(1). 概述
前面,剖析了,发起投票请求,这一篇主要剖析,接受投票请求.
(2). NodeRequestProcessor.processRequest
public Message processRequest(final T request, final RpcRequestClosure done) {
final PeerId peer = new PeerId();
final String peerIdStr = getPeerId(request);
if (peer.parse(peerIdStr)) {
final String groupId = getGroupId(request);
// ******************************************************************
// 根据groupId和peer取出Node对象
// ******************************************************************
final Node node = NodeManager.getInstance().get(groupId, peer);
if (node != null) {
// ******************************************************************
// 把node强制转换成:RaftServerService
// ******************************************************************
return processRequest0((RaftServerService) node, request, done);
} else {
// ... ...
}
} else {
// ... ...
}
}
(3). RequestVoteRequestProcessor.processRequest0
public Message processRequest0(final RaftServerService service, final RequestVoteRequest request,
final RpcRequestClosure done) {
if (request.getPreVote()) { // 针对预投票处理
return service.handlePreVoteRequest(request);
} else {
// *****************************************************************
// 投票处理
// *****************************************************************
return service.handleRequestVoteRequest(request);
}
}
(4). NodeImpl.handleRequestVoteRequest
public Message handleRequestVoteRequest(final RequestVoteRequest request) {
boolean doUnlock = true;
this.writeLock.lock();
try {
if (!this.state.isActive()) {
LOG.warn("Node {} is not in active state, currTerm={}.", getNodeId(), this.currTerm);
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Node %s is not in active state, state %s.", getNodeId(), this.state.name());
}
final PeerId candidateId = new PeerId();
if (!candidateId.parse(request.getServerId())) {
LOG.warn("Node {} received RequestVoteRequest from {} serverId bad format.", getNodeId(), request.getServerId());
return RpcFactoryHelper //
.responseFactory() //
.newResponse(RequestVoteResponse.getDefaultInstance(), RaftError.EINVAL,
"Parse candidateId failed: %s.", request.getServerId());
}
// noinspection ConstantConditions
do {
// request = 127.0.0.1:8081
// curr = 127.0.0.1:8082
// 检查,发起投票的节点(request)的任期 大于或者等于当前节点的任期
// check term
if (request.getTerm() >= this.currTerm) {
LOG.info("Node {} received RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
// increase current term, change state to follower
if (request.getTerm() > this.currTerm) {
// 当节节点停止.
stepDown(request.getTerm(), false, new Status(RaftError.EHIGHERTERMRESPONSE, "Raft node receives higher term RequestVoteRequest."));
}
} else {
// ignore older term
LOG.info("Node {} ignore RequestVoteRequest from {}, term={}, currTerm={}.", getNodeId(), request.getServerId(), request.getTerm(), this.currTerm);
break;
}
doUnlock = false;
this.writeLock.unlock();
final LogId lastLogId = this.logManager.getLastLogId(true);
doUnlock = true;
this.writeLock.lock();
// vote need ABA check after unlock&writeLock
if (request.getTerm() != this.currTerm) {
LOG.warn("Node {} raise term {} when get lastLogId.", getNodeId(), this.currTerm);
break;
}
// 发起请求(request)的lastLogIndex和lastLogTerm > 当前节点的lastLogIndex和lastLogTerm
final boolean logIsOk = new LogId(request.getLastLogIndex(), request.getLastLogTerm()).compareTo(lastLogId) >= 0;
if (logIsOk && (this.votedId == null || this.votedId.isEmpty())) {
stepDown(request.getTerm(), false, new Status(RaftError.EVOTEFORCANDIDATE, "Raft node votes for some candidate, step down to restart election_timer."));
this.votedId = candidateId.copy();
// 存储投票给哪个节点
this.metaStorage.setVotedFor(candidateId);
}
} while (false);
return RequestVoteResponse.newBuilder() //
.setTerm(this.currTerm) //
.setGranted(request.getTerm() == this.currTerm && candidateId.equals(this.votedId)) //
.build();
} finally {
if (doUnlock) {
this.writeLock.unlock();
}
}
}
(5). 总结
服务端处理投票比较简单,只要发起请求的节点(比如:127.0.0.1:8081)的term和lastLogIndex比本地(比如:127.0.0.1:8082,127.0.0.1:8083)的大,即,授权通过.