目录:
- LengthFieldBasedFrameDecoder解码器概述
- LengthFieldBasedFrameDecoder解码器Demo
- 服务端解码描述
- 客户端编码描述
- 总结
1. LengthFieldBasedFrameDecoder解码器概述
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18
| public class RequestParam {
private byte type;
private byte flag;
private int length;
private String body; }
|
直接上数据包的Demo来即系下对应的参数。可以看到如下对应的有3个标记字段即type、flag、length。实际消息则为body。那可以看到type、flag、length对应一共占多少个字节呢?1(type)+1(flag)+4(length),也就是说这些标记类型的字段则为6个字节。
如果使用LengthFieldBasedFrameDecoder来解析上面的这个消息,我们来看看到底该怎么使用这些参数。
1 2 3
| public LengthFieldBasedFrameDecoder( int maxFrameLength, int lengthFieldOffset, int lengthFieldLength)
|
使用这个最简单的模型。
maxFrameLength:表示的是包的最大长度。65534代表数据发送最大64m。
lengthFieldOffset:表示跳过2个字节之后的才是长度域(lengthFieldLength),即代表我们需要往后移动2位之后才到长度域(lengthFieldLength)。因为大家可以看到上面定义的消息type、flag是两个字节,即需要往后偏移两个字节之后才能获取到消息体的长度
lengthFieldLength:记录该帧数据长度的字段本身的长度,即消息体的长度。获取到消息体的长度之后,那么就好获取对应的真实消息体内容了
2. LengthFieldBasedFrameDecoder解码器Demo
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24
|
public class RequestParam {
private byte type;
private byte flag;
private int length;
private String body; }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27 28 29
| public class RcpServer {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup bossGroup = new NioEventLoopGroup(1); EventLoopGroup workerGroup = new NioEventLoopGroup(); try { ServerBootstrap b = new ServerBootstrap(); b.group(bossGroup, workerGroup) .channel(NioServerSocketChannel.class) .childOption(ChannelOption.TCP_NODELAY, true) .childHandler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 2, 4)); ch.pipeline().addLast(new RequestDecoder()); ch.pipeline().addLast(new RequestHandler()); } }); ChannelFuture channelFuture = b.bind(6379).sync(); channelFuture.channel().closeFuture().sync(); }finally { bossGroup.shutdownGracefully(); workerGroup.shutdownGracefully(); }
} }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26
|
public class RequestDecoder extends ByteToMessageDecoder {
@Override protected void decode(ChannelHandlerContext ctx, ByteBuf in, List<Object> out) throws Exception { byte type = in.readByte(); byte flag = in.readByte(); int length = in.readInt(); String body = null; if (length > 0){ ByteBuf buf = in.readBytes(length); byte[] req = new byte[buf.readableBytes()]; buf.readBytes(req); body = new String(req, "UTF-8"); } RequestParam requestParam = new RequestParam(type,flag,length,body); out.add(requestParam); }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23
| public class RequestHandler extends ChannelInboundHandlerAdapter {
@Override public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception { if (msg instanceof RequestParam) { RequestParam requestParam = (RequestParam) msg; switch (requestParam.getFlag()){ case 1: System.out.println("this is beat message received msg := " + requestParam.toString()); break; case 2: System.out.println("this is business message received msg := " + requestParam.toString()); break; default: System.out.println("error message " + requestParam.toString()); break; } } else { System.out.println("error"); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14 15 16 17 18 19 20 21 22 23 24 25 26 27
| public class RcpClient {
public static void main(String[] args) throws InterruptedException {
EventLoopGroup group = new NioEventLoopGroup();
try { Bootstrap b = new Bootstrap(); b.group(group).channel(NioSocketChannel.class) .option(ChannelOption.TCP_NODELAY, true) .handler(new ChannelInitializer<SocketChannel>() { @Override protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestEncoder()); } }); ChannelFuture channelFuture = b.connect("127.0.0.1", 6379).sync(); String body = "this message is from client"; channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 2, body.length(), body)); System.out.println("message already send."); channelFuture.channel().closeFuture().sync(); } finally { group.shutdownGracefully(); } }
}
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class RequestEncoder extends MessageToByteEncoder<RequestParam> {
@Override protected void encode(ChannelHandlerContext ctx, RequestParam requestParam, ByteBuf out) throws Exception { if( requestParam == null){ throw new Exception("request param null"); } out.writeByte(requestParam.getType()); out.writeByte(requestParam.getFlag()); out.writeInt(requestParam.getLength()); out.writeBytes(requestParam.getBody().getBytes()); }
}
|
启动服务端,然后通过客户端发送消息后最终输出结果:
3.服务端解码描述
1 2 3 4 5 6
| protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new LengthFieldBasedFrameDecoder(65536, 2, 4)); ch.pipeline().addLast(new RequestDecoder()); ch.pipeline().addLast(new RequestHandler()); }
|
可以看到服务端这里先是添加的长度解码器LengthFieldBasedFrameDecoder,由它解码之后再把解码好的ByteBuf也就是一个完整的数据包传播给RequestDecoder,在这里在对ByteBuf进行业务解码,最终再把解码后的消息传播给Handler进行处理。
4.客户端编码描述
1 2 3
| protected void initChannel(SocketChannel ch) throws Exception { ch.pipeline().addLast(new RequestEncoder()); }
|
1 2 3 4 5 6 7 8 9 10 11 12 13 14
| public class RequestEncoder extends MessageToByteEncoder<RequestParam> {
@Override protected void encode(ChannelHandlerContext ctx, RequestParam requestParam, ByteBuf out) throws Exception { if( requestParam == null){ throw new Exception("request param null"); } out.writeByte(requestParam.getType()); out.writeByte(requestParam.getFlag()); out.writeInt(requestParam.getLength()); out.writeBytes(requestParam.getBody().getBytes()); }
}
|
1
| channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 2, body.length(), body));
|
可以看到从Client通过Channel写数据到通道。writeAndFlush然后经过编码,把RequestParam进行编码。尤其注意看一定要按照对应的顺序来写数据包。
第一个字节一定要是type:out.writeByte(requestParam.getType());
第二给字节一定要是flag:out.writeByte(requestParam.getFlag());
然后从第三字节到第6个字节为消息体的长度。int型(4字节):out.writeInt(requestParam.getLength());
最后再从第7个字节写入消息体:out.writeBytes(requestParam.getBody().getBytes());
按照这么一个顺序写入之后,通过LengthFieldBasedFrameDecoder解码器的一些参数,就知道如何读取截取消息
5.总结
以上相当于模拟一个Rpc框架客户端调用服务端这么一个逻辑。
当然Rpc框架的消息肯定更加负责,但是主要应该是在消息头或消息体做文章,主要还是看怎么去解析与定于这么一个消息包。比如需要增加对应接口名称、方法名称,另外消息体上的参数。那么当服务器接收到这么一个消息后进行解析然后定位到注入的Service,比如Spring的管理对象然在再通过动态代理的方式去调用它们的方法。
不过上面还仅仅只是最简单的使用LengthFieldBasedFrameDecoder,像咱们的这个demo可以看到它在decoder的时候还是会有对应的长度出来,其实也可以通过在实例化LengthFieldBasedFrameDecoder的时候只让消息体往后传播如:
而这里的第五个参数就是initialBytesToStrip,这里为6,表示获取完一个完整的数据包之后,忽略前面的6个字节,应用解码器拿到的就是不带长度域的数据包。即直接打到消息体body。
当然他还有其他属性如lengthAdjestment,但是只要理解上面的这个demo,然后在这个基础之上在去理解其他的参数。其实也就是换着法子来怎么解析你传递过来的消息。这个规则都是咱们小马哥定义的。