(1). 概述
前面把SpringBoot与Kafka进行了集成,配置都是从官网去拉下来的,但是,为了能够深入到底有哪些配置项是可以配的,需要找到,Kafka的业务模型.
(2). KafkaProperties
@ConfigurationProperties(prefix = "spring.kafka")
public class KafkaProperties {
// kafka服务器列表,可配置多个
private List<String> bootstrapServers = new ArrayList<>(Collections.singletonList("localhost:9092"));
private String clientId;
private final Map<String, String> properties = new HashMap<>();
// 消费者配置
private final Consumer consumer = new Consumer();
// 生产者配置
private final Producer producer = new Producer();
// admin配置
private final Admin admin = new Admin();
// 流配置
private final Streams streams = new Streams();
// 监听器配置
private final Listener listener = new Listener();
// SSL
private final Ssl ssl = new Ssl();
private final Jaas jaas = new Jaas();
private final Template template = new Template();
}
(3). Consumer
public static class Consumer {
private final Ssl ssl = new Ssl();
/**
* Frequency with which the consumer offsets are auto-committed to Kafka if
* 'enable.auto.commit' is set to true.
*/
private Duration autoCommitInterval;
/**
*
* What to do when there is no initial offset in Kafka or if the current offset no
* longer exists on the server.
*/
private String autoOffsetReset;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connections to the Kafka cluster. Overrides the global property, for consumers.
*/
private List<String> bootstrapServers;
/**
* ID to pass to the server when making requests. Used for server-side logging.
*/
private String clientId;
/**
* 启动用自动提交offset
* Whether the consumer's offset is periodically committed in the background.
*/
private Boolean enableAutoCommit;
/**
* Maximum amount of time the server blocks before answering the fetch request if
* there isn't sufficient data to immediately satisfy the requirement given by
* "fetch-min-size".
*/
private Duration fetchMaxWait;
/**
*
* Minimum amount of data the server should return for a fetch request.
*/
private DataSize fetchMinSize;
/**
* 消费者组id
* Unique string that identifies the consumer group to which this consumer
* belongs.
*/
private String groupId;
/**
* Expected time between heartbeats to the consumer coordinator.
*/
private Duration heartbeatInterval;
/**
* Deserializer class for keys.
*/
private Class<?> keyDeserializer = StringDeserializer.class;
/**
* Deserializer class for values.
*/
private Class<?> valueDeserializer = StringDeserializer.class;
/**
* 最大poll的条数
* Maximum number of records returned in a single call to poll().
*/
private Integer maxPollRecords;
/**
* Additional consumer-specific properties used to configure the client.
*/
private final Map<String, String> properties = new HashMap<>();
}
(4). Producer
public static class Producer {
private final Ssl ssl = new Ssl();
/**
* 生产者ack
* Number of acknowledgments the producer requires the leader to have received
* before considering a request complete.
*/
private String acks;
/**
* Default batch size. A small batch size will make batching less common and may
* reduce throughput (a batch size of zero disables batching entirely).
*/
private DataSize batchSize;
/**
* Comma-delimited list of host:port pairs to use for establishing the initial
* connections to the Kafka cluster. Overrides the global property, for producers.
*/
private List<String> bootstrapServers;
/**
* 生产者缓冲区大小
* Total memory size the producer can use to buffer records waiting to be sent to
* the server.
*/
private DataSize bufferMemory;
/**
*
* ID to pass to the server when making requests. Used for server-side logging.
*/
private String clientId;
/**
* 压缩类型?
* Compression type for all data generated by the producer.
*/
private String compressionType;
/**
* Serializer class for keys.
*/
private Class<?> keySerializer = StringSerializer.class;
/**
* Serializer class for values.
*/
private Class<?> valueSerializer = StringSerializer.class;
/**
* 重试次数
* When greater than zero, enables retrying of failed sends.
*/
private Integer retries;
/**
*
* When non empty, enables transaction support for producer.
*/
private String transactionIdPrefix;
/**
* Additional producer-specific properties used to configure the client.
*/
private final Map<String, String> properties = new HashMap<>();
}
(5).
public static class Listener {
public enum Type {
/**
* Invokes the endpoint with one ConsumerRecord at a time.
*/
SINGLE,
/**
* Invokes the endpoint with a batch of ConsumerRecords.
*/
BATCH
}
/**
* 配置监听的类型(ConsumerRecord/ConsumerRecords)
* Listener type.
*/
private Type type = Type.SINGLE;
/**
*
* Listener AckMode. See the spring-kafka documentation.
*/
private AckMode ackMode;
/**
* Prefix for the listener's consumer client.id property.
*/
private String clientId;
/**
* Number of threads to run in the listener containers.
*/
private Integer concurrency;
/**
* Timeout to use when polling the consumer.
*/
private Duration pollTimeout;
/**
* Multiplier applied to "pollTimeout" to determine if a consumer is
* non-responsive.
*/
private Float noPollThreshold;
/**
* Number of records between offset commits when ackMode is "COUNT" or
* "COUNT_TIME".
*/
private Integer ackCount;
/**
* Time between offset commits when ackMode is "TIME" or "COUNT_TIME".
*/
private Duration ackTime;
/**
* Time between publishing idle consumer events (no data received).
*/
private Duration idleEventInterval;
/**
* Time between checks for non-responsive consumers. If a duration suffix is not
* specified, seconds will be used.
*/
@DurationUnit(ChronoUnit.SECONDS)
private Duration monitorInterval;
/**
* Whether to log the container configuration during initialization (INFO level).
*/
private Boolean logContainerConfig;
}
(6). 总结
通过源码,我们能更加清楚知道有哪些配置项是可以配置的,后面会对注解@KafkaListener进行深入剖析.