概述
最近因为有项目使用到了 Kafka,所以需要了解一下 Kafka 的情况,在这篇文章中,我将介绍如何安装 Kafka 和平时运维 Kafka 使用的一些命令,这些命令在开发和定位过程中应该会经常使用到。
部署 Kafka
VM 部署 Kafka
机器 IP:
- 192.168.77.81
- 192.168.77.82
- 192.168.77.83
因为部署既要安装 JRE、Zookeeper 和 Kafka,所以略显复杂,于是乎我就找了一个 Ansible Playbook,但是这个也不好用(它是基于 CentOS 的),所以就稍微改造了一下,让他可以同时支持 CentOS 和 Ubuntu,用起来就方便了,我把它放在我的仓库了:Ansible-Kafka.
修改 Ansible Hosts 文件
例如我有 3 台机器,那我就将 hosts 文件修改为:
[root@liqiang.io]# cat hosts
# These are your kafka cluster nodes
[kafka_servers]
192.168.77.81 kafka_broker_id=1 ansible_user=root
192.168.77.82 kafka_broker_id=2 ansible_user=root
192.168.77.83 kafka_broker_id=3 ansible_user=root
#
# These are your zookeeper cluster nodes
[zk_servers]
192.168.77.81 zk_id=1 ansible_user=root
192.168.77.82 zk_id=2 ansible_user=root
192.168.77.83 zk_id=3 ansible_user=root
部署服务
修改完 hosts 文件之后,就可以直接部署啦,一条 ansible 命令就可以部署完一个完成的 kafka 环境了:
[root@liqiang.io]# ansible-playbook kafka.yml -i hosts --tags "java,zookeeper,kafka"
Docker 运行 Kafka
[root@liqiang.io]# cat docker-compose.yml
# Copyright VMware, Inc.
# SPDX-License-Identifier: APACHE-2.0
version: "2"
services:
kafka:
image: docker.io/bitnami/kafka:3.5
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
kafka_data:
driver: local
[root@liqiang.io]# docker compose up -d
Consumer Group
我没有找到可以直接从工具直接创建 Consumer Group 的方式(可能会有),但是,每个 Consumer 都会有属于一个 Consumer Group,通常在创建 Consumer 时指定,如果指定的 Consumer Group 不存在,那么会自动创建一个。
默认的,如果你用命令行的 consumer(./bin/kafka-console-consumer.sh
),可以通过配置文件来指定 consumer 配置,Kafka 存在默认的配置 config/consumer.properties
,可以通过参数 --consumer.config
来指定配置文件,从而使用配置文件里面的 Consumer Group。
查看所有的 consumer group
[root@liqiang.io]# bin/kafka-consumer-groups.sh --bootstrap-server <address:port> --list
demo:
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --list
test-consumer-group
创建 consumer group
除了用 Command 创建一个 Consumer Group 之外,SDK 也可以直接指定 Consumer Group 进行自动创建:
查看 consumer group 的详情
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --group test
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test liuliqiang-test 0 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
test liuliqiang-test 1 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --all-groups
查看 consumer group 下各 Topic 对应 partition 的实际消费情况
[root@liqiang.io]# bin/kafka-consumer-groups.sh --bootstrap-server <address:port> --group <name> --describe
通过该命令可以了解到给定Group下每个Topic各Partition的最大Offset值,当前所处的消费位置;还显示出当前Partition所操作的客户端信息,LAG表示消费滞后的情况;执行结果如下所示:
[root@liqiang.io]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --group test --describe
Consumer group 'test' has no active members.
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test liuliqiang-test 0 146427 530106 383679 - - -
test liuliqiang-test 1 141234 530106 388872 - - -
Topic
新建 Topic
[root@liqiang.io]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <string> --replication-factor <integer> --partitions <integer>
参数说明:
--bootstrap-server
:kafka 地址--partitions
:partition 数量--replication-factor 3
:replication 数量
demo:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3
删除 Topic
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --delete --topic <string>
demo:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3
查看 Topic 列表
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --list
demo:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --list
查看所有 Topic 的详细信息
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe
demo:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.81:9092 --describe
Topic: quickstart-events TopicId: sIpWxwkeTvGrIV8jpUyjfw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: quickstart-events Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: quickstart-events Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
Topic: liuliqiang-test TopicId: C2cd2Ua0TO6MvbiHrx7ZFg PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
Topic: liuliqiang-test Partition: 0 Leader: 3 Replicas: 3 Isr: 3
Topic: liuliqiang-test Partition: 1 Leader: 1 Replicas: 1 Isr: 1
增加 Partition 个数
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --alter --topic <string> --partitions <integer>
查看指定 Topic 的详细信息
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --topic <string>
相比于上一条查看所有 topic 的信息命令,查看指定 topic 的信息具有更切实的实际作用,更有利于在实际工作中快速定位和发现问题(推荐使用)。通过该命令可以查看给定 Topic 的分区、副本集个数;以及各分区、副本集的实际分布情况,同时还可以看到每个分区的 ISR 列表信息(有关 ISR 介绍可详见 ISR。通过该命令可以根据各 Partition 的 ISR 情况分析 Broker 状况。
执行结果如下所示:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --describe --topic quickstart-events
Topic: quickstart-events TopicId: sIpWxwkeTvGrIV8jpUyjfw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
Topic: quickstart-events Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
Topic: quickstart-events Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
Topic: quickstart-events Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
查看副本集同步出现异常的分区
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions
查看缺失leader的分区
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --unavailable-partitions
查看 Topic 各 Partition 的 offset 极值
[root@liqiang.io]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <address:port> --topic <string> --time <param>
time 参数详解
-1:查看 Partition 的最大 Offset,即就是 latest
-2:查看 Partition 的最小 Offset,即就是 earlist
timestamp:查询满足时间戳的消息所在 Segment 的最早 Offset 值;详见 Kafka Timestamp
因为 Consumer 的版本有新旧之分,且旧版本(Scala)的 Consumer Client 是依赖于 Zookeeper 来保存 Offset 的,而新版本(Java)的则基于内置的 topic(__consumer_offsets
)来保存 offset,所以在使用 Group 相关命令时,必须根据实际情况确定 Consumer 的版本,即就是:旧版本的查询只需要指定 Zookeeper 参数即可,而新版本的查询需要指定 –new-consumer
参数的同时,指定 bootstrap-server
参数
demo:
[root@liqiang.io]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.77.81:9092
liuliqiang-test:0:0
liuliqiang-test:1:0
quickstart-events:0:0
quickstart-events:1:0
quickstart-events:2:0
命令行生产和消费
控制台消费数据
旧版本消费
[root@liqiang.io]# bin/kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic <string> --from-beginning
新版本消费
[root@liqiang.io]# bin/kafka-console-consumer.sh --bootstrap-server <address:port> --topic <string> --from-beginning
eg:
[root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning
通过配置文件指定 consumer group
如果直接使用上面的示例命令,那么我们会发现 consumer group 为:
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
console-consumer-83066
我们可以通过 consumer config 来指定 consumer group:
[root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning --consumer.config config/consumer.properties
[root@liqiang.io]# cat config/consumer.properties | grep "group\.id"
group.id=test-consumer-group
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
console-consumer-83066
test-consumer-group
控制台生产数据
[root@liqiang.io]# bin/kafka-console-producer.sh --broker-list <address:port> --topic <string>
eg:
[root@liqiang.io]# ./bin/kafka-console-producer.sh --broker-list 192.168.77.81:9092 --topic liuliqiang-test
>