Overview

I recently used Kafka in a project and wanted to learn more about it. This article will cover the installation of Kafka and some commonly used commands for operating it. These commands are essential for development and troubleshooting.

Deploying Kafka

VM Deployment of Kafka

Machine IPs:

As the deployment involves installing JRE, Zookeeper, and Kafka, which can be complex, I found an Ansible Playbook for deployment. However, it was not very user-friendly (based on CentOS), so I modified it slightly to support both CentOS and Ubuntu. This made it more convenient to use, and I have placed it in my repository: Ansible-Kafka.

Modify Ansible Hosts File

For example, if I have 3 machines, I would modify the hosts file as follows:

  1. [root@liqiang.io]# cat hosts
  2. # These are your kafka cluster nodes
  3. [kafka_servers]
  4. 192.168.77.81 kafka_broker_id=1 ansible_user=root
  5. 192.168.77.82 kafka_broker_id=2 ansible_user=root
  6. 192.168.77.83 kafka_broker_id=3 ansible_user=root
  7. #
  8. # These are your zookeeper cluster nodes
  9. [zk_servers]
  10. 192.168.77.81 zk_id=1 ansible_user=root
  11. 192.168.77.82 zk_id=2 ansible_user=root
  12. 192.168.77.83 zk_id=3 ansible_user=root

Deploy Services

After modifying the hosts file, you can directly deploy. One ansible command can deploy a complete Kafka environment:

  1. [root@liqiang.io]# ansible-playbook kafka.yml -i hosts --tags "java,zookeeper,kafka"

Running Kafka with Docker

  1. [root@liqiang.io]# cat docker-compose.yml
  2. # Copyright VMware, Inc.
  3. # SPDX-License-Identifier: APACHE-2.0
  4. version: "2"
  5. services:
  6. kafka:
  7. image: docker.io/bitnami/kafka:3.5
  8. ports:
  9. - "9092:9092"
  10. volumes:
  11. - "kafka_data:/bitnami"
  12. environment:
  13. # KRaft settings
  14. - KAFKA_CFG_NODE_ID=0
  15. - KAFKA_CFG_PROCESS_ROLES=controller,broker
  16. - KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
  17. # Listeners
  18. - KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
  19. - KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
  20. - KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
  21. - KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
  22. - KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
  23. volumes:
  24. kafka_data:
  25. driver: local
  26. [root@liqiang.io]# docker compose up -d

Consumer Group

I did not find a way to directly create a Consumer Group from a tool (there may be one), but each Consumer will belong to a Consumer Group. This is usually specified when creating a Consumer. If the specified Consumer Group does not exist, it will be automatically created.

By default, when using the command-line consumer (./bin/kafka-console-consumer.sh), you can specify consumer configurations through a configuration file. Kafka has a default configuration file config/consumer.properties, which can be used with the --consumer.config parameter to specify the configuration file for the Consumer Group.

Viewing All Consumer Groups

  1. [root@liqiang.io]# bin/kafka-consumer-groups.sh --bootstrap-server <address:port> --list

Example:

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --list
  2. test-consumer-group

Creating a Consumer Group

In addition to creating a Consumer Group using a command, SDKs can also specify a Consumer Group for automatic creation:

Viewing Details of a Consumer Group

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --group test
  2. GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
  3. test liuliqiang-test 0 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
  4. test liuliqiang-test 1 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama

Details of All Consumer Groups

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --all-groups

Topic

Creating a New Topic

  1. [root@liqiang.io]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <string> --replication-factor <integer> --partitions <integer>

Parameters:

Example:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3

Deleting a Topic

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --delete --topic <string>

Example:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3

Listing Topics

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --list

Example:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --list

Describing All Topics

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe

Example:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.81:9092 --describe

Increasing the Number of Partitions

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --alter --topic <string> --partitions <integer>

Describing a Specific Topic

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --topic <string>

Example:

  1. [root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --describe --topic quickstart-events

Viewing Partitions with Syncing Issues

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions

Viewing Partitions with Missing Leaders

  1. [root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --unavailable-partitions

Viewing the Offset Extremes of Partitions in a Topic

  1. [root@liqiang.io]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <address:port> --topic <string> --time <param>

Time parameter explanation:

Since there are different versions of Consumers, and the old version (Scala) of the Consumer Client depends on Zookeeper to save Offset, while the new version (Java) is based on the built-in topic (__consumer_offsets) to save offset, when using Group-related commands, the Consumer version must be determined according to the actual situation. That is, for old versions of queries, only the Zookeeper parameter needs to be specified, while for new versions of queries, the --new-consumer parameter needs to be specified along with the bootstrap-server parameter.

Example:

  1. [root@liqiang.io]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.77.81:9092
  2. liuliqiang-test:0:0
  3. liuliqiang-test:1:0
  4. quickstart-events:0:0
  5. quickstart-events:1:0
  6. quickstart-events:2:0

Command Line Production and Consumption

Consuming Data from Console

For old versions of consumers:

  1. [root@liqiang.io]# bin/kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic <string> --from-beginning

For new versions of consumers:

  1. [root@liqiang.io]# bin/kafka-console-consumer.sh --bootstrap-server <address:port> --topic <string> --from-beginning

Example:

  1. [root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning

Specifying Consumer Group via Configuration File

If you directly use the example command above, you will find the consumer group as:

  1. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
  2. console-consumer-83066

You can specify the consumer group via consumer config:

  1. [root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning --consumer.config config/consumer.properties
  2. [root@liqiang.io]# cat config/consumer.properties | grep "group\.id"
  3. group.id=test-consumer-group
  4. [root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
  5. console-consumer-83066
  6. test-consumer-group

Producing Data from Console

  1. [root@liqiang.io]# bin/kafka-console-producer.sh --broker-list <address:port> --topic <string>

Example:

  1. [root@liqiang.io]# ./bin/kafka-console-producer.sh --broker-list 192.168.77.81:9092 --topic liuliqiang-test
  2. >