Netty组件NioEventLoop(二)

目录:

  1. NioEventLoop创建
  2. NioEventLoop启动
  3. NioEventLoop执行逻辑
  4. 总结

1. NioEventLoop创建

通过跟踪代码可以看,NioEventLoop的创建是在创建NioEventLoopGroup的时候就同步一起创建了,即创建对应的boss,worker的时候就需要把指定的NioEventLoop给一起创建了。

1
2
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
EventLoopGroup workerGroup = new NioEventLoopGroup();

通过以上代码,可以看到在创建bossGroup的时候传递的参数为1,而workerGroup则没有指定使用了默认值。那我们来分析下指定与没有指定的区别在哪里?

1
2
3
public NioEventLoopGroup(int nThreads) {
this(nThreads, null);
}
1
2
3
4
5
6
7
8
9
10
11
12
protected MultithreadEventLoopGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
super(nThreads == 0? DEFAULT_EVENT_LOOP_THREADS : nThreads, threadFactory, args);
}

static {
DEFAULT_EVENT_LOOP_THREADS = Math.max(1, SystemPropertyUtil.getInt(
"io.netty.eventLoopThreads", Runtime.getRuntime().availableProcessors() * 2));

if (logger.isDebugEnabled()) {
logger.debug("-Dio.netty.eventLoopThreads: {}", DEFAULT_EVENT_LOOP_THREADS);
}
}

可以看到NioEventLoopGroup>MultithreadEventLoopGroup>MultithreadEventExecutorGroup抽象类。如果传递了nThreads,那么就使用传递的参数值,如果没有传递默认则为cpu核数*2的线程池个数。

即bossGroup一般情况下咱们都是使用一个,因为只需要监听一个端口,即一个线程池即可。而workerGorup则是多个线程池。可以看如下代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
protected MultithreadEventExecutorGroup(int nThreads, ThreadFactory threadFactory, Object... args) {
if (nThreads <= 0) {
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 线程名称工厂类
if (threadFactory == null) {
threadFactory = newDefaultThreadFactory();
}
// 如果使用以上逻辑
// bossGroup的线程池长度则为1
// workerGroup的线程池长度则为cpu核数*2
children = new SingleThreadEventExecutor[nThreads];

// 如果需要多个NioEventLoop,此处做了优化处理,就是在选择NioEventLoop服务的时候做了计算优化,选择策略优化
if (isPowerOfTwo(children.length)) {
chooser = new PowerOfTwoEventExecutorChooser();
} else {
chooser = new GenericEventExecutorChooser();
}

for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// boss情况下,就创建一个NioEventLoop
// worker请况下,也就创建了cpu核数*2个NioEventLoop个数
children[i] = newChild(threadFactory, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) {
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
}

for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
}
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
}
}
}
}
}

final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
};

for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
}

protected EventExecutor newChild(
ThreadFactory threadFactory, Object... args) throws Exception {
return new NioEventLoop(this, threadFactory, (SelectorProvider) args[0]);
}

根据以上代码就可以看出,在创建对应的group时候就已经把NioEventLoop,也就是具体的工作类给创建好了,当中包括处理连接的线程池数组,以及读写的线程池数组。

接下来我们再来看看创建NioEventLoop的时候做了啥。

1
2
3
4
5
6
7
8
9
NioEventLoop(NioEventLoopGroup parent, ThreadFactory threadFactory, SelectorProvider selectorProvider) {
super(parent, threadFactory, false);
if (selectorProvider == null) {
throw new NullPointerException("selectorProvider");
}
provider = selectorProvider;
// 打开Selector,也就是说每一个NioEventLoop有对应的Selector
selector = openSelector();
}
1
2
3
protected SingleThreadEventLoop(EventLoopGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
super(parent, threadFactory, addTaskWakesUp);
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
protected SingleThreadEventExecutor(
EventExecutorGroup parent, ThreadFactory threadFactory, boolean addTaskWakesUp) {
// threadFactory 用于线程命名使用
if (threadFactory == null) {
throw new NullPointerException("threadFactory");
}

this.parent = parent;
this.addTaskWakesUp = addTaskWakesUp;

// 通过netty优化的FastThraedLocal创建这么一个执行线程即为NioEventLoop线程
thread = threadFactory.newThread(new Runnable() {
@Override
public void run() {
boolean success = false;
updateLastExecutionTime();
try {
SingleThreadEventExecutor.this.run();
success = true;
} catch (Throwable t) {
logger.warn("Unexpected exception from an event executor: ", t);
} finally {
for (;;) {
int oldState = STATE_UPDATER.get(SingleThreadEventExecutor.this);
if (oldState >= ST_SHUTTING_DOWN || STATE_UPDATER.compareAndSet(
SingleThreadEventExecutor.this, oldState, ST_SHUTTING_DOWN)) {
break;
}
}
// Check if confirmShutdown() was called at the end of the loop.
if (success && gracefulShutdownStartTime == 0) {
logger.error(
"Buggy " + EventExecutor.class.getSimpleName() + " implementation; " +
SingleThreadEventExecutor.class.getSimpleName() + ".confirmShutdown() must be called " +
"before run() implementation terminates.");
}

try {
// Run all remaining tasks and shutdown hooks.
for (;;) {
if (confirmShutdown()) {
break;
}
}
} finally {
try {
cleanup();
} finally {
STATE_UPDATER.set(SingleThreadEventExecutor.this, ST_TERMINATED);
threadLock.release();
if (!taskQueue.isEmpty()) {
logger.warn(
"An event executor terminated with " +
"non-empty task queue (" + taskQueue.size() + ')');
}

terminationFuture.setSuccess(null);
}
}
}
}
});
// 创建一个阻塞队列 PlatformDependent.newMpscQueue(),用于后续执行线程使用。NioEventLoop重写了该方法。Netty自己实现的QueueMpscLinkedQueue是一种针对Netty中NIO任务设计的一种队列,允许有多个生产者,只有一个消费者的队列.MpscLinkedQueue通过使用链表存储数据以及巧妙的CAS操作,实现单消费者多生产者队列,代码简洁高效,适合netty中的无锁化串行设计
taskQueue = newTaskQueue();
}

thread = threadFactory.newThread(new Runnable())这个地方很重要,相当于是NioEventLoop的主线程,后续在执行启动的时候都是使用这个线程去启动执行的。

那么到此以上针对NioEventLoop创建就算完成了。

2.NioEventLoop启动

根据上面的代码分析,我们可以看到NioEventLoop是在创建Group的时候一起就实例化了,那么在哪里启动呢?

这个也可以跟踪上一篇的Netty源码解析-基本组件架构&启动过程(一)代码主要结构流程可以看出

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
private static void doBind0(
final ChannelFuture regFuture, final Channel channel,
final SocketAddress localAddress, final ChannelPromise promise) {

// This method is invoked before channelRegistered() is triggered. Give user handlers a chance to set up
// the pipeline in its channelRegistered() implementation.
// 通过channel获取到NioEventLoop,然后执行execute方法区执行这个线程。此处execute也是重写了jdk的线程池方法的
channel.eventLoop().execute(new Runnable() {
@Override
public void run() {
if (regFuture.isSuccess()) {
channel.bind(localAddress, promise).addListener(ChannelFutureListener.CLOSE_ON_FAILURE);
} else {
promise.setFailure(regFuture.cause());
}
}
});
}

根据上面这个代码可以看出,在实例化channel以及初始化的时候就把对应这个channel的NioEventLoop给绑定好了。

1
2
3
-->register(final SelectableChannel ch, final int interestOps, final NioTask<?> task)
-->channel.unsafe().register(this, promise)
-->register(EventLoop eventLoop, ChannelPromise promise)--DefaultServerUnsafe->AbstractUnsafe

即NioEventLoop->SingleThreadEventLoop->SingleThreadEventExecutor->ExecutorService

然后SingleThreadEventExecutor又重写了execute方法,所以上面的那个提交线程的方法就是执行的这个方法channel.eventLoop().execute(),当日也是启动NioEventLoop的方法入口。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
@Override
public void execute(Runnable task) {
if (task == null) {
throw new NullPointerException("task");
}
// 判断是否在eventLoop这个线程当中,根据上面的那个提交线程执行,这里默认初始肯定为false的,这个时候还应该是主线程即main方法的线程
boolean inEventLoop = inEventLoop();
if (inEventLoop) {
addTask(task);
} else {
// 启动NioEventLoop线程,即在创建
startThread();
// 把刚才传进来的task加入到队列中
addTask(task);
if (isShutdown() && removeTask(task)) {
reject();
}
}

if (!addTaskWakesUp && wakesUpForTask(task)) {
wakeup(inEventLoop);
}
}

private void startThread() {
if (STATE_UPDATER.get(this) == ST_NOT_STARTED) {
if (STATE_UPDATER.compareAndSet(this, ST_NOT_STARTED, ST_STARTED)) {
delayedTaskQueue.add(new ScheduledFutureTask<Void>(
this, delayedTaskQueue, Executors.<Void>callable(new PurgeTask(), null),
ScheduledFutureTask.deadlineNanos(SCHEDULE_PURGE_INTERVAL), -SCHEDULE_PURGE_INTERVAL));
// 启动之前创建好的线程
thread.start();
}
}
}

至此NioEventLoop的线程就算是已经启动执行了,接来下就需要看看NioEventLoop主要读干啥子了。

3.NioEventLoop执行逻辑

根据上面的NioEventLoop启动后,就只执行线程当中的SingleThreadEventExecutor.this.run();这个方法区真正执行NioEventLoop的逻辑。主要是以下代码。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
protected void run() {
for (;;) {
oldWakenUp = wakenUp.getAndSet(false);
try {
// 即如果有任务提交在NioEventLoop执行,那么这个地方就非阻塞
if (hasTasks()) {
selectNow();
} else {
// 阻塞等待连接,或者等待读写
select();

if (wakenUp.get()) {
selector.wakeup();
}
}

cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
if (ioRatio == 100) {
// 处理监听的链接、读写事件,把各个监听事件转换为Task
processSelectedKeys();
// 执行以上所转换的Task
runAllTasks();
} else {
final long ioStartTime = System.nanoTime();

processSelectedKeys();

final long ioTime = System.nanoTime() - ioStartTime;
runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}

if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
break;
}
}
} catch (Throwable t) {
logger.warn("Unexpected exception in the selector loop.", t);

// Prevent possible consecutive immediate failures that lead to
// excessive CPU consumption.
try {
Thread.sleep(1000);
} catch (InterruptedException e) {
// Ignore.
}
}
}
}

private void processSelectedKeys() {
if (selectedKeys != null) {
processSelectedKeysOptimized(selectedKeys.flip());
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}

private static void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// 通过获取到的Unsafe类,
// NioServerSocketChannel 绑定的是 NioMessageUnsafe 新连接走的是Unsafe
// NioSocketChannel 绑定的是 NioByteUnsafe 读写走的是这个unsafe
final NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
return;
}

try {
int readyOps = k.readyOps();
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// 此处新连接、读写自动适配对应的Unsafe,即NioMessageUnsafe与NioByteUnsafe
unsafe.read();
if (!ch.isOpen()) {
// Connection already closed - no need to handle write.
return;
}
}
if ((readyOps & SelectionKey.OP_WRITE) != 0) {
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
if ((readyOps & SelectionKey.OP_CONNECT) != 0) {
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);

unsafe.finishConnect();
}
} catch (CancelledKeyException e) {
unsafe.close(unsafe.voidPromise());
}
}

最终通过MessageUnsafe处理read然后在到具体的NioServerSocketChannel的doReadMessages接收新的连接,此处它是可以处理多个新的连接,即最多处理16个。然后在通过pipeline的fireChannelRead事件往下传播。到最终的Handler处理。

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
private final class NioMessageUnsafe extends AbstractNioUnsafe {

private final List<Object> readBuf = new ArrayList<Object>();

@Override
public void read() {
assert eventLoop().inEventLoop();
final ChannelConfig config = config();
if (!config.isAutoRead() && !isReadPending()) {
// ChannelConfig.setAutoRead(false) was called in the meantime
removeReadOp();
return;
}

final int maxMessagesPerRead = config.getMaxMessagesPerRead();
final ChannelPipeline pipeline = pipeline();
boolean closed = false;
Throwable exception = null;
try {
try {
for (;;) {
// 此处它是可以处理多个新的连接,即最多处理16个
int localRead = doReadMessages(readBuf);
if (localRead == 0) {
break;
}
if (localRead < 0) {
closed = true;
break;
}

// stop reading and remove op
if (!config.isAutoRead()) {
break;
}
// maxMessagesPerRead = 16;
if (readBuf.size() >= maxMessagesPerRead) {
break;
}
}
} catch (Throwable t) {
exception = t;
}
setReadPending(false);
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
// 传播事件,需要再次去注册这个channel
pipeline.fireChannelRead(readBuf.get(i));
}

readBuf.clear();
pipeline.fireChannelReadComplete();

if (exception != null) {
if (exception instanceof IOException) {
// ServerChannel should not be closed even on IOException because it can often continue
// accepting incoming connections. (e.g. too many open files)
closed = !(AbstractNioMessageChannel.this instanceof ServerChannel);
}

pipeline.fireExceptionCaught(exception);
}

if (closed) {
if (isOpen()) {
close(voidPromise());
}
}
} finally {
if (!config.isAutoRead() && !isReadPending()) {
removeReadOp();
}
}
}
}
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
protected int doReadMessages(List<Object> buf) throws Exception {
SocketChannel ch = javaChannel().accept();

try {
if (ch != null) {
// NioSocketChannel绑定jdk的SocketChannel
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);

try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}

return 0;
}

通过下面代码可以看到,在创建NioSocketChannel,即处理客户端的读写,在这里可以看到它注册的就是OP_READ事件

1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
public NioSocketChannel(Channel parent, SocketChannel socket) {
super(parent, socket);
config = new NioSocketChannelConfig(this, socket.socket());
}

protected AbstractNioByteChannel(Channel parent, SelectableChannel ch) {
super(parent, ch, SelectionKey.OP_READ);
}

protected AbstractNioChannel(Channel parent, SelectableChannel ch, int readInterestOp) {
super(parent);
this.ch = ch;
this.readInterestOp = readInterestOp;
try {
ch.configureBlocking(false);
} catch (IOException e) {
try {
ch.close();
} catch (IOException e2) {
if (logger.isWarnEnabled()) {
logger.warn(
"Failed to close a partially initialized socket.", e2);
}
}

throw new ChannelException("Failed to enter non-blocking mode.", e);
}
}

protected AbstractChannel(Channel parent) {
this.parent = parent;
unsafe = newUnsafe();
pipeline = new DefaultChannelPipeline(this);
}

通过上面的代码把客户端的channel创建好之后,然后就需要把这些channel去绑定注册。通过pipeline传播事件的方式传播到最终的ServerBootstrapAcceptor处理类,这个是在实例化NioServerSocketChannel的时候绑定好的。

1
2
3
4
for (int i = 0; i < size; i ++) {
// 传播事件,需要再次去注册这个channel
pipeline.fireChannelRead(readBuf.get(i));
}
1
2
-->init(Channel channel)--ServerBootstrap这里里面也没干啥,主要就是把上面的channel再次初始化,绑定一些相关属性,绑定Options,attribute,
-->ch.pipeline().addLast(new ServerBootstrapAcceptor(currentChildGroup, currentChildHandler, currentChildOptions, currentChildAttrs)) 这个算是最终要的,添加一个连接注册器,用于给新连接分配线程
1
2
3
4
5
6
7
8
9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
public void channelRead(ChannelHandlerContext ctx, Object msg) {
final Channel child = (Channel) msg;

child.pipeline().addLast(childHandler);

for (Entry<ChannelOption<?>, Object> e: childOptions) {
try {
if (!child.config().setOption((ChannelOption<Object>) e.getKey(), e.getValue())) {
logger.warn("Unknown channel option: " + e);
}
} catch (Throwable t) {
logger.warn("Failed to set a channel option: " + child, t);
}
}

for (Entry<AttributeKey<?>, Object> e: childAttrs) {
child.attr((AttributeKey<Object>) e.getKey()).set(e.getValue());
}

try {
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
}

4.总结

顺着以上的思路捋一下,从bossGroup与workerGroup创建到NioEventLoop的创建,bossGroup相当于定义需要多少个线程池在服务新的连接处理,而workerGroup定义需要多少个线程池来处理读写操作。

然后再启动应用程序doBind方法区启动bossGroup当中的线程池NioEventLoop,然后通过run方法去然后去监听处理新的连接(以上的源码里面没有包括最终的读写数据,仅仅只是包含了新的连接处理)。

处理新的连接有如下几个步骤

  • 检测新连接接入
  • 实例化NioSocketChannel
  • 通过pipeline传播fireChannelRead事件把NioSocketChannel传播下去
  • 通过ServerBootstrapAcceptor去处理传播过来的NioSocketChannel,给期绑定监听对应的OP_READ事件以及分配对应的worker的NioEventLoop线程池处理。
    • ServerBootstrapAcceptor这个是在初始化SocketServerChannel的时候绑定好的pipeline

以上应该算是NioEventLoop处理新连接的一个大致的流程,当日NioEventLoop照样也处理读写,但是思路是一样的。可以按照这个思路在详细的解读下。

分享到 评论