(1). 概述
前面自己搭了一个Kafka集群,在这里对Kafka的常用命令进行一个学习
(2). 创建主题
# 1. 创建主题,实际上是往zk进行注册,然后,kafka内部监听ZK,去实现主题和分区的创建.
# --create --> 创建主题
# --zookeeper 127.0.0.1:2181 --> zk地址
# --partitions 2 --> 指定创建的主题的分区个数(hello-0/hello-1)
# --replication-factor 3 --> 主题对应的副本数()
lixin-macbook:Developer lixin$ ./kafka-2.8.1/kafka_0/bin/kafka-topics.sh --create --zookeeper 127.0.0.1:2181 --replication-factor 3 --partitions 2 --topic hello
Created topic hello.
# 2. 查看创建的主题在磁盘上的信息.
lixin-macbook:Developer lixin$ tree ./kafka-2.8.1/kafka_0/data/kafka-logs/
./kafka-2.8.1/kafka_0/data/kafka-logs/
├── cleaner-offset-checkpoint
├── hello-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── hello-1
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint
lixin-macbook:Developer lixin$ tree ./kafka-2.8.1/kafka_1/data/kafka-logs/
./kafka-2.8.1/kafka_1/data/kafka-logs/
├── cleaner-offset-checkpoint
├── hello-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── hello-1
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint
lixin-macbook:Developer lixin$ tree ./kafka-2.8.1/kafka_2/data/kafka-logs/
./kafka-2.8.1/kafka_2/data/kafka-logs/
├── cleaner-offset-checkpoint
├── hello-0
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── hello-1
│ ├── 00000000000000000000.index
│ ├── 00000000000000000000.log
│ ├── 00000000000000000000.timeindex
│ └── leader-epoch-checkpoint
├── log-start-offset-checkpoint
├── meta.properties
├── recovery-point-offset-checkpoint
└── replication-offset-checkpoint
# 3. 通过ZK,查看有哪些分区
[zk: localhost:2181(CONNECTED) 4] ls /brokers/topics/hello/partitions
[0, 1]
# 4. 通过ZK,查看leader信息以及isr
[zk: localhost:2181(CONNECTED) 7] get /brokers/topics/hello/partitions/0/state
{"controller_epoch":3,"leader":2,"version":1,"leader_epoch":0,"isr":[2,0,1]}
(3). 列出所有的主题
# 直接查找ZK中的数据就好了
lixin-macbook:Developer lixin$ ./kafka-2.8.1/kafka_0/bin/kafka-topics.sh --list --zookeeper localhost:2181
hello
(4). 查看主题的详细
# --describe --> 查看主题信息
# Leader --> 主题下的分区属于哪个broker进行管理(读写数据)
# Replicas: 2,0,1 --> 主题下的分区在哪些brokder有(冗余,仅同步数据到本地)
# Isr --> Isr代表:可以进行数据同步或者已同步的brokder节点,并且,当Leader故障时,会从Isr(brokder)列表里进行选择一个成为Leader
lixin-macbook:Developer lixin$ ./kafka-2.8.1/kafka_0/bin/kafka-topics.sh --describe --zookeeper localhost:2181 --topic hello
Topic: hello TopicId: 56XQ5CJ2SyS3oVVtIz5i8w PartitionCount: 2 ReplicationFactor: 3 Configs:
Topic: hello Partition: 0 Leader: 2 Replicas: 2,0,1 Isr: 2,0,1
Topic: hello Partition: 1 Leader: 0 Replicas: 0,1,2 Isr: 0,1,2
(5). 通过Kafka控制台进行消息发送
# 指定brokder列表
lixin-macbook:Developer lixin$ ./kafka-2.8.1/kafka_0/bin/kafka-console-producer.sh --broker-list 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --topic hello
>hello
>world
(6). 通过Kafka控制台进行消息订阅
# --from-beginning --> 指定从头开始消费.
# --consumer-property group.id=testGroupa --> 指定消费组
lixin-macbook:Developer lixin$ ./kafka-2.8.1/kafka_0/bin/kafka-console-consumer.sh --bootstrap-server 127.0.0.1:9092,127.0.0.1:9093,127.0.0.1:9094 --from-beginning --consumer-property group.id=testGroupa --topic hello
world
hello