当前位置: 首页 > article >正文

分布式服务框架 如何设计一个更合理的协议

1、概述

        前面我们聊了如何设计一款分布式服务框架的问题,并且编码实现了一个简单的分布式服务框架 cheese, 目前 cheese 基本具备分布式服务框架的基本功能。后面我们又引入了缓存机制,以及使用Socket替代了最开始的 RestTemplate。并且还学习了网络相关的知识点和HTTP协议。今天我们就继续分析,如何继续优化我们的Cheese,让她变得更加高效简洁。

2、HTTP协议合不合适

        我们回忆一下之前学习的HTTP请求的组成,大概是这个样子,

如果展开请求头和响应头 内容会更多,如果是浏览器访问肯定是没问题,但是如果用在Cheese 里面是否合适呢。你想想,Jerry就是想问Tom 要一块奶酪,一个简单的请求,难道还要按照HTTP协议约定的格式拼接一大堆和奶酪无关的东西,然后组成一个HTTP请求发过去,这样不仅会影响Cheese的远程调用的效率更是一种资源浪费。想想一个HTTP请求从源端到目标端经历的那些是不是头皮发麻。 (参考这里)

 这个问题就是HTTP协议里面存在很多和业务需求无关的信息,但是我们又不能去掉,否则解析的时候会出现问题。 那么怎么解决呢,显而易见解决方案就是放弃HTTP协议。

3、一个更为合理的协议

3.1、协议的概念

        首先说明一下 这里我们说的协议是工作在应用层的,也就是类似HTTP协议的那种,传输层依然还是TCP协议。这里大家可以先 回忆一下之前的文章中反复提到的一句话  客户端和服务器可以通过 Socket 建立网络连接 然后使用Socket对象的输入输出流 进行数据交换 ,(传送门)

我们仔细想想,既然是数据交换,那么协议的本质就是定义一组固定格式的数据,客户端按照这种格式组装请求报文,服务端按照同样的方式解析请求报文。这里的  固定格式  就是协议的体现了

3.2、 设计一个合理的固定格式

        在这之前我们还是先给我们的协议取个名字吧,姑且就叫 Cheese 协议吧。我们回忆一下之前的 RemoteServer 2.0 设计的思路。也就是回到 Tom & Jerry的那个案例中去 

之前的步骤 大概是这样 (懒得打字了,截图吧)

这是一个抽象的过程,我们这里将上述步骤可以进一步细化,如下所示

我们这里设计的 Cheese 协议说的透彻点就是如何封装请求对象和响应对象。根据上图我们可以大概的知道这一组对象主要包括的属性,其中请求对象 至少需要5个属性,首先是服务对应的类名,客户端需要告诉服务端执行哪一个类,接着还有方法名以及参数类型和返回值类型等等。返回对象主要有返回值和返回状态标识,如果发生异常了还需要将异常信息返回给客户端。

设计完成后按照上图描述的 Jerry 和 Tom 的步骤 传递这一组对象就可以实现这个通信的过程了。

4、Cheese 协议的落地

4.1、请求对象和响应对象的设计

        这部分内容主要涉及请求和响应对象,首先定义出一个顶层的接口,里面包含设置返回码和服务端ip

package org.wcan.cheese.protocol;

import java.io.Serializable;

/**
 * @Description
 * @Author wcan
 * @Date 2025/1/16 下午 23:37
 * @Version 1.0
 */
public interface CheeseProtocol extends Serializable {
     public void setResponseCode(Integer responseCode);
     public void setServerIp(String serverIp);
}

接着我们实现请求对象和响应对象的适配层,这里引入一个抽象类 AbstractCheeseProtocol 

package org.wcan.cheese.protocol;

/**
 * @Description
 * @Author wcan
 * @Date 2025/1/17 下午 17:12
 * @Version 1.0
 */
public class AbstractCheeseProtocol implements CheeseProtocol{

    public  Integer responseCode = 200;

    public String serverIp = null;

    public  Integer getResponseCode() {
        return responseCode;
    }

    public String getServerIp() {
        return serverIp;
    }

    @Override
    public void setResponseCode(Integer responseCode) {
        this.responseCode= responseCode;
    }

    @Override
    public void setServerIp(String serverIp) {
        this.serverIp = serverIp;
    }
}

最后给出 CheeseRequest 和 CheeseResponse 的代码

package org.wcan.cheese.protocol;

import java.util.Arrays;
import java.util.Objects;

/**
 * @Description Cheese 请求对象
 * @Author wcan
 * @Date 2025/1/16 下午 23:15
 * @Version 1.0
 */
public class CheeseRequest extends AbstractCheeseProtocol{

    public String className;
    public String methodName;
    private Class<?> returnValueType;
    private Class[] parameterTypes;
    private Object[] parameterValue;

    public CheeseRequest() {

    }
    public CheeseRequest(String className, String methodName, Class<?> returnValueType, Class[] parameterTypes, Object[] parameterValue) {
        this.className = className;
        this.methodName = methodName;
        this.returnValueType = returnValueType;
        this.parameterTypes = parameterTypes;
        this.parameterValue = parameterValue;
    }

    public String getClassName() {
        return className;
    }

    public void setClassName(String className) {
        this.className = className;
    }

    public String getMethodName() {
        return methodName;
    }

    public void setMethodName(String methodName) {
        this.methodName = methodName;
    }

    public Class<?> getReturnValueType() {
        return returnValueType;
    }

    public void setReturnValueType(Class<?> returnValueType) {
        this.returnValueType = returnValueType;
    }

    public Class[] getParameterTypes() {
        return parameterTypes;
    }

    public void setParameterTypes(Class[] parameterTypes) {
        this.parameterTypes = parameterTypes;
    }

    public Object[] getParameterValue() {
        return parameterValue;
    }

    public void setParameterValue(Object[] parameterValue) {
        this.parameterValue = parameterValue;
    }

    @Override
    public boolean equals(Object o) {
        if (this == o) return true;
        if (o == null || getClass() != o.getClass()) return false;
        CheeseRequest that = (CheeseRequest) o;
        return Objects.equals(className, that.className) && Objects.equals(methodName, that.methodName) && Objects.equals(returnValueType, that.returnValueType) && Arrays.equals(parameterTypes, that.parameterTypes) && Arrays.equals(parameterValue, that.parameterValue);
    }

    @Override
    public int hashCode() {
        int result = Objects.hash(className, methodName, returnValueType);
        result = 31 * result + Arrays.hashCode(parameterTypes);
        result = 31 * result + Arrays.hashCode(parameterValue);
        return result;
    }

    @Override
    public String toString() {
        return "CheeseRequest{" +
                "className='" + className + '\'' +
                ", methodName='" + methodName + '\'' +
                ", returnValueType=" + returnValueType +
                ", parameterTypes=" + Arrays.toString(parameterTypes) +
                ", parameterValue=" + Arrays.toString(parameterValue) +
                '}';
    }
}
package org.wcan.cheese.protocol;

/**
 * @Description Cheese 请求对象
 * @Author wcan
 * @Date 2025/1/16 下午 23:15
 * @Version 1.0
 */
public class CheeseResponse extends AbstractCheeseProtocol {

    private Object returnValue;
    private Exception exceptionValue;


    public CheeseResponse() {
    }

    public CheeseResponse(Object returnValue, Exception exceptionValue) {
        this.returnValue = returnValue;
        this.exceptionValue = exceptionValue;
    }

    public Object getReturnValue() {
        return returnValue;
    }

    public void setReturnValue(Object returnValue) {
        this.returnValue = returnValue;
    }

    public Exception getExceptionValue() {
        return exceptionValue;
    }

    public void setExceptionValue(Exception exceptionValue) {
        this.exceptionValue = exceptionValue;
    }

    @Override
    public String toString() {
        return "CheeseResponse{" +
                "returnValue=" + returnValue +
                ", exceptionValue=" + exceptionValue +
                '}';
    }
}

 4.2、服务端组件设计

        服务端的逻辑相对简单一些 ,主要就是解析CheeseRequest对象,获取类名、方法名、返回值类型以及入参类型这几个关键的信息,然后通过反射调用对应的目标方法,最后将结果封装成CheeseResponse对象返回给客户端。

首先我们使用 NIO 实现 ServerEndpoint

package org.wcan.cheese.remote.cheese;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.wcan.cheese.execute.ReflectionExecute;
import org.wcan.cheese.protocol.CheeseRequest;
import org.wcan.cheese.protocol.CheeseResponse;

import java.io.IOException;
import java.nio.ByteBuffer;
import java.nio.channels.SelectionKey;
import java.nio.channels.Selector;
import java.nio.channels.ServerSocketChannel;
import java.nio.channels.SocketChannel;
import java.nio.charset.StandardCharsets;
import java.util.Iterator;

/**
 * @Description rpc 通信端点
 * @Author wcan
 * @Date 2025/1/17 下午 14:15
 * @Version 1.0
 */
@Component
public class ServerEndpoint {

    @Autowired
    private ReflectionExecute reflectionExecute;

    @Value(value = "${cheesePort:8000}")
    private int cheesePort;  // 监听端口
    public void ServerStart() {
        try {
            // 打开服务器端的 ServerSocketChannel
            ServerSocketChannel serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new java.net.InetSocketAddress(cheesePort));
            serverSocketChannel.configureBlocking(false);
            // 打开 Selector
            Selector selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器正在运行,监听端口 " + cheesePort);
            //TODO 注册端口
            while (true) {
                // 阻塞,等待 I/O 事件发生
                selector.select();
                // 获取所有发生的事件
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
                    if (key.isAcceptable()) {
                        // 接受连接请求
                        handleAccept(serverSocketChannel, selector);
                    } else if (key.isReadable()) {
                        // 处理读取请求
                        handleResponse(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    private static void handleAccept(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
        // 接受客户端连接
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        // 注册到 Selector,监听读事件
        socketChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("新连接接入:" + socketChannel.getRemoteAddress());
    }


    private void handleResponse(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int bytesRead = socketChannel.read(buffer);
        if (bytesRead == -1) {
            socketChannel.close();
            System.out.println("连接关闭");
            return;
        }
        //获取请求内容
        String request = new String(buffer.array(), 0, bytesRead, StandardCharsets.UTF_8);
        ObjectMapper objectMapper = new ObjectMapper();
        CheeseRequest cheeseRequest = objectMapper.readValue(request, CheeseRequest.class);
        //执行请求
        CheeseResponse cheeseResponse = reflectionExecute.execute(cheeseRequest);
        //返回内容
        String response = objectMapper.writeValueAsString(cheeseResponse);
        ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
        // 发送响应数据
        while (byteBuffer.hasRemaining()) {
            socketChannel.write(byteBuffer);
        }
        socketChannel.close();
        System.out.println("响应已发送");
    }
}

接着 我们编写反射调用指定方法的工具类 

package org.wcan.cheese.execute;

import org.springframework.stereotype.Component;
import org.wcan.cheese.protocol.CheeseRequest;
import org.wcan.cheese.protocol.CheeseResponse;

import java.lang.reflect.Method;
import java.util.Arrays;

/**
 * @Description
 * @Author wcan
 * @Date 2025/1/16 下午 23:26
 * @Version 1.0
 */
@Component
public class ReflectionExecute {

    public CheeseResponse execute(CheeseRequest cheeseRequest) {
        CheeseResponse cheeseResponse = new CheeseResponse();
        String className = cheeseRequest.getClassName();
        String methodName = cheeseRequest.getMethodName();
//        Class<?> returnValueType = cheeseRequest.getReturnValueType();
//        Class[] parameterTypes = cheeseRequest.getParameterTypes();
        try {
            Class<?> aClass = Class.forName(className);
            Method method = aClass.getMethod(methodName, cheeseRequest.getParameterTypes());
            Object invoke = method.invoke(aClass.newInstance(), cheeseRequest.getParameterValue());
            cheeseResponse.setReturnValue(invoke);
        } catch (Exception e) {
            cheeseResponse.setExceptionValue(e);
        }
        return cheeseResponse;
    }

}

至此服务端的逻辑已经完成了

4.3、服务端功能测试 

        我们先测试一下编写的服务组件,在 tom-store 工程里 新建一个Service类 (工程代码从这里下载)

package org.tom.service;

import org.springframework.stereotype.Service;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description
 * @Author wcan
 * @Date 2025/1/20 下午 14:26
 * @Version 1.0
 */
@Service
public class CheeseService {

    public Map<String, Object> getCheese() throws Exception{
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("cheese", "A piece of cheese on the small table in the room");
        map.put("msg","我正在约会 不要打扰我");
        return map;
    }

    public Map<String, Object> getCheeseByName(String name,Integer size) throws Exception{
        Map<String, Object> map = new HashMap<String, Object>();
        map.put("cheese", "A "+ size+ "-centimeter piece of cheese is on the table");
        map.put("msg","hi "+name +"! 我正在约会 不要打扰我");
        return map;
    }
}

接着我们编写一个测试类

public class TestDemo {

    public static void main(String[] args) {
        CheeseRequest cheeseRequest = new CheeseRequest();
        cheeseRequest.setClassName("org.tom.service.CheeseService");
        cheeseRequest.setMethodName("getCheeseByName");
        cheeseRequest.setReturnValueType(Map.class);
        cheeseRequest.setParameterTypes(new Class[]{String.class,Integer.class});
        cheeseRequest.setParameterValue(new Object[]{"乔峰",35});
        CheeseResponse cheeseResponse = new ReflectionExecute().execute(cheeseRequest);
        Object returnValue = cheeseResponse.getReturnValue();
        System.out.println(returnValue);
    }
}

执行后结果如下图所示: 

 至此服务端的反射调用的逻辑能走通,接着我们后续就能通过Socket 来发送  CheeseRequest 对象了。

4.4、客户端组件设计

        客户端主要承载的能力是 连接服务端,将Jerry 需要调用Tom 的服务的元信息(类名、入参、入参类型、返回值类型)封装成 CheeseRequest 对象,然后通过Socket 发送到服务端。

package org.wcan.cheese.remote.cheese;

import com.fasterxml.jackson.databind.ObjectMapper;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.stereotype.Component;
import org.wcan.cheese.protocol.CheeseRequest;
import org.wcan.cheese.protocol.CheeseResponse;

import java.io.IOException;
import java.net.InetSocketAddress;
import java.nio.ByteBuffer;
import java.nio.channels.SocketChannel;
import java.nio.charset.Charset;

/**
 * @Description
 * @Author wcan
 * @Date 2025/1/17 下午 15:13
 * @Version 1.0
 */
@Component
public class ClientEndpoint {

    @Value(value = "${cheesePort:8000}")
    private int cheesePort;  // 监听端口

    public CheeseResponse doRequest(CheeseRequest cheeseRequest){
        SocketChannel client = null;
        CheeseResponse cheeseResponse = null;
        try {
            // 创建一个客户端SocketChannel,并连接到服务端
            client = SocketChannel.open(new InetSocketAddress(cheeseRequest.getServerIp(), 8000));
            // 设置为非阻塞模式
            client.configureBlocking(false);
            ObjectMapper objectMapper = new ObjectMapper();
            // 准备发送的数据
            String request = objectMapper.writeValueAsString(cheeseRequest);
            // 使用Charset将字符串编码成字节
            ByteBuffer buffer = Charset.forName("UTF-8").encode(request);
            // 发送数据到服务端
            while (buffer.hasRemaining()) {
                client.write(buffer);
            }
            System.out.println("Message sent to the server: " + request);
            // 等待服务端响应并读取响应数据
            // 创建一个缓冲区用于接收服务端的响应
            ByteBuffer readBuffer = ByteBuffer.allocate(1024);
            int bytesRead = 0;
            // 不断尝试读取服务端响应
            while (bytesRead == 0) {
                bytesRead = client.read(readBuffer);
            }
            // 读取数据完成,解码并输出响应消息
            if (bytesRead > 0) {
                readBuffer.flip();
                String response = Charset.forName("UTF-8").decode(readBuffer).toString();
                cheeseResponse = objectMapper.readValue(response, CheeseResponse.class);
                System.out.println("Response from server: " + cheeseResponse);
            }

        } catch (IOException e) {
            throw new RuntimeException(e);
        } finally {
            try {
                if (client != null && client.isOpen()) {
                    client.close();
                }
            } catch (IOException e) {
                e.printStackTrace();
            }
        }
        return cheeseResponse;
    }
}

我们编写好了 ClientEndpoint,客户端的核心内容还有一部分,那就是我们需要和服务发现组件集成。

4.5、CheeseRemoteExecute 实现

         这里我们设计一个 CheeseRemoteExecute 组件,通过这个组件将 ClientEndpoint 组件和DiscoveryServer组件关联起来,DiscoveryServer组件从注册中心获取服务端的ip和端口号信息 然后交给ClientEndpoint处理。

package org.wcan.cheese.execute;

import org.springframework.stereotype.Service;
import org.wcan.cheese.DiscoveryServer;
import org.wcan.cheese.RemoteExecute;
import org.wcan.cheese.protocol.CheeseRequest;
import org.wcan.cheese.protocol.CheeseResponse;
import org.wcan.cheese.remote.cheese.ClientEndpoint;

import java.util.HashMap;
import java.util.Map;

/**
 * @Description
 * @Author wcan
 * @Date 2024/10/27 下午 19:24
 * @ClassName HttpRemoteExecute
 * @Version 1.0
 */
@Service
public class CheeseRemoteExecute implements RemoteExecute {

    private DiscoveryServer discoveryServer;
    private ClientEndpoint clientEndpoint;

    public CheeseRemoteExecute(DiscoveryServer discoveryServer,ClientEndpoint clientEndpoint) {
        this.discoveryServer = discoveryServer;
        this.clientEndpoint = clientEndpoint;
    }

    @Override
    public String execute(String serviceName) throws Exception {
        return null;
    }

    @Override
    public Map<String, Object> execute(String serviceName, Object[] params) throws Exception {

        String[] serviceNames = serviceName.split("#");
        String className = serviceNames[0];
        String methodName = serviceNames[1];
        String serverUrl = discoveryServer.getSingleServer(className);
        String[] split = serverUrl.split(":");
        String serverIp = split[0];
        CheeseRequest cheeseRequest = new CheeseRequest();
        cheeseRequest.setClassName(className);
        cheeseRequest.setMethodName(methodName);
        cheeseRequest.setReturnValueType(Map.class);
        Class[] classes = null;
        if(params.length>0){
            classes = new Class[params.length];
            for (int i = 0; i <params.length; i++) {
                classes[i] = params[i].getClass();
            }
        }
        cheeseRequest.setParameterTypes(classes);
        cheeseRequest.setParameterValue(params);
        cheeseRequest.setServerIp(serverIp);
        CheeseResponse cheeseResponse = clientEndpoint.doRequest(cheeseRequest);
        Object returnValue = cheeseResponse.getReturnValue();
        HashMap<String, Object> objectHashMap = new HashMap<>();
        objectHashMap.put("data", returnValue);
        return objectHashMap;
    }
}

 到这里我们的 Cheese 协议已经开发完成了,后面我们需要做的就是把它集成到框架里进行工程化落地。

5、工程化落地

5.1、ServerEndpoint 集成方案

        当我们将 tom-store 服务启动起来后,ServerEndpoint 怎么启动呢,毫无疑问我们首先想到的就是 Spring容器启动的时候就初始化 ServerEndpoint 组件,然后随着SpringBoot内置的Tomcat启动而启动,这个时候 tom-store 服务大概就是这个样子 

9000端口负责处理HTTP请求,8000端口处理我们自己设计的 Cheese 协议请求。同样的我们启动Jerry 服务也是上图中描述的样子,无论是 Tom 还是Jerry 他们既可以是服务端也可以是客户端。

集成方案如下,我们改造一下 ServerLister 组件,将 ServerEndpoint 组件注入进去,并且在ServerLister 的构造器中使用一个线程异步启动 ServerEndpoint 组件。相关代码如下

package org.wcan.cheese.event;

import org.springframework.boot.context.event.ApplicationReadyEvent;
import org.springframework.context.ApplicationListener;
import org.springframework.stereotype.Component;
import org.springframework.stereotype.Controller;
import org.springframework.stereotype.Service;
import org.wcan.cheese.config.CheeseConfig;
import org.wcan.cheese.remote.cheese.ServerEndpoint;

import java.util.HashMap;
import java.util.Map;

@Component
public class ServerLister implements ApplicationListener<ApplicationReadyEvent> {

    private final RegisterEvent registerEvent;

    private final CheeseConfig cheeseConfig;

    private final ServerEndpoint serverEndpoint;

    public ServerLister(RegisterEvent registerEvent, CheeseConfig cheeseConfig, ServerEndpoint serverEndpoint) {
        this.registerEvent = registerEvent;
        this.cheeseConfig = cheeseConfig;
        this.serverEndpoint = serverEndpoint;
        new Thread(() -> serverEndpoint.ServerStart()).start();
    }


    @Override
    public void onApplicationEvent(ApplicationReadyEvent event) {
        System.out.println("所有的 Controller 和它们的请求映射:");
        Map<String, Object> serverMap = new HashMap<String, Object>();
        Map<String, Object> controllers = event.getApplicationContext().getBeansWithAnnotation(Controller.class);
        String serverPackage = cheeseConfig.getServerPackage();
        if (null == serverPackage || "".equals(serverPackage))
            return;
        controllers.forEach((key, value) -> {
            if (value.toString().contains(serverPackage)) {
                serverMap.put(key.toString(), value.getClass().getName());
            }
        });
        Map<String, Object> servers = event.getApplicationContext().getBeansWithAnnotation(Service.class);
        servers.forEach((key, value) -> {
            if (value.toString().contains(serverPackage)) {
                serverMap.put(key.toString(), value.getClass().getName());
            }
        });
        registerEvent.registerServer(serverMap);
    }

}

5.2、集成测试

PS: 大家可以去仓库拉取完整的工程化代码运行

我们分别启动 tom-store 和 jerry-store 两个工程 ,然后浏览器访问 

http://localhost:8001/getCheeseByName?name=jerry&size=5

我们就能看到效果了

我们观察控制台,也能看到调用的的时候打印的信息

5.3、如何处理 ServerEndpoint 的启停

        相信聪明的你肯定 能想到单独开一个线程启动 ServerEndpoint 组件的好处,第一是异步启动不会影响到主服务的启动效率,第二是子线程如果发生了异常不会影响到主线程的运行。这就是偷偷new 一个线程去处理 ServerEndpoint 的好处

        但是这么做也有一个问题或许要考虑,假设子线程发生了异常终止,在不重启服务的情况下 怎么把 ServerEndpoint 重新拉起来呢。或者我想要不停服的情况下单独停止 ServerEndpoint,假设ServerEndpoint 遭受了恶意攻击,影响了主服务业务我们要单独停掉 ServerEndpoint 。这种情况需要怎么处理呢 

我们改造一下ServerEndpoint 类 ,加入两个方法,改造后的代码如下

@Component
public class ServerEndpoint {

    @Autowired
    private ReflectionExecute reflectionExecute;

    private ServerSocketChannel serverSocketChannel;
    private Selector selector;


    @Value(value = "${cheesePort:8000}")
    private int cheesePort = 8000;  // 监听端口
    public void ServerStart() {
        try {
            // 打开服务器端的 ServerSocketChannel
            serverSocketChannel = ServerSocketChannel.open();
            serverSocketChannel.bind(new java.net.InetSocketAddress(cheesePort));
            serverSocketChannel.configureBlocking(false);
            // 打开 Selector
             selector = Selector.open();
            serverSocketChannel.register(selector, SelectionKey.OP_ACCEPT);
            System.out.println("服务器正在运行,监听端口 " + cheesePort);
            //TODO 注册端口
            while (true) {
                // 阻塞,等待 I/O 事件发生
                selector.select();
                // 获取所有发生的事件
                Iterator<SelectionKey> selectedKeys = selector.selectedKeys().iterator();
                while (selectedKeys.hasNext()) {
                    SelectionKey key = selectedKeys.next();
                    selectedKeys.remove();
                    if (key.isAcceptable()) {
                        // 接受连接请求
                        handleAccept(serverSocketChannel, selector);
                    } else if (key.isReadable()) {
                        // 处理读取请求
                        handleResponse(key);
                    }
                }
            }
        } catch (IOException e) {
            e.printStackTrace();
        }
    }

    public void stopServer() throws IOException {
        this.serverSocketChannel.close();
        this.selector.close();
        System.out.println("服务器已关闭");
    }
    public void restartServer() throws IOException {
        stopServer();
        new Thread(() -> ServerStart()).start();
        System.out.println("服务器正在重启");
    }

    private static void handleAccept(ServerSocketChannel serverSocketChannel, Selector selector) throws IOException {
        // 接受客户端连接
        SocketChannel socketChannel = serverSocketChannel.accept();
        socketChannel.configureBlocking(false);
        // 注册到 Selector,监听读事件
        socketChannel.register(selector, SelectionKey.OP_READ);
        System.out.println("新连接接入:" + socketChannel.getRemoteAddress());
    }



    private void handleResponse(SelectionKey key) throws IOException {
        SocketChannel socketChannel = (SocketChannel) key.channel();
        ByteBuffer buffer = ByteBuffer.allocate(1024);
        int bytesRead = socketChannel.read(buffer);
        if (bytesRead == -1) {
            socketChannel.close();
            System.out.println("连接关闭");
            return;
        }
        //获取请求内容
        String request = new String(buffer.array(), 0, bytesRead, StandardCharsets.UTF_8);
        ObjectMapper objectMapper = new ObjectMapper();
        CheeseRequest cheeseRequest = objectMapper.readValue(request, CheeseRequest.class);
        //执行请求
        CheeseResponse cheeseResponse = reflectionExecute.execute(cheeseRequest);
        //返回内容
        String response = objectMapper.writeValueAsString(cheeseResponse);
        ByteBuffer byteBuffer = ByteBuffer.wrap(response.getBytes(StandardCharsets.UTF_8));
        // 发送响应数据
        while (byteBuffer.hasRemaining()) {
            socketChannel.write(byteBuffer);
        }
        socketChannel.close();
        System.out.println("响应已发送");
    }
}

我们继续在 tom-store工程里加入一个Controller

@RestController
public class AdminController {


    @Autowired
    private ServerEndpoint serverEndpoint;

    @RequestMapping("/stop")
    public String stopServer() throws IOException {
        serverEndpoint.stopServer();
        return "stop success";
    }

    @RequestMapping("/restart")
    public String restartServer() throws IOException {
        serverEndpoint.restartServer();
        return "restart success";
    }

}

我们重启 tom-store  然后访问下面的地址 

重启: http://localhost:9000/restart
​停止: http://localhost:9000/stop​

这是一个比较简单的方案,把 ServerEndpoint的存活权交给用户处理。

6、总结与思考

6.1、总结

        这篇文章给大家介绍了如何自定义应用层的协议,根据我们的实际业务设计一个更合理的协议。我们主要的实现方案就是 基于 NIO 创建 Socket 通道传输根据Cheese协议封装的对象来完成客户端和服务端的数据交换。

6.2、关于Cheese的思考

        数据从内存里传递到网络中这个过程会涉及到序列化,本篇文章我们使用的是 JSON序列化,这种方式比较直观,因为人可以看懂所以也方便调试。缺点就是效率不高,我们后面会研究一下常用的序列化方式,为我们的Cheese 协议 选择一种最佳的方案

       在服务端我们使用的是反射执行目标方法,其实这是一个很耗时的操作,因为每次我们都要加载类的元信息,而类信息又是固定的,所以这里其实是可以优化的。

      目前我们是把服务接口配在了配置文件中,比如 jerry-store 项目的配置文件中serviceName 就是指定调用的服务接口,程序拿着这个接口去注册中心获取这个接口的注册信息

spring.application.name=jerry-store
server.port=8001

zkUrl=XXXX
zookeeperBasePath=/jerry
nodePath=/config
timeout=5000
#serverPackage=org.jerry

##配置接口编号
serviceName=org.tom.service.CheeseService#getCheeseByName

##cheese协议端口
cheesePort=8002

 这种方法实现比较简单,但是你肯定看的出来这并不是一个好的解决方案,我们应该思考将serviceName放在哪里合适。

后面我们讨论的话题将主要围绕这三点展开,让我们的 Cheese 更加健壮。


http://www.kler.cn/a/541849.html

相关文章:

  • vue知识点2
  • 从云原生到 AI 原生,谈谈我经历的网关发展历程和趋势
  • Visual Studio Code (VSCode) 的基本设置指南,帮助你优化开发环境
  • Android图片加载框架Coil,Kotlin
  • C++ STL容器之vector的使用及复现
  • 手动配置IP
  • 爬取彩票网站数据
  • rpx和px混用方案
  • 【2024最新Java面试宝典】—— SpringBoot面试题(44道含答案)_java spingboot 面试题
  • el-table多列勾选
  • Vue2生命周期面试题
  • Access数据库教案(Excel+VBA+Access数据库SQL Server编程)
  • (3/100)每日小游戏平台系列
  • Visual Studio 2022环境下Miracl Lib库报错“无法解析的外部命令”
  • 数字孪生平台 v5.2 发布
  • Vulnhub empire-lupinone靶机攻击实战(一)
  • 【Elasticsearch】Elasticsearch检索方式全解析:从基础到实战(一)
  • 系统开发:大文件下载报错问题
  • 【自然语言处理】TextRank 算法提取关键词、短语、句(Python源码实现)
  • 【算法-动态规划】、魔法卷轴: 两次清零机会整个数组最大累加和
  • 代发考试战报:2月5号最近考过的思科和华为考试战报
  • 请求响应-请求-日期参数json参数路径参数
  • 【SpringBoot苍穹外卖】debugDay02
  • SpringBoot中为什么要引入消息队列依赖项
  • 防御保护-----前言
  • 用于构建基于大型语言模型(LLM)的开源框架LangChain介绍及代码实践