(1). 概述
在这小一节,将对Gateway的源码进行剖析,Gateway的源码入口在哪?像这样的框架源码,我们首要的任务是先运行起来,然后,进行一个简单的HelloWorld入门后,去Linux进程查看下Gateway进程信息,所以,Gateway的入口代码在gateway的脚本程序里.
(2). /bin/gateway
// ... ...
// StandaloneGateway就是程序的入口
exec "$JAVACMD" $JAVA_OPTS -Xms128m -XX:+ExitOnOutOfMemoryError -Dfile.encoding=UTF-8 \
-classpath "$CLASSPATH" \
-Dapp.name="gateway" \
-Dapp.pid="$$" \
-Dapp.repo="$REPO" \
-Dapp.home="$BASEDIR" \
-Dbasedir="$BASEDIR" \
io.camunda.zeebe.gateway.StandaloneGateway
// ... ...
(3). StandaloneGateway
// **********************************************************
// 配置了sprig自动扫描如下package,咱先不管它,先看main函数
// **********************************************************
@SpringBootApplication(
scanBasePackages = {
"io.camunda.zeebe.gateway",
"io.camunda.zeebe.shared",
"io.camunda.zeebe.util.liveness"
})
// **********************************************************
// 配置了ConfigurationPropertie的扫描
// **********************************************************
@ConfigurationPropertiesScan(basePackages = {"io.camunda.zeebe.gateway", "io.camunda.zeebe.shared"})
public class StandaloneGateway
// **********************************************************
// 要稍微注意下,实现了哪些接口,其中:CommandLineRunner是Spring容器启动完成之后的一个Callback函数.
// **********************************************************
implements CommandLineRunner, ApplicationListener<ContextClosedEvent>, CloseableSilently {
// ... ...
}
(4). StandaloneGateway.main
public static void main(final String[] args) {
// 1. 为线程配置当出现异常时的处理
Thread.setDefaultUncaughtExceptionHandler(FatalErrorHandler.uncaughtExceptionHandler(Loggers.GATEWAY_LOGGER));
// 2. 配置banner
System.setProperty("spring.banner.location", "classpath:/assets/zeebe_gateway_banner.txt");
// 3. 配置容器为servlet/profiles
final var application = new SpringApplicationBuilder(StandaloneGateway.class)
.web(WebApplicationType.SERVLET)
.logStartupInfo(true)
.profiles(Profile.GATEWAY.getId())
.build(args);
// 4. 运行容器
application.run();
}
(5). StandaloneGateway构造器
对main函数进行分析,好像啥也没干,这时候,要把目光放到:scanBasePackages上,或者,其它上面了,我用Spring一般不喜欢用@Component注解,喜欢用@Configuration去配置所有Bean之间的依赖关系,这样有利于其它开发人员,到一个集中的地方进行配置Bean.我习惯于把:@Configuration注解当成一个xml,当然,在我的世界里,我理解注解确实只是XML的一种变种而已.
@Autowired
public StandaloneGateway(
// 依赖注入:GatewayCfg/SpringGatewayBridge/ActorClockConfiguration
// ActorClockConfiguration是一个时钟管理,我们不分析,直接扔掉不管理.
final GatewayCfg configuration,
final SpringGatewayBridge springGatewayBridge,
final ActorClockConfiguration clockConfig) {
this.configuration = configuration;
this.springGatewayBridge = springGatewayBridge;
this.clockConfig = clockConfig;
}
(6). GatewayCfg
咦,GatewayCfg是一个纯配置,这个配置对照application.yaml看一下就好了.
@Component
@ConfigurationProperties(prefix = "zeebe.gateway")
public class GatewayCfg {
private NetworkCfg network = new NetworkCfg();
private ClusterCfg cluster = new ClusterCfg();
private ThreadsCfg threads = new ThreadsCfg();
private SecurityCfg security = new SecurityCfg();
private LongPollingCfg longPolling = new LongPollingCfg();
private List<InterceptorCfg> interceptors = new ArrayList<>();
private boolean initialized = false;
// ... ...
}
(7). SpringGatewayBridge
从代码上分析SpringGatewayBridge好像就是一个状态管理以及BrokerClient(与Broker通信的Client SDK),这是一个典型的桥梁模式哈.
@Component
public class SpringGatewayBridge {
private Supplier<Status> gatewayStatusSupplier;
private Supplier<Optional<BrokerClusterState>> clusterStateSupplier;
private Supplier<BrokerClient> brokerClientSupplier;
// ... ...
// ************************************************
// 注册:BrokerClient
// ************************************************
public void registerBrokerClientSupplier(final Supplier<BrokerClient> brokerClientSupplier) {
this.brokerClientSupplier = brokerClientSupplier;
}
// ************************************************
// 获取:BrokerClient
// ************************************************
public Optional<BrokerClient> getBrokerClient() {
return Optional.ofNullable(brokerClientSupplier).map(Supplier::get);
}
}
(8). StandaloneGateway.run
通过前几步的分析,好像还是没有找到我想要的东西,这时候,要开始怀疑是不是忽略了什么,再细看下:StandaloneGateway发现有实现几个接口,其中有一个接口CommandLineRunner里有做一些处理,不妨看下这个接口的细节.
// 通过Spring容器注入的GatewayCfg,主要用于对配置文件的管理.
private final GatewayCfg configuration;
private ActorScheduler actorScheduler;
public void run(final String... args) throws Exception {
// ... ...
// ****************************************************************
// 看到了我们重点:AtomixCluster
// Atomix是RAFT和Gossip的实现,咱先不去详细分析RAFT和Gossip.
// ****************************************************************
atomixCluster = createAtomixCluster(configuration.getCluster());
// ... ...
atomixCluster.start();
}
(9). StandaloneGateway.createAtomixCluster
createAtomixCluster方法的目的在于创建:AtomixCluster,监听着:26502,并与node节点保持联系.
// ********************************************************
private AtomixCluster createAtomixCluster(final ClusterCfg config) {
// ********************************************************
// 创建成员协议,这个协议实际是一个Gossip协议
// createMembershipProtocol函数的内容,请参考下面(10)小点,大部份参数是默认的,只是配置了一个:BroadcastDispute
// ********************************************************
final var membershipProtocol = createMembershipProtocol(config.getMembership());
final var builder =
AtomixCluster.builder()
// gateway
.withMemberId(config.getMemberId())
// gateway:26502
.withAddress(Address.from(config.getHost(), config.getPort()))
// zeebe-cluster
.withClusterId(config.getClusterName())
// ****************************************************************
// 配置node节点(broker)的成员列表
// ****************************************************************
.withMembershipProvider(
BootstrapDiscoveryProvider.builder()
// node-1:26502
.withNodes(Address.from(config.getContactPoint()))
.build())
// 配置上面的成员协议信息
.withMembershipProtocol(membershipProtocol)
// 配置消息压缩信息
.withMessageCompression(config.getMessageCompression());
// 判断Security是否开启
if (config.getSecurity().isEnabled()) {
applyClusterSecurityConfig(config, builder);
}
// 构建AtomixCluster
return builder.build();
}
(10). StandaloneGateway.createMembershipProtocol
private GroupMembershipProtocol createMembershipProtocol(final MembershipCfg config) {
return SwimMembershipProtocol.builder()
// PT10S
.withFailureTimeout(config.getFailureTimeout())
// PT0.25S
.withGossipInterval(config.getGossipInterval())
// PT1S
.withProbeInterval(config.getProbeInterval())
// PT0.1S
.withProbeTimeout(config.getProbeTimeout())
// true
.withBroadcastDisputes(config.isBroadcastDisputes())
// false
.withBroadcastUpdates(config.isBroadcastUpdates())
// 2
.withGossipFanout(config.getGossipFanout())
// false
.withNotifySuspect(config.isNotifySuspect())
// 3
.withSuspectProbes(config.getSuspectProbes())
// PT10S
.withSyncInterval(config.getSyncInterval())
.build();
}
(11). 总结
在这里仅分析到AtomixCluster的创建过程,暂时先分析到这里,毕竟内容太多,要拆开来写文章,从源码分析上能看出来:Zeebe有一部份内容(配置/Metrics)是向Spring靠扰了,会监听到9600端口,而AtomixCluster会监听26502端口,并且与node节点(node-1:26502)进行通信.