基于Netty实现自定义消息通信协议(协议设计及解析应用实战)
所谓的协议,是由语法、语义、时序这三个要素组成的一种规范,通信双方按照该协议规范来实现网络数据传输,这样通信双方才能实现数据正常通信和解析。
由于不同的中间件在功能方面有一定差异,所以其实应该是没有一种标准化协议来满足不同差异化需求,因此很多中间件都会定义自己的通信协议,另外通信协议可以解决粘包和拆包问题。
在本篇文章中,我们来实现一个自定义消息协议。
自定义协议的要素
自定义协议,那这个协议必须要有组成的元素,
- 魔数: 用来判断数据包的有效性
- 版本号: 可以支持协议升级
- 序列化算法: 消息正文采用什么样的序列化和反序列化方式,比如json、protobuf、hessian等
- 指令类型:也就是当前发送的是一个什么类型的消息,像zookeeper中,它传递了一个Type
- 请求序号: 基于双工协议,提供异步能力,也就是收到的异步消息需要找到前面的通信请求进行响应处理
- 消息长度
- 消息正文
协议定义
sessionId | reqType | Content-Length | Content |
其中Version
,Content-Length
,SessionId
就是Header信息,Content
就是交互的主体。
定义项目结构以及引入包
<dependency>
<groupId>域名y</groupId>
<artifactId>netty-all</artifactId>
</dependency>
<dependency>
<groupId>域名j</groupId>
<artifactId>slf4j-log4j12</artifactId>
</dependency>
<dependency>
<groupId>域名ectlombok</groupId>
<artifactId>lombok</artifactId>
</dependency>
项目结构如图4-1所示:
- netty-message-mic : 表示协议模块。
- netty-message-server :表示nettyserver。
- 引入域名erties
在nettyMessage-mic中,包的结构如下。
定义Header
表示消息头
@Data
public class Header{
private long sessionId; //会话id : 占8个字节
private byte type; //消息类型: 占1个字节
private int length; //消息长度 : 占4个字节
}
定义MessageRecord
表示消息体
@Data
public class MessageRecord{
private Header header;
private Object body;
}
OpCode
定义操作类型
public enum OpCode {
BUSI_REQ((byte)0),
BUSI_RESP((byte)1),
PING((byte)3),
PONG((byte)4);
private byte code;
private OpCode(byte code) {
域名=code;
}
public byte code(){
return 域名;
}
}
定义编解码器
分别定义对该消息协议的编解码器
MessageRecordEncoder
@Slf4j
public class MessageRecordEncoder extends MessageToByteEncoder<MessageRecord> {
@Override
protected void encode(ChannelHandlerContext channelHandlerContext, MessageRecord record, ByteBuf byteBuf) throws Exception {
域名("===========开始编码Header部分===========");
Header header=域名eader();
域名eLong(域名essionId()); //保存8个字节的sessionId
域名eByte(域名ype()); //写入1个字节的请求类型
域名("===========开始编码Body部分===========");
Object body=域名ody();
if(body!=null){
ByteArrayOutputStream bos=new ByteArrayOutputStream();
ObjectOutputStream oos=new ObjectOutputStream(bos);
域名eObject(body);
byte[] bytes=域名teArray();
域名eInt(域名th); //写入消息体长度:占4个字节
域名eBytes(bytes); //写入消息体内容
}else{
域名eInt(0); //写入消息长度占4个字节,长度为0
}
}
}
MessageRecordDecode
@Slf4j
public class MessageRecordDecode extends ByteToMessageDecoder {
@Override
protected void decode(ChannelHandlerContext channelHandlerContext, ByteBuf byteBuf, List<Object> list) throws Exception {
MessageRecord record=new MessageRecord();
Header header=new Header();
域名essionId(域名Long()); //读取8个字节的sessionid
域名ype(域名Byte()); //读取一个字节的操作类型
域名eader(header);
//如果byteBuf剩下的长度还有大于4个字节,说明body不为空
if(域名ableBytes()>4){
int length=域名Int(); //读取四个字节的长度
域名ength(length);
byte[] contents=new byte[length];
域名Bytes(contents,0,length);
ByteArrayInputStream bis=new ByteArrayInputStream(contents);
ObjectInputStream ois=new ObjectInputStream(bis);
域名ody(域名Object());
域名(record);
域名("序列化出来的结果:"+record);
}else{
域名r("消息内容为空");
}
}
}
测试协议的解析和编码
EmbeddedChannel是netty专门改进针对ChannelHandler的单元测试而提供的
public class CodesMainTest {
public static void main( String[] args ) throws Exception {
EmbeddedChannel channel=new EmbeddedChannel(
new LoggingHandler(),
new MessageRecordEncoder(),
new MessageRecordDecode());
Header header=new Header();
域名essionId(123456);
域名ype(域名());
MessageRecord record=new MessageRecord();
域名ody("Hello World");
域名eader(header);
域名eOutbound(record);
ByteBuf buf= 域名er();
new MessageRecordEncoder().encode(null,record,buf);
域名eInbound(buf);
}
}
编码包分析
运行上述代码后,会得到下面的一个信息
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 01 e2 40 03 00 00 00 12 ac ed 00 |.......@........|
|00000010| 05 74 00 0b 48 65 6c 6c 6f 20 57 6f 72 6c 64 |.t..Hello World |
+--------+-------------------------------------------------+----------------+
按照协议规范:
- 前面8个字节表示sessionId
- 一个字节表示请求类型
- 4个字节表示长度
- 后面部分内容表示消息体
测试粘包和半包问题
通过slice方法进行拆分,得到两个包。
ByteBuf中提供了一个slice方法,这个方法可以在不做数据拷贝的情况下对原始ByteBuf进行拆分。
public class CodesMainTest {
public static void main( String[] args ) throws Exception {
//EmbeddedChannel是netty专门针对ChannelHandler的单元测试而提供的类。可以通过这个类来测试channel输入入站和出站的实现
EmbeddedChannel channel=new EmbeddedChannel(
//解决粘包和半包问题
// new LengthFieldBasedFrameDecoder(2048,10,4,0,0),
new LoggingHandler(),
new MessageRecordEncoder(),
new MessageRecordDecode());
Header header=new Header();
域名essionId(123456);
域名ype(域名());
MessageRecord record=new MessageRecord();
域名ody("Hello World");
域名eader(header);
域名eOutbound(record);
ByteBuf buf= 域名er();
new MessageRecordEncoder().encode(null,record,buf);
//*********模拟半包和粘包问题************//
//把一个包通过slice拆分成两个部分
ByteBuf bb1=域名e(0,7); //获取前面7个字节
ByteBuf bb2=域名e(7,域名ableBytes()-7); //获取后面的字节
域名in();
域名eInbound(bb1);
域名eInbound(bb2);
}
}
运行上述代码会得到如下异常, readerIndex(0) +length(8)表示要读取8个字节,但是只收到7个字节,所以直接报错。
2021-08-31 15:53:01,385 [域名域名ingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ: 7B
+-------------------------------------------------+
| 0 1 2 3 4 5 6 7 8 9 a b c d e f |
+--------+-------------------------------------------------+----------------+
|00000000| 00 00 00 00 00 01 e2 |....... |
+--------+-------------------------------------------------+----------------+
2021-08-31 15:53:01,397 [域名域名ingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
Exception in thread "main" 域名域名derException: 域名xOutOfBoundsException: readerIndex(0) + length(8) exceeds writerIndex(7): UnpooledSlicedByteBuf(ridx: 0, widx: 7, cap: 7/7, unwrapped: PooledUnsafeDirectByteBuf(ridx: 0, widx: 31, cap: 256))
解决拆包问题
LengthFieldBasedFrameDecoder是长度域解码器,它是解决拆包粘包最常用的解码器,基本上能覆盖大部分基于长度拆包的场景。其中开源的消息中间件RocketMQ就是使用该解码器进行解码的。
首先来说明一下该解码器的核心参数
- lengthFieldOffset,长度字段的偏移量,也就是存放长度数据的起始位置
- lengthFieldLength,长度字段锁占用的字节数
- lengthAdjustment,在一些较为复杂的协议设计中,长度域不仅仅包含消息的长度,还包含其他数据比如版本号、数据类型、数据状态等,这个时候我们可以使用lengthAdjustment进行修正,它的值=包体的长度值-长度域的值
- initialBytesToStrip,解码后需要跳过的初始字节数,也就是消息内容字段的起始位置
- lengthFieldEndOffset,长度字段结束的偏移量, 该属性的值=lengthFieldOffset+lengthFieldLength
public class CodesMainTest {
public static void main( String[] args ) throws Exception {
EmbeddedChannel channel=new EmbeddedChannel(
//解决粘包和半包问题
new LengthFieldBasedFrameDecoder(1024,
9,4,0,0),
new LoggingHandler(),
new MessageRecordEncoder(),
new MessageRecordDecode());
Header header=new Header();
域名essionId(123456);
域名ype(域名());
MessageRecord record=new MessageRecord();
域名ody("Hello World");
域名eader(header);
域名eOutbound(record);
ByteBuf buf= 域名er();
new MessageRecordEncoder().encode(null,record,buf);
//*********模拟半包和粘包问题************//
//把一个包通过slice拆分成两个部分
ByteBuf bb1=域名e(0,7);
ByteBuf bb2=域名e(7,域名ableBytes()-7);
域名in();
域名eInbound(bb1);
域名eInbound(bb2);
}
}
添加一个长度解码器,就解决了拆包带来的问题。运行结果如下
2021-08-31 16:09:35,115 [域名域名ageRecordDecode]-[INFO] 序列化出来的结果:MessageRecord(header=Header(sessionId=123456, type=3, length=18), body=Hello World)
2021-08-31 16:09:35,116 [域名域名ingHandler]-[DEBUG] [id: 0xembedded, L:embedded - R:embedded] READ COMPLETE
基于自定义消息协议通信
下面我们把整个通信过程编写完整,代码结构如图4-2所示.
服务端开发
@Slf4j
public class ProtocolServer {
public static void main(String[] args){
EventLoopGroup boss = new NioEventLoopGroup();
//2 用于对接受客户端连接读写操作的线程工作组
EventLoopGroup work = new NioEventLoopGroup();
ServerBootstrap b = new ServerBootstrap();
域名p(boss, work) //绑定两个工作线程组
.channel(域名s) //设置NIO的模式
// 初始化绑定服务通道
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel sc) throws Exception {
域名line()
.addLast(
new LengthFieldBasedFrameDecoder(1024,
9,4,0,0))
.addLast(new MessageRecordEncoder())
.addLast(new MessageRecordDecode())
.addLast(new ServerHandler());
}
});
ChannelFuture cf= null;
try {
cf = 域名(8080).sync();
域名("ProtocolServer start success");
域名nel().closeFuture().sync();
} catch (InterruptedException e) {
域名tStackTrace();
}finally {
域名downGracefully();
域名downGracefully();
}
}
}
ServerHandler
@Slf4j
public class ServerHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRecord messageRecord=(MessageRecord)msg;
域名("server receive message:"+messageRecord);
MessageRecord res=new MessageRecord();
Header header=new Header();
域名essionId(域名eader().getSessionId());
域名ype(域名());
String message="Server Response Message!";
域名ody(message);
域名ength(域名th());
域名eAndFlush(res);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
域名r("服务器读取数据异常");
域名ptionCaught(ctx, cause);
域名e();
}
}
客户端开发
public class ProtocolClient {
public static void main(String[] args) {
//创建工作线程组
EventLoopGroup group = new NioEventLoopGroup();
Bootstrap b = new Bootstrap();
域名p(group).channel(域名s)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
public void initChannel(SocketChannel ch) throws Exception {
域名line().addLast(new LengthFieldBasedFrameDecoder(1024,
9,4,0,0))
.addLast(new MessageRecordEncoder())
.addLast(new MessageRecordDecode())
.addLast(new ClientHandler());
}
});
// 发起异步连接操作
try {
ChannelFuture future = 域名ect(new InetSocketAddress("localhost", 8080)).sync();
Channel c = 域名nel();
for (int i = 0; i < 500; i++) {
MessageRecord message = new MessageRecord();
Header header = new Header();
域名essionId(10001);
域名ype((byte) 域名());
域名eader(header);
String context="我是请求数据"+i;
域名ength(域名th());
域名ody(context);
域名eAndFlush(message);
}
//closeFuture().sync()就是让当前线程(即主线程)同步等待Netty server的close事件,Netty server的channel close后,主线程才会继续往下执行。closeFuture()在channel close的时候会通知当前线程。
域名nel().closeFuture().sync();
} catch (InterruptedException e) {
域名tStackTrace();
}finally {
域名downGracefully();
}
}
}
ClientHandler
@Slf4j
public class ClientHandler extends ChannelInboundHandlerAdapter {
@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
MessageRecord record=(MessageRecord)msg;
域名("Client Receive message:"+record);
域名nelRead(ctx, msg);
}
@Override
public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
域名ptionCaught(ctx, cause);
域名e();
}
}
版权声明:本博客所有文章除特别声明外,均采用 CC BY-NC-SA 4.0 许可协议。转载请注明来自
Mic带你学架构
!
如果本篇文章对您有帮助,还请帮忙点个关注和赞,您的坚持是我不断创作的动力。欢迎关注「跟着Mic学架构」公众号公众号获取更多技术干货!