(1).NioEventLoopGroup继承关系图
(2).new NioEventLoopGroup
EventLoopGroup bossGroup = new NioEventLoopGroup(1);
(3).NioEventLoopGroup 构造器
public NioEventLoopGroup(int nThreads) {
// nThreads = 1
this(nThreads, (Executor) null);
}
(4).NioEventLoopGroup 构造器
public NioEventLoopGroup(int nThreads, Executor executor) {
// nThreads = 1
// executor = null
// 调用:SelectorProvider.provider() 获得一个:SelectorProvider
this(nThreads, executor, SelectorProvider.provider());
}
(5).NioEventLoopGroup 构造器
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider) {
// nThreads = 1
// executor = null
// 因为是mac:
// selectorProvider = sun.nio.ch.KQueueSelectorProvider
this(nThreads, executor, selectorProvider, DefaultSelectStrategyFactory.INSTANCE);
}
(6).NioEventLoopGroup 构造器
public NioEventLoopGroup(int nThreads, Executor executor, final SelectorProvider selectorProvider,final SelectStrategyFactory selectStrategyFactory) {
// nThreads = 1
// executor = null
// selectorProvider = sun.nio.ch.KQueueSelectorProvider
// selectStrategyFactory = io.netty.channel.DefaultSelectStrategyFactory
// 设置拒绝策略
// 调用:MultithreadEventLoopGroup的构造器
super(nThreads, executor, selectorProvider, selectStrategyFactory, RejectedExecutionHandlers.reject());
}
(7).MultithreadEventLoopGroup 构造器
protected MultithreadEventLoopGroup(int nThreads, Executor executor, Object... args) {
// nThreads = 1
// executor = null
// args = SelectorProvider/SelectStrategyFactory/RejectedExecutionHandlers$1
// args = [sun.nio.ch.KQueueSelectorProvider, io.netty.channel.DefaultSelectStrategyFactory, io.netty.util.concurrent.RejectedExecutionHandlers$1]
// 调用父类MultithreadEventExecutorGroup的构造器
super(nThreads == 0 ? DEFAULT_EVENT_LOOP_THREADS : nThreads, executor, args);
}
(8).MultithreadEventExecutorGroup 构造器
protected MultithreadEventExecutorGroup(int nThreads, Executor executor, Object... args) {
// nThreads = 1
// executor = null
// args = [sun.nio.ch.KQueueSelectorProvider, io.netty.channel.DefaultSelectStrategyFactory, io.netty.util.concurrent.RejectedExecutionHandlers$1]
this(nThreads, executor, DefaultEventExecutorChooserFactory.INSTANCE, args);
}
(9).MultithreadEventExecutorGroup 构造器
// 属性域
EventExecutor[] children;
EventExecutorChooserFactory.EventExecutorChooser chooser;
AtomicInteger terminatedChildren = new AtomicInteger();
Promise<?> terminationFuture = new DefaultPromise(GlobalEventExecutor.INSTANCE);
// ********************************************
// 重点:
// ********************************************
protected MultithreadEventExecutorGroup(
int nThreads,
Executor executor,
EventExecutorChooserFactory chooserFactory,
Object... args) {
// nThreads = 1
// executor = null
// chooserFactory = io.netty.util.concurrent.DefaultEventExecutorChooserFactory
// args = [
// sun.nio.ch.KQueueSelectorProvider
// io.netty.channel.DefaultSelectStrategyFactory
// io.netty.util.concurrent.RejectedExecutionHandlers$1
//]
if (nThreads <= 0) { // false
throw new IllegalArgumentException(String.format("nThreads: %d (expected: > 0)", nThreads));
}
// 用户没有指定线程池的情况下
if (executor == null) { // true
// 创建自定义的:Executor
// 另开一节讲这部份的源码,先不纠结里面是什么
executor = new ThreadPerTaskExecutor(newDefaultThreadFactory());
}
// EventExecutor[] children 为定义部份
// 创建EventExecutor数组
children = new EventExecutor[nThreads];
for (int i = 0; i < nThreads; i ++) {
boolean success = false;
try {
// ************************************************************
// 先忽略,后面详解
// 1. 委托给子类:NioEventLoopGroup去执行
// [io.netty.channel.nio.NioEventLoop@6b81ce95]
// ************************************************************
children[i] = newChild(executor, args);
success = true;
} catch (Exception e) {
// TODO: Think about if this is a good exception type
throw new IllegalStateException("failed to create a child event loop", e);
} finally {
if (!success) { // 执行不成功的处理
for (int j = 0; j < i; j ++) {
children[j].shutdownGracefully();
} //end for
for (int j = 0; j < i; j ++) {
EventExecutor e = children[j];
try {
while (!e.isTerminated()) {
e.awaitTermination(Integer.MAX_VALUE, TimeUnit.SECONDS);
} //end while
} catch (InterruptedException interrupted) {
Thread.currentThread().interrupt();
break;
} //end catch
}// end for
} // end if
} // finally
} //end for
// ************************************************************
// 先忽略,后面详解
// chooser是一个选择器,可以从childern(NioEventLoop[])中选择一个:EventExecutor
// chooserFactory.io.netty.util.concurrent.DefaultEventExecutorChooserFactory
// chooser = io.netty.util.concurrent.DefaultEventExecutorChooserFactory$PowerOfTwoEventExecutorChooser
// ************************************************************
chooser = chooserFactory.newChooser(children);
// 创建中止监听器
final FutureListener<Object> terminationListener = new FutureListener<Object>() {
@Override
public void operationComplete(Future<Object> future) throws Exception {
if (terminatedChildren.incrementAndGet() == children.length) {
terminationFuture.setSuccess(null);
}
}
}; //end
// 为所有的:NioEventLoop添加中止监听器
for (EventExecutor e: children) {
e.terminationFuture().addListener(terminationListener);
}
// 把NioEventLoop数组转换成Set集合.
Set<EventExecutor> childrenSet = new LinkedHashSet<EventExecutor>(children.length);
Collections.addAll(childrenSet, children);
// 设置为只读的NioEventLoop
readonlyChildren = Collections.unmodifiableSet(childrenSet);
}
(10).NioEventLoopGroup.newChild
protected EventLoop newChild(Executor executor, Object... args) throws Exception {
// executor = io.netty.util.concurrent.ThreadPerTaskExecutor
// args = [
// sun.nio.ch.KQueueSelectorProvider
// io.netty.channel.DefaultSelectStrategyFactory
// io.netty.util.concurrent.RejectedExecutionHandlers$1
// ]
// queueFactory = null;
EventLoopTaskQueueFactory queueFactory = args.length == 4 ? (EventLoopTaskQueueFactory) args[3] : null;
// ********************************************
// 后面留一节专门讲NioEventLoop,先略过
// ********************************************
return new NioEventLoop(this, executor, (SelectorProvider) args[0],((SelectStrategyFactory) args[1]).newSelectStrategy(), (RejectedExecutionHandler) args[2], queueFactory);
}
(11).总结
NioEventLoopGroup实际是根据配置的nThreads,创建N个NioEventLoop NioEventLoopGroup和NioEventLoop都实现了:EventLoopGroup,相当于间两个类都间接的实现了ScheduledExecutorService(定时调度方法)和Iterable(迭代器).
而NioEventLoopGroup的大部份调度方法都是委派给:NioEventLoop去调用的.自己实际不执行任务定时任务的内容.