(1).BossGroup(NioEventLoop)初始化完成了,是怎么接受请求(ACCEPT事件),并把请求分发给:WorkGroup(NioEventLoop)的呢?
答案在:NioEventLoop.run()方法里.
(2).NioEventLoop.run
public final class NioEventLoop extends SingleThreadEventLoop {
protected void run() {
int selectCnt = 0;
for (;;) {
try {
int strategy;
try {
// strategy = -1;
strategy = selectStrategy.calculateStrategy(selectNowSupplier, hasTasks());
switch (strategy) {
case SelectStrategy.CONTINUE:
continue;
case SelectStrategy.BUSY_WAIT:
// ******************************************
// 产生轮询
// ******************************************
case SelectStrategy.SELECT:
// curDeadlineNanos = -1
long curDeadlineNanos = nextScheduledTaskDeadlineNanos();
if (curDeadlineNanos == -1L) { // true
// NONE = 9223372036854775807
curDeadlineNanos = NONE; // nothing on the calendar
}
//
nextWakeupNanos.set(curDeadlineNanos);
try {
if (!hasTasks()) { //true
// **************************************
// strategy = -1
// 3. 调用Selector.select() 阻塞等待客户端的连接
// **************************************
strategy = select(curDeadlineNanos);
}
} finally {
nextWakeupNanos.lazySet(AWAKE);
}
// fall through
default:
}
} catch (IOException e) {
rebuildSelector0();
selectCnt = 0;
handleLoopException(e);
continue;
} //end catch
selectCnt++;
cancelledKeys = 0;
needsToSelectAgain = false;
final int ioRatio = this.ioRatio;
boolean ranTasks;
// ioRatio = 50
if (ioRatio == 100) {
try {
if (strategy > 0) {
processSelectedKeys();
}
} finally {
// Ensure we always run tasks.
ranTasks = runAllTasks();
}
} else if (strategy > 0) {
final long ioStartTime = System.nanoTime();
try {
// ******************************************
// 4. 处理Select事件
// ******************************************
processSelectedKeys();
} finally {
// Ensure we always run tasks.
// ioTime = 650326301514
final long ioTime = System.nanoTime() - ioStartTime;
// **************************************************
// 9. 运行所有的任务(SingleThreadEventExecutor.runAllTasks)
// **************************************************
ranTasks = runAllTasks(ioTime * (100 - ioRatio) / ioRatio);
}
} else {
ranTasks = runAllTasks(0); // This will run the minimum number of tasks
}
if (ranTasks || strategy > 0) {
if (selectCnt > MIN_PREMATURE_SELECTOR_RETURNS && logger.isDebugEnabled()) {
logger.debug("Selector.select() returned prematurely {} times in a row for Selector {}.",
selectCnt - 1, selector);
}
selectCnt = 0;
} else if (unexpectedSelectorWakeup(selectCnt)) { // Unexpected wakeup (unusual case)
selectCnt = 0;
}
} catch (CancelledKeyException e) {
// Harmless exception - log anyway
if (logger.isDebugEnabled()) {
logger.debug(CancelledKeyException.class.getSimpleName() + " raised by a Selector {} - JDK bug?",
selector, e);
}
} catch (Throwable t) {
handleLoopException(t);
}
// Always handle shutdown even if the loop processing threw an exception.
try {
if (isShuttingDown()) {
closeAll();
if (confirmShutdown()) {
return;
}
}
} catch (Throwable t) {
handleLoopException(t);
}// end catch
} // end for
} // end run
}
(3).NioEventLoop.select
private int select(long deadlineNanos) throws IOException {
// deadlineNanos = 9223372036854775807
if (deadlineNanos == NONE) { // true
// selector = io.netty.channel.nio.SelectedSelectionKeySetSelector
// ************************************************
// select()为阻塞方法
// 阻塞等待客户端的连接.一旦发现有连接过来,就将事件存储到:
// SelectedSelectionKeySet里面
// ************************************************
return selector.select();
}
// Timeout will only be 0 if deadline is within 5 microsecs
long timeoutMillis = deadlineToDelayNanos(deadlineNanos + 995000L) / 1000000L;
return timeoutMillis <= 0 ? selector.selectNow() : selector.select(timeoutMillis);
}
(4).NioEventLoop.processSelectedKeys
private void processSelectedKeys() {
// selectedKeys = sun.nio.ch.SelectionKeyImpl
if (selectedKeys != null) {
processSelectedKeysOptimized();
} else {
processSelectedKeysPlain(selector.selectedKeys());
}
}
(5). NioEventLoop.processSelectedKeysOptimized
private void processSelectedKeysOptimized() {
// *************************************************
// NioEventLoop.select() 方法会将用户触发的事件添加到:
// selectedKeys中
// *************************************************
// selectedKeys.size = 1 此时只有一个client连接上来了
for (int i = 0; i < selectedKeys.size; ++i) {
// k = sun.nio.ch.SelectionKeyImpl
// 获得对应下标的SelectionKey
final SelectionKey k = selectedKeys.keys[i];
// 立即将对应的key设置为空
selectedKeys.keys[i] = null;
// a = NioServerSocketChannel
final Object a = k.attachment();
if (a instanceof AbstractNioChannel) { // true
processSelectedKey(k, (AbstractNioChannel) a);
} else {
@SuppressWarnings("unchecked")
NioTask<SelectableChannel> task = (NioTask<SelectableChannel>) a;
processSelectedKey(k, task);
}
if (needsToSelectAgain) { // false
// null out entries in the array to allow to have it GC'ed once the Channel close
// See https://github.com/netty/netty/issues/2363
selectedKeys.reset(i + 1);
selectAgain();
i = -1;
}
}
}
(6).NioEventLoop.processSelectedKey
private void processSelectedKey(SelectionKey k, AbstractNioChannel ch) {
// k = sun.nio.ch.SelectionKeyImpl
// ch = NioServerSocketChannel
// 获得NioServerSocketChannel内部的Unsafe对象
// unsafe = AbstractNioMessageChannel$NioMessageUnsafe
final AbstractNioChannel.NioUnsafe unsafe = ch.unsafe();
if (!k.isValid()) { // false
final EventLoop eventLoop;
try {
eventLoop = ch.eventLoop();
} catch (Throwable ignored) {
// If the channel implementation throws an exception because there is no event loop, we ignore this
// because we are only trying to determine if ch is registered to this event loop and thus has authority
// to close ch.
return;
}
// Only close ch if ch is still registered to this EventLoop. ch could have deregistered from the event loop
// and thus the SelectionKey could be cancelled as part of the deregistration process, but the channel is
// still healthy and should not be closed.
// See https://github.com/netty/netty/issues/5125
if (eventLoop == this) {
// close the channel if the key is not valid anymore
unsafe.close(unsafe.voidPromise());
}
return;
}// end if
try {
// readyOps = 16
int readyOps = k.readyOps();
// We first need to call finishConnect() before try to trigger a read(...) or write(...) as otherwise
// the NIO JDK channel implementation may throw a NotYetConnectedException.
if ((readyOps & SelectionKey.OP_CONNECT) != 0) { // false
// remove OP_CONNECT as otherwise Selector.select(..) will always return without blocking
// See https://github.com/netty/netty/issues/924
int ops = k.interestOps();
ops &= ~SelectionKey.OP_CONNECT;
k.interestOps(ops);
unsafe.finishConnect();
}
// Process OP_WRITE first as we may be able to write some queued buffers and so free memory.
if ((readyOps & SelectionKey.OP_WRITE) != 0) { // false
// Call forceFlush which will also take care of clear the OP_WRITE once there is nothing left to write
ch.unsafe().forceFlush();
}
// Also check for readOps of 0 to workaround possible JDK bug which may otherwise lead
// to a spin loop
// true
if ((readyOps & (SelectionKey.OP_READ | SelectionKey.OP_ACCEPT)) != 0 || readyOps == 0) {
// *****************************************************
// 6. AbstractNioMessageChannel.NioMessageUnsafe.read()
// *****************************************************
unsafe.read();
}
} catch (CancelledKeyException ignored) {
unsafe.close(unsafe.voidPromise());
}
}
(7).AbstractNioMessageChannel.NioMessageUnsafe.read
public void read() {
assert eventLoop().inEventLoop();
// config = NioServerSocketChannel$NioServerSocketChannelConfig
final ChannelConfig config = config();
// 一个日志ChannelHandler 和 ServerBootstrap$ServerBootstrapAcceptor
// pipeline = DefaultChannelPipeline{(LoggingHandler#0 = io.netty.handler.logging.LoggingHandler), (ServerBootstrap$ServerBootstrapAcceptor#0 = io.netty.bootstrap.ServerBootstrap$ServerBootstrapAcceptor)}
final ChannelPipeline pipeline = pipeline();
// allocHandle = io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl
final RecvByteBufAllocator.Handle allocHandle = unsafe().recvBufAllocHandle();
allocHandle.reset(config);
boolean closed = false;
Throwable exception = null;
try {
try {
do {
// 调用:NioServerSocketChannel.doReadMessages
// 读取消息
int localRead = doReadMessages(readBuf);
if (localRead == 0) { // false
break;
}
if (localRead < 0) { // false
closed = true;
break;
}
// allocHandler = io.netty.channel.AdaptiveRecvByteBufAllocator$HandleImpl
allocHandle.incMessagesRead(localRead);
} while (allocHandle.continueReading()); // end do while
} catch (Throwable t) {
exception = t;
} // end catch
int size = readBuf.size();
for (int i = 0; i < size; i ++) {
readPending = false;
// ******************************************************
// readBuf.get(i) = NioSocketChannel
// 调用BoosGroup里配置的所有handler.fireChannelRead
// 传递的NioSocketChannel是刚才:doReadMessages new出来的
// ******************************************************
// 8.ServerBootstrap$ServerBootstrapAcceptor.channelRead()
pipeline.fireChannelRead(readBuf.get(i));
} // end for
// 清除集合中的数据
readBuf.clear();
// 标记读取完成
allocHandle.readComplete();
// ***************************************************
// 调用pipeline上所有的ChannelHandler.channelReadComplete(ctx) 方法
// ***************************************************
pipeline.fireChannelReadComplete();
if (exception != null) { // false
closed = closeOnReadError(exception);
pipeline.fireExceptionCaught(exception);
}// end if
if (closed) { // false
inputShutdown = true;
if (isOpen()) {
close(voidPromise());
} // end if
}// end if
} finally {
// Check if there is a readPending which was not processed yet.
// This could be for two reasons:
// * The user called Channel.read() or ChannelHandlerContext.read() in channelRead(...) method
// * The user called Channel.read() or ChannelHandlerContext.read() in channelReadComplete(...) method
//
// See https://github.com/netty/netty/issues/2254
if (!readPending && !config.isAutoRead()) { // false
removeReadOp();
}
} // end finally
}// end read
(8).NioServerSocketChannel.doReadMessages
protected int doReadMessages(List<Object> buf) throws Exception {
// buf=[]
// **************************************************************
// javaChannel = sun.nio.ch.ServerSocketChannelImpl
// 调用:ServerSocketChannel.accept()方法
// 返回一个:java.nio.channels.SocketChannel对象
// **************************************************************
java.nio.channels.SocketChannel ch = SocketUtils.accept(javaChannel());
try {
if (ch != null) {
// 创建一个:io.netty.channel.socket.nio.NioSocketChannel
// NioSocketChannel 内部持
// Channel = NioServerSocketChannel
// SocketChannel = SocketChannel
// 添加到buf中
buf.add(new NioSocketChannel(this, ch));
return 1;
}
} catch (Throwable t) {
logger.warn("Failed to create a new channel from an accepted socket.", t);
try {
ch.close();
} catch (Throwable t2) {
logger.warn("Failed to close a socket.", t2);
}
}
return 0;
}
(9).ServerBootstrap$ServerBootstrapAcceptor.channelRead
private final EventLoopGroup childGroup;
private final ChannelHandler childHandler;
private final Entry<ChannelOption<?>, Object>[] childOptions;
private final Entry<AttributeKey<?>, Object>[] childAttrs;
private final Runnable enableAutoReadTask;
public void channelRead(ChannelHandlerContext ctx, Object msg) {
// msg = NioSocketChannel
// ctx = DefaultChannelHandlerContext
final Channel child = (Channel) msg;
// 为WorkGroup配置管道信息
child.pipeline().addLast(childHandler);
// 为WorkGroup配置options
setChannelOptions(child, childOptions, logger);
// 为WorkGroup配置atrributes
setAttributes(child, childAttrs);
try {
// 向WorkGroup(EventLoopGroup[EventLoop])注册Channel
childGroup.register(child).addListener(new ChannelFutureListener() {
@Override
public void operationComplete(ChannelFuture future) throws Exception {
if (!future.isSuccess()) {
forceClose(child, future.cause());
}
}
});
} catch (Throwable t) {
forceClose(child, t);
}
} // end channelRead
(10).NioEventLoop.runAllTasks
protected boolean runAllTasks(long timeoutNanos) {
// timeoutNanos = 650326301514
fetchFromScheduledTaskQueue();
// 拉取任务
Runnable task = pollTask();
if (task == null) { // true
afterRunningAllTasks();
return false;
}
final long deadline = timeoutNanos > 0 ? ScheduledFutureTask.nanoTime() + timeoutNanos : 0;
long runTasks = 0;
long lastExecutionTime;
for (;;) {
safeExecute(task);
runTasks ++;
// Check timeout every 64 tasks because nanoTime() is relatively expensive.
// XXX: Hard-coded value - will make it configurable if it is really a problem.
if ((runTasks & 0x3F) == 0) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
if (lastExecutionTime >= deadline) {
break;
}
}
task = pollTask();
if (task == null) {
lastExecutionTime = ScheduledFutureTask.nanoTime();
break;
}
}
afterRunningAllTasks();
this.lastExecutionTime = lastExecutionTime;
return true;
}
(11).ACCEPT过程图解
(12).总结
- NioEventLoop.run方法会不停的轮询.当返回的策略为:SelectStrategy.SELECT
- 调用:SelectedSelectionKeySetSelector.select()方法,会阻塞方法继续往下执行,直到有事件进入.并把事件存储到:SelectedSelectionKeySet集合中(修改Selector的事件存储对象).
- 遍历集合(SelectedSelectionKeySet)中所有的事件(SelectionKey.OP_ACCEPT)
- 调用:AbstractNioMessageChannel.NioMessageUnsafe.read处理ACCEPT/READ事件
- 调用:NioServerSocketChannel.doReadMessages方法,会委托给:ServerSocketChannel.accept,返回一个SocketChannel.
- 调用:DefaultChannelPipeline.fireChannelRead,通知所有ChannelHandler.channelRead方法(ServerBootstrap.ServerBootstrapAcceptor.channelRead) 6.1 为WorkGroup(EventLoop)添加ChannelHandler 6.2 为WorkGroup(EventLoop)注册SocketChannel(子Channel)
- 调用:DefaultChannelPipeline.fireChannelReadComplete.通知所有ChannelHandler.channelReadComplete方法