(1). 概述
AtomixCluster是由AtomixClusterFactory工厂创建,所以,要先看下:AtomixCluster的构建过程.
(2). AtomixClusterFactory
public final class AtomixClusterFactory {
private static final Logger LOG = Loggers.CLUSTERING_LOGGER;
private AtomixClusterFactory() {}
// 读取brokder配置,创建:AtomixCluster对象
public static AtomixCluster fromConfiguration(final BrokerCfg configuration) {
final var clusterCfg = configuration.getCluster();
final var nodeId = clusterCfg.getNodeId();
final var localMemberId = Integer.toString(nodeId);
final var networkCfg = configuration.getNetwork();
final var discoveryProvider = createDiscoveryProvider(clusterCfg, localMemberId);
final var membershipCfg = clusterCfg.getMembership();
final var membershipProtocol =
SwimMembershipProtocol.builder()
.withFailureTimeout(membershipCfg.getFailureTimeout())
.withGossipInterval(membershipCfg.getGossipInterval())
.withProbeInterval(membershipCfg.getProbeInterval())
.withProbeTimeout(membershipCfg.getProbeTimeout())
.withBroadcastDisputes(membershipCfg.isBroadcastDisputes())
.withBroadcastUpdates(membershipCfg.isBroadcastUpdates())
.withGossipFanout(membershipCfg.getGossipFanout())
.withNotifySuspect(membershipCfg.isNotifySuspect())
.withSuspectProbes(membershipCfg.getSuspectProbes())
.withSyncInterval(membershipCfg.getSyncInterval())
.build();
final var atomixBuilder =
new AtomixClusterBuilder(new ClusterConfig())
//zeebe-cluster
.withClusterId(clusterCfg.getClusterName())
// 0
.withMemberId(localMemberId)
.withMembershipProtocol(membershipProtocol)
// 0.0.0.0
.withMessagingInterface(networkCfg.getInternalApi().getHost())
// 26502
.withMessagingPort(networkCfg.getInternalApi().getPort())
.withAddress(
Address.from(
// 0.0.0.0
networkCfg.getInternalApi().getAdvertisedHost(),
// 26502
networkCfg.getInternalApi().getAdvertisedPort()))
.withMembershipProvider(discoveryProvider)
// NONE
.withMessageCompression(clusterCfg.getMessageCompression());
final var securityCfg = networkCfg.getSecurity();
if (securityCfg.isEnabled()) {
atomixBuilder.withSecurity(securityCfg.getCertificateChainPath(), securityCfg.getPrivateKeyPath());
}
// ********************************************************************
// 通过AtomixClusterBuilder构建出:AtomixCluster
// ********************************************************************
return atomixBuilder.build();
}
private static NodeDiscoveryProvider createDiscoveryProvider(
final ClusterCfg clusterCfg, final String localMemberId) {
final BootstrapDiscoveryBuilder builder = BootstrapDiscoveryProvider.builder();
final List<String> initialContactPoints = clusterCfg.getInitialContactPoints();
final List<Node> nodes = new ArrayList<>();
initialContactPoints.forEach(
contactAddress -> {
final Node node = Node.builder().withAddress(Address.from(contactAddress)).build();
LOG.debug("Member {} will contact node: {}", localMemberId, node.address());
nodes.add(node);
});
return builder.withNodes(nodes).build();
}
}
(3). AtomixCluster构建器
public AtomixCluster(final ClusterConfig config, final Version version) {
this(config, version, buildMessagingService(config), buildUnicastService(config));
}
(4). AtomixCluster.buildMessagingService
protected static ManagedMessagingService buildMessagingService(final ClusterConfig config) {
// *************************************************************
// config.getNodeConfig().getAddress() = 0.0.0.0:26502
// NettyMessagingService主要用于TCP通信.
// *************************************************************
return new NettyMessagingService(config.getClusterId(), config.getNodeConfig().getAddress(), config.getMessagingConfig());
}
(5). AtomixCluster.buildUnicastService
protected static ManagedUnicastService buildUnicastService(final ClusterConfig config) {
// *************************************************************
// NettyUnicastService广播通信.
// *************************************************************
return new NettyUnicastService(config.getClusterId(), config.getNodeConfig().getAddress(), config.getMessagingConfig());
}
(6).
protected AtomixCluster(
final ClusterConfig config,
final Version version,
final ManagedMessagingService messagingService,
final ManagedUnicastService unicastService) {
this.messagingService =
messagingService != null ? messagingService : buildMessagingService(config);
this.unicastService = unicastService != null ? unicastService : buildUnicastService(config);
discoveryProvider = buildLocationProvider(config);
membershipProtocol = buildMembershipProtocol(config);
// ************************************************************************
// 构建集群成员服务
// ************************************************************************
membershipService = buildClusterMembershipService(config, this, discoveryProvider, membershipProtocol, version);
// ************************************************************************
// 构建管理集群通信的服务
// ************************************************************************
communicationService = buildClusterMessagingService(getMembershipService(), getMessagingService(), getUnicastService());
// ************************************************************************
// 构建集群事件通信服务
// ************************************************************************
eventService = buildClusterEventService(getMembershipService(), getMessagingService());
}
(7). AtomixCluster.buildClusterMessagingService
protected static ManagedClusterCommunicationService buildClusterMessagingService(
final ClusterMembershipService membershipService,
final MessagingService messagingService,
final UnicastService unicastService) {
return new DefaultClusterCommunicationService(membershipService, messagingService, unicastService);
}
(8). AtomixCluster.buildClusterEventService
protected static ManagedClusterEventService buildClusterEventService(
final ClusterMembershipService membershipService, final MessagingService messagingService) {
return new DefaultClusterEventService(membershipService, messagingService);
}
(9). 总结
AtomixCluster在创建时,会启动TCP和UDP端口监听.