主要概念

Topic 和 Partition

图 1:Partition

Consumer

图 2:Consumer Group

Broker

图 3:Partition Redundant

Retention

A key feature of Apache Kafka is that of retention, which is the durable storage of messages for some period of time. Kafka brokers are configured with a default retention setting for topics, either retaining messages for some period of time (e.g., 7 days) or until the partition reaches a certain size in bytes (e.g., 1 GB). Once these limits are reached, messages are expired and deleted. In this way, the retention configuration defines a minimum amount of data available at any time.

生产者:发送消息

发送流程

We start producing messages to Kafka by creating a ProducerRecord, which must include the topic we want to send the record to and a value. Optionally, we can also specify a key, a partition, a timestamp, and/or a collection of headers. Once we send the ProducerRecord, the first thing the producer will do is serialize the key and value objects to byte arrays so they can be sent over the network.

Next, if we didn’t explicitly specify a partition, the data is sent to a partitioner. The partitioner will choose a partition for us, usually based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go to. It then adds the record to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers.

When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.

图 4:生产消息的流程

从中可以看到大部分的工作都是在 SDK 里面做的,完成之后再发送给 Broker Controller。

发送模式

Kafka 的生产者一般支持(有不规范的实现除外)以下 3 种消息发送模式:

一些比较重要的参数

Partition

Headers

配额和限流

消费者:读取消息

Reading data from Kafka is a bit different than reading data from other messaging systems, and there are a few unique concepts and ideas involved. It can be difficult to understand how to use the Consumer API without understanding these concepts first.

消费者组

Partition Rebalance

Partition Rebalance 是一个重要的主题,可能发生在以下情况:

Kafka 支持的两种 Rebalance 的方式:

  1. Eager rebalances
    • 当需要 rebalance 时,消费者组让所有的消费者都放弃当前拥有的 partition,然后等待所有的消费者都放弃成功之后,重新分配 partition,再继续消费;
    • 这种方式的好处就是 rebalance 收敛快,但是问题也很明显,他是 stop the world 的;
  2. Cooperative rebalances
    • 当需要 rebalance 时,消费者组的 leader 通知当前所有消费者需要放弃对某些 partitions的控制权,然后等待这些消费者组完成放弃;然后再分配这些 partitions 给新的消费者;
    • 这种方式的好处就是 rebalance 是不停机的,适合消费者多的场景,相反,问题就是收敛速度较慢;
静态成员
监听 Rebalance

消费者可能需要在消费者组 Rebalance 的时候 Commit 一下 Offset,所以一般 SDK 都会提供一些事件给消费者监听,例如 Java 的 SDK 中就有以下监听事件:

Commits 和 Offsets

图 5:Commit 的 Offset 小了
图 6:Commit 的 Offset 大了
Commit 方式

Kafka 支持以下几种 Commit 方式:

建议方式:同步 + 异步提交混合使用:

  1. [root@liqiang.io]# cat main.java
  2. Duration timeout = Duration.ofMillis(100);
  3. try {
  4. while (!closing) {
  5. ConsumerRecords<String, String> records = consumer.poll(timeout);
  6. for (ConsumerRecord<String, String> record : records) {
  7. System.out.printf("topic = %s, partition = %s, offset = %d,
  8. customer = %s, country = %s\n",
  9. record.topic(), record.partition(),
  10. record.offset(), record.key(), record.value());
  11. }
  12. consumer.commitAsync();
  13. }
  14. consumer.commitSync();
  15. } catch (Exception e) {
  16. log.error("Unexpected error", e);
  17. } finally {
  18. consumer.close();
  19. }
Commit 具体的 offset
  1. [root@liqiang.io]# cat main.java
  2. private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
  3. new HashMap<>(); 1
  4. int count = 0;
  5. ....
  6. Duration timeout = Duration.ofMillis(100);
  7. while (true) {
  8. ConsumerRecords<String, String> records = consumer.poll(timeout);
  9. for (ConsumerRecord<String, String> record : records) {
  10. System.out.printf("topic = %s, partition = %s, offset = %d,
  11. customer = %s, country = %s\n",
  12. record.topic(), record.partition(), record.offset(),
  13. record.key(), record.value()); 2
  14. currentOffsets.put(
  15. new TopicPartition(record.topic(), record.partition()),
  16. new OffsetAndMetadata(record.offset()+1, "no metadata")); 3
  17. if (count % 1000 == 0) 4
  18. consumer.commitAsync(currentOffsets, null); 5
  19. count++;
  20. }
  21. }

副本集

可观测性

监控

对于 Kafka 消费者来说,最关键的任务之一是监控它们的消费进度,或者说是监测它们与生产者的消息产生速度之间的差距。这个差距通常称为消费者滞后(Consumer Lag),它反映了消费者相对于生产者的消息处理进度。举例来说,如果 Kafka 生产者成功发送了100万条消息到某个主题,而你的消费者目前只处理了80万条消息,那么你的消费者滞后了20万条消息,也就是说 Lag 等于20万。

原文