Overview
I recently used Kafka in a project and wanted to learn more about it. This article will cover the installation of Kafka and some commonly used commands for operating it. These commands are essential for development and troubleshooting.
Deploying Kafka
VM Deployment of Kafka
Machine IPs:
- 192.168.77.81
- 192.168.77.82
- 192.168.77.83
As the deployment involves installing JRE, Zookeeper, and Kafka, which can be complex, I found an Ansible Playbook for deployment. However, it was not very user-friendly (based on CentOS), so I modified it slightly to support both CentOS and Ubuntu. This made it more convenient to use, and I have placed it in my repository: Ansible-Kafka.
Modify Ansible Hosts File
For example, if I have 3 machines, I would modify the hosts file as follows:
[root@liqiang.io]# cat hosts
# These are your kafka cluster nodes
[kafka_servers]
192.168.77.81 kafka_broker_id=1 ansible_user=root
192.168.77.82 kafka_broker_id=2 ansible_user=root
192.168.77.83 kafka_broker_id=3 ansible_user=root
#
# These are your zookeeper cluster nodes
[zk_servers]
192.168.77.81 zk_id=1 ansible_user=root
192.168.77.82 zk_id=2 ansible_user=root
192.168.77.83 zk_id=3 ansible_user=root
Deploy Services
After modifying the hosts file, you can directly deploy. One ansible command can deploy a complete Kafka environment:
[root@liqiang.io]# ansible-playbook kafka.yml -i hosts --tags "java,zookeeper,kafka"
Running Kafka with Docker
[root@liqiang.io]# cat docker-compose.yml
# Copyright VMware, Inc.
# SPDX-License-Identifier: APACHE-2.0
version: "2"
services:
kafka:
image: docker.io/bitnami/kafka:3.5
ports:
- "9092:9092"
volumes:
- "kafka_data:/bitnami"
environment:
# KRaft settings
- KAFKA_CFG_NODE_ID=0
- KAFKA_CFG_PROCESS_ROLES=controller,broker
- KAFKA_CFG_CONTROLLER_QUORUM_VOTERS=0@kafka:9093
# Listeners
- KAFKA_CFG_LISTENERS=PLAINTEXT://:9092,CONTROLLER://:9093
- KAFKA_CFG_ADVERTISED_LISTENERS=PLAINTEXT://:9092
- KAFKA_CFG_LISTENER_SECURITY_PROTOCOL_MAP=CONTROLLER:PLAINTEXT,PLAINTEXT:PLAINTEXT
- KAFKA_CFG_CONTROLLER_LISTENER_NAMES=CONTROLLER
- KAFKA_CFG_INTER_BROKER_LISTENER_NAME=PLAINTEXT
volumes:
kafka_data:
driver: local
[root@liqiang.io]# docker compose up -d
Consumer Group
I did not find a way to directly create a Consumer Group from a tool (there may be one), but each Consumer will belong to a Consumer Group. This is usually specified when creating a Consumer. If the specified Consumer Group does not exist, it will be automatically created.
By default, when using the command-line consumer (./bin/kafka-console-consumer.sh
), you can specify consumer configurations through a configuration file. Kafka has a default configuration file config/consumer.properties
, which can be used with the --consumer.config
parameter to specify the configuration file for the Consumer Group.
Viewing All Consumer Groups
[root@liqiang.io]# bin/kafka-consumer-groups.sh --bootstrap-server <address:port> --list
Example:
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --list
test-consumer-group
Creating a Consumer Group
In addition to creating a Consumer Group using a command, SDKs can also specify a Consumer Group for automatic creation:
Viewing Details of a Consumer Group
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --group test
GROUP TOPIC PARTITION CURRENT-OFFSET LOG-END-OFFSET LAG CONSUMER-ID HOST CLIENT-ID
test liuliqiang-test 0 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
test liuliqiang-test 1 - 0 - sarama-737da37c-0db0-4311-ba12-ffb958e1ad26 /192.168.77.1 sarama
Details of All Consumer Groups
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.81:9092 --describe --all-groups
Topic
Creating a New Topic
[root@liqiang.io]# bin/kafka-topics.sh --bootstrap-server localhost:9092 --create --topic <string> --replication-factor <integer> --partitions <integer>
Parameters:
--bootstrap-server
: Kafka address--partitions
: Number of partitions--replication-factor 3
: Replication factor
Example:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3
Deleting a Topic
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --delete --topic <string>
Example:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --create --topic quickstart-events --replication-factor 2 --partitions 3
Listing Topics
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --list
Example:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --list
Describing All Topics
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe
Example:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.81:9092 --describe
Increasing the Number of Partitions
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --alter --topic <string> --partitions <integer>
Describing a Specific Topic
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --topic <string>
Example:
[root@liqiang.io]# ./kafka-topics.sh --bootstrap-server 192.168.77.83:9092 --describe --topic quickstart-events
Viewing Partitions with Syncing Issues
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --under-replicated-partitions
Viewing Partitions with Missing Leaders
[root@liqiang.io]# bin/kafka-topics.sh --zookeeper <zookeeper connect> --describe --unavailable-partitions
Viewing the Offset Extremes of Partitions in a Topic
[root@liqiang.io]# bin/kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list <address:port> --topic <string> --time <param>
Time parameter explanation:
- -1: View the maximum offset of the Partition, i.e., latest
- -2: View the minimum offset of the Partition, i.e., earliest
- timestamp: Query the earliest Offset value in the Segment that satisfies the timestamp of the message; see Kafka Timestamp
Since there are different versions of Consumers, and the old version (Scala) of the Consumer Client depends on Zookeeper to save Offset, while the new version (Java) is based on the built-in topic (__consumer_offsets
) to save offset, when using Group-related commands, the Consumer version must be determined according to the actual situation. That is, for old versions of queries, only the Zookeeper parameter needs to be specified, while for new versions of queries, the --new-consumer
parameter needs to be specified along with the bootstrap-server
parameter.
Example:
[root@liqiang.io]# ./kafka-run-class.sh kafka.tools.GetOffsetShell --broker-list 192.168.77.81:9092
liuliqiang-test:0:0
liuliqiang-test:1:0
quickstart-events:0:0
quickstart-events:1:0
quickstart-events:2:0
Command Line Production and Consumption
Consuming Data from Console
For old versions of consumers:
[root@liqiang.io]# bin/kafka-console-consumer.sh --zookeeper <zookeeper connect> --topic <string> --from-beginning
For new versions of consumers:
[root@liqiang.io]# bin/kafka-console-consumer.sh --bootstrap-server <address:port> --topic <string> --from-beginning
Example:
[root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning
Specifying Consumer Group via Configuration File
If you directly use the example command above, you will find the consumer group as:
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
console-consumer-83066
You can specify the consumer group via consumer config:
[root@liqiang.io]# ./bin/kafka-console-consumer.sh --bootstrap-server 192.168.77.81:9092 --topic liuliqiang-test --from-beginning --consumer.config config/consumer.properties
[root@liqiang.io]# cat config/consumer.properties | grep "group\.id"
group.id=test-consumer-group
[root@liqiang.io]# ./bin/kafka-consumer-groups.sh --bootstrap-server 192.168.77.83:9092 --list
console-consumer-83066
test-consumer-group
Producing Data from Console
[root@liqiang.io]# bin/kafka-console-producer.sh --broker-list <address:port> --topic <string>
Example:
[root@liqiang.io]# ./bin/kafka-console-producer.sh --broker-list 192.168.77.81:9092 --topic liuliqiang-test
>