(1). 概述

前面对Apollo与Spring的整合部份的源码进行了剖析,Spring的PropertySource最终会委托给Apollo的Config.getProperty方法.
在这一小节,主要剖析Config,带着问题来看源码:
1) Config与Meta Server进行交互的,请求报文是什么?
2) Config与Config Service进行交互的,请求报文是什么?

(2). ConfigServiceLocator

Config与Meta Server进行交互的,请求报文是什么?
ConfigServiceLocator主要负责与Meta Server通信,获得:Config Service服务列表.
Meta Server主要是用来做负载均衡的,当Config Service不可用时,可以,换另一个Config Service.

public class ConfigServiceLocator {
  private static final Logger logger = LoggerFactory.getLogger(ConfigServiceLocator.class);
  private HttpClient m_httpClient;
  private ConfigUtil m_configUtil;
  // 最终的数据载体:ConfigService服务列表
  private AtomicReference<List<ServiceDTO>> m_configServices;
  private Type m_responseType;
  // 定时任务
  private ScheduledExecutorService m_executorService;
  private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
  private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();

  // 1. 构建器
  public ConfigServiceLocator() {
    List<ServiceDTO> initial = Lists.newArrayList();
    m_configServices = new AtomicReference<>(initial);
    m_responseType = new TypeToken<List<ServiceDTO>>() {
    }.getType();
    m_httpClient = ApolloInjector.getInstance(HttpClient.class);
    m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
    this.m_executorService = Executors.newScheduledThreadPool(1, ApolloThreadFactory.create("ConfigServiceLocator", true));
	// ****************************************************************************
	// 2. 初始化:ConfigService列表
	// ****************************************************************************
    initConfigServices();
  }


  // ****************************************************************************
  // 3. 初始化:ConfigService列表
  // ****************************************************************************
  private void initConfigServices() {
    
	// 4. 读取配置(apollo.configService),并转换成业务模型:ServiceDTO
	// get from run time configurations
    List<ServiceDTO> customizedConfigServices = getCustomizedConfigService();

    // 5. 如果结果集有内容,则,把结果,设置到当前属性(m_configServices)上,并直返回了,不会走HTTP请求
    if (customizedConfigServices != null) {
      setConfigServices(customizedConfigServices);
	  // 注意,直接返回了
      return;
    }

    // ***************************************************************************** 
	// 6. 调用meat server,获得confi service服务列表
	// ***************************************************************************** 
    // update from meta service
    this.tryUpdateConfigServices();
	
	// ***************************************************************************** 
	// 创建定时任务(apollo.refreshInterval=5),继续请求:tryUpdateConfigServices方法
	// ***************************************************************************** 
    this.schedulePeriodicRefresh();
  }

  private boolean tryUpdateConfigServices() {
    try {
      updateConfigServices();
      return true;
    } catch (Throwable ex) {
      //ignore
    }
    return false;
  }

  // ***************************************************************************** 
  // 7. 获取config service列表
  // ***************************************************************************** 
  private synchronized void updateConfigServices() {
    // 8. 构建HTTP请求
	//     http://127.0.0.1:8080/services/config?appId=7BBB492B-62F8-453F-B50B-0D568308E87A&ip=172.16.95.197
    String url = assembleMetaServiceUrl();
	
	
    HttpRequest request = new HttpRequest(url);
	// 最大重试次数
    int maxRetries = 2;
    Throwable exception = null;

    for (int i = 0; i < maxRetries; i++) {
	  Transaction transaction = Tracer.newTransaction("Apollo.MetaService", "getConfigService");
      transaction.addData("Url", url);
      try {
		// 9. 发起HTTP请求
        HttpResponse<List<ServiceDTO>> response = m_httpClient.doGet(request, m_responseType);
        transaction.setStatus(Transaction.SUCCESS);
		
		// 10. 解析response内容,转换成业务模型:ServiceDTO
        List<ServiceDTO> services = response.getBody();
        if (services == null || services.isEmpty()) { // 如果,services为空,则跳过
          logConfigService("Empty response!");
          continue;
        }
		
		// 11. 把解析后的结果(List<ServiceDTO>),设置到当前的属性上(m_configServices)
        setConfigServices(services);
        return;
      } catch (Throwable ex) {
        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
        transaction.setStatus(ex);
        exception = ex;
      } finally {
        transaction.complete();
      }
	  
      try {
        m_configUtil.getOnErrorRetryIntervalTimeUnit().sleep(m_configUtil.getOnErrorRetryInterval());
      } catch (InterruptedException ex) {
        //ignore
      }
    }
    throw new ApolloConfigException(String.format("Get config services failed from %s", url), exception);
  }
  
  // 8.1 构建HTTP请求
  //  http://127.0.0.1:8080/services/config?appId=7BBB492B-62F8-453F-B50B-0D568308E87A&ip=172.16.95.197
  private String assembleMetaServiceUrl() {
    String domainName = m_configUtil.getMetaServerDomainName();
    String appId = m_configUtil.getAppId();
    String localIp = m_configUtil.getLocalIp();

    Map<String, String> queryParams = Maps.newHashMap();
    queryParams.put("appId", queryParamEscaper.escape(appId));
    if (!Strings.isNullOrEmpty(localIp)) {
      queryParams.put("ip", queryParamEscaper.escape(localIp));
    }
    return domainName + "/services/config?" + MAP_JOINER.join(queryParams);
  }
}

(3). RemoteConfigRepository

Config与Config Service进行交互的,请求报文是什么?
RemoteConfigRepository主要负责与Config Service进行交互,发起HTTP请求,获得所有的配置信息.

public class RemoteConfigRepository extends AbstractConfigRepository {
	private static final Joiner STRING_JOINER = Joiner.on(ConfigConsts.CLUSTER_NAMESPACE_SEPARATOR);
	private static final Joiner.MapJoiner MAP_JOINER = Joiner.on("&").withKeyValueSeparator("=");
	private static final Escaper pathEscaper = UrlEscapers.urlPathSegmentEscaper();
	private static final Escaper queryParamEscaper = UrlEscapers.urlFormParameterEscaper();
	
	// 前面有介绍了,主要负责存储config service列表信息.
	private final ConfigServiceLocator m_serviceLocator;
	private final HttpClient m_httpClient;
	private final ConfigUtil m_configUtil;
	
	
	private final RemoteConfigLongPollService remoteConfigLongPollService;
	private volatile AtomicReference<ApolloConfig> m_configCache;
	private final String m_namespace;
	private final static ScheduledExecutorService m_executorService;
	private final AtomicReference<ServiceDTO> m_longPollServiceDto;
	private final AtomicReference<ApolloNotificationMessages> m_remoteMessages;
	private final RateLimiter m_loadConfigRateLimiter;
	private final AtomicBoolean m_configNeedForceRefresh;
	private final SchedulePolicy m_loadConfigFailSchedulePolicy;
	private static final Gson GSON = new Gson();
	
	public RemoteConfigRepository(String namespace) {
		m_namespace = namespace;
		m_configCache = new AtomicReference<>();
		m_configUtil = ApolloInjector.getInstance(ConfigUtil.class);
		m_httpClient = ApolloInjector.getInstance(HttpClient.class);
		m_serviceLocator = ApolloInjector.getInstance(ConfigServiceLocator.class);
		remoteConfigLongPollService = ApolloInjector.getInstance(RemoteConfigLongPollService.class);
		m_longPollServiceDto = new AtomicReference<>();
		m_remoteMessages = new AtomicReference<>();
		m_loadConfigRateLimiter = RateLimiter.create(m_configUtil.getLoadConfigQPS());
		m_configNeedForceRefresh = new AtomicBoolean(true);
		m_loadConfigFailSchedulePolicy = new ExponentialSchedulePolicy(m_configUtil.getOnErrorRetryInterval(), m_configUtil.getOnErrorRetryInterval() * 8);
		
		// *****************************************************************************
		// 1. 同步配置
		// *****************************************************************************
		this.trySync();
		
		// *****************************************************************************
		// 创建订时任务(apollo.refreshInterval=5),内部仍然是调用:trySync方法
		// *****************************************************************************
		this.schedulePeriodicRefresh();
		
		// 委托给:RemoteConfigLongPollService进行长轮询处理,这部份内容,留到后面剖析.
		this.scheduleLongPollingRefresh();
	} // end Constructor
	
	
	protected synchronized void sync() {
	    Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "syncRemoteConfig");
	    try {
		  // 上一次的配置	
	      ApolloConfig previous = m_configCache.get();
		  
		  // *****************************************************************************
		  // 2. 当前配置为最新的,加载配置
		  // *****************************************************************************
	      ApolloConfig current = loadApolloConfig();
	
	      //reference equals means HTTP 304
	      if (previous != current) {
	        logger.debug("Remote Config refreshed!");
	        m_configCache.set(current);
			// 触发Event
	        this.fireRepositoryChange(m_namespace, this.getConfig());
	      }
	
	      if (current != null) {
	        Tracer.logEvent(String.format("Apollo.Client.Configs.%s", current.getNamespaceName()), current.getReleaseKey());
	      }
	      transaction.setStatus(Transaction.SUCCESS);
	    } catch (Throwable ex) {
	      transaction.setStatus(ex);
	      throw ex;
	    } finally {
	      transaction.complete();
	    }
	}// end sync
	
	// 获取:config service列表
	private List<ServiceDTO> getConfigServices() {
	    List<ServiceDTO> services = m_serviceLocator.getConfigServices();
	    if (services.size() == 0) {
	      throw new ApolloConfigException("No available config service");
	    }
	    return services;
	}
	
	// *****************************************************************************
	// 2. 调用config service,获得最新的配置
	// *****************************************************************************
	private ApolloConfig loadApolloConfig() {

	  // 限流控制,请求过快的情况下,sleep
	  if (!m_loadConfigRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
	    //wait at most 5 seconds
	    try {
	      TimeUnit.SECONDS.sleep(5);
	    } catch (InterruptedException e) {
	    }
	  }
	  
	  // 请求的参数
	  String appId = m_configUtil.getAppId();
	  String cluster = m_configUtil.getCluster();
	  // ******************************************************
	  // 在有些特殊情况下,应用有需求对不同的集群做不同的配置.
	  // 比如部署在A机房的应用连接的es服务器地址和部署在B机房的应用连接的es服务器地址不一样. 
	  // 在这种情况下,可以通过在Apollo创建不同的集群来解决
	  // 会读取:/opt/settings/server.properties(linux)或C:\opt\settings\server.properties(windows)文件中的idc属性作为集群名字. 
	  // ******************************************************
	  String dataCenter = m_configUtil.getDataCenter();
	  String secret = m_configUtil.getAccessKeySecret();
	  Tracer.logEvent("Apollo.Client.ConfigMeta", STRING_JOINER.join(appId, cluster, m_namespace));
	  int maxRetries = m_configNeedForceRefresh.get() ? 2 : 1;
	  long onErrorSleepTime = 0; // 0 means no sleep
	  Throwable exception = null;
	
	  // 3. 获得所有的config service列表(前面有分析过了的)
	  List<ServiceDTO> configServices = getConfigServices();
	  String url = null;
	  retryLoopLabel:
	  for (int i = 0; i < maxRetries; i++) {
	    List<ServiceDTO> randomConfigServices = Lists.newLinkedList(configServices);
	    Collections.shuffle(randomConfigServices);
	    
		//Access the server which notifies the client first
	    if (m_longPollServiceDto.get() != null) {
	      randomConfigServices.add(0, m_longPollServiceDto.getAndSet(null));
	    }
		
		// 随机取一个:config service
	    for (ServiceDTO configService : randomConfigServices) {
	      // ... ...

		  // 4. 构建HTTP请求(向config service发起请求)
		  //    http://172.16.95.197:8080/configs/7BBB492B-62F8-453F-B50B-0D568308E87A/default/TEST1.jdbc?ip=172.16.95.197
	      url = assembleQueryConfigUrl(configService.getHomepageUrl(), appId, cluster, m_namespace,dataCenter, m_remoteMessages.get(), m_configCache.get());
	      logger.debug("Loading config from {}", url);
	      
		  HttpRequest request = new HttpRequest(url);
		  // *****************************************************************************
		  // 如果有配置(apollo.accesskey.secret=xxx)的情况下.
		  // 会把:url/appId/secret进行签名产生签名的结果,并设置到请求头里
		  // 比如:  
		  //    Authorization: Apollo ${appId} ${signature}
		  //    Timestamp: ${currentTimeMillis}  
		  // *****************************************************************************
	      if (!StringUtils.isBlank(secret)) { 
	        Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
	        request.setHeaders(headers);
	      }
		  
	      Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "queryConfig");
	      transaction.addData("Url", url);
	      try {
			// 5. 发送HTTP请求(GET)
	        HttpResponse<ApolloConfig> response = m_httpClient.doGet(request, ApolloConfig.class);
	        m_configNeedForceRefresh.set(false);
	        m_loadConfigFailSchedulePolicy.success();
	
	        transaction.addData("StatusCode", response.getStatusCode());
	        transaction.setStatus(Transaction.SUCCESS);
	
	        if (response.getStatusCode() == 304) { // 如果返回协议头Code是:304,则,直接返回
	          logger.debug("Config server responds with 304 HTTP status code.");
	          return m_configCache.get();
	        }
			
			// 6. 解析HTTP返回结果,并转换成:ApolloConfig
	        ApolloConfig result = response.getBody();
	        logger.debug("Loaded config for {}: {}", m_namespace, result);
	        return result;
	      } catch (ApolloConfigStatusCodeException ex) {  // 当异常是404时,代表:config not found
	        ApolloConfigStatusCodeException statusCodeException = ex;
	        //config not found
	        if (ex.getStatusCode() == 404) {
	          String message = String.format("Could not find config for namespace - appId: %s, cluster: %s, namespace: %s, " +"please check whether the configs are released in Apollo!",appId, cluster, m_namespace);
	          statusCodeException = new ApolloConfigStatusCodeException(ex.getStatusCode(),message);
	        }
			
	        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(statusCodeException));
	        transaction.setStatus(statusCodeException);
	        exception = statusCodeException;
	        if(ex.getStatusCode() == 404) { // 404 的情况下,都不需要再重试了
	          break retryLoopLabel;
	        }
	      } catch (Throwable ex) {
	        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
	        transaction.setStatus(ex);
	        exception = ex;
	      } finally {
	        transaction.complete();
	      }
		  //... ...
	    }
	  }
	  
	  String message = String.format("Load Apollo Config failed - appId: %s, cluster: %s, namespace: %s, url: %s", appId, cluster, m_namespace, url);
	  throw new ApolloConfigException(message, exception);
	} // end loadApolloConfig
	
}

(4). RemoteConfigLongPollService

RemoteConfigLongPollService固名思义,就是一个轮询的操作,轮询的目的就是为了近可能实时的知道有配置变化(官网说是60秒,实际超时为:90秒),返回的结果有两种情况:
1) 返回协议头为:200,并且,有内容,但不是配置详细信息.
2) 返回协议头为:304,继续进行轮询.

public boolean submit(String namespace, RemoteConfigRepository remoteConfigRepository) {
	boolean added = m_longPollNamespaces.put(namespace, remoteConfigRepository);
	m_notifications.putIfAbsent(namespace, INIT_NOTIFICATION_ID);
	if (!m_longPollStarted.get()) {
	  // ****************************************************************
	  // 1. 开启长轮询
	  // ****************************************************************
	  startLongPolling();
	}
	return added;
} // end submit

private void startLongPolling() {
    if (!m_longPollStarted.compareAndSet(false, true)) {
      //already started
      return;
    }
    try {
      
	  // 发起HTTP请求需要的参烽
	  final String appId = m_configUtil.getAppId();
      final String cluster = m_configUtil.getCluster();
      final String dataCenter = m_configUtil.getDataCenter();
      final String secret = m_configUtil.getAccessKeySecret();
      final long longPollingInitialDelayInMills = m_configUtil.getLongPollingInitialDelayInMills();
	  
	  // ****************************************************************
	  // 2. 提交到线程池去运行.
	  // ****************************************************************
      m_longPollingService.submit(new Runnable() {
        @Override
        public void run() {
			
          if (longPollingInitialDelayInMills > 0) {
            try {
              logger.debug("Long polling will start in {} ms.", longPollingInitialDelayInMills);
              TimeUnit.MILLISECONDS.sleep(longPollingInitialDelayInMills);
            } catch (InterruptedException e) {
              //ignore
            }
          }
		  
		  // ****************************************************************
		  // 3.开始轮询
		  // ****************************************************************
          doLongPollingRefresh(appId, cluster, dataCenter, secret);
        }
      });
    } catch (Throwable ex) {
      m_longPollStarted.set(false);
      ApolloConfigException exception = new ApolloConfigException("Schedule long polling refresh failed", ex);
      Tracer.logError(exception);
      logger.warn(ExceptionUtil.getDetailMessage(exception));
    }
} // end startLongPolling


private void doLongPollingRefresh(String appId, String cluster, String dataCenter, String secret) {
    final Random random = new Random();
    ServiceDTO lastServiceDto = null;
    
	while (!m_longPollingStopped.get() && !Thread.currentThread().isInterrupted()) {
      
	  // 对请求进行限流,保护服务器
	  if (!m_longPollRateLimiter.tryAcquire(5, TimeUnit.SECONDS)) {
        //wait at most 5 seconds
        try {
          TimeUnit.SECONDS.sleep(5);
        } catch (InterruptedException e) {
        }
      }
	  
      Transaction transaction = Tracer.newTransaction("Apollo.ConfigService", "pollNotification");
      String url = null;
      try {
        
		// 上一次用过的config service,如果没有,则随取一个出来用.
		if (lastServiceDto == null) {
          List<ServiceDTO> configServices = getConfigServices();
          lastServiceDto = configServices.get(random.nextInt(configServices.size()));
        }

	    // ****************************************************************
		// 构建HTTP请求
		// http://172.16.95.197:8080/notifications/v2?cluster=default&appId=7BBB492B-62F8-453F-B50B-0D568308E87A&ip=172.16.95.197&notifications=%5B%7B%22namespaceName%22%3A%22TEST1.jdbc%22%2C%22notificationId%22%3A-1%7D%5D
		// ****************************************************************
        url = assembleLongPollRefreshUrl(lastServiceDto.getHomepageUrl(), appId, cluster, dataCenter,m_notifications);
        logger.debug("Long polling from {}", url);
		
		
        HttpRequest request = new HttpRequest(url);
		// *******************************************************************
		// private static final int LONG_POLLING_READ_TIMEOUT = 90 * 1000;
		// 官网说是60秒,而客户端的请求超时为:90秒,为什么不是70秒就够了呢?要设置那么长
		// 有一个问题哦,底层是BIO模型吗?CLIENT是否会对应相应的FD文件?
		// *******************************************************************
        request.setReadTimeout(LONG_POLLING_READ_TIMEOUT);
        
		// 针对secret的处理,前面有剖析,这里不再重复.
		if (!StringUtils.isBlank(secret)) {
          Map<String, String> headers = Signature.buildHttpHeaders(url, appId, secret);
          request.setHeaders(headers);
        }
        transaction.addData("Url", url);

		// 发起HTTP请求
        final HttpResponse<List<ApolloConfigNotification>> response = m_httpClient.doGet(request, m_responseType);
        logger.debug("Long polling response: {}, url: {}", response.getStatusCode(), url);
        
		
		if (response.getStatusCode() == 200 && response.getBody() != null) { // 针对200的处理
          updateNotifications(response.getBody());
          updateRemoteNotifications(response.getBody());
          transaction.addData("Result", response.getBody().toString());
		  // 仅仅是发个通知而已,response不会有配置文件信息的
          notify(lastServiceDto, response.getBody());
        }

        //try to load balance
        if (response.getStatusCode() == 304 && random.nextBoolean()) { // 针对304的处理
          lastServiceDto = null;
        }
		
        m_longPollFailSchedulePolicyInSecond.success();
        transaction.addData("StatusCode", response.getStatusCode());
        transaction.setStatus(Transaction.SUCCESS);
      } catch (Throwable ex) {  // 针对异常处理
        lastServiceDto = null;
        Tracer.logEvent("ApolloConfigException", ExceptionUtil.getDetailMessage(ex));
        transaction.setStatus(ex);
        long sleepTimeInSecond = m_longPollFailSchedulePolicyInSecond.fail();
        logger.warn("Long polling failed, will retry in {} seconds. appId: {}, cluster: {}, namespaces: {}, long polling url: {}, reason: {}",
            sleepTimeInSecond, appId, cluster, assembleNamespaces(), url, ExceptionUtil.getDetailMessage(ex));
        try {
          TimeUnit.SECONDS.sleep(sleepTimeInSecond);
        } catch (InterruptedException ie) {
          //ignore
        }
      } finally {
        transaction.complete();
      }
    }
} // end doLongPollingRefresh

(5). 总结

至此,Apollo的主流代码已经分析完毕,后面的一些内容也就不分析了,除非业务上有需要,对Apollo做一个总结:
1) Apollo会先与Meta Server进行通信,获得config service列表.
2) 在RemoteConfigRepository内部,会随机取一个config service,获得最新的配置信息.
3) 创建长轮询任务,如果配置有更新,仅发出一个通知,会继续由:RemoteConfigRepository去拉取配置.