(1). 概述
在这一篇,继续剖析完一个比较重要的环节,那就是应用任务,这是JRaft提供给业务开发人员的一个重要接口.
(2). NodeImpl.apply
apply方法最终仅仅是把task组装成:LogEntry,并,提交给:disruptor,反而是要看disruptor是如何实现的了.
public void apply(final Task task) {
// 验证下是否关闭了
if (this.shutdownLatch != null) {
ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), new Status(RaftError.ENODESHUTDOWN, "Node is shutting down."));
throw new IllegalStateException("Node is shutting down");
}
// 要求传入的task不能为空
Requires.requireNonNull(task, "Null task");
// **************************************************************************
// 把从task里拿出data,传入到:LogEntry
// **************************************************************************
final LogEntry entry = new LogEntry();
entry.setData(task.getData());
// 创建:EventTranslator的实现,实际就是给:LogEntryAndClosure赋值.
final EventTranslator<LogEntryAndClosure> translator = (event, sequence) -> {
event.reset();
event.done = task.getDone();
event.entry = entry;
event.expectedTerm = task.getExpectedTerm();
};
// ***************************************************************************
// 不论任务模式是阻塞或非阻塞,都是往:disruptor里传数据.
// ***************************************************************************
switch(this.options.getApplyTaskMode()) {
case Blocking:
// ****************************************************************************
// 提交任务给:disruptor
// ****************************************************************************
this.applyQueue.publishEvent(translator);
break;
case NonBlocking:
default:
// ****************************************************************************
// 提交任务给:disruptor
// ****************************************************************************
if (!this.applyQueue.tryPublishEvent(translator)) {
String errorMsg = "Node is busy, has too many tasks, queue is full and bufferSize="+ this.applyQueue.getBufferSize();
ThreadPoolsFactory.runClosureInThread(this.groupId, task.getDone(), new Status(RaftError.EBUSY, errorMsg));
LOG.warn("Node {} applyQueue is overload.", getNodeId());
this.metrics.recordTimes("apply-task-overload-times", 1);
if(task.getDone() == null) {
throw new OverloadException(errorMsg);
} // end if
} // end if
break;
} // end switch
} // end apply
(3). NodeImpl.init
在disruptor初始化时,会配置事件的处理,所以,我们主要剖析:LogEntryAndClosureHandler即可.
this.applyDisruptor.handleEventsWith(new LogEntryAndClosureHandler());
(4). LogEntryAndClosureHandler.onEvent
@Override
public void onEvent(final LogEntryAndClosure event, final long sequence, final boolean endOfBatch)
throws Exception {
// ************************************************************************************
// 如果,event里有配置shutdownLatch,代表任务是要立即执行的,否则,代表任务是要批量执行的.
// ************************************************************************************
if (event.shutdownLatch != null) { // 如果,event里的shutdownLatch不为空的情况下
// 如果,缓冲列表不为空
if (!this.tasks.isEmpty()) {
// 判断应用任务
executeApplyingTasks(this.tasks);
// 由于event是交还给disruptor,所以,要把event全部属性给清空,还要把缓存列表(tasks)给清空.
reset();
}
// 计数器进行计数
final int num = GLOBAL_NUM_NODES.decrementAndGet();
LOG.info("The number of active nodes decrement to {}.", num);
event.shutdownLatch.countDown();
return;
}
// 添加任务到任务列表
this.tasks.add(event);
// 如果任务达到了批(或者disruptor里列表结尾了),则批量应用任务
if (this.tasks.size() >= NodeImpl.this.raftOptions.getApplyBatch() || endOfBatch) {
executeApplyingTasks(this.tasks);
reset();
}
}
(5). NodeImpl.executeApplyingTasks
private void executeApplyingTasks(final List<LogEntryAndClosure> tasks) {
this.writeLock.lock();
try {
final int size = tasks.size();
// ******************************************************************************
// 针对Leader的处理.
// ******************************************************************************
if (this.state != State.STATE_LEADER) {
final Status st = new Status();
if (this.state != State.STATE_TRANSFERRING) {
st.setError(RaftError.EPERM, "Is not leader.");
} else {
st.setError(RaftError.EBUSY, "Is transferring leadership.");
}
LOG.debug("Node {} can't apply, status={}.", getNodeId(), st);
final List<Closure> dones = tasks.stream().map(ele -> ele.done)
.filter(Objects::nonNull).collect(Collectors.toList());
ThreadPoolsFactory.runInThread(this.groupId, () -> {
for (final Closure done : dones) {
done.run(st);
}
});
return;
}
final List<LogEntry> entries = new ArrayList<>(size);
for (int i = 0; i < size; i++) {
final LogEntryAndClosure task = tasks.get(i);
// 再次验证下:term是否相同.
if (task.expectedTerm != -1 && task.expectedTerm != this.currTerm) {
LOG.debug("Node {} can't apply task whose expectedTerm={} doesn't match currTerm={}.", getNodeId(),
task.expectedTerm, this.currTerm);
if (task.done != null) {
final Status st = new Status(RaftError.EPERM, "expected_term=%d doesn't match current_term=%d",
task.expectedTerm, this.currTerm);
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, st);
task.reset();
}
continue;
}
if (!this.ballotBox.appendPendingTask(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf(), task.done)) {
ThreadPoolsFactory.runClosureInThread(this.groupId, task.done, new Status(RaftError.EINTERNAL, "Fail to append task."));
task.reset();
continue;
}
// ***************************************************************************
// 为任务配置:term/type
// ***************************************************************************
// set task entry info before adding to list.
task.entry.getId().setTerm(this.currTerm);
task.entry.setType(EnumOutter.EntryType.ENTRY_TYPE_DATA);
// ***************************************************************************
// 添加任务到集合里
// ***************************************************************************
entries.add(task.entry);
task.reset();
}
// *****************************************************************************
// 交给LogManager追加日志,追加日志成功后,
// *****************************************************************************
this.logManager.appendEntries(entries, new LeaderStableClosure(entries));
// update conf.first
checkAndSetConfiguration(true);
} finally {
this.writeLock.unlock();
}
}
(6). LeaderStableClosure
class LeaderStableClosure extends LogManager.StableClosure {
public LeaderStableClosure(final List<LogEntry> entries) {
super(entries);
}
@Override
public void run(final Status status) {
if (status.isOk()) {
// ********************************************************************************
// 提交日志,前面分析过了,就不提了.
// ********************************************************************************
NodeImpl.this.ballotBox.commitAt(this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, NodeImpl.this.serverId);
} else {
LOG.error("Node {} append [{}, {}] failed, status={}.", getNodeId(), this.firstLogIndex, this.firstLogIndex + this.nEntries - 1, status);
}
} // end run
}
(7). 总结
apply方法的作用如下:
- 把task通过LogEntry进行包裹.
- 通过Disruptor提交任务.
- LogEntryAndClosureHandler接受处理任务.
- 在提交任务之前,验证是否为Leader/Term.
- 如果任务是需要立即给出反馈的(shutdownLatch),则,立即执行.
- 如果任务不是需要立即给出反馈的,则,会添加到集合中,进行批量提交.
- 最终任务会交给:LogManager进行任务的追加.
- 日志追加完毕后,会调用:LeaderStableClosure,而,LeaderStableClosure会委托给:BallotBox.
- BallotBox会要求”法定票数”的机器都成功之后,才能算成功.
- BallotBox会调用:FSMCaller的onCommitted(实际就是更新lastAppliedIndex).