(1). 概述

在前面对BrokerClient进行了剖析,它相当于Client SDK,用于向Broker发送请求,在这里分析比较核心的类:Gateway.

(2). StandaloneGateway.run

Gateway gateway = new Gateway(
         configuration, 
		 // 1. 传递了一个Function,在使用时再调用.
		 this::createBrokerClient, 
		 actorScheduler);

(3). Gateway构造器

public final class Gateway {
		
	// 2. 默认Server构造工厂	
	private static final Function<GatewayCfg, ServerBuilder> DEFAULT_SERVER_BUILDER_FACTORY = cfg -> setNetworkConfig(cfg.getNetwork());

	public Gateway(
		  final GatewayCfg gatewayCfg,
		  final Function<GatewayCfg, BrokerClient> brokerClientFactory,
		  final ActorSchedulingService actorSchedulingService) {
		// ************************************************************************	  
		// 1. 设置一个默认构建:io.grpc.Server类的工厂.
		// ************************************************************************	  
		this(gatewayCfg, brokerClientFactory, DEFAULT_SERVER_BUILDER_FACTORY, actorSchedulingService);
	}// end Gateway
	
	
	private static NettyServerBuilder setNetworkConfig(final NetworkCfg cfg) {
		final Duration minKeepAliveInterval = cfg.getMinKeepAliveInterval();

		if (minKeepAliveInterval.isNegative() || minKeepAliveInterval.isZero()) {
		  throw new IllegalArgumentException("Minimum keep alive interval must be positive.");
		}

		// **************************************************************
		// 也就是说:Gateway监听的是26500端口
        // gateway:26500
		// **************************************************************
		return NettyServerBuilder.forAddress(new InetSocketAddress(cfg.getHost(), cfg.getPort()))
			.permitKeepAliveTime(minKeepAliveInterval.toMillis(), TimeUnit.MILLISECONDS)
			.permitKeepAliveWithoutCalls(false);
    } // end setNetworkConfig
}

(4). Gateway.start

public void start() throws IOException {
    healthManager.setStatus(Status.STARTING);
    brokerClient = buildBrokerClient();

    final ActivateJobsHandler activateJobsHandler;
	// 判断是否开启了轮询
    if (gatewayCfg.getLongPolling().isEnabled()) { // true
      final LongPollingActivateJobsHandler longPollingHandler = buildLongPollingHandler(brokerClient);
      actorSchedulingService.submitActor(longPollingHandler);
      activateJobsHandler = longPollingHandler;
    } else {
      activateJobsHandler = new RoundRobinActivateJobsHandler(brokerClient);
    }

    // *******************************************************************************
	// 1. 实际上分析到这里就大概知道了,Netty(GRPC)接收请求后,会把请求通过:EndpointManager进行转发.
    // EndpointManager类内部持有BrokerClient,通过BrokerClient与Broker进行通信来着的.
	// 比如:completeJob/cancelProcessInstance/createProcessInstance/createProcessInstanceWithResult
	// *******************************************************************************
    final EndpointManager endpointManager = new EndpointManager(brokerClient, activateJobsHandler);
	
	// *******************************************************************************
	// 2. GatewayGrpcService包裹了EndpointManager,代表在EndpointManager的方法上进行了增强.
	// *******************************************************************************
    final GatewayGrpcService gatewayGrpcService = new GatewayGrpcService(endpointManager);
    final ServerBuilder<?> serverBuilder = serverBuilderFactory.apply(gatewayCfg);

    final SecurityCfg securityCfg = gatewayCfg.getSecurity();
    if (securityCfg.isEnabled()) {
      setSecurityConfig(serverBuilder, securityCfg);
    }

    server =
        serverBuilder
            .addService(applyInterceptors(gatewayGrpcService))
            .addService(
                ServerInterceptors.intercept(
                    healthManager.getHealthService(), MONITORING_SERVER_INTERCEPTOR))
            .build();
			
    server.start();
    healthManager.setStatus(Status.RUNNING);
} // end 

(5). EndpointManager

EndpointManager的方法签名,从签名上基本上是能看出来,它大概在做什么,我就不深入去研究了,毕竟,时间太赶了,Broker部份的代码才是核心值得深究的代码,对于Gateway部份的源码,我只要能找到入口在哪即可.

(6). 总结

Gateway启动时,会绑定三个端口:

  • 9600(web端口,用于收集监控数据).
  • 26500(Gateway对外提供的端口,主要用于JobWoker的通信).
  • 26502(Gateway与Broker进行通信的端口,即Atomix集群监听端口).