概述

最近因为有项目使用到了 Kafka,所以需要了解一下 Kafka 的情况,在这篇文章中,我将介绍如何安装 Kafka 和平时运维 Kafka 使用的一些命令,这些命令在开发和定位过程中应该会经常使用到。

部署 Kafka

VM 部署 Kafka

机器 IP:

因为部署既要安装 JRE、Zookeeper 和 Kafka,所以略显复杂,于是乎我就找了一个 Ansible Playbook,但是这个也不好用(它是基于 CentOS 的),所以就稍微改造了一下,让他可以同时支持 CentOS 和 Ubuntu,用起来就方便了,我把它放在我的仓库了:Ansible-Kafka.

修改 Ansible Hosts 文件

例如我有 3 台机器,那我就将 hosts 文件修改为:

  1. [root@liqiang.io]# cat hosts
  2. # These are your kafka cluster nodes
  3. [kafka_servers]
  4. 192.168.77.81 kafka_broker_id=1 ansible_user=root
  5. 192.168.77.82 kafka_broker_id=2 ansible_user=root
  6. 192.168.77.83 kafka_broker_id=3 ansible_user=root
  7. #
  8. # These are your zookeeper cluster nodes
  9. [zk_servers]
  10. 192.168.77.81 zk_id=1 ansible_user=root
  11. 192.168.77.82 zk_id=2 ansible_user=root
  12. 192.168.77.83 zk_id=3 ansible_user=root

部署服务

修改完 hosts 文件之后,就可以直接部署啦,一条 ansible 命令就可以部署完一个完成的 kafka 环境了:

  1. [root@liqiang.io]# ansible-playbook kafka.yml -i hosts --tags "java,zookeeper,kafka"

Docker 运行 Kafka

  1. [root@liqiang.io]# cat docker-compose.yml
  2. # Copyright VMware, Inc.
  3. # SPDX-License-Identifier: APACHE-2.0
  4. version: "2"
  5. services:
  6. kafka:
  7. image: docker.io/bitnami/kafka:3.5
  8. ports:
  9. - "9092:9092"
  10. volumes:
  11. - "kafka_data:/bitnami"
  12. environment:
  13. # KRaft settings
  14. - KAFKA_CFG_NODE_ID=0
  15. - KAFKA_CFG_PROCESS_ROLES=controller,broker
  16. - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
  17. # Listeners
  18. - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
  19. - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
  20. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
  21. - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
  22. - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  23. volumes:
  24. kafka_data:
  25. driver: local
  26. [root@liqiang.io]# docker compose up -d

Consumer Group

我没有找到可以直接从工具直接创建 Consumer Group 的方式(可能会有),但是,每个 Consumer 都会有属于一个 Consumer Group,通常在创建 Consumer 时指定,如果指定的 Consumer Group 不存在,那么会自动创建一个。

默认的,如果你用命令行的 consumer(./bin/kafka-console-consumer.sh),可以通过配置文件来指定 consumer 配置,Kafka 存在默认的配置 config/consumer.properties,可以通过参数 --consumer.config 来指定配置文件,从而使用配置文件里面的 Consumer Group。

查看所有的 consumer group

  1. [root@liqiang.io]# bin/kafka-consumer-groups.sh --bootstrap-server <address:port> --list

demo:

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --list
  2. test-consumer-group

创建 consumer group

除了用 Command 创建一个 Consumer Group 之外,SDK 也可以直接指定 Consumer Group 进行自动创建:

查看 consumer group 的详情

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --group test
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test liuliqiang-test 0 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
  4. test liuliqiang-test 1 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
  5. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --all-groups

查看 consumer group 下各 Topic 对应 partition 的实际消费情况

  1. [root@liqiang.io]# bin/kafka-consumer-groups.sh --bootstrap-server <address:port> --group <name> --describe

通过该命令可以了解到给定Group下每个Topic各Partition的最大Offset值,当前所处的消费位置;还显示出当前Partition所操作的客户端信息,LAG表示消费滞后的情况;执行结果如下所示:

  1. [root@liqiang.io]# ./kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --group test --describe
  2. Consumer group 'test' has no active members.
  3. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  4. test liuliqiang-test 0 146427 530106 383679 - - -
  5. test liuliqiang-test 1 141234 530106 388872 - - -

Topic

新建 Topic

  1. [root@liqiang.io]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <string> --replication-factor <integer> --partitions <integer>

参数说明:

demo:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3

删除 Topic

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --delete --topic <string>

demo:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3

查看 Topic 列表

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --list

demo:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --list

查看所有 Topic 的详细信息

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe

demo:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.81:9092 --describe
  2. Topic: quickstart-events TopicId: sIpWxwkeTvGrIV8jpUyjfw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
  3. Topic: quickstart-events Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
  4. Topic: quickstart-events Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
  5. Topic: quickstart-events Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3
  6. Topic: liuliqiang-test TopicId: C2cd2Ua0TO6MvbiHrx7ZFg PartitionCount: 2 ReplicationFactor: 1 Configs: segment.bytes=1073741824
  7. Topic: liuliqiang-test Partition: 0 Leader: 3 Replicas: 3 Isr: 3
  8. Topic: liuliqiang-test Partition: 1 Leader: 1 Replicas: 1 Isr: 1

增加 Partition 个数

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --alter --topic <string> --partitions <integer>

查看指定 Topic 的详细信息

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --topic <string>

相比于上一条查看所有 topic 的信息命令,查看指定 topic 的信息具有更切实的实际作用,更有利于在实际工作中快速定位和发现问题(推荐使用)。通过该命令可以查看给定 Topic 的分区、副本集个数;以及各分区、副本集的实际分布情况,同时还可以看到每个分区的 ISR 列表信息(有关 ISR 介绍可详见 ISR。通过该命令可以根据各 Partition 的 ISR 情况分析 Broker 状况。

执行结果如下所示:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --describe --topic quickstart-events
  2. Topic: quickstart-events TopicId: sIpWxwkeTvGrIV8jpUyjfw PartitionCount: 3 ReplicationFactor: 2 Configs: segment.bytes=1073741824
  3. Topic: quickstart-events Partition: 0 Leader: 2 Replicas: 2,1 Isr: 2,1
  4. Topic: quickstart-events Partition: 1 Leader: 3 Replicas: 3,2 Isr: 3,2
  5. Topic: quickstart-events Partition: 2 Leader: 1 Replicas: 1,3 Isr: 1,3

查看副本集同步出现异常的分区

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions

查看缺失leader的分区

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --unavailable-partitions

查看 Topic 各 Partition 的 offset 极值

  1. [root@liqiang.io]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <address:port> --topic <string> --time <param>

time 参数详解

-1:查看 Partition 的最大 Offset,即就是 latest
-2:查看 Partition 的最小 Offset,即就是 earlist
timestamp:查询满足时间戳的消息所在 Segment 的最早 Offset 值;详见 Kafka Timestamp

因为 Consumer 的版本有新旧之分,且旧版本(Scala)的 Consumer Client 是依赖于 Zookeeper 来保存 Offset 的,而新版本(Java)的则基于内置的 topic(__consumer_offsets )来保存 offset,所以在使用 Group 相关命令时,必须根据实际情况确定 Consumer 的版本,即就是:旧版本的查询只需要指定 Zookeeper 参数即可,而新版本的查询需要指定 –new-consumer 参数的同时,指定 bootstrap-server 参数

demo:

  1. [root@liqiang.io]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.77.81:9092
  2. liuliqiang-test:0:0
  3. liuliqiang-test:1:0
  4. quickstart-events:0:0
  5. quickstart-events:1:0
  6. quickstart-events:2:0

命令行生产和消费

控制台消费数据

旧版本消费

  1. [root@liqiang.io]# bin/kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic <string> --from-beginning

新版本消费

  1. [root@liqiang.io]# bin/kafka-console-consumer.sh --bootstrap-server <address:port> --topic <string> --from-beginning

eg:

  1. [root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning

通过配置文件指定 consumer group

如果直接使用上面的示例命令,那么我们会发现 consumer group 为:

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
  2. console-consumer-83066

我们可以通过 consumer config 来指定 consumer group:

  1. [root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning --consumer.config config/consumer.properties
  2. [root@liqiang.io]# cat config/consumer.properties | grep "group\.id"
  3. group.id=test-consumer-group
  4. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
  5. console-consumer-83066
  6. test-consumer-group

控制台生产数据

  1. [root@liqiang.io]# bin/kafka-console-producer.sh --broker-list <address:port> --topic <string>

eg:

  1. [root@liqiang.io]# ./bin/kafka-console-producer.sh --broker-list 192.168.77.81:9092 --topic liuliqiang-test
  2. >