Netty 解码器LengthFieldBasedFrameDecoder Demo(六)

目录:

  1. LengthFieldBasedFrameDecoder解码器概述
  2. LengthFieldBasedFrameDecoder解码器Demo
  3. 服务端解码描述
  4. 客户端编码描述
  5. 总结

1. LengthFieldBasedFrameDecoder解码器概述

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RequestParam {
/**
* 类型数值以后扩展1,2,3......
*/
private byte type;
/**
* 信息标志 1:表示心跳包 2:业务信息包
*/
private byte flag;
/**
* 请求消息内容长度
*/
private int length;
/**
* 请求消息体
*/
private String body;
}

直接上数据包的Demo来即系下对应的参数。可以看到如下对应的有3个标记字段即type、flag、length。实际消息则为body。那可以看到type、flag、length对应一共占多少个字节呢?1(type)+1(flag)+4(length),也就是说这些标记类型的字段则为6个字节。

如果使用LengthFieldBasedFrameDecoder来解析上面的这个消息,我们来看看到底该怎么使用这些参数。

1
2
3
public LengthFieldBasedFrameDecoder(
int maxFrameLength,
int lengthFieldOffset, int lengthFieldLength)

使用这个最简单的模型。

maxFrameLength:表示的是包的最大长度。65534代表数据发送最大64m。

lengthFieldOffset:表示跳过2个字节之后的才是长度域(lengthFieldLength),即代表我们需要往后移动2位之后才到长度域(lengthFieldLength)。因为大家可以看到上面定义的消息type、flag是两个字节,即需要往后偏移两个字节之后才能获取到消息体的长度

lengthFieldLength:记录该帧数据长度的字段本身的长度,即消息体的长度。获取到消息体的长度之后,那么就好获取对应的真实消息体内容了

2. LengthFieldBasedFrameDecoder解码器Demo

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* +--------+--------------------+
* | 1 | 1 | 4 | ? |
* | type | flag | length | body |
* +--------+--------------------+
*/
public class RequestParam {
/**
* 类型数值以后扩展1,2,3......
*/
private byte type;
/**
* 信息标志 1:表示心跳包 2:业务信息包
*/
private byte flag;
/**
* 请求消息内容长度
*/
private int length;
/**
* 请求消息体
*/
private String body;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
public class RcpServer {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();
try {
ServerBootstrap b = new ServerBootstrap();
b.group(bossGroup, workerGroup)
.channel(NioServerSocketChannel.class)
.childOption(ChannelOption.TCP_NODELAY, true)
.childHandler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,
2, 4));
ch.pipeline().addLast(new RequestDecoder());
ch.pipeline().addLast(new RequestHandler());
}
});
ChannelFuture channelFuture = b.bind(6379).sync();
channelFuture.channel().closeFuture().sync();
}finally {
bossGroup.shutdownGracefully();
workerGroup.shutdownGracefully();
}

}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
/*
* +--------+--------------------+
* | 1 | 1 | 4 | ? |
* | type | flag | length | body |
* +--------+--------------------+
*/
public class RequestDecoder extends ByteToMessageDecoder {

@Override
protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception {
//注意在读的过程中,readIndex的指针也在移动
byte type = in.readByte();
byte flag = in.readByte();
int length = in.readInt();
String body = null;
if (length > 0){
ByteBuf buf = in.readBytes(length);
byte[] req = new byte[buf.readableBytes()];
buf.readBytes(req);
body = new String(req, "UTF-8");
}
RequestParam requestParam = new RequestParam(type,flag,length,body);
out.add(requestParam);
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
public class RequestHandler extends ChannelInboundHandlerAdapter {

@Override
public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
if (msg instanceof RequestParam) {
RequestParam requestParam = (RequestParam) msg;
switch (requestParam.getFlag()){
case 1:
System.out.println("this is beat message received msg := " + requestParam.toString());
break;
case 2:
System.out.println("this is business message received msg := " + requestParam.toString());
break;
default:
System.out.println("error message " + requestParam.toString());
break;
}
} else {
System.out.println("error");
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
public class RcpClient {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestEncoder());
}
});
ChannelFuture channelFuture = b.connect("127.0.0.1", 6379).sync();
String body = "this message is from client";
channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 2, body.length(), body));
System.out.println("message already send.");
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RequestEncoder extends MessageToByteEncoder<RequestParam> {

@Override
protected void encode(ChannelHandlerContext ctx, RequestParam requestParam, ByteBuf out) throws Exception {
if( requestParam == null){
throw new Exception("request param null");
}
out.writeByte(requestParam.getType());
out.writeByte(requestParam.getFlag());
out.writeInt(requestParam.getLength());
out.writeBytes(requestParam.getBody().getBytes());
}

}

启动服务端,然后通过客户端发送消息后最终输出结果:

3.服务端解码描述

1
2
3
4
5
6
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536,
2, 4));
ch.pipeline().addLast(new RequestDecoder());
ch.pipeline().addLast(new RequestHandler());
}

可以看到服务端这里先是添加的长度解码器LengthFieldBasedFrameDecoder,由它解码之后再把解码好的ByteBuf也就是一个完整的数据包传播给RequestDecoder,在这里在对ByteBuf进行业务解码,最终再把解码后的消息传播给Handler进行处理。

4.客户端编码描述

1
2
3
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestEncoder());
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
public class RequestEncoder extends MessageToByteEncoder<RequestParam> {

@Override
protected void encode(ChannelHandlerContext ctx, RequestParam requestParam, ByteBuf out) throws Exception {
if( requestParam == null){
throw new Exception("request param null");
}
out.writeByte(requestParam.getType());
out.writeByte(requestParam.getFlag());
out.writeInt(requestParam.getLength());
out.writeBytes(requestParam.getBody().getBytes());
}

}
1
channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 2, body.length(), body));

可以看到从Client通过Channel写数据到通道。writeAndFlush然后经过编码,把RequestParam进行编码。尤其注意看一定要按照对应的顺序来写数据包。

第一个字节一定要是type:out.writeByte(requestParam.getType());

第二给字节一定要是flag:out.writeByte(requestParam.getFlag());

然后从第三字节到第6个字节为消息体的长度。int型(4字节):out.writeInt(requestParam.getLength());

最后再从第7个字节写入消息体:out.writeBytes(requestParam.getBody().getBytes());

按照这么一个顺序写入之后,通过LengthFieldBasedFrameDecoder解码器的一些参数,就知道如何读取截取消息

5.总结

以上相当于模拟一个Rpc框架客户端调用服务端这么一个逻辑。

当然Rpc框架的消息肯定更加负责,但是主要应该是在消息头或消息体做文章,主要还是看怎么去解析与定于这么一个消息包。比如需要增加对应接口名称、方法名称,另外消息体上的参数。那么当服务器接收到这么一个消息后进行解析然后定位到注入的Service,比如Spring的管理对象然在再通过动态代理的方式去调用它们的方法。

不过上面还仅仅只是最简单的使用LengthFieldBasedFrameDecoder,像咱们的这个demo可以看到它在decoder的时候还是会有对应的长度出来,其实也可以通过在实例化LengthFieldBasedFrameDecoder的时候只让消息体往后传播如:

而这里的第五个参数就是initialBytesToStrip,这里为6,表示获取完一个完整的数据包之后,忽略前面的6个字节,应用解码器拿到的就是不带长度域的数据包。即直接打到消息体body。

当然他还有其他属性如lengthAdjestment,但是只要理解上面的这个demo,然后在这个基础之上在去理解其他的参数。其实也就是换着法子来怎么解析你传递过来的消息。这个规则都是咱们小马哥定义的。

分享到 评论