(1). 概述
本来这一节,要剖析SnapshotExecutor,结果,我发现:SnapshotExecutor底层却依赖:FSMCaller,所以,先剖析:FSMCaller
(2). FSMCaller UML图
(3). FSMCaller接口签名
注意,注意,注意,细看FSMCaller的接口签名,你会发现:应用任务(apply)这个方法,在FSMCaller里是没有的.
public interface FSMCaller extends Lifecycle<FSMCallerOptions>, Describer {
interface LastAppliedLogIndexListener {
void onApplied(final long lastAppliedLogIndex);
} // end LastAppliedLogIndexListener
boolean isRunningOnFSMThread();
void addLastAppliedLogIndexListener(final LastAppliedLogIndexListener listener);
boolean onCommitted(final long committedIndex);
boolean onSnapshotLoad(final LoadSnapshotClosure done);
public void onSnapshotSaveSync(SaveSnapshotClosure done);
boolean onSnapshotSave(final SaveSnapshotClosure done);
boolean onLeaderStop(final Status status);
boolean onLeaderStart(final long term);
boolean onStartFollowing(final LeaderChangeContext ctx);
boolean onStopFollowing(final LeaderChangeContext ctx);
boolean onError(final RaftException error);
long getLastAppliedIndex();
long getLastCommittedIndex();
void join() throws InterruptedException;
}
(4). FSMCaller依赖关系
private volatile Thread fsmThread;
// ************************************************************************************
// 日志管理
// ************************************************************************************
private LogManager logManager;
// ************************************************************************************
// 状态机
// ************************************************************************************
private StateMachine fsm;
private ClosureQueue closureQueue;
// ************************************************************************************
// 最后提交给状态机的id
// ************************************************************************************
private final AtomicLong lastAppliedIndex;
// ************************************************************************************
// 日志最后提交的id
// ************************************************************************************
private final AtomicLong lastCommittedIndex;
// ************************************************************************************
// 最后提交给状态机的term
// ************************************************************************************
private long lastAppliedTerm;
private Closure afterShutdown;
// ************************************************************************************
// node里hold有网络通信
// ************************************************************************************
private NodeImpl node;
private volatile TaskType currTask;
private final AtomicLong applyingIndex;
private volatile RaftException error;
// ************************************************************************************
// 最后所有的业务操作,都转换成task,扔给了Disruptor
// ************************************************************************************
private Disruptor<ApplyTask> disruptor;
private RingBuffer<ApplyTask> taskQueue;
private volatile CountDownLatch shutdownLatch;
private NodeMetrics nodeMetrics;
// 扔给业务的一个应用监听器而已
private final CopyOnWriteArrayList<LastAppliedLogIndexListener> lastAppliedLogIndexListeners = new CopyOnWriteArrayList<>();
(5). FSMCaller构造器
public FSMCallerImpl() {
super();
// ************************************************************************************
// 配置当前的任务为空闲
// ************************************************************************************
this.currTask = TaskType.IDLE;
this.lastAppliedIndex = new AtomicLong(0);
this.applyingIndex = new AtomicLong(0);
this.lastCommittedIndex = new AtomicLong(0);
}
(6). FSMCaller初始化
FSMCaller初始化时,依赖如下对象:Node/LogManager/StateMachine
public boolean init(final FSMCallerOptions opts) {
this.logManager = opts.getLogManager();
this.fsm = opts.getFsm();
this.closureQueue = opts.getClosureQueue();
this.afterShutdown = opts.getAfterShutdown();
this.node = opts.getNode();
this.nodeMetrics = this.node.getNodeMetrics();
this.lastCommittedIndex.set(opts.getBootstrapId().getIndex());
this.lastAppliedIndex.set(opts.getBootstrapId().getIndex());
// 触发所有的:LastAppliedLogIndexListener事件
notifyLastAppliedIndexUpdated(this.lastAppliedIndex.get());
this.lastAppliedTerm = opts.getBootstrapId().getTerm();
// *******************************************************************************************
// 这部份的代码是:Disruptor的初始化(事件工厂/缓冲行大小/消费等待策略)
// *******************************************************************************************
this.disruptor = DisruptorBuilder.<ApplyTask> newInstance() //
.setEventFactory(new ApplyTaskFactory()) // 配置事件工厂
.setRingBufferSize(opts.getDisruptorBufferSize()) //
.setThreadFactory(new NamedThreadFactory("JRaft-FSMCaller-Disruptor-", true)) //
.setProducerType(ProducerType.MULTI) //
.setWaitStrategy(new BlockingWaitStrategy()) // 配置消费等待策略为阻塞
.build();
// *******************************************************************************************
// 配置Disruptor事件的消费类
// *******************************************************************************************
this.disruptor.handleEventsWith(new ApplyTaskHandler());
// 配置Disruptor异常处理
this.disruptor.setDefaultExceptionHandler(new LogExceptionHandler<Object>(getClass().getSimpleName()));
// 启动Disruptor,并Hold住这个缓存队列.
this.taskQueue = this.disruptor.start();
// 监控数据
if (this.nodeMetrics.getMetricRegistry() != null) {
this.nodeMetrics.getMetricRegistry().register("jraft-fsm-caller-disruptor",
new DisruptorMetricSet(this.taskQueue));
}
this.error = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_NONE);
LOG.info("Starts FSMCaller successfully.");
return true;
}
(7). TaskType
private enum TaskType {
IDLE, //
COMMITTED, //
SNAPSHOT_SAVE, //
SNAPSHOT_LOAD, //
LEADER_STOP, //
LEADER_START, //
START_FOLLOWING, //
STOP_FOLLOWING, //
SHUTDOWN, //
FLUSH, //
ERROR;
}
(8). ApplyTask
// 业务模型定义
private static class ApplyTask {
TaskType type;
// union fields
long committedIndex;
long term;
Status status;
LeaderChangeContext leaderChangeCtx;
Closure done;
CountDownLatch shutdownLatch;
public void reset() {
this.type = null;
this.committedIndex = 0;
this.term = 0;
this.status = null;
this.leaderChangeCtx = null;
this.done = null;
this.shutdownLatch = null;
}
}
(9). ApplyTaskFactory
// ********************************************************************************************
// 要求实现com.lmax.disruptor.EventFactory
// 固名思义:就是产生事件的工厂,这里的事件就是我们的业务模型
// ********************************************************************************************
private static class ApplyTaskFactory implements com.lmax.disruptor.EventFactory<ApplyTask> {
@Override
public ApplyTask newInstance() {
return new ApplyTask();
}
}
(10). ApplyTaskHandler
// ********************************************************************************************
// 消费者消费时的回调函数
// 要求实现:com.lmax.disruptor.EventHandler
// ********************************************************************************************
private class ApplyTaskHandler implements com.lmax.disruptor.EventHandler<ApplyTask> {
boolean firstRun = true;
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;
// ********************************************************************************************
// Disruptor回调函数
// ********************************************************************************************
@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
setFsmThread();
// ********************************************************************************************
// 委派给了:runApplyTask方法
// 这里忽略,留到下一篇进行分析
// ********************************************************************************************
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
private void setFsmThread() {
if (firstRun) {
fsmThread = Thread.currentThread();
firstRun = false;
}
}
}
(11). FSMCallerImpl.enqueueTask
private boolean enqueueTask(final EventTranslator<ApplyTask> tpl) {
// ... ...
// *****************************************************************************
// 向Disruptor发布事件
// *****************************************************************************
this.taskQueue.publishEvent(tpl);
return true;
}
(12). FSMCallerImpl常用方法分析
public boolean onSnapshotLoad(final LoadSnapshotClosure done) {
// ****************************************************************************
// 对于Lambda表达式,你可以理解成是对com.lmax.disruptor.EventTranslator的实现.如果难理解,你就理解成:
// new EventTranslator<ApplyTask>(){
// void translateTo(ApplyTask task, long sequence) {
// task.type = TaskType.SNAPSHOT_LOAD;
// task.done = done;
// }
// }
// 到最后,始终是委派给上了上面分析的:enqueueTask方法,而enqueueTask方法是往Disruptor里发布事件而已
// ****************************************************************************
return enqueueTask((task, sequence) -> {
task.type = TaskType.SNAPSHOT_LOAD;
task.done = done;
});
} // end
public boolean onSnapshotSave(final SaveSnapshotClosure done) {
// ****************************************************************************
// 对于Lambda表达式,你可以理解成是对com.lmax.disruptor.EventTranslator的实现.如果难理解,你就理解成:
// new EventTranslator<ApplyTask>(){
// void translateTo(ApplyTask task, long sequence) {
// task.type = TaskType.SNAPSHOT_SAVE;
// task.done = done;
// }
// }
// 到最后,始终是委派给上了上面分析的:enqueueTask方法,而enqueueTask方法是往Disruptor里发布事件而已
// ****************************************************************************
return enqueueTask((task, sequence) -> {
task.type = TaskType.SNAPSHOT_SAVE;
task.done = done;
});
} // end
public boolean onLeaderStop(final Status status) {
// ****************************************************************************
// 对于Lambda表达式,你可以理解成是对com.lmax.disruptor.EventTranslator的实现.如果难理解,你就理解成:
// new EventTranslator<ApplyTask>(){
// void translateTo(ApplyTask task, long sequence) {
// task.type = TaskType.LEADER_STOP;
// task.status = new Status(status);
// }
// }
// 到最后,始终是委派给上了上面分析的:enqueueTask方法,而enqueueTask方法是往Disruptor里发布事件而已
// ****************************************************************************
return enqueueTask((task, sequence) -> {
task.type = TaskType.LEADER_STOP;
task.status = new Status(status);
});
} // end
public boolean onLeaderStart(final long term) {
// ****************************************************************************
// 对于Lambda表达式,你可以理解成是对com.lmax.disruptor.EventTranslator的实现.如果难理解,你就理解成:
// new EventTranslator<ApplyTask>(){
// void translateTo(ApplyTask task, long sequence) {
// task.type = TaskType.LEADER_START;
// task.term = term;
// }
// }
// 到最后,始终是委派给上了上面分析的:enqueueTask方法,而enqueueTask方法是往Disruptor里发布事件而已
// ****************************************************************************
return enqueueTask((task, sequence) -> {
task.type = TaskType.LEADER_START;
task.term = term;
});
} // end
public boolean onStartFollowing(final LeaderChangeContext ctx) {
// ****************************************************************************
// 对于Lambda表达式,你可以理解成是对com.lmax.disruptor.EventTranslator的实现.如果难理解,你就理解成:
// new EventTranslator<ApplyTask>(){
// void translateTo(ApplyTask task, long sequence) {
// task.type = TaskType.START_FOLLOWING;
// task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
// }
// }
// 到最后,始终是委派给上了上面分析的:enqueueTask方法,而enqueueTask方法是往Disruptor里发布事件而已
// ****************************************************************************
return enqueueTask((task, sequence) -> {
task.type = TaskType.START_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
});
} // end
public boolean onStopFollowing(final LeaderChangeContext ctx) {
// ****************************************************************************
// 对于Lambda表达式,你可以理解成是对com.lmax.disruptor.EventTranslator的实现.如果难理解,你就理解成:
// new EventTranslator<ApplyTask>(){
// void translateTo(ApplyTask task, long sequence) {
// task.type = TaskType.STOP_FOLLOWING;
// task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
// }
// }
// 到最后,始终是委派给上了上面分析的:enqueueTask方法,而enqueueTask方法是往Disruptor里发布事件而已
// ****************************************************************************
return enqueueTask((task, sequence) -> {
task.type = TaskType.STOP_FOLLOWING;
task.leaderChangeCtx = new LeaderChangeContext(ctx.getLeaderId(), ctx.getTerm(), ctx.getStatus());
});
} // end
(13). 总结
针对FSMCaller的大量API,实际底层最后是转换成:ApplyTask,并提交给:Disruptor异步处理.