(1). 概述
前面剖析了Node的启动方法,会判断是否有其它节点,如果有其它节点的情况下,会判断是否要进行选举,这一小篇,剖析下定时任务中的选举处理.
(2). Node预投票注意属性
// 预投票处理
private final Ballot prevVoteCtx = new Ballot();
// Client
private RaftClientService rpcService;
(3). Node.init
// ... ...
// 定义选举定时任务
name = "JRaft-ElectionTimer-" + suffix;
this.electionTimer = new RepeatedTimer(
name,
this.options.getElectionTimeoutMs(),
TIMER_FACTORY.getElectionTimer(this.options.isSharedElectionTimer(), name)
) {
@Override
protected void onTrigger() {
// ***********************************************************************
// 处理投票超时
// ***********************************************************************
handleElectionTimeout();
}
@Override
protected int adjustTimeout(final int timeoutMs) {
return randomTimeout(timeoutMs);
}
};
// ... ...
// 在Node初始化时,就把状态设置成:STATE_FOLLOWER.
// set state to follower
this.state = State.STATE_FOLLOWER;
(4). Node.handleElectionTimeout
private void handleElectionTimeout() {
// ... ...
// RAFT要求,节点启动时是FOLLOWER
if (this.state != State.STATE_FOLLOWER) {
return;
} // end if
// ... ...
// *************************************************************************
// 准备投票
// *************************************************************************
preVote();
// ... ...
}
(5). Node.preVote
private void preVote() {
// ... ...
// *******************************************************************************
// 前面剖析过:Ballot会根据节点的数量,计算出法定票数.
// *******************************************************************************
// Ballot prevVoteCtx = new Ballot();
// this.conf.getConf() = 127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083
this.prevVoteCtx.init(this.conf.getConf(), this.conf.isStable() ? null : this.conf.getOldConf());
// ************************************************************************************
// 一个节点一个节点的连接,发起预投票请求
// ************************************************************************************
for (final PeerId peer : this.conf.listPeers()) {
if (peer.equals(this.serverId)) {
continue;
}
// ************************************************************************************
// PingRequest(在这里,我忽略掉,不做分析了)
// ************************************************************************************
if (!this.rpcService.connect(peer.getEndpoint())) {
LOG.warn("Node {} channel init failed, address={}.", getNodeId(), peer.getEndpoint());
continue;
}
// 创建回调处理,
final OnPreVoteRpcDone done = new OnPreVoteRpcDone(peer, this.currTerm);
done.request = RequestVoteRequest.newBuilder() //
.setPreVote(true) // it's a pre-vote request.
.setGroupId(this.groupId) //
.setServerId(this.serverId.toString()) //
.setPeerId(peer.toString()) //
.setTerm(this.currTerm + 1) // next term
.setLastLogIndex(lastLogId.getIndex()) //
.setLastLogTerm(lastLogId.getTerm()) //
.build();
// ************************************************************************************
// 向服务器发起预投票请求
// ************************************************************************************
this.rpcService.preVote(peer.getEndpoint(), done.request, done);
} // end for
this.prevVoteCtx.grant(this.serverId);
if (this.prevVoteCtx.isGranted()) { // 过半的节点预投票成功的情况下,进入投票阶段
// ... ...
} // end if
// ... ...
}
(6). Node.handlePreVoteResponse
public void handlePreVoteResponse(final PeerId peerId, final long term, final RequestVoteResponse response) {
boolean doUnlock = true;
this.writeLock.lock();
try {
if (this.state != State.STATE_FOLLOWER) { // 先验证,在投票之前,本节点的状态是否为:FOLLOWER
LOG.warn("Node {} received invalid PreVoteResponse from {}, state not in STATE_FOLLOWER but {}.",
getNodeId(), peerId, this.state);
return;
}
if (term != this.currTerm) { // 验证任期
LOG.warn("Node {} received invalid PreVoteResponse from {}, term={}, currTerm={}.", getNodeId(),
peerId, term, this.currTerm);
return;
}
// 如果其它节点(参与投票的节点)的任期,比当前节点的任期还要大
if (response.getTerm() > this.currTerm) {
LOG.warn("Node {} received invalid PreVoteResponse from {}, term {}, expect={}.", getNodeId(), peerId,
response.getTerm(), this.currTerm);
stepDown(response.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE,
"Raft node receives higher term pre_vote_response."));
return;
}
LOG.info("Node {} received PreVoteResponse from {}, term={}, granted={}.", getNodeId(), peerId, response.getTerm(), response.getGranted());
// check granted quorum?
if (response.getGranted()) { // 参与投票的节点,投了一票给本节点
// *******************************************************************************
// 接受,其余节点(Peer)处理预投票请求后,返回的信息后,进行预投票的判断,判断是否达到法定票数.
// *******************************************************************************
this.prevVoteCtx.grant(peerId);
if (this.prevVoteCtx.isGranted()) { // 只有达到了法定票数,才进入真正的选举.
doUnlock = false;
// *****************************************************************************
// 开始选举,这里的内容,放到后面剖析.
// *****************************************************************************
electSelf();
}
}
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
(7). 总结
Node节点在初始化时会做以下几件事情:
- 本节点(Peer)为: 127.0.0.1:8081
- 其余节点(Peer)为: 127.0.0.1:8082,127.0.0.1:8083
- 从参数中,拿出所有的Peer(127.0.0.1:8081,127.0.0.1:8082,127.0.0.1:8083)
- 创建Ballot类,传入所有的Peer后,能自动计算出法定票数(quorum),注意:Ballot是Node的实例属性
- 遍历所有的Peer节点,发起心跳请求建立起连接
- 遍历所有的Peer节点,发起预投票请求(preVote)
- 其余节点(比如:127.0.0.1:8082)处理预投票请求(这个下一篇分析)
- 本节点处理投票后的结果(OnPreVoteRpcDone),实际就是调用:Ballot.grant方法,对法定票数(quorum)进行自减,当:法定票数为零时,代表多数节点已经确认通过.