(1). 概述
最近一直在看RAFT相关的源码,Atomix突然开始转型(用Go重写),不得不看其它RAFT的实现,从蚂蚁金服官网clone RAFT演示代码,进行源码剖析,在这里先看RPC初始化处理.
(2). CounterServer
// serverId.getEndpoint() == 127.0.0.1:8081
RpcServer rpcServer = RaftRpcServerFactory.createRaftRpcServer(serverId.getEndpoint());
(3). RaftRpcServerFactory静态代码块
我们知道,初始化一个类,首先会初始化静态代码块.
public class RaftRpcServerFactory {
static {
// PROTOBUF处理,在这里,不进行深究
ProtobufMsgFactory.load();
}
}
(4). RaftRpcServerFactory.createRaftRpcServer
public static RpcServer createRaftRpcServer(final Endpoint endpoint) {
return createRaftRpcServer(endpoint, null, null);
}
(5). RaftRpcServerFactory.createRaftRpcServer
public static RpcServer createRaftRpcServer(final Endpoint endpoint, final Executor raftExecutor,
final Executor cliExecutor) {
// 委托给:RpcFactoryHelper处理
// RpcFactoryHelper内部通过SPI加载:RaftRpcFactory,然后,创建:RpcServer
final RpcServer rpcServer = RpcFactoryHelper.rpcFactory().createRpcServer(endpoint);
// ****************************************************************
// 为RpcServer配置RAFT处理,RpcServer是接受网络请求,最终要承载业务处理.
// ****************************************************************
addRaftRequestProcessors(rpcServer, raftExecutor, cliExecutor);
return rpcServer;
}
(6). RpcFactoryHelper.rpcFactory
public class RpcFactoryHelper {
// ***************************************************************************************
// 通过SPI加载RaftRpcFactory的实现类(com.alipay.sofa.jraft.rpc.impl.GrpcRaftRpcFactory).
// ***************************************************************************************
private static final RaftRpcFactory RPC_FACTORY = JRaftServiceLoader.load(RaftRpcFactory.class) //
.first();
public static RaftRpcFactory rpcFactory() {
return RPC_FACTORY;
} // end
}
(7). RaftRpcServerFactory.addRaftRequestProcessors
为RpcServer配置业务处理.
public static void addRaftRequestProcessors(final RpcServer rpcServer, final Executor raftExecutor,
final Executor cliExecutor) {
// raft core processors
final AppendEntriesRequestProcessor appendEntriesRequestProcessor = new AppendEntriesRequestProcessor(raftExecutor);
rpcServer.registerConnectionClosedEventListener(appendEntriesRequestProcessor);
rpcServer.registerProcessor(appendEntriesRequestProcessor);
rpcServer.registerProcessor(new GetFileRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new InstallSnapshotRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new RequestVoteRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new PingRequestProcessor());
rpcServer.registerProcessor(new TimeoutNowRequestProcessor(raftExecutor));
rpcServer.registerProcessor(new ReadIndexRequestProcessor(raftExecutor));
// raft cli service
rpcServer.registerProcessor(new AddPeerRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new RemovePeerRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new ResetPeerRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new ChangePeersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new GetLeaderRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new SnapshotRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new TransferLeaderRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new GetPeersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new AddLearnersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new RemoveLearnersRequestProcessor(cliExecutor));
rpcServer.registerProcessor(new ResetLearnersRequestProcessor(cliExecutor));
}
(8). RaftRpcFactory接口
public interface RaftRpcFactory {
void registerProtobufSerializer(final String className, final Object... args);
RpcClient createRpcClient(final ConfigHelper<RpcClient> helper);
RpcServer createRpcServer(final Endpoint endpoint, final ConfigHelper<RpcServer> helper);
RpcResponseFactory getRpcResponseFactory();
}
(9). RaftRpcServerFactory类图
(10). 总结
RaftRpcServerFactory的主要职责是创建:RpcServer,并为RpcServer配置业务处理(看到了吧!典型的工厂模式).