(1). 概述
在前一小节,对StandaloneGateway创建AtomixCluster进行了源码剖析,还得继续往下看源码,这一小节,主要剖析的是与ActorScheduler相关的内容.
(2). StandaloneGateway.createActorScheduler
private ActorScheduler createActorScheduler(final GatewayCfg config) {
return ActorScheduler.newActorScheduler()
// 1 设置CPU线程数
.setCpuBoundActorThreadCount(config.getThreads().getManagementThreads())
// 2. 设置IO线程数
.setIoBoundActorThreadCount(0)
// 3. 设置线程名称
.setSchedulerName("gateway-scheduler")
.setActorClock(clockConfig.getClock())
// *************************************************************
// 4. 构建:ActorScheduler
// *************************************************************
.build();
}
(3). ActorScheduler.build
public ActorScheduler build() {
// 初始化线程工厂
initActorThreadFactory();
// 初始化CPU线程组
initCpuBoundActorThreadGroup();
// 初始化IO线程数
initIoBoundActorThreadGroup();
// 初始化Executor
initActorExecutor();
return new ActorScheduler(this);
}
(4). ActorScheduler.initActorThreadFactory
ActorThreadFactory的主要职职责是创建:ActorThread线程.
private void initActorThreadFactory() {
if (actorThreadFactory == null) {
// 1. 创建DefaultActorThreadFactory
actorThreadFactory = new DefaultActorThreadFactory();
}
} // end initActorThreadFactory
// 1.1 DefaultActorThreadFactory实现了:ActorThreadFactory,所以,需要先看下该接口有哪些签名
public interface ActorThreadFactory {
ActorThread newThread(
String name,
int id,
ActorThreadGroup threadGroup,
TaskScheduler taskScheduler,
ActorClock clock,
ActorTimerQueue timerQueue);
} // end
// 1.2 通过对ActorThreadFactory的分析DefaultActorThreadFactory的主要责任应该就是创建Thread,只是这个Thread是被Zeebe包装过的.
public static class DefaultActorThreadFactory implements ActorThreadFactory {
@Override
public ActorThread newThread(
final String name,
final int id,
final ActorThreadGroup threadGroup,
final TaskScheduler taskScheduler,
final ActorClock clock,
final ActorTimerQueue timerQueue) {
return new ActorThread(name, id, threadGroup, taskScheduler, clock, timerQueue);
}
}
(5). ActorScheduler.initCpuBoundActorThreadGroup
// 1.1 初始化Cpu线程组
private void initCpuBoundActorThreadGroup() {
if (cpuBoundActorGroup == null) {
cpuBoundActorGroup = new CpuThreadGroup(this);
}
} // end initCpuBoundActorThreadGroup
// **********************************************
// 1.2 CpuThreadGroup继承于:ActorThreadGroup,所以,要分析下这个抽象类的行为
// **********************************************
public abstract class ActorThreadGroup {
// 线程组名称
protected final String groupName;
// 看类的名称应该是Thread的实现类,在这里我不详细剖析这个类了,就把它理解为Thread的增加吧,后面再进行详细剖析
protected final ActorThread[] threads;
// 不太理解这个类是干嘛的,先凉在这里.
protected final MultiLevelWorkstealingGroup tasks;
// 线程的数量
protected final int numOfThreads;
public ActorThreadGroup(
final String groupName,
final int numOfThreads,
final int numOfQueuesPerThread,
final ActorSchedulerBuilder builder) {
// 设置线程名称
this.groupName = groupName;
// 设置线程数量
this.numOfThreads = numOfThreads;
// MultiLevelWorkstealingGroup内部结构实际上是参考了:Disruptor
// 自己实现了一套环形队列,感觉Zeebe好像不太喜欢用开源已经成熟了的东西
// 当然,也有一种可能性,那就是我没能理解他的精隋
tasks = new MultiLevelWorkstealingGroup(numOfThreads, numOfQueuesPerThread);
// 根据线程数量,创建N个线程数组,Zeebe并没有用Java提供的线程池,而是自己用数组创建了N个线程养着.
threads = new ActorThread[numOfThreads];
// **************************************************************************************
// 对线程池里的线程进行初始化.
// **************************************************************************************
for (int t = 0; t < numOfThreads; t++) {
final String threadName = String.format("%s-%d", groupName, t);
// **************************************************************************************
// 调用子类(CpuThreadGroup)创建:TaskScheduler
// **************************************************************************************
final TaskScheduler taskScheduler = createTaskScheduler(tasks, builder);
final ActorThread thread =
builder
.getActorThreadFactory()
// 调用ActorThreadFactory创建线程.
.newThread(
threadName,
t,
this,
taskScheduler,
builder.getActorClock(),
builder.getActorTimerQueue());
threads[t] = thread;
}
}
// 预留给子类自己去实现.
protected TaskScheduler createTaskScheduler(final MultiLevelWorkstealingGroup tasks, final ActorSchedulerBuilder builder);
// *****************************************************************
// 从ActorThreadGroup类的方法签名上能看出来,这个类的主要目的在于提交task.
// *****************************************************************
public void submit(final ActorTask actorTask) {
final int level = getLevel(actorTask);
final ActorThread current = ActorThread.current();
if (current != null && current.getActorThreadGroup() == this) {
tasks.submit(actorTask, level, current.getRunnerId());
} else {
final int threadId = ThreadLocalRandom.current().nextInt(numOfThreads);
tasks.submit(actorTask, level, threadId);
threads[threadId].hintWorkAvailable();
}
}
// 调用所有线程的start方法
public void start() {
for (final ActorThread actorThread : threads) {
actorThread.start();
}
}
}
// **********************************************
// CpuThreadGroup的职责如下:
// 1. 实现ActorThreadGroup.createTaskScheduler方法,一看名字:PriorityScheduler就知道这是一个有优先级的调度器.
// 2. MultiLevelWorkstealingGroup内部实际是一个Queue来着的.
// 3. 设置调度的级别(HIGH:0 / REGULAR:1 / LOW:2 )
// **********************************************
public final class CpuThreadGroup extends ActorThreadGroup {
public CpuThreadGroup(final ActorSchedulerBuilder builder) {
super(
String.format("%s-%s", builder.getSchedulerName(), "zb-actors"),
builder.getCpuBoundActorThreadCount(),
builder.getPriorityQuotas().length,
builder);
}
@Override
protected TaskScheduler createTaskScheduler(final MultiLevelWorkstealingGroup tasks, final ActorSchedulerBuilder builder) {
// PriorityScheduler一看名字就知道这是一个有优先级的调度器,稍微看一下源码会发现:PriorityScheduler也是Zeebe自己封装的.
return new PriorityScheduler(tasks::getNextTask, builder.getPriorityQuotas());
} // end createTaskScheduler
@Override
protected int getLevel(final ActorTask actorTask) {
return actorTask.getPriority();
} // end getLevel
}
(6). ActorScheduler.initActorExecutor
private void initActorExecutor() {
if (actorExecutor == null) {
actorExecutor = new ActorExecutor(this);
}
} // end initActorExecutor
// **************************************************
// ActorExecutor相当于是Hold住两个线程组,是两个线程组的一个聚合类而已.
// 主要提供:启动线程池/提交任务/关闭线程池
// **************************************************
public final class ActorExecutor {
private final ActorThreadGroup cpuBoundThreads;
private final ActorThreadGroup ioBoundThreads;
public ActorExecutor(final ActorSchedulerBuilder builder) {
ioBoundThreads = builder.getIoBoundActorThreads();
cpuBoundThreads = builder.getCpuBoundActorThreads();
}
public ActorFuture<Void> submitCpuBound(final ActorTask task) {
return submitTask(task, cpuBoundThreads);
}
public ActorFuture<Void> submitIoBoundTask(final ActorTask task) {
return submitTask(task, ioBoundThreads);
}
private ActorFuture<Void> submitTask(final ActorTask task, final ActorThreadGroup threadGroup) {
if (task.getLifecyclePhase() != ActorLifecyclePhase.CLOSED) {
throw new IllegalStateException("ActorTask was already submitted!");
}
final ActorFuture<Void> startingFuture = task.onTaskScheduled(this, threadGroup);
threadGroup.submit(task);
return startingFuture;
}
public void start() {
cpuBoundThreads.start();
ioBoundThreads.start();
}
public CompletableFuture<Void> closeAsync() {
return CompletableFuture.allOf(ioBoundThreads.closeAsync(), cpuBoundThreads.closeAsync());
}
public ActorThreadGroup getCpuBoundThreads() {
return cpuBoundThreads;
}
public ActorThreadGroup getIoBoundThreads() {
return ioBoundThreads;
}
} // end ActorExecutor
(7). ActorScheduler构建器
public final class ActorScheduler implements AutoCloseable, ActorSchedulingService {
private final AtomicReference<SchedulerState> state = new AtomicReference<>();
private final ActorExecutor actorTaskExecutor;
public ActorScheduler(final ActorSchedulerBuilder builder) {
// 设置状态
state.set(SchedulerState.NEW);
// 包裹着两个ActorThreadGroup(CPU/IO)
actorTaskExecutor = builder.getActorExecutor();
} // end
}
(8). 总结
ActorScheduler的内部维护着两个线程组(CPU密集型和IO密集型),这两个线程组是Zeebe自己开发的,并没有使用JDK自带的,因为,这两个线程组内部还维护着优先级来着的,更深的代码在这里不具体看了,毕竟,Zeebe后面还需要剖析源码的地方还很多,不能在这上面浪费太多时间了.