(1). 概述
前面剖析到MessageListenerContainer接口,这个接口主要负责转化业务模型,并真正与Kafka进行通信,实现订阅功能,它有两个实现类,分别是:ConcurrentMessageListenerContainer和KafkaMessageListenerContainer.
而,我比较好奇的一件事情就是:注解(@KafkaListener(concurrency = “2”))上配置了并发数,那么底层是线程池呢?还是怎么实现的?
(2). ConcurrentMessageListenerContainer
public class ConcurrentMessageListenerContainer<K, V> extends AbstractMessageListenerContainer<K, V> {
// 内部持有N个KafkaMessageListenerContainer
private final List<KafkaMessageListenerContainer<K, V>> containers = new ArrayList<>();
// 并发数
private int concurrency = 1;
// 为了偷懒,我只看这个变量在哪些地方有使用即可.
@Override
protected void doStart() {
if (!isRunning()) {
checkTopics();
ContainerProperties containerProperties = getContainerProperties();
TopicPartitionInitialOffset[] topicPartitions = containerProperties.getTopicPartitions();
if (topicPartitions != null
&& this.concurrency > topicPartitions.length) {
this.logger.warn("When specific partitions are provided, the concurrency must be less than or "
+ "equal to the number of partitions; reduced from " + this.concurrency + " to "
+ topicPartitions.length);
this.concurrency = topicPartitions.length;
}
setRunning(true);
// ******************************************************************
// 哈哈哈,出乎我的猜测:底层没用线程池,而是直接new:KafkaMessageListenerContainer
// 也就是说:ConcurrentMessageListenerContainer内部包裹着:KafkaMessageListenerContainer,典型的组合模式哈.
// ******************************************************************
for (int i = 0; i < this.concurrency; i++) {
KafkaMessageListenerContainer<K, V> container;
if (topicPartitions == null) {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties);
} else {
container = new KafkaMessageListenerContainer<>(this, this.consumerFactory,
containerProperties, partitionSubset(containerProperties, i));
}
String beanName = getBeanName();
container.setBeanName((beanName != null ? beanName : "consumer") + "-" + i);
if (getApplicationEventPublisher() != null) {
container.setApplicationEventPublisher(getApplicationEventPublisher());
}
container.setClientIdSuffix("-" + i);
container.setGenericErrorHandler(getGenericErrorHandler());
container.setAfterRollbackProcessor(getAfterRollbackProcessor());
// 委托给:KafkaMessageListenerContainer
container.start();
// 包裹着所有的:KafkaMessageListenerContainer
this.containers.add(container);
}
}
}
}
(3). 总结
Spring的设计还是挺巧妙的,通过组合模式来实现:并发数的消费,而不是通过一个线程池去配置.