(1). 概述
在上一篇对FSMCaller的初始化和基本方法进行了剖析,发现,大量的API操作最终是往Disruptor里发布事件而已,所以,Disruptor的消费(ApplyTaskHandler)是这一篇要分析的.
(2). ApplyTaskHandler
private class ApplyTaskHandler implements EventHandler<ApplyTask> {
boolean firstRun = true;
// max committed index in current batch, reset to -1 every batch
private long maxCommittedIndex = -1;
@Override
public void onEvent(final ApplyTask event, final long sequence, final boolean endOfBatch) throws Exception {
setFsmThread();
// ******************************************************************************************
// 委派给了runApplyTask
// ******************************************************************************************
this.maxCommittedIndex = runApplyTask(event, this.maxCommittedIndex, endOfBatch);
}
private void setFsmThread() {
if (firstRun) {
fsmThread = Thread.currentThread();
firstRun = false;
}
}
}
(3). FSMCaller.runApplyTask
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
if (task.type == TaskType.COMMITTED) {
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
task.reset();
} else {
if (maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex);
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
final long startMs = Utils.monotonicMs();
try {
switch (task.type) {
case COMMITTED:
Requires.requireTrue(false, "Impossible");
break;
case SNAPSHOT_SAVE:
// ********************************************************************
// 快照保存
// ********************************************************************
onSnapshotSaveSync(task);
break;
case SNAPSHOT_LOAD:
// ********************************************************************
// 快照加载
// ********************************************************************
this.currTask = TaskType.SNAPSHOT_LOAD;
if (passByStatus(task.done)) {
doSnapshotLoad((LoadSnapshotClosure) task.done);
}
break;
case LEADER_STOP:
this.currTask = TaskType.LEADER_STOP;
doLeaderStop(task.status);
break;
case LEADER_START:
this.currTask = TaskType.LEADER_START;
doLeaderStart(task.term);
break;
case START_FOLLOWING:
this.currTask = TaskType.START_FOLLOWING;
doStartFollowing(task.leaderChangeCtx);
break;
case STOP_FOLLOWING:
this.currTask = TaskType.STOP_FOLLOWING;
doStopFollowing(task.leaderChangeCtx);
break;
case ERROR:
this.currTask = TaskType.ERROR;
doOnError((OnErrorClosure) task.done);
break;
case IDLE:
Requires.requireTrue(false, "Can't reach here");
break;
case SHUTDOWN:
this.currTask = TaskType.SHUTDOWN;
shutdown = task.shutdownLatch;
break;
case FLUSH:
this.currTask = TaskType.FLUSH;
shutdown = task.shutdownLatch;
break;
}
} finally {
this.nodeMetrics.recordLatency(task.type.metricName(), Utils.monotonicMs() - startMs);
// 重置一个业务模型
task.reset();
}
}
try {
if (endOfBatch && maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
doCommitted(maxCommittedIndex);
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
this.currTask = TaskType.IDLE;
return maxCommittedIndex;
} finally {
if (shutdown != null) {
shutdown.countDown();
}
}
}
(4). FSMCaller.onSnapshotSaveSync
private void onSnapshotSaveSync(final ApplyTask task) {
this.currTask = TaskType.SNAPSHOT_SAVE;
if (passByStatus(task.done)) { // 验证下error是否有值,如果有内容的话就不能继续往下走了
// *********************************************************************
// FSMCaller.doSnapshotSave
// *********************************************************************
doSnapshotSave((SaveSnapshotClosure) task.done);
}
}
(5). FSMCaller.doSnapshotSave
private void doSnapshotSave(final SaveSnapshotClosure done) {
Requires.requireNonNull(done, "SaveSnapshotClosure is null");
final long lastAppliedIndex = this.lastAppliedIndex.get();
// 创建快照存储元数据(SnapshotMeta)
final RaftOutter.SnapshotMeta.Builder metaBuilder = RaftOutter.SnapshotMeta.newBuilder() //
.setLastIncludedIndex(lastAppliedIndex) //
.setLastIncludedTerm(this.lastAppliedTerm);
final ConfigurationEntry confEntry = this.logManager.getConfiguration(lastAppliedIndex);
if (confEntry == null || confEntry.isEmpty()) {
LOG.error("Empty conf entry for lastAppliedIndex={}", lastAppliedIndex);
ThreadPoolsFactory.runClosureInThread(getNode().getGroupId(), done, new Status(RaftError.EINVAL,
"Empty conf entry for lastAppliedIndex=%s", lastAppliedIndex));
return;
}
// 拷贝相关属性
for (final PeerId peer : confEntry.getConf()) {
metaBuilder.addPeers(peer.toString());
}
for (final PeerId peer : confEntry.getConf().getLearners()) {
metaBuilder.addLearners(peer.toString());
}
if (confEntry.getOldConf() != null) {
for (final PeerId peer : confEntry.getOldConf()) {
metaBuilder.addOldPeers(peer.toString());
}
for (final PeerId peer : confEntry.getOldConf().getLearners()) {
metaBuilder.addOldLearners(peer.toString());
}
}
// ***********************************************************************
// 从外部传入的:SaveSnapshotClosure里的start方法,获得:SnapshotWriter
// ***********************************************************************
final SnapshotWriter writer = done.start(metaBuilder.build());
if (writer == null) {
done.run(new Status(RaftError.EINVAL, "snapshot_storage create SnapshotWriter failed"));
return;
}
// *********************************************************************
// 委托给:StateMachine的onSnapshotSave方法
// 状态机的方法,我留到下一篇去剖析.
// *********************************************************************
this.fsm.onSnapshotSave(writer, done);
}
(6). FSMCaller.doSnapshotLoad
private void doSnapshotLoad(final LoadSnapshotClosure done) {
Requires.requireNonNull(done, "LoadSnapshotClosure is null");
// ******************************************************************************
// 从外部传入的:LoadSnapshotClosure.start方法中,获得:SnapshotReader
// ******************************************************************************
final SnapshotReader reader = done.start();
if (reader == null) { // 验证一把
done.run(new Status(RaftError.EINVAL, "open SnapshotReader failed"));
return;
}
// 通过:SnapshotReader读取元数据(SnapshotMeta)
final RaftOutter.SnapshotMeta meta = reader.load();
if (meta == null) { // 验证一把
done.run(new Status(RaftError.EINVAL, "SnapshotReader load meta failed"));
if (reader.getRaftError() == RaftError.EIO) {
final RaftException err = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_SNAPSHOT, RaftError.EIO,
"Fail to load snapshot meta");
setError(err);
}
return;
}
// 验证
// 提交给状态机的日志不能比快照的日志信息还要新,如果是这样的话,代表快照内容是旧的,不应该加载
final LogId lastAppliedId = new LogId(this.lastAppliedIndex.get(), this.lastAppliedTerm);
final LogId snapshotId = new LogId(meta.getLastIncludedIndex(), meta.getLastIncludedTerm());
if (lastAppliedId.compareTo(snapshotId) > 0) {
done.run(new Status(
RaftError.ESTALE,
"Loading a stale snapshot last_applied_index=%d last_applied_term=%d snapshot_index=%d snapshot_term=%d",
lastAppliedId.getIndex(), lastAppliedId.getTerm(), snapshotId.getIndex(), snapshotId.getTerm()));
return;
}
// ******************************************************************************************
// 委托给:StateMachine.onSnapshotLoad
// StateMachine的内容,下一篇再分析.
// ******************************************************************************************
if (!this.fsm.onSnapshotLoad(reader)) {
done.run(new Status(-1, "StateMachine onSnapshotLoad failed"));
final RaftException e = new RaftException(EnumOutter.ErrorType.ERROR_TYPE_STATE_MACHINE,
RaftError.ESTATEMACHINE, "StateMachine onSnapshotLoad failed");
setError(e);
return;
}
if (meta.getOldPeersCount() == 0) {
// Joint stage is not supposed to be noticeable by end users.
final Configuration conf = new Configuration();
for (int i = 0, size = meta.getPeersCount(); i < size; i++) {
final PeerId peer = new PeerId();
Requires.requireTrue(peer.parse(meta.getPeers(i)), "Parse peer failed");
conf.addPeer(peer);
}
this.fsm.onConfigurationCommitted(conf);
}
this.lastCommittedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedIndex.set(meta.getLastIncludedIndex());
this.lastAppliedTerm = meta.getLastIncludedTerm();
done.run(Status.OK());
}
(7). 总结
在这一篇,对FSMCaller里的:onSnapshotSaveSync/onSnapshotLoad进行了剖析,内容比较简单,最终是委托给了:StateMachine进行操作,可以理解:FSMCaller是对StateMachine的包装,包括一些验证,以及记录一些信息(lastCommittedIndex/lastAppliedIndex/lastAppliedTerm).