(1). 概述

在前面,把BusAutoConfiguration初始化的一部份代码给剖析了一篇,在这一篇,主要剖析:BusAutoConfiguration.

(2). BusAutoConfiguration

@Configuration
@ConditionalOnBusEnabled
@EnableBinding(SpringCloudBusClient.class)
@EnableConfigurationProperties(BusProperties.class)
@AutoConfigureBefore(BindingServiceConfiguration.class) // so stream bindings work properly
@AutoConfigureAfter(LifecycleMvcEndpointAutoConfiguration.class) // so actuator endpoints have needed dependencies
public class BusAutoConfiguration implements ApplicationEventPublisherAware {

	public static final String BUS_PATH_MATCHER_NAME = "busPathMatcher";

	public static final String CLOUD_CONFIG_NAME_PROPERTY = "spring.cloud.config.name";
	
	// *************************************************************************************
	// MessageChannel(消息发送)
	// *************************************************************************************
	private MessageChannel cloudBusOutboundChannel;

	private ApplicationEventPublisher applicationEventPublisher;

	private final ServiceMatcher serviceMatcher;

	private final BindingServiceProperties bindings;

	private final BusProperties bus;

	// *************************************************************************************
	// 1. 构建器注入:ServiceMatcher/BindingServiceProperties/BusProperties
	//   注意:BindingServiceProperties属于(stream): 
	//   org.springframework.cloud.stream.config.BindingServiceProperties
	// *************************************************************************************
	public BusAutoConfiguration(ServiceMatcher serviceMatcher, BindingServiceProperties bindings, BusProperties bus) {
		this.serviceMatcher = serviceMatcher;
		this.bindings = bindings;
		this.bus = bus;
	}

	// *************************************************************************************
	// 2. 额,spring cloud bus实际就是配置:spring cloud stream
	// *************************************************************************************
	@PostConstruct
	public void init() {
		
		// bindings是一个map集合(springCloudBusInput),从map中获取:BindingProperties
		// SpringCloudBusClient.INPUT = springCloudBusInput
		BindingProperties inputBinding = this.bindings.getBindings().get(SpringCloudBusClient.INPUT);
		// 如果不存在,则创建一个
		if (inputBinding == null) { // false
			this.bindings.getBindings().put(SpringCloudBusClient.INPUT, new BindingProperties());
		}
		
		BindingProperties input = this.bindings.getBindings().get(SpringCloudBusClient.INPUT);
		if (input.getDestination() == null || input.getDestination().equals(SpringCloudBusClient.INPUT)) {
			// *****************************************************************************************
			// 2.1 设置destination=springCloudBus
			// *****************************************************************************************
			input.setDestination(this.bus.getDestination());
		}
		
		// bindings是一个map集合(springCloudBusOutput),从map中获取:BindingProperties
		// SpringCloudBusClient.OUTPUT = springCloudBusOutput
		BindingProperties outputBinding = this.bindings.getBindings().get(SpringCloudBusClient.OUTPUT);
		if (outputBinding == null) {
			this.bindings.getBindings().put(SpringCloudBusClient.OUTPUT,new BindingProperties());
		}
		
		BindingProperties output = this.bindings.getBindings().get(SpringCloudBusClient.OUTPUT);
		if (output.getDestination() == null || output.getDestination().equals(SpringCloudBusClient.OUTPUT)) {
			// *****************************************************************************************
			// 2.2 设置destination=springCloudBus
			// *****************************************************************************************
			output.setDestination(this.bus.getDestination());
		}
	} // end init
	
	
	// *****************************************************************************************
	// 3. 注入MessageChannel
	// *****************************************************************************************
	@Autowired
	@Output(SpringCloudBusClient.OUTPUT)
	public void setCloudBusOutboundChannel(MessageChannel cloudBusOutboundChannel) {
		this.cloudBusOutboundChannel = cloudBusOutboundChannel;
	}// end setCloudBusOutboundChannel
}

(3). 事件发布与监听

// 监听:RemoteApplicationEvent事件,往MessageChannel里发送数据
@EventListener(classes = RemoteApplicationEvent.class)
public void acceptLocal(RemoteApplicationEvent event) {
	if (this.serviceMatcher.isFromSelf(event)
			&& !(event instanceof AckRemoteApplicationEvent)) {
		this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(event).build());
	}
}

// 订阅消息处理
@StreamListener(SpringCloudBusClient.INPUT)
public void acceptRemote(RemoteApplicationEvent event) {
	// ***********************************************************************************************
	// 消费者会监听这两个Event事件.
	// EnvironmentChangeRemoteApplicationEvent
	// AckRemoteApplicationEvent
	// ***********************************************************************************************
	if (event instanceof AckRemoteApplicationEvent) {
		if (this.bus.getTrace().isEnabled() && !this.serviceMatcher.isFromSelf(event) && this.applicationEventPublisher != null) {
			this.applicationEventPublisher.publishEvent(event);
		}
		return;
	}
	
	if (this.serviceMatcher.isForSelf(event) && this.applicationEventPublisher != null) {
		if (!this.serviceMatcher.isFromSelf(event)) {
			this.applicationEventPublisher.publishEvent(event);
		}
		
		if (this.bus.getAck().isEnabled()) {
			AckRemoteApplicationEvent ack = new AckRemoteApplicationEvent(this,
					this.serviceMatcher.getServiceId(),
					this.bus.getAck().getDestinationService(),
					event.getDestinationService(), event.getId(), event.getClass());
             // 发送ack事件
			this.cloudBusOutboundChannel.send(MessageBuilder.withPayload(ack).build());
			this.applicationEventPublisher.publishEvent(ack);
		}
	}
	
	if (this.bus.getTrace().isEnabled() && this.applicationEventPublisher != null) {
		this.applicationEventPublisher.publishEvent(new SentApplicationEvent(this, event.getOriginService(), event.getDestinationService(),event.getId(), event.getClass()));
	}
}

(4). 总结

Spring Cloud Bus的底层实际是组装配置(BindingServiceProperties),余下的事情交给了:Spring Cloud Stream做处理.