跳到主要内容

自定义协议(重点)

一些协议概念

  • RPC(Remote Procedure Call Protocol)远程过程调用协议。一个通俗的描述是:客户端在不知道调用细节的情况下,调用存在于远程计算机上的某个对象,就像调用本地应用程序中的对象一样。

  • HTTP(超文本传输协议,Hypertext Transfer Protocol是一种用于从网络传输超文本到本地浏览器的传输协议。它定义了客户端与服务器之间请求和响应的格式。HTTP 工作在 TCP/IP 模型之上,通常使用端口 80

  • 传输控制协议(TCP,Transmission Control Protocol是一种面向连接的、可靠的、基于字节流的传输层通信协议,

  • UDP(User Datagram Protocol)即用户数据报协议,在网络中它与TCP协议一样用于处理数据包,是一种无连接的协议。

  • HTTP只是一个通信协议,工作在OSI的第七层,不是一个完整的远程调用方案。

  • 其实rpc不是一种协议,rpc是一种调用过程的方案/范式/实现。RPC是一个完整的远程调用方案,它包括了:接口规范+序列化反序列化规范+通信协议等。

为什么要自定义RPC协议

性能优化:

  • 标准的 HTTP/REST 协议虽然使用广泛,但由于其报文头部开销较大,不适合高性能的 RPC 场景。
  • 自定义的二进制协议,如 Protobuf、Thrift 等,可以大幅降低数据传输的开销,提升 RPC 的吞吐量和延迟。

自定义RPC协议设计

自定义RPC协议两大核心部分:

  • 自定义网络传输
  • 自定义消息结构

网络传输设计

由于HTTP本身是应用层协议,我们现在要设计的RPC协议也是 应用层协议,性能不如底层(TCP与UDP)传输效率高 ,因此对于高性能的追求,我们选择使用TCP协议进行网络传输。

消息结构设计

我们设计消息结构的目的是为了用最少的空间来传输需要的信息。

int占4个字节、32个比特(bit)位、而byte占1个字节(8个bit位),尽量要选择使用byte。

但是Java中bit运算麻烦,因此要尽量凑到整个字节

消息结构设计,我们的RPC消息所需要的信息:

  • 魔数:标识当前消息是 RPC 协议的消息,避免与其他协议的消息混淆,提高消息的可靠性。
  • 版本号:用于标识当前 RPC 协议的版本,以便于后续的协议升级和兼容性管理。
  • 序列化方式:标识消息体采用的序列化方式,如 Protobuf、Hessian 等,便于接收方进行正确的反序列化。
  • 类型:标识当前消息的类型,如请求、响应、通知
  • 状态:标识当前消息的状态,如成功、失败等
  • 请求ID:标识当前消息的唯一标识符,便于接收方与对应的请求进行关联。
  • 消息体长度:标识消息体的长度,便于接收方准确获取完整的消息内容(TCP有半包和粘包问题,传输信息不完整)
  • 消息体内容:携带实际的业务数据,如方法名、参数列表、返回值等。

结构如下:

message.svg

请求头的大小=1+1+1+1+1+8+4=17字节=1+1+1+1+1+8+4=17字节,我们将整个结构拼接在一起成紧凑的数据。

在后续实现消息编码器和消息解码器的时候:

  • 首先编码器按照顺序向缓冲区Buffer写入这些数据
  • 解码器在按照这个顺序依次读取,比如读magic,只需要读第一个 字节(8bit)即可。

使用这种方式,我们就不用使用key=value的格式 ,这样可以更省内存 。

Redis中很多数据结构都是这样设计的如:参考链接

image-20240503153829665

Dubbo协议设计:

/dev-guide/images/dubbo_protocol_header.jpg

代码实现

消息结构代码

package com.yunfei.rpc.protocol;

import lombok.AllArgsConstructor;
import lombok.Data;
import lombok.NoArgsConstructor;

/**
* 协议消息结构
*/
@Data
@AllArgsConstructor
@NoArgsConstructor
public class ProtocolMessage<T> {
/**
* 消息头
*/
private Header header;

/**
* 消息体 (请求或响应对象)
*/
private T body;

/**
* 协议消息头
*/
@Data
public static class Header {
/**
* 魔数
*/
private byte magic;

/**
* 协议版本
*/
private byte version;

/**
* 序列化器
*/
private byte serializer;

/**
* 消息类型 - 请求/响应
*/
private byte type;


/**
* 状态
*/
private byte status;

/**
* 请求 ID
*/
private long requestId;

/**
* 消息体长度
*/
private int bodyLength;
}

}

消息编码

Vert.x的TCP服务器收发消息都是Buffer类型,不可以直接写入Java对象,我们需要实现一个编码器和解码器,使得Java对象和Buffer之间可以相互转换

encode.svg

消息编码

public class ProtocolMessageEncoder {
/**
* 编码
*/
public static Buffer encode(ProtocolMessage<?> protocolMessage) throws Exception {
if (protocolMessage == null || protocolMessage.getHeader() == null) {
return Buffer.buffer();
}
ProtocolMessage.Header header = protocolMessage.getHeader();
// 依次向缓冲区写入字节
Buffer buffer = Buffer.buffer();
buffer.appendByte(header.getMagic());
buffer.appendByte(header.getVersion());
buffer.appendByte(header.getSerializer());
buffer.appendByte(header.getType());
buffer.appendByte(header.getStatus());
buffer.appendLong(header.getRequestId());

// 获取序列化器
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("不支持的序列化器");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
byte[] bodyBytes = serializer.serialize(protocolMessage.getBody());
// 写入 body 长度 和 数据
buffer.appendInt(bodyBytes.length);
buffer.appendBytes(bodyBytes);
return buffer;
}
}

解释:

获取 protocolMessageheader 属性,并将其各个字段写入到一个新创建的 Buffer 对象中。这些字段包括:

  • magic: 消息头的"魔数",用于标识消息协议。byte类型,1个字节
  • version: 消息协议的版本号。byte类型,1个字节
  • serializer: 消息体的序列化方式。byte类型,1个字节
  • type: 消息的类型,如请求、响应等。byte类型,1个字节
  • status: 消息的状态,如成功、失败等。byte类型,1个字节
  • requestId: 消息的请求 ID。long类型,8个字节

然后根据 header 中的 serializer 字段,获取对应的序列化器实现。使用序列化器将 protocolMessagebody 属性序列化为字节数组。

  • bodyLength:将字节数组的长度写入 Buffer 对象。int类型,4个字节
  • body:然后将字节数组本身也写入 Buffer 对象。未知

encode1.svg

消息解码

在消息解码的时候,我们就可以参考上面的图了,依次从中读取对应的数据,同时为了解决粘包的问题,我们读取的body的大小应该是bodyLength的值。

/**
* 协议消息解码器
*/
public class ProtocolMessageDecoder {
public static ProtocolMessage<?> decode(Buffer buffer) throws Exception {
// 分别从指定位置读出Buffer
ProtocolMessage.Header header = new ProtocolMessage.Header();
byte magic = buffer.getByte(0);

// 校验魔数
if (magic != ProtocolConstant.PROTOCOL_MAGIC) {
throw new Exception("Invalid magic!");
}
header.setMagic(magic);
header.setVersion(buffer.getByte(1));
header.setSerializer(buffer.getByte(2));
header.setType(buffer.getByte(3));
header.setStatus(buffer.getByte(4));
header.setRequestId(buffer.getLong(5));
header.setBodyLength(buffer.getInt(13));

// 解决粘包问题,只读取指定长度的数据
byte[] bodyBytes = buffer.getBytes(17, 17 + header.getBodyLength());
// 解析消息体
ProtocolMessageSerializerEnum serializerEnum = ProtocolMessageSerializerEnum.getEnumByKey(header.getSerializer());
if (serializerEnum == null) {
throw new RuntimeException("序列化消息的协议不存在");
}
Serializer serializer = SerializerFactory.getInstance(serializerEnum.getValue());
ProtocolMessageTypeEnum messageTypeEnum = ProtocolMessageTypeEnum.getEnum(header.getType());
if (messageTypeEnum == null) {
throw new RuntimeException("序列化消息的类型不存在");
}
switch (messageTypeEnum) {
case REQUEST:
RpcRequest request = serializer.deserialize(bodyBytes, RpcRequest.class);
return new ProtocolMessage<>(header, request);
case RESPONSE:
RpcResponse response = serializer.deserialize(bodyBytes, RpcResponse.class);
return new ProtocolMessage<>(header, response);
case HEAT_BEAT:
case OTHER:
default:
throw new RuntimeException("不支持的消息类型");
}
}
}

TCP服务器实现

我们使用Vert.x创建一个TCP服务

/**
* TCP服务器实现
*/
public class VertxTcpServer implements HttpServer {
@Override
public void doStart(int port) {
// 创建一个Vertx实例
Vertx vertx = Vertx.vertx();

// 创建一个TCP服务器
NetServer server = vertx.createNetServer();

// 处理连接请求
server.connectHandler(new TcpServerHandler());

// 启动TCP服务器并监听指定端口
server.listen(port, res -> {
if (res.succeeded()) {
System.out.println("TCP server is now listening on actual port: " + server.actualPort());
} else {
System.err.println("Failed to bind!");
}
});
}

public static void main(String[] args) {
new VertxTcpServer().doStart(8080);
}
}

其中处理连接请求在后面 server.connectHandler(new TcpServerHandler());,主要涉及半包、粘包等问题的处理

TCP客户端实现

public class VertxTcpClient {

public static RpcResponse doRequest(RpcRequest rpcRequest, ServiceMetaInfo metaInfo) throws Exception {
// 发送TCP请求
Vertx vertx = Vertx.vertx();
NetClient netClient = vertx.createNetClient();
CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(metaInfo.getServicePort(), metaInfo.getServiceHost(), res -> {
if (!res.succeeded()) {
System.err.println("Failed to connect to TCP server");
return;
}
System.out.println("Connected to TCP server");
NetSocket socket = res.result();

// 发送数据
ProtocolMessage<Object> protocolMessage = new ProtocolMessage<>();
ProtocolMessage.Header header = new ProtocolMessage.Header();

header.setMagic(ProtocolConstant.PROTOCOL_MAGIC);
header.setVersion(ProtocolConstant.PROTOCOL_VERSION);
header.setSerializer((byte) ProtocolMessageSerializerEnum.getEnumByValue(RpcApplication.getRpcConfig().getSerializer()).getKey());
header.setType((byte) ProtocolMessageTypeEnum.REQUEST.getKey());
header.setRequestId(IdUtil.getSnowflakeNextId());

protocolMessage.setHeader(header);
protocolMessage.setBody(rpcRequest);
// 编码请求
try {
Buffer encodeBuffer = ProtocolMessageEncoder.encode(protocolMessage);
socket.write(encodeBuffer);
} catch (Exception e) {
throw new RuntimeException(e);
}

// 接收响应
TcpBufferHandlerWrapper tcpBufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
try {
ProtocolMessage<RpcResponse> responseProtocolMessage = (ProtocolMessage<RpcResponse>) ProtocolMessageDecoder.decode(buffer);
responseFuture.complete(responseProtocolMessage.getBody());
} catch (Exception e) {
throw new RuntimeException("协议消息码错误");
}

});
socket.handler(tcpBufferHandlerWrapper);


});
System.out.println("Waiting for response");
RpcResponse rpcResponse = null;
rpcResponse = responseFuture.get(5, TimeUnit.SECONDS);
System.out.println("Received response");
netClient.close();
return rpcResponse;
}

public void start() {
Vertx vertx = Vertx.vertx();
vertx.createNetClient().connect(8082, "localhost", res -> {
if (res.succeeded()) {
System.out.println("Connected to Tcp Server!");
NetSocket socket = res.result();
for (int i = 0; i < 1000; i++) {
Buffer buffer = Buffer.buffer();
String str = "hello,server!hello,server!hello,server!hello,server!";
buffer.appendInt(0);
buffer.appendInt(str.getBytes().length);
System.out.println("Send data to server:" + str);
buffer.appendBytes(str.getBytes());
socket.write(buffer);
}
// 接收数据
socket.handler(buffer -> {
System.out.println("Received data from server:" + buffer.toString());
});

} else {
System.out.println("Failed to connect: " + res.cause().getMessage());
}
});
}

public static void main(String[] args) {
new VertxTcpClient().start();
}
}

请求处理器(服务提供者)

请求处理器的主要作用是接受请求,通过反射调用对应的服务实现类

我们通过Vert.x提供的Handler<NetSocket>接口,来实现TCP请求处理器

public class TcpServerHandler implements Handler<NetSocket> {

/**
* 处理请求
*
* @param socket the event to handle
*/
@Override
public void handle(NetSocket socket) {
TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
// 接受请求,解码
ProtocolMessage<RpcRequest> protocolMessage;
try {
protocolMessage = (ProtocolMessage<RpcRequest>) ProtocolMessageDecoder.decode(buffer);
} catch (Exception e) {
throw new RuntimeException("协议消息解码错误");
}
RpcRequest rpcRequest = protocolMessage.getBody();
ProtocolMessage.Header header = protocolMessage.getHeader();

// 处理请求
// 构造响应结果对象
RpcResponse rpcResponse = new RpcResponse();
try {
// 获取要调用的服务实现类,通过反射调用
Class<?> implClass = LocalRegistry.get(rpcRequest.getServiceName());
Method method = implClass.getMethod(rpcRequest.getMethodName(), rpcRequest.getParameterTypes());
Object result = method.invoke(implClass.newInstance(), rpcRequest.getArgs());
// 封装返回结果
rpcResponse.setData(result);
rpcResponse.setDataType(method.getReturnType());
rpcResponse.setMessage("ok");
} catch (Exception e) {
e.printStackTrace();
rpcResponse.setMessage(e.getMessage());
rpcResponse.setException(e);
}

// 发送响应,编码
header.setType((byte) ProtocolMessageTypeEnum.RESPONSE.getKey());
header.setStatus((byte) ProtocolMessageStatusEnum.OK.getValue());
ProtocolMessage<RpcResponse> responseProtocolMessage = new ProtocolMessage<>(header, rpcResponse);
try {
Buffer encode = ProtocolMessageEncoder.encode(responseProtocolMessage);
socket.write(encode);
} catch (Exception e) {
throw new RuntimeException("协议消息编码错误");
}
});
socket.handler(bufferHandlerWrapper);
}

}

我们的 Vert.x 的 TCP 服务端处理器,主要实现了以下功能:

  1. 接收客户端连接请求,并创建 TcpBufferHandlerWrapper 对象处理接收的数据。

  2. TcpBufferHandlerWrapper 中实现了以下逻辑:

    • 使用 ProtocolMessageDecoder.decode() 方法解码接收到的数据,得到 ProtocolMessage<RpcRequest> 对象。
    • ProtocolMessage 中获取 RpcRequest 对象,并根据其中的服务名称和方法名,通过反射调用对应的服务实现类方法,获取执行结果。
    • 创建 RpcResponse 对象,将执行结果封装其中,并设置响应状态为成功。如果在调用过程中出现异常,则将异常信息设置到 RpcResponse 中。
    • 使用 ProtocolMessageEncoder.encode() 方法将 ProtocolMessage<RpcResponse> 对象编码为字节序列,并通过 socket.write() 方法写回给客户端。

请求发送(服务消费者)

/**
* 动态代理
*/
public class ServiceProxy implements InvocationHandler {

// 指定序列化器
final Serializer serializer = SerializerFactory.getInstance(RpcApplication.getRpcConfig().getSerializer());

@Override
public Object invoke(Object proxy, Method method, Object[] args) throws Throwable {

// 构造请求
RpcRequest rpcRequest = RpcRequest.builder()
.serviceName(method.getDeclaringClass().getName())
.methodName(method.getName())
.parameterTypes(method.getParameterTypes())
.args(args)
.build();


// 从注册中心获取服务提供者请求地址
RpcConfig rpcConfig = RpcApplication.getRpcConfig();
Registry registry = RegistryFactory.getInstance(rpcConfig.getRegistryConfig().getRegistry());
ServiceMetaInfo serviceMetaInfo = new ServiceMetaInfo();
// 构造请求
String serviceName = method.getDeclaringClass().getName();
serviceMetaInfo.setServiceName(serviceName);
serviceMetaInfo.setServiceVersion(RpcConstant.DEFAULT_SERVICE_VERSION);
List<ServiceMetaInfo> serviceMetaInfos = registry.serviceDiscovery(serviceMetaInfo.getServiceKey());
if (CollUtil.isEmpty(serviceMetaInfos)) {
throw new RuntimeException("暂无可用服务提供者");
}

// 负载均衡
LoadBalancer loadBalancer = LoadBalancerFactory.getInstance(rpcConfig.getLoadBalancer());
HashMap<String, Object> requestParams = new HashMap<>();
requestParams.put("methodName", rpcRequest.getMethodName());
ServiceMetaInfo metaInfo = loadBalancer.select(requestParams, serviceMetaInfos);

// 发送TCP请求
// 使用重试策略
RpcResponse response ;
try {
RetryStrategy retryStrategy = RetryStrategyFactory.getInstance(rpcConfig.getRetryStrategy());
response = retryStrategy.doRetry(() -> {
return VertxTcpClient.doRequest(rpcRequest, metaInfo);
});
} catch (Exception e) {
TolerantStrategy strategy = TolerantStrategyFactory.getInstance(rpcConfig.getTolerantStrategy());
// 构造上下文
Map<String, Object> context = new HashMap<>();
context.put(TolerantStrategyConstant.SERVICE_LIST, serviceMetaInfos);
context.put(TolerantStrategyConstant.CURRENT_SERVICE, metaInfo);
context.put(TolerantStrategyConstant.RPC_REQUEST, rpcRequest);
response = strategy.doTolerant(context, e);
}
return response.getData();
}
}

VertxTcpClient.doRequest(rpcRequest, metaInfo);在上面已经有了

Vert.x提供的请求处理器是异步的,反应式的,为了更方便的获取结果,我们使用CompletableFuture将异步转为同步,阻塞代码responseFuture.get(),直到拿到了结果才会继续往下执行

CompletableFuture<RpcResponse> responseFuture = new CompletableFuture<>();
netClient.connect(metaInfo.getServicePort(), metaInfo.getServiceHost(), res -> {
// 接收响应
TcpBufferHandlerWrapper tcpBufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
//返回处理结果
responseFuture.complete(responseProtocolMessage.getBody());
});
socket.handler(tcpBufferHandlerWrapper);
});
RpcResponse rpcResponse = null;
rpcResponse = responseFuture.get(5, TimeUnit.SECONDS);

半包粘包

什么是半包粘包?

举例:

如果我们客户端要发送的消息为hello,cxk!hello,cxk!

  • 半包:收到消息少了,例如hello,cxk!
  • 粘包:收到消息多了,例如hello,cxk!hello,cxk!hello,cxk!

如何解决半包粘包问题?

如何解决半包?

我们在消息头中已经设置了请求体的长度,在服务端接收的时候,判断每次消息的长度是否符合我们的预期,如果消息不完整,那么我们就留到下一次再读取

如何解决粘包问题

解决思路类似,我们每次只读取指定长度的数据,超过的长度留到下一次接收消息的时候再读取

在Vert.x中,我们可以使用内置的RecordParser来解决半包和粘包问题,它可以保证下次读取到特定长度的字符,这是我们解决半包粘包问题的基础。

具体为:RecordParser.newFixed(len)

我们封装一个TcpBufferHandlerWrapper类,这里我们使用了设计模式中的装饰者模式,使用RecordParser来对原来的Buffer处理器功能进行增强

public class TcpBufferHandlerWrapper implements Handler<Buffer> {
private final RecordParser recordParser;

public TcpBufferHandlerWrapper(Handler<Buffer> bufferHandler) {
this.recordParser = initRecordParser(bufferHandler);
}

@Override
public void handle(Buffer buffer) {
recordParser.handle(buffer);
}

private RecordParser initRecordParser(Handler<Buffer> bufferHandler) {
// 构造parser
RecordParser parser = RecordParser.newFixed(ProtocolConstant.MESSAGE_HEADER_LENGTH);
parser.setOutput(new Handler<Buffer>() {
// 初始化
int size = -1;
Buffer resultBuffer = Buffer.buffer();

@Override
public void handle(Buffer buffer) {
if (size == -1) {
// 读取消息体的长度
size = buffer.getInt(13);
parser.fixedSizeMode(size);
// 写入头信息到结果
resultBuffer.appendBuffer(buffer);
} else {
// 写入体信息到结果
resultBuffer.appendBuffer(buffer);
// 已拼接为完整的Buffer,执行处理
bufferHandler.handle(resultBuffer);
// 重置parser
parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
size = -1;
resultBuffer = Buffer.buffer();
}
}
});
return parser;
}
}

TcpBufferHandlerWrapper 类是一个 Vert.x 的 Handler<Buffer> 实现,用于处理从 TCP 连接中接收到的二进制数据。它的主要功能如下:

  1. 在构造函数中初始化一个 RecordParser 对象,用于解析接收到的数据。

  2. 实现 handle(Buffer buffer) 方法,将接收到的二进制数据传递给 RecordParser 进行处理。

  3. initRecordParser() 方法中实现了 RecordParser 的初始化逻辑:

    • 创建一个新的 RecordParser 实例,并设置其固定长度为 ProtocolConstant.MESSAGE_HEADER_LENGTH(消息头长度)。
    • 设置 RecordParser 的输出处理器,该处理器内部实现了以下逻辑:
      • 首先读取消息体的长度,并设置 RecordParser 的固定长度模式为该长度。
      • 将消息头部分数据写入临时缓冲区 resultBuffer
      • 当接收到完整的消息体数据后,将整个消息写入 resultBuffer,并将其传递给外部处理器进行处理。
      • 重置 RecordParser 的固定长度模式为消息头长度,并清空 resultBuffer

这个 TcpBufferHandlerWrapper 类的作用是将从 TCP 连接中接收到的二进制数据流,按照消息协议的格式进行解析,并将解析后的完整消息传递给外部处理器进行处理。

            if (size == -1) {
// 读取消息体的长度
size = buffer.getInt(13);
parser.fixedSizeMode(size);
// 写入头信息到结果
resultBuffer.appendBuffer(buffer);
} else {
// 写入体信息到结果
resultBuffer.appendBuffer(buffer);
// 已拼接为完整的Buffer,执行处理
bufferHandler.handle(resultBuffer);
// 重置parser
parser.fixedSizeMode(ProtocolConstant.MESSAGE_HEADER_LENGTH);
size = -1;
resultBuffer = Buffer.buffer();
}

在这段代码中,如果size==-1,那么我们就读取从位置13开始的数据,读一个int,这个数据就是消息体的长度。

等到下次size不等于-1了,因为我们设置了parser.fixedSizeMode(size);就是保证可以读取size 长度,也就是下次一定可以读取到整个消息体内容,读完了之后再重置一下。

MESSAGE_HEADER_LENGTH=17,在初始化设置为这样就是为了读取整个消息头的内容,然后可以获取消息体的长度。

因为我们在消息结构是这样设计的:消息结构代码

具体使用:

TcpServerHandler进行增强

public class TcpServerHandler implements Handler<NetSocket> {
@Override
public void handle(NetSocket socket) {
TcpBufferHandlerWrapper bufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
//...
socket.handler(bufferHandlerWrapper);
}
}

VertxTcpClient进行增强

public class VertxTcpClient {
netClient.connect(metaInfo.getServicePort(), metaInfo.getServiceHost(), res -> {
// 接收响应
TcpBufferHandlerWrapper tcpBufferHandlerWrapper = new TcpBufferHandlerWrapper(buffer -> {
});
socket.handler(tcpBufferHandlerWrapper);
});
}