(1). 概述
在前面,把Spring Cloud Stream与Kafka进行了集成,在这里,开始要深入了解下,集成后有哪些配置项是可以配置的,也就是要找到业务模型.
(2). 源码切入点在哪?
一年前有把Spring Cloud Stream与RocketMQ进行集成,稍微的分析了下原理,在这里也稍微的提一下,切入点代码在:spring-cloud-stream工程里,它会读取classpath下所有的:spring.binders,并初始化.
@Configuration(proxyBeanMethods = false)
@Role(BeanDefinition.ROLE_INFRASTRUCTURE)
@EnableConfigurationProperties({ BindingServiceProperties.class })
@Import(ContentTypeConfiguration.class)
public class BinderFactoryAutoConfiguration {
@Bean
public BinderTypeRegistry binderTypeRegistry(
ConfigurableApplicationContext configurableApplicationContext) {
Map<String, BinderType> binderTypes = new HashMap<>();
ClassLoader classLoader = configurableApplicationContext.getClassLoader();
try {
// **********************************************************************
// spring cloud stream在初始化的时候,会去读取:META-INF/spring.binders文件,
// **********************************************************************
Enumeration<URL> resources = classLoader.getResources("META-INF/spring.binders");
// ... ...
if (binderTypes.isEmpty() && !Boolean.valueOf(this.selfContained)
&& (resources == null || !resources.hasMoreElements())) {
this.logger.debug(
"Failed to locate 'META-INF/spring.binders' resources on the classpath."
+ " Assuming standard boot 'META-INF/spring.factories' configuration is used");
} else {
// ********************************************************************
// 解析:META-INF/spring.binders的内容,并,保存到:Map容器里.
// ********************************************************************
while (resources.hasMoreElements()) {
URL url = resources.nextElement();
UrlResource resource = new UrlResource(url);
for (BinderType binderType : parseBinderConfigurations(classLoader, resource)) {
binderTypes.put(binderType.getDefaultName(), binderType);
}
}
} //end else
} catch (IOException | ClassNotFoundException e) {
throw new BeanCreationException("Cannot create binder factory:", e);
}
return new DefaultBinderTypeRegistry(binderTypes);
} // end binderTypeRegistry
}
(3). 查看kafka的配置(spring.binders)
# spring-cloud-stream-binder-kafka-2.1.0.RELEASE.jar/META-INF/spring.binders
kafka:\
org.springframework.cloud.stream.binder.kafka.config.KafkaBinderConfiguration
(4). KafkaBinderConfiguration
@Bean
KafkaBinderConfigurationProperties configurationProperties(KafkaProperties kafkaProperties) {
return new KafkaBinderConfigurationProperties(kafkaProperties);
}
(5). KafkaBinderConfigurationProperties
KafkaBinderConfigurationProperties主要负责Kafka的相关信息配置,那么,另一部份配置又在哪呢(spring.cloud.stream.bindings)?
// *******************************************************************************
// KafkaBinderConfigurationProperties主要负责如下配置,从这些配置能看出,主要是配置MQ的相关信息.
// # spring.cloud.stream.kafka.binder.brokers=127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094
// # spring.cloud.stream.kafka.binder.zk-nodes=127.0.0.1:2181,127.0.0.1:2182,127.0.0.1:2183
// # spring.cloud.stream.kafka.binder.auto-create-topics=true
// # spring.cloud.stream.kafka.binder.required-acks=-1
// *******************************************************************************
@ConfigurationProperties(prefix = "spring.cloud.stream.kafka.binder")
public class KafkaBinderConfigurationProperties {
// kafka brokder地址
private static final String DEFAULT_KAFKA_CONNECTION_STRING = "localhost:9092";
// kafka支持事务管理
private final Transaction transaction = new Transaction();
// 内部再持有一个:org.springframework.boot.autoconfigure.kafka.KafkaProperties
// 会从KafkaProperties中解析出来一些信息,填充到当前模型里.
private final KafkaProperties kafkaProperties;
// zk地址
private String[] zkNodes = new String[] { "localhost" };
// 应用于生产者和消费者的属性
private Map<String, String> configuration = new HashMap<>();
// consumer属性配置
private Map<String, String> consumerProperties = new HashMap<>();
// producer属性配置
private Map<String, String> producerProperties = new HashMap<>();
// zk端口
private String defaultZkPort = "2181";
// kafka broker地址
private String[] brokers = new String[] { "localhost" };
// brokder port
private String defaultBrokerPort = "9092";
private String[] headers = new String[] {};
// 更新offset时间窗口
private int offsetUpdateTimeWindow = 10000;
//
private int offsetUpdateCount;
private int offsetUpdateShutdownTimeout = 2000;
//
private int maxWait = 100;
// 自动创建topic
private boolean autoCreateTopics = true;
// 自动添加praition
private boolean autoAddPartitions;
// socket缓冲区
private int socketBufferSize = 2097152;
// zk会话超时时间
private int zkSessionTimeout = 10000;
// zk连接超时时间
private int zkConnectionTimeout = 10000;
// product ack模式(0/1/-1)
private String requiredAcks = "1";
// 副本数
private short replicationFactor = 1;
// 每次fetch大小
private int fetchSize = 1024 * 1024;
// 最小分区数
private int minPartitionCount = 1;
//
private int queueSize = 8192;
// 心跳
private int healthTimeout = 60;
private JaasLoginModuleConfiguration jaas;
private String headerMapperBeanName;
}
(6). BindingServiceProperties
BindingServiceProperties属于spring colud stream的代码,它主要负责:spring.cloud.stream.bindings的配置.
@ConfigurationProperties("spring.cloud.stream")
@JsonInclude(Include.NON_DEFAULT)
public class BindingServiceProperties
implements ApplicationContextAware, InitializingBean {
// 重试间隔
private static final int DEFAULT_BINDING_RETRY_INTERVAL = 30;
private String source;
@Value("${INSTANCE_INDEX:${CF_INSTANCE_INDEX:0}}")
private int instanceIndex;
private List<Integer> instanceIndexList = new ArrayList<>();
private int instanceCount = 1;
// **********************************************************************
//
// spring.cloud.stream.bindings.output.destination=hello-world
// spring.cloud.stream.bindings.output.content-type=text/plain
// **********************************************************************
private Map<String, BindingProperties> bindings = new TreeMap<>(String.CASE_INSENSITIVE_ORDER);
private Map<String, BinderProperties> binders = new HashMap<>();
private String defaultBinder;
private String[] dynamicDestinations = new String[0];
private int dynamicDestinationCacheSize = 10;
// 绑定重试间隔
private int bindingRetryInterval = DEFAULT_BINDING_RETRY_INTERVAL;
private ConfigurableApplicationContext applicationContext = new GenericApplicationContext();
private ConversionService conversionService;
}
(7). BindingProperties
package org.springframework.cloud.stream.config;
@JsonInclude(Include.NON_DEFAULT)
@Validated
public class BindingProperties {
//
public static final MimeType DEFAULT_CONTENT_TYPE = MimeTypeUtils.APPLICATION_JSON;
private static final String COMMA = ",";
// *************************************************************
// *************************************************************
private String destination;
// 所属组
private String group;
// 持久化时的数据类型
private String contentType = DEFAULT_CONTENT_TYPE.toString();
private String binder;
// *************************************************************
// 消费者信息指定
// spring.cloud.stream.bindings.XXX.consumer.concurrency=2
// *************************************************************
private ConsumerProperties consumer;
// *************************************************************
// 生产者信息指定
// *************************************************************
private ProducerProperties producer;
}
(8). 总结
通过源码分析,我们能知道,Spring Cloud Stream与Kafka集成后,支持哪些属性了.