Netty 编码 Encoder(七)

mark

目录:

  1. Netty编码概述
  2. Encoder Demo举例
  3. MessageToByteEncoder
  4. writeAndFlush()方法
  5. 总结

如何把对象也就是咱们业务当中的对象转换为字节流,最终写入到socket底层?

1. Netty编码概述

1
2
channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 1, body.length(), body));
channelFuture.channel().writeAndFlush(new RequestParam((byte) 2, (byte) 2, body.length(), body));

以上这个就是通过Netty中的channel把对象写入到流中,但是这里仅仅还只是把RequestParam这个对象输入到Netty的Outbound传播时间当中, 也就是相当触发了pipeline当中的writeAndFlush。

writeAndFlush方法具体流程

  • 从tail节点开始往前传播
  • 逐个调用channelHandler的write方法
  • 逐个调用channelHandler的flush方法

2. MessageToByteEncoder

根据上面的代码通过writeAndflush写入到事件中。

NioSocketChannel当中的writeAndFlush方法如下:

1
2
3
public ChannelFuture writeAndFlush(Object msg) {
return pipeline.writeAndFlush(msg);
}

ChannelPipeline当中的writeAndFlush方法如下:

1
2
3
public ChannelFuture writeAndFlush(Object msg) {
return tail.writeAndFlush(msg);
}

根据上面的Channel以及对应的Pipeline中可以看到,Netty当中的把对象写入到传播事件中,首先会经过Tail,也就是咱们之前说的Outbound,它是先触达到末尾,然后再往上传播到你注册的Decoder,最终通过自定义的Decoder把相关的字段写入到通道中。

相当于从上面的流程触发,最终到MessageToByteEncoder。

这里最终又是到达了HeadContext的write方法,然后再通过Unsafe类来操作具体的write和flush。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
public abstract class MessageToByteEncoder<I> extends ChannelOutboundHandlerAdapter {
private final TypeParameterMatcher matcher;
private final boolean preferDirect;

protected MessageToByteEncoder() {
this(true);
}

protected MessageToByteEncoder(Class<? extends I> outboundMessageType) {
this(outboundMessageType, true);
}

protected MessageToByteEncoder(boolean preferDirect) {
this.matcher = TypeParameterMatcher.find(this, MessageToByteEncoder.class, "I");
this.preferDirect = preferDirect;
}

protected MessageToByteEncoder(Class<? extends I> outboundMessageType, boolean preferDirect) {
this.matcher = TypeParameterMatcher.get(outboundMessageType);
this.preferDirect = preferDirect;
}

public boolean acceptOutboundMessage(Object msg) throws Exception {
return this.matcher.match(msg);
}

public void write(ChannelHandlerContext ctx, Object msg, ChannelPromise promise) throws Exception {
ByteBuf buf = null;

try {
// 判断是否可以处理该对象,匹配对象,
// 如果不能则else,再把它进行往下传播write
if (this.acceptOutboundMessage(msg)) {
I cast = msg;
// 分配内存空间 默认堆外内存
if (this.preferDirect) {
buf = ctx.alloc().ioBuffer();
} else {
buf = ctx.alloc().heapBuffer();
}

try {
// 编码实现,把转换的内容填充到buf
this.encode(ctx, cast, buf);
} finally {
//释放对象
ReferenceCountUtil.release(msg);
}

// buf写了数据之后,就代表是可读状态了
if (buf.isReadable()) {
// 再继续往下传播。这个时候就会传播到我们的head节点
ctx.write(buf, promise);
} else {
buf.release();
ctx.write(Unpooled.EMPTY_BUFFER, promise);
}

buf = null;
} else {
ctx.write(msg, promise);
}
} catch (EncoderException var17) {
throw var17;
} catch (Throwable var18) {
throw new EncoderException(var18);
} finally {
if (buf != null) {
buf.release();
}

}

}

protected abstract void encode(ChannelHandlerContext var1, I var2, ByteBuf var3) throws Exception;

2. Encoder Demo

可以看如下的Demo代码。发送的协议消息固定字节长度为6,一个变动长度body。

可以看到type、flag、length这3个字段加一起一共6个字节。length当中则记录了最终消息体body的长度。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
/*
* +--------+--------------------+
* | 1 | 1 | 4 | ? |
* | type | flag | length | body |
* +--------+--------------------+
*/
public class RequestParam {
/**
* 类型数值以后扩展1,2,3......
*/
private byte type;
/**
* 信息标志 1:表示心跳包 2:业务信息包
*/
private byte flag;
/**
* 请求消息内容长度
*/
private int length;
/**
* 请求消息体
*/
private String body;
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
public class RequestEncoder extends MessageToByteEncoder<RequestParam> {

@Override
protected void encode(ChannelHandlerContext ctx, RequestParam requestParam, ByteBuf out) throws Exception {
if( requestParam == null){
throw new Exception("request param null");
}
// 写入1个字节
out.writeByte(requestParam.getType());
// 写入1个字节
out.writeByte(requestParam.getFlag());
// 写入4个字节
out.writeInt(requestParam.getLength());
// 写入body(变长,length则记录了body的长度)
out.writeBytes(requestParam.getBody().getBytes());
}

}

这里写入整个消息包,最终通过事件传播到encoder进行处理

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
public class RcpClient {

public static void main(String[] args) throws InterruptedException {

EventLoopGroup group = new NioEventLoopGroup();

try {
Bootstrap b = new Bootstrap();
b.group(group).channel(NioSocketChannel.class)
.option(ChannelOption.TCP_NODELAY, true)
.handler(new ChannelInitializer<SocketChannel>() {
@Override
protected void initChannel(SocketChannel ch) throws Exception {
ch.pipeline().addLast(new RequestEncoder());
}
});
ChannelFuture channelFuture = b.connect("127.0.0.1", 6379).sync();
String body = "this message is from client";
// 这里写入整个消息包,最终通过事件传播到encoder进行处理
channelFuture.channel().writeAndFlush(new RequestParam((byte) 1, (byte) 2, body.length(), body));
System.out.println("message already send.");
channelFuture.channel().closeFuture().sync();
} finally {
group.shutdownGracefully();
}
}

}

4.writeAndFlush()方法

HeadContext的writeAndFlush方法

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
public ChannelFuture writeAndFlush(Object msg, ChannelPromise promise) {
if (msg == null) {
throw new NullPointerException("msg");
}

if (!validatePromise(promise, true)) {
ReferenceCountUtil.release(msg);
// cancelled
return promise;
}

write(msg, true, promise);

return promise;
}

private void write(Object msg, boolean flush, ChannelPromise promise) {
AbstractChannelHandlerContext next = findContextOutbound();
final Object m = pipeline.touch(msg, next);
EventExecutor executor = next.executor();
if (executor.inEventLoop()) {
if (flush) {
next.invokeWriteAndFlush(m, promise);
} else {
next.invokeWrite(m, promise);
}
} else {
AbstractWriteTask task;
if (flush) {
task = WriteAndFlushTask.newInstance(next, m, promise);
} else {
task = WriteTask.newInstance(next, m, promise);
}
safeExecute(executor, task, promise, m);
}
}

private void invokeWriteAndFlush(Object msg, ChannelPromise promise) {
if (invokeHandler()) {
// 先走write
invokeWrite0(msg, promise);
// 再走flush
invokeFlush0();
} else {
writeAndFlush(msg, promise);
}
}
  1. write()

    • 把ByteBuf转换为堆外内存Direct
    • 插入写队列
    • 更新设置写状态
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    public final void write(Object msg, ChannelPromise promise) {
    // 判断是否EventLoop线程
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
    safeSetFailure(promise, WRITE_CLOSED_CHANNEL_EXCEPTION);
    // release message now to prevent resource-leak
    ReferenceCountUtil.release(msg);
    return;
    }

    int size;
    try {
    // 转换为directByteBuf,如果已经是堆外内存则直接返回
    msg = filterOutboundMessage(msg);
    size = pipeline.estimatorHandle().size(msg);
    if (size < 0) {
    size = 0;
    }
    } catch (Throwable t) {
    safeSetFailure(promise, t);
    ReferenceCountUtil.release(msg);
    return;
    }
    // 插入写队列并且设置写状态(重要)
    // 将已经转化为堆外内存的msg插入到写队列
    outboundBuffer.addMessage(msg, size, promise);
    }

    public void addMessage(Object msg, int size, ChannelPromise promise) {
    //包装消息,为写请求Entry
    Entry entry = Entry.newInstance(msg, size, total(msg), promise);
    if (tailEntry == null) {
    // 即将被消费的开始节点
    flushedEntry = null;
    // 最后一个节点
    tailEntry = entry;
    } else {
    Entry tail = tailEntry;
    tail.next = entry;
    tailEntry = entry;
    }
    if (unflushedEntry == null) {
    // 被添加的开始节点,但没有准备好被消费。
    unflushedEntry = entry;
    }

    // increment pending bytes after adding message to the unflushed arrays.
    // See https://github.com/netty/netty/issues/1619
    //在添加消息到未刷新写请求链表后,更新待发送的字节数 (这步时统计当前有多少字节需要被写出)
    incrementPendingOutboundBytes(size, false);
    }

    private void incrementPendingOutboundBytes(long size, boolean invokeLater) {
    if (size == 0) {
    return;
    }
    //TOTAL_PENDING_SIZE_UPDATER当前缓冲区里面有多少待写的字节
    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, size);
    //如果待发送的字节数,大于通道写buf大小,则更新通道可状态
    //getWriteBufferHighWaterMark() 最高不能超过64k
    if (newWriteBufferSize > channel.config().getWriteBufferHighWaterMark()) {
    //更新通道写状态
    setUnwritable(invokeLater);
    }
    }

    private void setUnwritable(boolean invokeLater) {
    // 这里通过自旋和cas操作, 传播一个ChannelWritabilityChanged事件, 最终会调用handler的channelWritabilityChanged方法进行处理
    for (;;) {
    final int oldValue = unwritable;
    final int newValue = oldValue | 1;
    if (UNWRITABLE_UPDATER.compareAndSet(this, oldValue, newValue)) {
    if (oldValue == 0 && newValue != 0) {
    //写状态改变,则触发通道ChannelWritabilityChanged事件
    fireChannelWritabilityChanged(invokeLater);
    }
    break;
    }
    }
    }
  2. flush()

    • 添加刷新标志并设置写状态
    • 遍历buffer队列,过滤ByteBuf
    • 调用jdk底层api进行自旋写
    1
    2
    3
    4
    5
    6
    7
    8
    9
    10
    11
    12
    13
    14
    15
    16
    17
    18
    19
    20
    21
    22
    23
    24
    25
    26
    27
    28
    29
    30
    31
    32
    33
    34
    35
    36
    37
    38
    39
    40
    41
    42
    43
    44
    45
    46
    47
    48
    49
    50
    51
    52
    53
    54
    55
    56
    57
    58
    59
    60
    61
    62
    63
    64
    65
    66
    67
    68
    69
    70
    71
    72
    73
    74
    75
    76
    77
    78
    79
    80
    81
    82
    83
    84
    85
    86
    87
    88
    89
    90
    91
    92
    93
    94
    95
    96
    97
    98
    99
    100
    101
    102
    103
    104
    105
    106
    107
    108
    109
    110
    111
    112
    113
    114
    115
    116
    117
    118
    119
    120
    121
    122
    123
    124
    125
    126
    127
    128
    129
    130
    131
    132
    133
    134
    135
    136
    137
    138
    139
    140
    141
    142
    143
    144
    145
    146
    147
    148
    149
    150
    151
    152
    153
    154
    155
    156
    157
    public final void flush() {
    assertEventLoop();

    ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null) {
    return;
    }
    // 添加刷新标志并设置写状态
    outboundBuffer.addFlush();
    flush0();
    }
    public void addFlush() {
    // There is no need to process all entries if there was already a flush before and no new messages
    // where added in the meantime.
    //
    // See https://github.com/netty/netty/issues/2577
    Entry entry = unflushedEntry;
    if (entry != null) {
    if (flushedEntry == null) {
    // there is no flushedEntry yet, so start with the entry
    flushedEntry = entry;
    }
    do {
    flushed ++;
    if (!entry.promise.setUncancellable()) {
    // Was cancelled so make sure we free up memory and notify about the freed bytes
    int pending = entry.cancel();
    decrementPendingOutboundBytes(pending, false, true);
    }
    entry = entry.next;
    } while (entry != null);

    // All flushed so reset unflushedEntry
    unflushedEntry = null;
    }
    }
    private void decrementPendingOutboundBytes(long size, boolean invokeLater, boolean notifyWritability) {
    if (size == 0) {
    return;
    }

    long newWriteBufferSize = TOTAL_PENDING_SIZE_UPDATER.addAndGet(this, -size);
    if (notifyWritability && newWriteBufferSize < channel.config().getWriteBufferLowWaterMark()) {
    setWritable(invokeLater);
    }
    }
    protected void flush0() {
    if (inFlush0) {
    // Avoid re-entrance
    return;
    }

    final ChannelOutboundBuffer outboundBuffer = this.outboundBuffer;
    if (outboundBuffer == null || outboundBuffer.isEmpty()) {
    return;
    }

    inFlush0 = true;

    // Mark all pending write requests as failure if the channel is inactive.
    if (!isActive()) {
    try {
    if (isOpen()) {
    outboundBuffer.failFlushed(FLUSH0_NOT_YET_CONNECTED_EXCEPTION, true);
    } else {
    // Do not trigger channelWritabilityChanged because the channel is closed already.
    outboundBuffer.failFlushed(FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
    }
    } finally {
    inFlush0 = false;
    }
    return;
    }

    try {
    doWrite(outboundBuffer);
    } catch (Throwable t) {
    if (t instanceof IOException && config().isAutoClose()) {
    /**
    * Just call {@link #close(ChannelPromise, Throwable, boolean)} here which will take care of
    * failing all flushed messages and also ensure the actual close of the underlying transport
    * will happen before the promises are notified.
    *
    * This is needed as otherwise {@link #isActive()} , {@link #isOpen()} and {@link #isWritable()}
    * may still return {@code true} even if the channel should be closed as result of the exception.
    */
    close(voidPromise(), t, FLUSH0_CLOSED_CHANNEL_EXCEPTION, false);
    } else {
    outboundBuffer.failFlushed(t, true);
    }
    } finally {
    inFlush0 = false;
    }
    }

    protected void doWrite(ChannelOutboundBuffer in) throws Exception {
    int writeSpinCount = -1;

    boolean setOpWrite = false;
    for (;;) {
    Object msg = in.current();
    if (msg == null) {
    // Wrote all messages.
    clearOpWrite();
    // Directly return here so incompleteWrite(...) is not called.
    return;
    }

    if (msg instanceof ByteBuf) {
    ByteBuf buf = (ByteBuf) msg;
    int readableBytes = buf.readableBytes();
    if (readableBytes == 0) {
    in.remove();
    continue;
    }

    boolean done = false;
    long flushedAmount = 0;
    if (writeSpinCount == -1) {
    writeSpinCount = config().getWriteSpinCount();
    }
    for (int i = writeSpinCount - 1; i >= 0; i --) {
    // 调用jdk写
    int localFlushedAmount = doWriteBytes(buf);
    if (localFlushedAmount == 0) {
    setOpWrite = true;
    break;
    }

    flushedAmount += localFlushedAmount;
    if (!buf.isReadable()) {
    done = true;
    break;
    }
    }

    in.progress(flushedAmount);

    if (done) {
    in.remove();
    } else {
    // Break the loop and so incompleteWrite(...) is called.
    break;
    }
    } else {
    // Should not reach here.
    throw new Error();
    }
    }
    incompleteWrite(setOpWrite);
    }

    protected int doWriteBytes(ByteBuf buf) throws Exception {
    final int expectedWrittenBytes = buf.readableBytes();
    // 获取jdk channel写
    return buf.readBytes(javaChannel(), expectedWrittenBytes);
    }

5.总结

以上主要讲述了一个编码的过程。当中有两个地方时需要注意,也就是说Netty在写数据的过程当中有一个可写状态,当超过它的最大阈值即64k的时候。你发送的数据实际是不会写入到底层的通道中去的。它会给你触发一个ChannelWritabilityChanged事件。还有一个低水位的说法

WRITE_BUFFER_HIGH_WATER_MARK:Netty参数,写高水位标记,默认值64KB。如果Netty的写缓冲区中的字节超过该值,Channel的isWritable()返回False。

WRITE_BUFFER_LOW_WATER_MARK:Netty参数,写低水位标记,默认值32KB。当Netty的写缓冲区中的字节超过高水位之后若下降到低水位,则Channel的isWritable()返回True。写高低水位标记使用户可以控制写入数据速度,从而实现流量控制。

推荐做法是:每次调用channl.write(msg)方法首先调用channel.isWritable()判断是否可写。然后通过队列的方式进行写入,或者自旋检测增加吞吐。

分享到 评论