(1). 概述
一直都好奇,FSMCaller与StateMachine的onApply方法是如何交互的,曾还一度怀疑可能不存在交互,这一小篇主要剖析:FSMCaller与StateMachine的onApply方法交互.
(2). FSMCallerTest
@RunWith(value = MockitoJUnitRunner.class)
public class FSMCallerTest {
private static final String GROUP_ID = "group001";
private FSMCallerImpl fsmCaller;
@Mock
private NodeImpl node;
@Mock
private StateMachine fsm;
@Mock
private LogManager logManager;
private ClosureQueueImpl closureQueue;
@Before
public void setup() {
this.fsmCaller = new FSMCallerImpl();
this.closureQueue = new ClosureQueueImpl(GROUP_ID);
final FSMCallerOptions opts = new FSMCallerOptions();
Mockito.when(this.node.getNodeMetrics()).thenReturn(new NodeMetrics(false));
Mockito.when(this.node.getGroupId()).thenReturn(GROUP_ID);
opts.setNode(this.node);
opts.setFsm(this.fsm);
opts.setLogManager(this.logManager);
opts.setBootstrapId(new LogId(10, 1));
opts.setClosureQueue(this.closureQueue);
assertTrue(this.fsmCaller.init(opts));
} // end setup
@Test
public void testOnCommitted() throws Exception {
final LogEntry log = new LogEntry(EntryType.ENTRY_TYPE_DATA);
log.getId().setIndex(11);
log.getId().setTerm(1);
Mockito.when(this.logManager.getTerm(11)).thenReturn(1L);
Mockito.when(this.logManager.getEntry(11)).thenReturn(log);
final ArgumentCaptor<Iterator> itArg = ArgumentCaptor.forClass(Iterator.class);
// ******************************************************************
// 提交committedIndex
// ******************************************************************
assertTrue(this.fsmCaller.onCommitted(11));
this.fsmCaller.flush();
Thread.sleep(30);
} // end testOnCommitted
}
(3). FSMCaller.onCommitted
public boolean onCommitted(final long committedIndex) {
// 在前面剖析过了,是扔给了:Disruptor
return enqueueTask((task, sequence) -> {
// 为Task配置Type和committedIndex,
task.type = TaskType.COMMITTED;
task.committedIndex = committedIndex;
});
}
(4). FSMCaller.runApplyTask
private long runApplyTask(final ApplyTask task, long maxCommittedIndex, final boolean endOfBatch) {
CountDownLatch shutdown = null;
if (task.type == TaskType.COMMITTED) {
// maxCommittedIndex 为上一次提交的index,不过,每一次处理完后,都把:maxCommittedIndex还原成了-1来着的.
if (task.committedIndex > maxCommittedIndex) {
maxCommittedIndex = task.committedIndex;
}
task.reset();
} else {
// ... ...
}
try {
if (endOfBatch && maxCommittedIndex >= 0) {
this.currTask = TaskType.COMMITTED;
// ***************************************************************************
// 开始提交
// ***************************************************************************
doCommitted(maxCommittedIndex);
// ***************************************************************************
// 重置:maxCommittedIndex
// ***************************************************************************
maxCommittedIndex = -1L; // reset maxCommittedIndex
}
this.currTask = TaskType.IDLE;
return maxCommittedIndex;
} finally {
if (shutdown != null) {
shutdown.countDown();
}
} // end try
}
(5). FSMCaller.doCommitted
private void doCommitted(final long committedIndex) {
if (!this.error.getStatus().isOk()) {
return;
}
// 10
final long lastAppliedIndex = this.lastAppliedIndex.get();
// We can tolerate the disorder of committed_index
// 10 >= 11
if (lastAppliedIndex >= committedIndex) { // false
return;
}
// 把committedIndex改成:11
this.lastCommittedIndex.set(committedIndex);
final long startMs = Utils.monotonicMs();
try {
final List<Closure> closures = new ArrayList<>();
final List<TaskClosure> taskClosures = new ArrayList<>();
// 弹出数据
final long firstClosureIndex = this.closureQueue.popClosureUntil(committedIndex, closures, taskClosures);
// Calls TaskClosure#onCommitted if necessary
onTaskCommitted(taskClosures);
Requires.requireTrue(firstClosureIndex >= 0, "Invalid firstClosureIndex");
// IteratorImpl初始化时,就会通过LogManager加载lastAppliedIndex处的下一条数据来着.
final IteratorImpl iterImpl = new IteratorImpl(this, this.logManager, closures, firstClosureIndex, lastAppliedIndex, committedIndex, this.applyingIndex);
// **********************************************************
// 注意: 游标并未移动来着的,仅仅只是验证下是否有数据而已.
// **********************************************************
while (iterImpl.isGood()) {
// logEntry = lastAppliedIndex + 1
final LogEntry logEntry = iterImpl.entry();
// ******************************************************************************
// 在这里仅处理Entry类型为:ENTRY_TYPE_CONFIGURATION的日志,游标才会移动,其余情况下游标是不会移动的
// ******************************************************************************
if (logEntry.getType() != EnumOutter.EntryType.ENTRY_TYPE_DATA) { // 如果日志内容类型不是数据
if (logEntry.getType() == EnumOutter.EntryType.ENTRY_TYPE_CONFIGURATION) { // 如果日志类型为配置
if (logEntry.getOldPeers() != null && !logEntry.getOldPeers().isEmpty()) {
// Joint stage is not supposed to be noticeable by end users.
// ******************************************************************************
// 调用状态机的:onConfigurationCommitted
// ******************************************************************************
this.fsm.onConfigurationCommitted(new Configuration(iterImpl.entry().getPeers()));
}
}
if (iterImpl.done() != null) {
iterImpl.done().run(Status.OK());
}
// ******************************************************************************
// ENTRY_TYPE_CONFIGURATION才会移动游标
// ******************************************************************************
iterImpl.next();
continue;
}
// ******************************************************************************
// 调用状态机apply任务
// ******************************************************************************
// Apply data task to user state machine
doApplyTasks(iterImpl);
}
if (iterImpl.hasError()) {
setError(iterImpl.getError());
iterImpl.runTheRestClosureWithError();
}
// 12 - 1 = 11
long lastIndex = iterImpl.getIndex() - 1;
// 1
final long lastTerm = this.logManager.getTerm(lastIndex);
// ******************************************************************************
// 设置lastApplied
// ******************************************************************************
setLastApplied(lastIndex, lastTerm);
} finally {
this.nodeMetrics.recordLatency("fsm-commit", Utils.monotonicMs() - startMs);
}
}
(6). FSMCaller.doApplyTasks
private void doApplyTasks(final IteratorImpl iterImpl) {
final IteratorWrapper iter = new IteratorWrapper(iterImpl);
final long startApplyMs = Utils.monotonicMs();
final long startIndex = iter.getIndex();
try {
// *******************************************************************************
// 回调业务自定义的: StateMachine.onApply
// *******************************************************************************
this.fsm.onApply(iter);
} finally {
this.nodeMetrics.recordLatency("fsm-apply-tasks", Utils.monotonicMs() - startApplyMs);
this.nodeMetrics.recordSize("fsm-apply-tasks-count", iter.getIndex() - startIndex);
}
// 如果迭代器里还有数据,则进行日志提示
if (iter.hasNext()) {
LOG.error("Iterator is still valid, did you return before iterator reached the end?");
}
// 尝试移动到下一个日志(这里的next是被JRAFT给重写过了的)
// Try move to next in case that we pass the same log twice.
iter.next();
}
(7). FSMCaller.setLastApplied
void setLastApplied(long lastIndex, final long lastTerm) {
// lastIndex = 11
// lastTerm = 1
final LogId lastAppliedId = new LogId(lastIndex, lastTerm);
this.lastAppliedIndex.set(lastIndex);
this.lastAppliedTerm = lastTerm;
this.logManager.setAppliedId(lastAppliedId);
// 回调:LastAppliedLogIndexListener
notifyLastAppliedIndexUpdated(lastIndex);
}
(8). 总结
FSMCaller与StateMachine是什么关系呢?
FSMCaller是一个中控管理类,可以理解,StateMachine是FSMCaller留出来的一个接口(钩子),只是,这个接口,要求开发员自己必须要实现而已.