(1). 概述
前面对BootstrapDiscoveryBuilder源码分析,最终是调用:BootstrapDiscoveryBuilder.build方法,构建:BootstrapDiscoveryProvider,所以,这一篇主要对:BootstrapDiscoveryProvider进行剖析.
(2). 先看下BootstrapDiscoveryProvider的接口
public interface NodeDiscoveryProvider
extends ListenerService<NodeDiscoveryEvent, NodeDiscoveryEventListener>,
Configured<NodeDiscoveryConfig> {
// 返回活跃的其它节点
Set<Node> getNodes();
// 本节点加入集群(集群其它成员在配置文件里)
CompletableFuture<Void> join(BootstrapService bootstrap, Node localNode);
// 本节点离开集群
CompletableFuture<Void> leave(Node localNode);
}
(3). BootstrapDiscoveryProvider构建器
BootstrapDiscoveryProvider(BootstrapDiscoveryConfig config) {
// Hold住:BootstrapDiscoveryConfig
this.config = checkNotNull(config);
//Hold住种子节点
this.bootstrapNodes = ImmutableSet.copyOf(config.getNodes().stream().map(Node::new).collect(Collectors.toList()));
}
(4). BootstrapDiscoveryProvider属性
public class BootstrapDiscoveryProvider
extends AbstractListenerManager<NodeDiscoveryEvent, NodeDiscoveryEventListener>
implements NodeDiscoveryProvider {
private static final Serializer SERIALIZER = Serializer.using(Namespace.builder()
.register(Namespaces.BASIC)
.nextId(Namespaces.BEGIN_USER_CUSTOM_ID)
.register(Node.class)
.register(NodeId.class)
.register(new AddressSerializer(), Address.class)
.build());
private static final String HEARTBEAT_MESSAGE = "atomix-cluster-heartbeat";
// ***********************************************************************
// 引导时的种子节点
// ***********************************************************************
private final Collection<Node> bootstrapNodes;
private final BootstrapDiscoveryConfig config;
// ***********************************************************************
// 这个接口先不要关心,只要清楚这个类主要提供通信服务
// ***********************************************************************
private volatile BootstrapService bootstrap;
// ***********************************************************************
// 最终活跃的所有节点
// ***********************************************************************
private Map<Address, Node> nodes = Maps.newConcurrentMap();
// ***********************************************************************
// 定时任务
// ***********************************************************************
private final ScheduledExecutorService heartbeatScheduler = Executors.newSingleThreadScheduledExecutor(namedThreads("atomix-bootstrap-heartbeat-sender", LOGGER));
private final ExecutorService heartbeatExecutor = Executors.newSingleThreadExecutor(namedThreads("atomix-bootstrap-heartbeat-receiver", LOGGER));
}
(5). NodeDiscoveryProvider.join
public CompletableFuture<Void> join(BootstrapService bootstrap, Node localNode) {
// nodes 是最终活跃的节点
// 判断本节点是否存在,如果不存在,则进行初始化
if (nodes.putIfAbsent(localNode.address(), localNode) == null) {
// hold住:BootstrapService
this.bootstrap = bootstrap;
// 典型的观察者模式
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, localNode));
// ***********************************************************************
// 配置接收心跳请求的处理.
// ***********************************************************************
bootstrap.getMessagingService().registerHandler(
HEARTBEAT_MESSAGE,
(BiFunction<Address, byte[], byte[]>) (a, p) ->
// 解码,并反序列化
handleHeartbeat(localNode, SERIALIZER.decode(p)), heartbeatExecutor);
ComposableFuture<Void> future = new ComposableFuture<>();
// 发送心跳
sendHeartbeats(localNode)
.whenComplete((r, e) -> {
future.complete(null);
});
// 开启心跳定时任务
heartbeatFuture = heartbeatScheduler.scheduleAtFixedRate(() -> {
sendHeartbeats(localNode);
}, 0, config.getHeartbeatInterval().toMillis(), TimeUnit.MILLISECONDS);
return future.thenRun(() -> {
LOGGER.info("Joined");
});
}
return CompletableFuture.completedFuture(null);
}
(6). NodeDiscoveryProvider.handleHeartbeat
private byte[] handleHeartbeat(Node localNode, Node node) { // node为cliet发起心跳的节点.
LOGGER.trace("{} - Received heartbeat: {}", localNode.address(), localNode.address());
failureDetectors.computeIfAbsent(localNode.address(), n -> new PhiAccrualFailureDetector()).report();
Node oldNode = nodes.put(node.address(), node);
if (oldNode != null && !oldNode.id().equals(node.id())) {
failureDetectors.remove(oldNode.address());
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.LEAVE, oldNode));
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, node));
} else if (oldNode == null) {
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, node));
}
// 序列化本节点所有的活跃节点返回给发送请求的远程节点.
return SERIALIZER.encode(Lists.newArrayList(nodes.values()));
}
(7). NodeDiscoveryProvider.sendHeartbeats
private CompletableFuture<Void> sendHeartbeat(Node localNode, Address address) {
return bootstrap.getMessagingService()
// address : 为远程节点
// type : 业务类型(心跳)
// payload : 消息体
.sendAndReceive(address, HEARTBEAT_MESSAGE, SERIALIZER.encode(localNode))
.whenCompleteAsync((response, error) -> {
if (error == null) {
// *****************************************************************************
// 远程节点,会返回它所有活跃的成员信息.
// *****************************************************************************
// 没有错误的情况下,代表发送心跳请求是成功的,对返回的结果进行转码.
// 解码消息体
Collection<Node> nodes = SERIALIZER.decode(response);
for (Node node : nodes) {
if (node.address().equals(address)) { // 如果,发送远程请求的网络地址包含在请求的网络地址中,则...
Node oldNode = this.nodes.put(node.address(), node);
if (oldNode != null && !oldNode.id().equals(node.id())) { // 如果节点已经存在,但是节点的id不同
failureDetectors.remove(oldNode.address());
// 让以前的节点离开
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.LEAVE, oldNode));
// 新的节点加入
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, node));
} else if (oldNode == null) { // 节点不存在的情况下,代表节点是一个新的节点
// 新的节点加入
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.JOIN, node));
}
} else if (!this.nodes.containsKey(node.address()) || !this.nodes.get(node.address()).id().equals(node.id())) {
// 向其它的节点发送心跳请求
sendHeartbeat(localNode, node.address());
}
}
} else {
// 网络请求失败的情况下处理,在这里先不关心
LOGGER.debug("{} - Sending heartbeat to {} failed", localNode, address, error);
PhiAccrualFailureDetector failureDetector = failureDetectors.computeIfAbsent(address, n -> new PhiAccrualFailureDetector());
double phi = failureDetector.phi();
if (phi >= config.getFailureThreshold()
|| (phi == 0.0 && System.currentTimeMillis() - failureDetector.lastUpdated() > config.getFailureTimeout().toMillis())) {
Node node = this.nodes.remove(address);
if (node != null) {
failureDetectors.remove(node.address());
post(new NodeDiscoveryEvent(NodeDiscoveryEvent.Type.LEAVE, node));
}
}
}
}, heartbeatExecutor).exceptionally(e -> null)
.thenApply(v -> null);
}
(8). UML图
(9). 总结
BootstrapDiscoveryProvider在启动时,会向所有的种子节点(远程节点)进行心跳,而种子节点(远程节点)会返回它所有活跃的其它节点,这样就实现了节点之间的发现.