主要概念
Topic 和 Partition
- A message is simply an array of bytes as far as Kafka is concerned
- Messages in Kafka are categorized into topics.
- Topics are additionally broken down into a number of partitions.
- Note that as a topic typically has multiple partitions, there is no guarantee of message ordering across the entire topic, just within a single partition.
图 1:Partition |
---|
Consumer
- The consumer keeps track of which messages it has already consumed by keeping track of the offset of messages. The offset—an integer value that continually increases—is another piece of metadata that Kafka adds to each message as it is produced.
- Consumers work as part of a consumer group, which is one or more consumers that work together to consume a topic. The group ensures that each partition is only consumed by one member.
- The mapping of a consumer to a partition is often called ownership of the partition by the consumer.
图 2:Consumer Group |
---|
Broker
- A single Kafka server is called a broker. The broker receives messages from producers, assigns offsets to them, and writes the messages to storage on disk.
- Depending on the specific hardware and its performance characteristics, a single broker can easily handle thousands of partitions and millions of messages per second.
- Kafka brokers are designed to operate as part of a cluster. Within a cluster of brokers, one broker will also function as the cluster controller.
- The controller is responsible for administrative operations, including assigning partitions to brokers and monitoring for broker failures.
- A partition is owned by a single broker in the cluster, and that broker is called the leader of the partition.
- A replicated partition is assigned to additional brokers, called followers of the partition.
- All producers must connect to the leader in order to publish messages, but consumers may fetch from either the leader or one of the followers.(简单讲就是:主写,从读)
- 所以 cluter controller 和 partition leader 没有直接关系
图 3:Partition Redundant |
---|
Retention
A key feature of Apache Kafka is that of retention, which is the durable storage of messages for some period of time. Kafka brokers are configured with a default retention setting for topics, either retaining messages for some period of time (e.g., 7 days) or until the partition reaches a certain size in bytes (e.g., 1 GB). Once these limits are reached, messages are expired and deleted. In this way, the retention configuration defines a minimum amount of data available at any time.
生产者:发送消息
- 需要考虑的因素:
- is every message critical, or can we tolerate loss of messages?
- Are we OK with accidentally duplicating messages?
- Are there any strict latency or throughput requirements we need to support?
发送流程
We start producing messages to Kafka by creating a ProducerRecord, which must include the topic we want to send the record to and a value. Optionally, we can also specify a key, a partition, a timestamp, and/or a collection of headers. Once we send the ProducerRecord, the first thing the producer will do is serialize the key and value objects to byte arrays so they can be sent over the network.
Next, if we didn’t explicitly specify a partition, the data is sent to a partitioner. The partitioner will choose a partition for us, usually based on the ProducerRecord key. Once a partition is selected, the producer knows which topic and partition the record will go to. It then adds the record to a batch of records that will also be sent to the same topic and partition. A separate thread is responsible for sending those batches of records to the appropriate Kafka brokers.
When the broker receives the messages, it sends back a response. If the messages were successfully written to Kafka, it will return a RecordMetadata object with the topic, partition, and the offset of the record within the partition. If the broker failed to write the messages, it will return an error. When the producer receives an error, it may retry sending the message a few more times before giving up and returning an error.
图 4:生产消息的流程 |
---|
从中可以看到大部分的工作都是在 SDK 里面做的,完成之后再发送给 Broker Controller。
发送模式
Kafka 的生产者一般支持(有不规范的实现除外)以下 3 种消息发送模式:
Fire-and-forget
We send a message to the server and don’t really care if it arrives successfully or not. Most of the time, it will arrive successfully, since Kafka is highly available and the producer will retry sending messages automatically. However, in case of nonretriable errors or timeout, messages will get lost and the application will not get any information or exceptions about this.
放任不管,应用程序不需要知道发送结果,SDK 尽力而为地帮我发送。
Synchronous send
Technically, Kafka producer is always asynchronous—we send a message and the send() method returns a Future object. However, we use get() to wait on the Future and see if the send() was successful or not before sending the next record.
同步发送,一定要个结果生产者才继续后续的消息。
Asynchronous send
We call the send() method with a callback function, which gets triggered when it receives a response from the Kafka broker.
异步发送,结果好坏都行,但是你得通知我。
一些比较重要的参数
client.id
:客户端的标识,在 Broker 的日志中会使用;acks
:当有多少个 Broker 收到消息之后才认为是成功acks=0
:不需要 Leader Partition 确认就认为是成功,有丢数据的风险;acks=1
:只需要 Leader Partition 确认即认为成功,同样有丢数据的风险,同时,在 Leader 异常并且新 Leader 未选出时会出错;acks=all
:安全的选项,但是代价就是延迟会增高;
Partition
- Kafka messages are key-value pairs, and while it is possible to create a
ProducerRecord
with just a topic and a value, with the key set to null by default, most applications produce records with keys. - Keys serve two goals:
- they are additional information that gets stored with the message;
- and they are typically also used to decide which one of the topic partitions the message will be written to;
- When the key is null and the default partitioner is used, the record will be sent to one of the available partitions of the topic at random. A round-robin algorithm will be used to balance the messages among the partitions.
- Starting in the Apache Kafka 2.4 producer, the round-robin algorithm used in the default partitioner when handling null keys is sticky. This means that it will fill a batch of messages sent to a single partition before switching to the next partition. This allows sending the same number of messages to Kafka in fewer requests, leading to lower latency and reduced CPU utilization on the broker.
- 粘性路由,但是是针对请求级别的
- If a key exists and the default partitioner is used, Kafka will hash the key (using its own hash algorithm, so hash values will not change when Java is upgraded) and use the result to map the message to a specific partition.
- Since it is important that a key is always mapped to the same partition, we use all the partitions in the topic to calculate the mapping—not just the available partitions. This means that if a specific partition is unavailable when you write data to it, you might get an error.
- When the default partitioner is used, the mapping of keys to partitions is consistent only as long as the number of partitions in a topic does not change.
- However, the moment you add new partitions to the topic, this is no longer guaranteed—the old records will stay in partition 34 while new records may get written to a different partition.
- When partitioning keys is important, the easiest solution is to create topics with sufficient partitions and never add partitions.
Headers
- Records can, in addition to key and value, also include headers. Record headers give you the ability to add some metadata about the Kafka record, without adding any extra information to the key/value pair of the record itself.
- Headers are often used for lineage to indicate the source of the data in the record, and for routing or tracing messages based on header information without having to parse the message itself (perhaps the message is encrypted and the router doesn’t have permissions to access the data).
- Headers are implemented as an ordered collection of key/value pairs.
- The keys are always a String, and the values can be any serialized object—just like the message value.
配额和限流
- Kafka brokers have the ability to limit the rate at which messages are produced and consumed. This is done via the quota mechanism. Kafka has three quota types: produce, consume, and request.
- Produce and consume quotas limit the rate at which clients can send and receive data, measured in bytes per second.
- Request quotas limit the percentage of time the broker spends processing client requests.
- Quotas can be applied to all clients by setting default quotas, specific client-ids, specific users, or both. User-specific quotas are only meaningful in clusters where security is configured and clients authenticate.
消费者:读取消息
Reading data from Kafka is a bit different than reading data from other messaging systems, and there are a few unique concepts and ideas involved. It can be difficult to understand how to use the Consumer API without understanding these concepts first.
消费者组
- 单一消费者的问题:如果生产速度超过消费速度,消息将无法全部处理;
- 消费者组:订阅同一个 topic 的所有消费者组成的集合(主动的),消费者组中的每个消费者都将消费不同的 partition subset
- 问题:消费者数量超过 partition 数量怎么办?
- 解答:那么超过的消费者将会空闲,并不会获取到任何的消息(没有 partition 给它);
- 问题:消费者数量超过 partition 数量怎么办?
- 一个 topic 允许被多个消费者组使用,他们之间的消费消息互不相关;
Partition Rebalance
Partition Rebalance 是一个重要的主题,可能发生在以下情况:
- 消费者组加入了新的消费者;
- 消费者组中有消费者断开了;
- 管理员新增了 Partition;
Kafka 支持的两种 Rebalance 的方式:
- Eager rebalances
- 当需要 rebalance 时,消费者组让所有的消费者都放弃当前拥有的 partition,然后等待所有的消费者都放弃成功之后,重新分配 partition,再继续消费;
- 这种方式的好处就是 rebalance 收敛快,但是问题也很明显,他是 stop the world 的;
- Cooperative rebalances
- 当需要 rebalance 时,消费者组的 leader 通知当前所有消费者需要放弃对某些 partitions的控制权,然后等待这些消费者组完成放弃;然后再分配这些 partitions 给新的消费者;
- 这种方式的好处就是 rebalance 是不停机的,适合消费者多的场景,相反,问题就是收敛速度较慢;
静态成员
- 默认情况下,消费者组中的每个消费者的 ID 都是随机的,这也就意味着当消费者下线之后,它所拥有的 partition 会被 rebalance;然后重新回来之后,会被分配其他不同的 partition;
- Kafka 允许指定固定的 ID:
group.instance.id
,当一个消费者以固定的 ID 加入消费者组时,它将在该消费者组中拥有一个 session,当他下线之后,分配给他的 partition 将不会被立刻 rebalance,而是会保持住,这个时候这些 partition 将处于无消费者消费的状态;- 在 session 有效期(配置:
session.timeout.ms
)内,如果这个消费者可以正常回来,那么它将会从继续从之前的 partitions 中的最后未消费记录开始处理消息; - 如果在 session 过期前,消费者都不能回来,那么这个消费者所拥有的 partitions 将会被 rebalance。
- 在 session 有效期(配置:
监听 Rebalance
消费者可能需要在消费者组 Rebalance 的时候 Commit 一下 Offset,所以一般 SDK 都会提供一些事件给消费者监听,例如 Java 的 SDK 中就有以下监听事件:
public void onPartitionsAssigned(Collection<TopicPartition> partitions)
public void onPartitionsRevoked(Collection<TopicPartition> partitions)
public void onPartitionsLost(Collection<TopicPartition> partitions)
Commits 和 Offsets
- Kafka 和其他消息队列的不同之处在于它不会记录消息队列的位置,而是将这个工作交给应用程序负责;
offset commit
:消费者上报自己完成处理的最后一个消息再 partition 中 offset 的操作;- 实现方式:消费者通过往
__consumer_offsets
topic 中写入 partition 和 offset 的数据,从而达到记录 offset 的目的;- 这个提交的数据对当前的消费者没有影响(意思 offset 不准确也不会有影响),但是如果当前消费者下线了,对应的 partitions 被 rebalance 了,那么就会有比较大的影响;
- 如果提交的 offset 小了,那么对应的 offset 和真实处理 offset 之间的消息会被重复消费;
- 如果提交的 offset 大了,那么对应的 offset 和真是处理 offset 之间的消息将会丢失;
图 5:Commit 的 Offset 小了 |
---|
图 6:Commit 的 Offset 大了 |
---|
Commit 方式
Kafka 支持以下几种 Commit 方式:
- 自动提交:SDK 每隔 5秒自动提交一次最后一次 poll 的 offset
- 问题:如果消费者崩溃了,可能有些消息没处理也被当做处理了(offset 提交大了);也可能新的一轮
poll
的 offset 因为没到 5s 处理了,但是没有被提交 offset,导致重复处理;
- 问题:如果消费者崩溃了,可能有些消息没处理也被当做处理了(offset 提交大了);也可能新的一轮
- 同步提交:消费者主动提交 offset(不是提交具体的 offset,而是调用
consumer.SyncCommit
,内部会失败重试);- 好处是确保了处理完成了再提交;
- 问题是同步的,需要等待响应才继续下一轮的
poll
;
- 异步提交:消费者主动提交 offset,但是不等待结果(内部不会失败重试);
- 好处是异步的,不需要担心响应时间;
- 问题是可能没有提交成功,并且不会重试;
建议方式:同步 + 异步提交混合使用:
[root@liqiang.io]# cat main.java
Duration timeout = Duration.ofMillis(100);
try {
while (!closing) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(),
record.offset(), record.key(), record.value());
}
consumer.commitAsync();
}
consumer.commitSync();
} catch (Exception e) {
log.error("Unexpected error", e);
} finally {
consumer.close();
}
Commit 具体的 offset
[root@liqiang.io]# cat main.java
private Map<TopicPartition, OffsetAndMetadata> currentOffsets =
new HashMap<>(); 1
int count = 0;
....
Duration timeout = Duration.ofMillis(100);
while (true) {
ConsumerRecords<String, String> records = consumer.poll(timeout);
for (ConsumerRecord<String, String> record : records) {
System.out.printf("topic = %s, partition = %s, offset = %d,
customer = %s, country = %s\n",
record.topic(), record.partition(), record.offset(),
record.key(), record.value()); 2
currentOffsets.put(
new TopicPartition(record.topic(), record.partition()),
new OffsetAndMetadata(record.offset()+1, "no metadata")); 3
if (count % 1000 == 0) 4
consumer.commitAsync(currentOffsets, null); 5
count++;
}
}
副本集
- ISR(In-Sync Replicas):副本同步列表【包含Leader和Follower】
- OSR(Outof-Sync Replicas):由于同步落后而被剔除的副本列表,阈值参数:replica.lag.time.max.ms
- AR(Assigned Replicas):所有副本集;AR = ISR + OSR
可观测性
监控
对于 Kafka 消费者来说,最关键的任务之一是监控它们的消费进度,或者说是监测它们与生产者的消息产生速度之间的差距。这个差距通常称为消费者滞后(Consumer Lag),它反映了消费者相对于生产者的消息处理进度。举例来说,如果 Kafka 生产者成功发送了100万条消息到某个主题,而你的消费者目前只处理了80万条消息,那么你的消费者滞后了20万条消息,也就是说 Lag 等于20万。