(1).叙述
在前面对@EnabledBinding的配置进行了详解,最终,会向Spring容器中注册bean. beanName:org.springframework.cloud.stream.messaging.Source value:org.springframework.cloud.stream.binding.BindableProxyFactory
(2).BindableProxyFactory 详解
package org.springframework.cloud.stream.binding;
// 大体的理解为.Spring会根据:type产生动态字节码.并向Spring容器中注册.
public class BindableProxyFactory
implements
// **********************************************
// 方法拦截器
// **********************************************
MethodInterceptor,
// **********************************************
// Spring回调,创建Bean的工厂
// **********************************************
FactoryBean<Object>,
// 业务接口
Bindable,
// **********************************************
// Spring回调,Bean实例化之后
// **********************************************
InitializingBean {
// type = org.springframework.cloud.stream.messaging.Source
private Class<?> type;
@Autowired(required = false)
// null
private SharedBindingTargetRegistry sharedBindingTargetRegistry;
// channelFactory = org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory
// messageSourceFactory=org.springframework.cloud.stream.binding.MessageSourceBindingTargetFactory
@Autowired
private Map<String, BindingTargetFactory> bindingTargetFactories;
private Map<String, BoundTargetHolder> inputHolders = new HashMap<>();
private Map<String, BoundTargetHolder> outputHolders = new HashMap<>();
private Object proxy;
// ****************************************************
// 1. Spring 回调
// ****************************************************
public void afterPropertiesSet() throws Exception {
// type = org.springframework.cloud.stream.messaging.Source
Assert.notEmpty(BindableProxyFactory.this.bindingTargetFactories, "'bindingTargetFactories' cannot be empty");
ReflectionUtils.doWithMethods(this.type, new ReflectionUtils.MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException {
Input input = AnnotationUtils.findAnnotation(method, Input.class);
if (input != null) {
String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(input, method);
Class<?> returnType = method.getReturnType();
Object sharedBindingTarget = locateSharedBindingTarget(name, returnType);
if (sharedBindingTarget != null) {
BindableProxyFactory.this.inputHolders.put(name, new BoundTargetHolder(sharedBindingTarget, false));
} else {
BindableProxyFactory.this.inputHolders.put(
name,
new BoundTargetHolder(
getBindingTargetFactory(returnType)
.createInput(name),
true
)
);
} //end else
} //end if
}// end doWith
}); //end
ReflectionUtils.doWithMethods(this.type, new ReflectionUtils.MethodCallback() {
@Override
public void doWith(Method method) throws IllegalArgumentException {
Output output = AnnotationUtils.findAnnotation(method, Output.class);
if (output != null) {// true
// output
String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(output, method);
// org.springframework.messaging.MessageChannel
Class<?> returnType = method.getReturnType();
// null
Object sharedBindingTarget = locateSharedBindingTarget(name, returnType);
if (sharedBindingTarget != null) { // false
BindableProxyFactory.this.outputHolders.put(name, new BoundTargetHolder(sharedBindingTarget, false));
} else { // true
// *********************************************************
// 2. 创建MessageChannl并塞给outputHolders
// 调用getBindingTargetFactory(MessageChannel),创建:DirectWithAttributesChannel通道,并配置名称和出站拦截器.
// 将:DirectWithAttributesChannel传递给BoundTargetHolder进行包裹
// 将name(output)和BoundTargetHolder给Hodler住.
// *********************************************************
BindableProxyFactory.this.outputHolders.put(
name,
new BoundTargetHolder(
// Channel
getBindingTargetFactory(returnType)
// output
.createOutput(name),
true
)
);
} //end else
} // end else
} //end doWith
}); //end
}// end afterPropertiesSet
// 根据MessageChannel获得对应的工厂
private BindingTargetFactory getBindingTargetFactory(
//org.springframework.messaging.MessageChannel
Class<?> bindingTargetType
) {
// [channelFactory]
List<String> candidateBindingTargetFactories = new ArrayList<>();
for (Map.Entry<String, BindingTargetFactory> bindingTargetFactoryEntry : this.bindingTargetFactories.entrySet()) {
if (bindingTargetFactoryEntry.getValue().canCreate(bindingTargetType)) {
candidateBindingTargetFactories.add(bindingTargetFactoryEntry.getKey());
}// end for
} //end for
if (candidateBindingTargetFactories.size() == 1) {
// channelFactory=org.springframework.cloud.stream.binding.SubscribableChannelBindingTargetFactory
return this.bindingTargetFactories.get(candidateBindingTargetFactories.get(0));
} else {
if (candidateBindingTargetFactories.size() == 0) {
throw new IllegalStateException("No factory found for binding target type: "
+ bindingTargetType.getName() + " among registered factories: "
+ StringUtils.collectionToCommaDelimitedString(bindingTargetFactories.keySet()));
} else {
throw new IllegalStateException(
"Multiple factories found for binding target type: " + bindingTargetType.getName() + ": "
+ StringUtils.collectionToCommaDelimitedString(candidateBindingTargetFactories));
} //end else
}// end else
}// end getBindingTargetFactory
// 4. 是否为单例
public boolean isSingleton() {
return true;
}// end isSingleton
// 5. 获得类型
public Class<?> getObjectType() {
//org.springframework.cloud.stream.messaging.Source
return this.type;
}// end getObjectType
// *************************************************************
// 6. 创建动态对象
// *************************************************************
public synchronized Object getObject() throws Exception {
if (this.proxy == null) { //true
// 创建动态代理对象
// proxyInterface : 为代理的接口
// interceptor: 需要实现:org.aopalliance.intercept.Interceptor
// org.springframework.aop.framework.ProxyFactory.ProxyFactory(Class<?> proxyInterface, Interceptorinterceptor)
ProxyFactory factory = new ProxyFactory(this.type, this);
this.proxy = factory.getProxy();
} //end if
return this.proxy;
}// end getObject
// 7.MethodInterceptor
public synchronized Object invoke(MethodInvocation invocation)
throws Throwable {
Method method = invocation.getMethod();
// 7.1 先从缓存中取对象
Object boundTarget = targetCache.get(method);
if (boundTarget != null) {
return boundTarget;
} //end if
// 获取调用的方法是否有注解@Input
Input input = AnnotationUtils.findAnnotation(method, Input.class);
if (input != null) {
String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(input, method);
boundTarget = this.inputHolders.get(name).getBoundTarget();
targetCache.put(method, boundTarget);
return boundTarget;
} else {
// 获取方法上的注解@Output
Output output = AnnotationUtils.findAnnotation(method, Output.class);
if (output != null) {
// 获取方法上@Output(value="output")
String name = BindingBeanDefinitionRegistryUtils.getBindingTargetName(output, method);
// ************************************************
// 在BindableProxyFactory.afterPropertiesSet中第二步
// 中有holder住BoundTargetHolder
// 从outputHolders中获得output
// ************************************************
boundTarget = this.outputHolders.get(name).getBoundTarget();
targetCache.put(method, boundTarget);
return boundTarget;
}
} // end else
return null;
}
}
(3).SubscribableChannelBindingTargetFactory
public class SubscribableChannelBindingTargetFactory
extends AbstractBindingTargetFactory<SubscribableChannel> {
// *********************************************************
// 3. 创建SubscribableChannel
// *********************************************************
public SubscribableChannel createOutput(String name) {
DirectWithAttributesChannel subscribableChannel = new DirectWithAttributesChannel();
subscribableChannel.setAttribute("type", Source.OUTPUT);
// 委托给MessageConverterConfigurer配置输出流
// name = output
// 配置入站或出站消息拦截器
this.messageChannelConfigurer.configureOutputChannel(subscribableChannel, name);
return subscribableChannel;
}// end createOutput
}
(4).MessageConverterConfigurer
public class MessageConverterConfigurer
implements MessageChannelAndSourceConfigurer,
BeanFactoryAware {
// 4. 配置channel信息
private void configureMessageChannel(
MessageChannel channel,
// output
String channelName,
boolean inbound) {
Assert.isAssignable(AbstractMessageChannel.class, channel.getClass());
AbstractMessageChannel messageChannel = (AbstractMessageChannel) channel;
// 获取配置文件信息
// output为channelName
// spring.cloud.stream.bindings.output.destination=test-topic
BindingProperties bindingProperties = this.bindingServiceProperties.getBindingProperties(channelName);
String contentType = bindingProperties.getContentType();
ProducerProperties producerProperties = bindingProperties.getProducer();
// 如果对应的channnel的producerProperties
if (!inbound && producerProperties != null && producerProperties.isPartitioned()) {
messageChannel.addInterceptor(new PartitioningInterceptor(bindingProperties,
getPartitionKeyExtractorStrategy(producerProperties),
getPartitionSelectorStrategy(producerProperties)));
}
// 如果对应的channel的consumerProperties
ConsumerProperties consumerProperties = bindingProperties.getConsumer();
if (this.isNativeEncodingNotSet(producerProperties, consumerProperties, inbound)) {
if (inbound) { // false
// 入站消息拦截器
messageChannel.addInterceptor(new InboundContentTypeEnhancingInterceptor(contentType));
} else {
// ************************************************************
// 出站消息拦截器
// ************************************************************
messageChannel.addInterceptor(new OutboundContentTypeConvertingInterceptor(contentType, this.compositeMessageConverterFactory.getMessageConverterForAllRegistered()));
}
} // end if
}// end configureMessageChannel
}