(1). 概述
在这一小节,主要学习Kakfa对消息的消费,并注意几个配置参数(offset提交/拉取的最大批次).
(2). 消费者案例
package help.lixin.kafka.example.producer;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.clients.consumer.ConsumerRecord;
import org.apache.kafka.clients.consumer.ConsumerRecords;
import org.apache.kafka.clients.consumer.KafkaConsumer;
import org.apache.kafka.common.TopicPartition;
import org.apache.kafka.common.serialization.StringDeserializer;
import org.junit.Before;
import org.junit.Test;
import java.time.Duration;
import java.util.Arrays;
import java.util.Properties;
public class TestConsumer {
private Properties kafkaProperties = null;
private String topic = "hello";
private String groupName = "test-service123";
@Before
public void initProperties() {
kafkaProperties = new Properties();
kafkaProperties.put(ConsumerConfig.BOOTSTRAP_SERVERS_CONFIG, "127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094");
kafkaProperties.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
kafkaProperties.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, StringDeserializer.class.getName());
// 定义消费者组
kafkaProperties.put(ConsumerConfig.GROUP_ID_CONFIG, groupName);
}
@Test
public void testConsumer() {
// earliest --> 从头消费
// latest(默认) --> 只消费自己启动之后的主题消息
kafkaProperties.put(ConsumerConfig.AUTO_OFFSET_RESET_CONFIG , "earliest");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProperties);
// 消息订阅
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 每隔1秒,拉取一批消息(ConsumerRecords).
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// ConsumerRecord 为具体的消息
for (ConsumerRecord<String, String> record : records) {
String format = String.format("topic:[%s],partition:[%d],offset:[%d],value:[%s]", record.topic(), record.partition(), record.offset(), record.value());
System.out.println("RESULT:" + format);
}
}
}
/**
* 测试指定:主题/分区/offset/时间等进行消费
*/
@Test
public void testSettingOffsetConsumer() {
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProperties);
// 1. 指定分区消费
// consumer.assign(Arrays.asList(new TopicPartition(topic,0)));
// 2. 消息回溯消息,相当于指定offset=0,类似于命令行指定了:--from-beginning
// consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
// consumer.seekToBeginning(Arrays.asList(new TopicPartition(topic, 0)));
// 3. 指定offset消费
consumer.assign(Arrays.asList(new TopicPartition(topic, 0)));
consumer.seek(new TopicPartition(topic, 0), 1);
// 根据topic,获得所有的:PartitionInfo
// consumer.partitionsFor(topic);
// 5. 指定时间进行消费
// TopicPartition : 主题和分区信息
// timestampsToSearch : 定位的时间点
// 5.1 consumer.offsetsForTimes(Map<TopicPartition, Long> timestampsToSearch)
// 指定要消费的主题/分区/offset
// 5.2 consumer.assign();
// 5.3 consumer.seek();
while (true) {
// 每隔1秒,拉取一批消息(ConsumerRecords).
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// ConsumerRecord 为具体的消息
for (ConsumerRecord<String, String> record : records) {
String format = String.format("topic:[%s],partition:[%d],offset:[%d],value:[%s]", record.topic(), record.partition(), record.offset(), record.value());
System.out.println("RESULT:" + format);
}
}
}
/**
* 测试每次拉取消息的批次为2条记录
*/
@Test
public void testConsumerDefinMaxRecord() {
// 定义每次拉取消息的最大条数,默认一次poll500条
kafkaProperties.put(ConsumerConfig.MAX_POLL_RECORDS_CONFIG, 2);
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProperties);
// 消息订阅
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 每隔1秒,拉取一批消息(ConsumerRecords).
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
System.out.println("recods:" + records.count());
// ConsumerRecord 为具体的消息
for (ConsumerRecord<String, String> record : records) {
String format = String.format("topic:[%s],partition:[%d],offset:[%d],value:[%s]", record.topic(), record.partition(), record.offset(), record.value());
System.out.println("RESULT:" + format);
}
}
}
/**
* 测试简单消费,默认订阅的是topic的最后的offset,消费消息后自动提交offset
*/
@Test
public void testConsumerAutoCommitOffset() {
// *****************************消费者自动提交**************************************
// 是否自动提交offset,默认是:true
// kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG,"true");
// 自动提交offset的间隔时间
// kafkaProperties.put(ConsumerConfig.AUTO_COMMIT_INTERVAL_MS_CONFIG,"1000");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProperties);
// 消息订阅
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 每隔1秒,拉取一批消息(ConsumerRecords).
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// ConsumerRecord 为具体的消息
for (ConsumerRecord<String, String> record : records) {
String format = String.format("topic:[%s],partition:[%d],offset:[%d],value:[%s]", record.topic(), record.partition(), record.offset(), record.value());
System.out.println("RESULT:" + format);
}
}
}
/**
* 测试消费消息后,手动提交offset.
*/
@Test
public void testConsumerManualCommitOffset() {
// *****************************消费者手动提交**************************************
kafkaProperties.put(ConsumerConfig.ENABLE_AUTO_COMMIT_CONFIG, "false");
KafkaConsumer<String, String> consumer = new KafkaConsumer<String, String>(kafkaProperties);
// 消息订阅
consumer.subscribe(Arrays.asList(topic));
while (true) {
// 每隔1秒,拉取一批消息(ConsumerRecords).
ConsumerRecords<String, String> records = consumer.poll(Duration.ofMillis(1000));
// ConsumerRecord 为具体的消息
for (ConsumerRecord<String, String> record : records) {
String format = String.format("topic:[%s],partition:[%d],offset:[%d],value:[%s]", record.topic(), record.partition(), record.offset(), record.value());
System.out.println("RESULT:" + format);
}
if (records.count() > 0) {
// 手动同步提交offset,会一直阻塞代码往下执行.
consumer.commitSync();
// 手动异步提交offset
// consumer.commitAsync((offsets, ex) -> {
//
// });
}
}
}
}