概述
在这篇文章中,我将以 Go 语言为例,使用 Sarama SDK演示如何通过 Kafka 生产和消费消息。其中,消费消息我将演示两种模式的消费方式,分别是自动提交消息已被消费以及主动提交消息消费 ACK 的模式,实际上这也是不同的消息处理模型(即最多消费一次和最少消费一次)的实践。但是在这里我没有演示有且只消费一次的模式,因为这不是单单消息队列就能支持的功能,还需要一些额外的业务支持才可以,所以点到即止,没有进行深入地演示。
Producer
Producer 发送消息给 Kafka 可以分为异步和同步两种方式,同步很好理解,就是发送一个消息之后,我们可以等待结果(是否成功被消息队列接收并保存),异步的话就稍微麻烦一些了,我们不能直接在生产消息的地方等待 Kafka 的接收结果,而是需要监听一个 Channel 获取成功或者失败的消息结果,并且从结果中根据一些标识来判断我们的消息是否被成功处理。
这两种方式各有好坏:
模式 | 同步模式 | 异步模式 |
---|---|---|
优势 |
|
|
劣势 |
|
|
但是,可以发现不同模式各有好坏,所以实际使用中可以根据我们的需要进行选择,这里我分别以两种不同的模式为例介绍一下简单的代码实现,完整的代码我已经放到了 Github 上:Golang Kafka Producer
同步生产消息
[root@liqiang.io]# cat producer/standalone_sync.go
// 创建配置
config := sarama.NewConfig()
config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本都响应后的响应模式
config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机分区策略
config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
// 使用给定代理地址和配置创建一个生产者
producer, err := sarama.NewSyncProducer(strings.Split(brokers, ","), config)
defer func() {
producer.Close()
}()
// 构建要发送的消息
msg := &sarama.ProducerMessage{
Topic: topic,
Key: sarama.StringEncoder("hello-world"),
Value: sarama.StringEncoder("Hello, Kafka!"),
}
// 发送消息
partition, offset, err := producer.SendMessage(msg)
fmt.Printf("Message %s sent to partition %d at offset %d\n", msg.Key, partition, offset)
异步生产消息
[root@liqiang.io]# cat producer/standalone_async.go
// 创建配置
config := sarama.NewConfig()
config.Producer.Return.Successes = true // 启用成功通知
config.Producer.Return.Errors = true // 启用错误通知
// 创建生产者
producer, err := sarama.NewAsyncProducer(strings.Split(brokers, ","), config)
// 异步发送消息
go func() {
for {
select {
case msg := <-producer.Successes():
fmt.Printf("Produced message to topic %s partition %d at offset %d\n", msg.Topic, msg.Partition, msg.Offset)
case err := <-producer.Errors():
log.Printf("Error producing message: %v\n", err)
}
}
}()
producer.Input() <- &sarama.ProducerMessage{
Topic: "test-topic",
Key: sarama.StringEncoder("hello-world"),
Value: sarama.StringEncoder("Hello, Kafka!"),
}
Consuemr
在消费 Kafka 方面,我们也有不同的模式,区分的方式为当我们拿到一个消息之后,我们怎么告诉消息队列我们已经成功地消费了这个消息,所以这里有两种场景,分别是当我们将消息分配给处理函数的时候,就告诉消息队列我们已经消费了这个消息,这种情况可能出现的问题就是处理函数可能会失败地处理这个消息(例如处理过程中程序异常了,或者处理函数当前不能处理成功处理这个消息),从而导致的问题就是这个消息最终不能被成功处理就从消息队列中被标记为已处理,我们称这种情况为最多处理一次。
另外一种情况就是我们将消息交给处理函数之后,在处理函数返回成功的情况下,我们才提交 Offset 到 Kafka 中。完整的示例代码我已经提交到了 Github 上:Golang Kafka Consumer
自动提交模式(at most once)
其实 Sarama 的自动提交模式并不是严格的 at most once 语义,因为 Sarama 并不是在将消息交给处理函数时就提交 Offset,而是间隔一段时间才提交 Offset,所以可能出现的情况是有些 Message 已经交给了处理函数,但是因为还没到间隔时间,没有提交 Offset 到 Kafka,如果这个时候业务应用异常了,可能会出现这些消息被多次消费的情况。
但是,一般认为这种自动提交的模式是 at most once 语义,示例代码为:
[root@liqiang.io]# cat consumer/at_most_once.go
// 创建配置
config := sarama.NewConfig()
config.Consumer.Return.Errors = true
config.Consumer.Offsets.AutoCommit.Enable = true // 开启自动提交, 默认也是 true
config.Version = sarama.V3_6_0_0 // 设置 Kafka 版本
consumer, err := sarama.NewConsumer(strings.Split(brokers, ","), config)
defer consumer.Close()
partitions, err := consumer.Partitions(topic)
for _, partition := range partitions {
partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
go func(pc sarama.PartitionConsumer) {
for {
msg := <-pc.Messages()
fmt.Printf("Partition: %d, Offset: %d, Key: %s, Value: %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
}
}(partitionConsumer)
}
手动提交模式(at least once)
手动提交模式我们需要自定义个 ConsumerGroupHandler,这个 Handler 有三个方法接口:
type ConsumerGroupHandler interface {
Setup(ConsumerGroupSession) error
Cleanup(ConsumerGroupSession) error
ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
}
他们的调用时机和调用规范为:
Setup
:在真正地启动 Consumer 之前,会通过这个函数来让 Consumer 做一些配置,执行完之后就开始真正的消费;ConsumeClain
:这个处理函数必须内部运行一个循环用于消费ConsumerGroupClaim
的消息,如果这个 Channel 关闭了,那么应该退出循环并且返回函数;Cleanup
:在所有的ConsumeClaim
goroutine 都返回后,并且在最后一次的 offset 上报之前,这个函数会被调用;
下面就是一个使用这个自定义 Handler 的示例:
[root@liqiang.io]# cat consumer/at_least_once.go
// 创建配置
config := sarama.NewConfig()
config.Version = sarama.V3_6_0_0
config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
sarama.NewBalanceStrategyRoundRobin(),
}
config.Consumer.Offsets.Initial = sarama.OffsetNewest
consumer := CustomCommitConsumer{
ready: make(chan bool),
}
client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
go func() {
for {
// 如果发生 rebalance 的时候,Consume 就会返回,此时我们需要重新创建一个新的 Consumer
client.Consume(ctx, strings.Split(topic, ","), &consumer)
}
}()
<-sigterm
}
type CustomCommitConsumer struct {
ready chan bool
}
func (c *CustomCommitConsumer) Setup(sarama.ConsumerGroupSession) error {
// Mark the consumer as ready
close(c.ready)
return nil
}
func (c *CustomCommitConsumer) Cleanup(sarama.ConsumerGroupSession) error {
return nil
}
func (c *CustomCommitConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
for {
message, ok := <-claim.Messages()
log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
session.MarkMessage(message, "") // 主动提交 Offset
}
}
Stream API
在 Java 的 SDK 中,是支持一种叫做 Stream 的 API 的,它的作用是我们应用可以定义一种针对消息的处理函数,然后指定 Input Topic 和 Output Topic,然后 SDK 即可帮助我们将 Input Topic 的消息经过处理之后直接放入 Output Topic 中,这样我们就无须自行定义 Consumer 和 Producer 了。
当然,这是粗浅的理解,实际上它还无状态的 map 和 reduce 以及有状态的 aggregate、join 等操作,此外还需要考虑处理的线程模型以及容错等,实际上我也只是简单了解了一下,没有深入地去尝试和学习它,如果感兴趣的话可以查看 Kafka 的官方文档:STREAMS DSL 。
同时,对于 Go 的 Sarama SDK 来说,它是没有支持 Stream 语义的,所以这里就不多看了。
Admin API
在我前面写过的一篇介绍安装和 Kafka 运维命令的文章(Kafka 安装和运维)中我介绍了很多关于 Kafka 运维的命令,那么如果我们希望基于 Kafka 进行二次开发的话,或者我们需要做一个可以管理 Kafka 的平台的话,可能就需要看我们的 SDK 是否能够支持类似的功能,可以让我们通过 API 来获取或者控制 Kafka 的状态以及配置信息。
不同于 Stream API,Sarama 是支持 Admin API 的,但是在初始化的时候,我们需要初始化的是 ClusterAdmin
对象,例如下面这个获取 Kafka 中所有 Topics 的示例:
[root@liqiang.io]# cat admin/topics.go
// 创建 Kafka 配置
config := sarama.NewConfig()
config.Version = sarama.V3_6_0_0
// 创建 Admin 客户端
admin, err := sarama.NewClusterAdmin(strings.Split(brokers, ","), config)
defer admin.Close()
// 获取 Topic 列表
topics, err := admin.ListTopics()
// 打印 Topic 列表
for name := range topics {
fmt.Println(name)
}
当然,ClusterAdmin 不仅仅提供查看和操作 Topic,还支持其他的例如 ACL、ConsumerGroup、分区等的信息查看和操作,具体的就不一一演示了,大概知道支持这些功能就可以了。
额外话题
exactly once
使用消息队列的时候,我们会关注消息的处理语义,在前面我们提过了 at most once 和 at lease once 的语义,但是,有时,我们需要的事 exactly once,就是刚刚好一次,不多不少。这个问题是一个比较复杂的问题,它涉及到多个环节:
- 生产者:如果要达到 exactly once,那么我们第一步就要保证生产者只生产了一次,Kafka 提供了消息的序列号来保证数据是不会重复的,也就是说如果发送多次具有相同序号的数据,Kafka 只会保存一次;
- Tolearn:如何实现的?有什么需要注意的地方?
- 消息队列:当消息被消息队列成功接收之后,我们还要保证消息队列自己的异常不会导致数据丢失,例如一个 Kafka 节点掉线了,通过 Replication 机制,我们可以保证其他节点还持有副本,从而保证数据不会丢失。(如果持有副本的所有节点都同时掉线并无法恢复了,那也没办法还是丢消息了)
- 消费者:消费者这边我们需要业务支持,既要保证消息不丢失,又要保证不重复,所以在实现的时候再 at least once 的方式下进行进一步优化实现:
- 因为一个消息什么情况下属于处理过从 Kafka 层面来说是不知道的,所以需要我们在业务上支撑;
- Consumer 在重复消费时可以判断出来,例如我们复用 Kafka 的序列号(能拿到吗?)之类的;
传递 Meta 信息
在使用消息队列的时候,我们可能还需要透传一些信息,例如比较常见的 Tracing 信息,当整条调用链路中间有两个服务使用 Kafka 进行通信时,我们肯定是不希望他们之间的 Tracing 信息就断了,所以一个很自然的想法就是 Kafka 是否可以直接透传我们的 Tracing 信息。
对比一下 Kafka 的 ProducerMessage
和 ConsumerMessage
:
type ProducerMessage struct {
Topic string // The Kafka topic for this message.
Key Encoder
Value Encoder
// The headers are key-value pairs that are transparently passed
// by Kafka between producers and consumers.
Headers []RecordHeader
Metadata interface{}
Offset int64
Partition int32
...
}
type ConsumerMessage struct {
Headers []*RecordHeader // only set if kafka is version 0.11+
Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
Key, Value []byte
Topic string
Partition int32
Offset int64
}
我们可以发现,Kafka 在 Message 中提供了一个 Header 字段可以供我们传递元数据,(注意,这里的 ProducerMessage
中也有一个 Metadata
字段,但是它不是用于在 Consumer 和 Producer 中传递数据的,而是只给 Producer 用的),所以我们可以将我们想要传递的元数据放入 Header 中,然后在 Consumer 中读取出来再传递下去。
Sarama 库的缺点
在查找资料的时候,我们可以看到很多人都说使用 Sarama 客户端收发消息存在以下问题:
- Sarama 客户端无法感知分区变化,当 Topic 分区数增加时,需要重启客户端后才能正常消费。
- Sarama 客户端消息最大处理时间(MaxProcessingTime)默认值为 100ms,超过最大处理时间可能导致消费者无法消费。
- 当消费位点重置策略设置为 Oldest(earliest)时,当客户端重启时,偏移量重置后可能从最小位点开始重复消费所有消息。
- 消费者同时订阅多个 Topic 时,部分分区可能无法消费到消息。
并且提供的解决方案就是建议使用 Confluent-Kafka-go 作为 Kafka 客户端库(😂),并且很容易找到几个常用 Golang SDK 的对比:
客户端 | 优点 | 缺点 |
---|---|---|
Confluent-Kafka-go |
|
|
Kafka-go |
|
|
Sarama |
|
|
这部分内容摘抄自:为什么不建议使用Sarama客户端收发消息?
但是,实际上,当我学习 Sarama 的时候,我发现这个 SDK 的维护主体已经由 Shopify 迁移到 IBM,也就是以前我们的 import package 是 github.com/shopify/sarama
,但是你看我的示例代码里面,已经变成了 github.com/ibm/sarama
,所以对于这些问题:
- Sarama 客户端无法感知分区变化,当 Topic 分区数增加时,需要重启客户端后才能正常消费。
- 已经解决了,从主动上报 offset 的例子中我们可以看到,当服务端 rebalance 的时候,Consumer 是会重建的;
- Sarama 客户端消息最大处理时间(MaxProcessingTime)默认值为 100ms,超过最大处理时间可能导致消费者无法消费。
- 这个我不确定,待学习
- 当消费位点重置策略设置为 Oldest(earliest)时,当客户端重启时,偏移量重置后可能从最小位点开始重复消费所有消息。
- 这个我不确定,待学习
- 消费者同时订阅多个 Topic 时,部分分区可能无法消费到消息。
- 这个我不确定,待学习