(1). 概述
在前面对BallotBox追加任务进行了剖析,在这一部份,主要剖析日志确认这一部份,RAFT要求,过半加一的节点确认后,这条日志才能真正的成功,才能推动commitIndex和applyIndex.
(2). BallotBoxTest
@Test
public void testCommitAt() {
// *********************************************************
// 这一部份,在上一篇有剖析过了,就不粘源码了.
// *********************************************************
// ... ...
// ********************************************************************
// 注意,注意,注意:
// 由于是三个节点,所以,需要2个节点确认,才会真正的确认来着的
// ********************************************************************
// ********************************************************************
// 8081 节点确认日志复制完成
// ********************************************************************
assertTrue(this.box.commitAt(1, 1, new PeerId("localhost", 8081)));
assertEquals(0, this.box.getLastCommittedIndex());
assertEquals(1, this.box.getPendingIndex());
// ********************************************************************
// 8082 节点确认日志复制完成
// ********************************************************************
assertTrue(this.box.commitAt(1, 1, new PeerId("localhost", 8082)));
assertEquals(1, this.box.getLastCommittedIndex());
assertEquals(2, this.box.getPendingIndex());
}
(3). BallotBox.commitAt
public boolean commitAt(final long firstLogIndex, final long lastLogIndex, final PeerId peer) {
// TODO use lock-free algorithm here?
final long stamp = this.stampedLock.writeLock();
// 定义临时:lastCommittedIndex
long lastCommittedIndex = 0;
try {
// commitAt的目的是处理队列里的数据来着,pendingIndex=0,代表队列为空,所以,不需要往下走了.
if (this.pendingIndex == 0) {
return false;
}
if (lastLogIndex < this.pendingIndex) {
return true;
}
if (lastLogIndex >= this.pendingIndex + this.pendingMetaQueue.size()) {
throw new ArrayIndexOutOfBoundsException();
}
final long startAt = Math.max(this.pendingIndex, firstLogIndex);
Ballot.PosHint hint = new Ballot.PosHint();
for (long logIndex = startAt; logIndex <= lastLogIndex; logIndex++) {
// *********************************************************************
// 这是个重点:Ballot对象是多线程共享的,多个节点反馈信息后,回调commitAt时,弹出的都是同一实例对象.
// *********************************************************************
final Ballot bl = this.pendingMetaQueue.get((int) (logIndex - this.pendingIndex));
// *********************************************************************
// 进行投票
// *********************************************************************
hint = bl.grant(peer, hint);
// *********************************************************************
// 验证投票(this.quorum <= 0 && this.oldQuorum <= 0;)
// 要求投票后,quorum和oldQuorum结果必须是零,才对临时lastCommittedIndex进行赋值.
// *********************************************************************
if (bl.isGranted()) {
// 这时候的:临时lastCommittedIndex切换成了:logIndex
lastCommittedIndex = logIndex;
}
}
// 当投票后,quorum不为零的情况下,临时lastCommittedIndex是没有变来着的,所以,这时候,不继续往下走了.
if (lastCommittedIndex == 0) {
return true;
}
// *******************************************************************************
// 过半以上的节点确认之后(临时lastCommittedIndex>0),从队列里拿出稳除数据
// 代表这条数据,已经多数节点复制成功了的
// *******************************************************************************
this.pendingMetaQueue.removeFromFirst((int) (lastCommittedIndex - this.pendingIndex) + 1);
LOG.debug("Committed log fromIndex={}, toIndex={}.", this.pendingIndex, lastCommittedIndex);
// *************************************************************
// pendingIndex往前推进
// *************************************************************
this.pendingIndex = lastCommittedIndex + 1;
this.lastCommittedIndex = lastCommittedIndex;
} finally {
this.stampedLock.unlockWrite(stamp);
}
// *************************************************************
// 这部份,我们前面剖析过了:
// 委派给FSMCaller进行提交,最终会调用到:StateMachine.onApply
// *************************************************************
this.waiter.onCommitted(lastCommittedIndex);
return true;
}
(4). Ballot.PosHint.grant
// Ballot 摘抄属性
private final List<UnfoundPeerId> peers = new ArrayList<>();
private int quorum;
private final List<UnfoundPeerId> oldPeers = new ArrayList<>();
private int oldQuorum;
public PosHint grant(final PeerId peerId, final PosHint hint) {
// ***************************************************************
// peers处理
// 遍历peers里所有的节点
// ***************************************************************
UnfoundPeerId peer = findPeer(peerId, this.peers, hint.pos0);
if (peer != null) {
if (!peer.found) {
// 为节点的:found赋值
peer.found = true;
// ********************************************************************
// 重点: 对投票数进行递减
// ********************************************************************
this.quorum--;
}
hint.pos0 = peer.index;
} else {
hint.pos0 = -1;
}
// ***************************************************************
// oldPerrs处理
// 遍历oldPerrs里所有的节点
// ***************************************************************
if (this.oldPeers.isEmpty()) {
hint.pos1 = -1;
return hint;
}
peer = findPeer(peerId, this.oldPeers, hint.pos1);
if (peer != null) {
if (!peer.found) {
peer.found = true;
// ********************************************************************
// 重点: 对投票数进行递减
// ********************************************************************
this.oldQuorum--;
}
hint.pos1 = peer.index;
} else {
hint.pos1 = -1;
}
return hint;
}
(5). 总结
我们可以理解成:BallotBox主要用于缓存日志请求,然后,等待过半加一的节点确认日志后,从缓存中剔除日志.