概述

在这篇文章中,我将以 Go 语言为例,使用 Sarama SDK演示如何通过 Kafka 生产和消费消息。其中,消费消息我将演示两种模式的消费方式,分别是自动提交消息已被消费以及主动提交消息消费 ACK 的模式,实际上这也是不同的消息处理模型(即最多消费一次和最少消费一次)的实践。但是在这里我没有演示有且只消费一次的模式,因为这不是单单消息队列就能支持的功能,还需要一些额外的业务支持才可以,所以点到即止,没有进行深入地演示。

Producer

Producer 发送消息给 Kafka 可以分为异步和同步两种方式,同步很好理解,就是发送一个消息之后,我们可以等待结果(是否成功被消息队列接收并保存),异步的话就稍微麻烦一些了,我们不能直接在生产消息的地方等待 Kafka 的接收结果,而是需要监听一个 Channel 获取成功或者失败的消息结果,并且从结果中根据一些标识来判断我们的消息是否被成功处理。

这两种方式各有好坏:

模式 同步模式 异步模式
优势
  • 可靠性高:可以即时反馈给业务事件的处理结果,从而方便业务及时做出对应的处理(重试/返回错误)
  • 可维护性高:容易理解的代码逻辑,都是串行化处理,一般无需缓存或者其他第三方依赖支撑
  • 性能较高:业务代码不会因为 Kafka 的通信以及处理延迟而被阻塞中,可以立即返回去处理其他内容
劣势
  • 性能较差:性能会受到影响,可能因为业务和 Kafka 之间的延迟或者 Kafka 不同 broker 之间的问题导致业务阻塞
  • 可维护性低:异步逻辑不那么直观,往往还需要缓存或者第三方依赖来保存异步消息的状态
  • 可靠性低:当发生 Producer 异常时,可能会出现业务代码无法处理消息是否成功处理的事件

但是,可以发现不同模式各有好坏,所以实际使用中可以根据我们的需要进行选择,这里我分别以两种不同的模式为例介绍一下简单的代码实现,完整的代码我已经放到了 Github 上:Golang Kafka Producer

同步生产消息

  1. [root@liqiang.io]# cat producer/standalone_sync.go
  2. // 创建配置
  3. config := sarama.NewConfig()
  4. config.Producer.RequiredAcks = sarama.WaitForAll // 等待所有副本都响应后的响应模式
  5. config.Producer.Partitioner = sarama.NewRandomPartitioner // 随机分区策略
  6. config.Producer.Return.Successes = true // 成功交付的消息将在success channel返回
  7. // 使用给定代理地址和配置创建一个生产者
  8. producer, err := sarama.NewSyncProducer(strings.Split(brokers, ","), config)
  9. defer func() {
  10. producer.Close()
  11. }()
  12. // 构建要发送的消息
  13. msg := &sarama.ProducerMessage{
  14. Topic: topic,
  15. Key: sarama.StringEncoder("hello-world"),
  16. Value: sarama.StringEncoder("Hello, Kafka!"),
  17. }
  18. // 发送消息
  19. partition, offset, err := producer.SendMessage(msg)
  20. fmt.Printf("Message %s sent to partition %d at offset %d\n", msg.Key, partition, offset)

异步生产消息

  1. [root@liqiang.io]# cat producer/standalone_async.go
  2. // 创建配置
  3. config := sarama.NewConfig()
  4. config.Producer.Return.Successes = true // 启用成功通知
  5. config.Producer.Return.Errors = true // 启用错误通知
  6. // 创建生产者
  7. producer, err := sarama.NewAsyncProducer(strings.Split(brokers, ","), config)
  8. // 异步发送消息
  9. go func() {
  10. for {
  11. select {
  12. case msg := <-producer.Successes():
  13. fmt.Printf("Produced message to topic %s partition %d at offset %d\n", msg.Topic, msg.Partition, msg.Offset)
  14. case err := <-producer.Errors():
  15. log.Printf("Error producing message: %v\n", err)
  16. }
  17. }
  18. }()
  19. producer.Input() <- &sarama.ProducerMessage{
  20. Topic: "test-topic",
  21. Key: sarama.StringEncoder("hello-world"),
  22. Value: sarama.StringEncoder("Hello, Kafka!"),
  23. }

Consuemr

在消费 Kafka 方面,我们也有不同的模式,区分的方式为当我们拿到一个消息之后,我们怎么告诉消息队列我们已经成功地消费了这个消息,所以这里有两种场景,分别是当我们将消息分配给处理函数的时候,就告诉消息队列我们已经消费了这个消息,这种情况可能出现的问题就是处理函数可能会失败地处理这个消息(例如处理过程中程序异常了,或者处理函数当前不能处理成功处理这个消息),从而导致的问题就是这个消息最终不能被成功处理就从消息队列中被标记为已处理,我们称这种情况为最多处理一次。

另外一种情况就是我们将消息交给处理函数之后,在处理函数返回成功的情况下,我们才提交 Offset 到 Kafka 中。完整的示例代码我已经提交到了 Github 上:Golang Kafka Consumer

自动提交模式(at most once)

其实 Sarama 的自动提交模式并不是严格的 at most once 语义,因为 Sarama 并不是在将消息交给处理函数时就提交 Offset,而是间隔一段时间才提交 Offset,所以可能出现的情况是有些 Message 已经交给了处理函数,但是因为还没到间隔时间,没有提交 Offset 到 Kafka,如果这个时候业务应用异常了,可能会出现这些消息被多次消费的情况。

但是,一般认为这种自动提交的模式是 at most once 语义,示例代码为:

  1. [root@liqiang.io]# cat consumer/at_most_once.go
  2. // 创建配置
  3. config := sarama.NewConfig()
  4. config.Consumer.Return.Errors = true
  5. config.Consumer.Offsets.AutoCommit.Enable = true // 开启自动提交, 默认也是 true
  6. config.Version = sarama.V3_6_0_0 // 设置 Kafka 版本
  7. consumer, err := sarama.NewConsumer(strings.Split(brokers, ","), config)
  8. defer consumer.Close()
  9. partitions, err := consumer.Partitions(topic)
  10. for _, partition := range partitions {
  11. partitionConsumer, err := consumer.ConsumePartition(topic, partition, sarama.OffsetOldest)
  12. go func(pc sarama.PartitionConsumer) {
  13. for {
  14. msg := <-pc.Messages()
  15. fmt.Printf("Partition: %d, Offset: %d, Key: %s, Value: %s\n", msg.Partition, msg.Offset, string(msg.Key), string(msg.Value))
  16. }
  17. }(partitionConsumer)
  18. }

手动提交模式(at least once)

手动提交模式我们需要自定义个 ConsumerGroupHandler,这个 Handler 有三个方法接口:

  1. type ConsumerGroupHandler interface {
  2. Setup(ConsumerGroupSession) error
  3. Cleanup(ConsumerGroupSession) error
  4. ConsumeClaim(ConsumerGroupSession, ConsumerGroupClaim) error
  5. }

他们的调用时机和调用规范为:

下面就是一个使用这个自定义 Handler 的示例:

  1. [root@liqiang.io]# cat consumer/at_least_once.go
  2. // 创建配置
  3. config := sarama.NewConfig()
  4. config.Version = sarama.V3_6_0_0
  5. config.Consumer.Group.Rebalance.GroupStrategies = []sarama.BalanceStrategy{
  6. sarama.NewBalanceStrategyRoundRobin(),
  7. }
  8. config.Consumer.Offsets.Initial = sarama.OffsetNewest
  9. consumer := CustomCommitConsumer{
  10. ready: make(chan bool),
  11. }
  12. client, err := sarama.NewConsumerGroup(strings.Split(brokers, ","), group, config)
  13. go func() {
  14. for {
  15. // 如果发生 rebalance 的时候,Consume 就会返回,此时我们需要重新创建一个新的 Consumer
  16. client.Consume(ctx, strings.Split(topic, ","), &consumer)
  17. }
  18. }()
  19. <-sigterm
  20. }
  21. type CustomCommitConsumer struct {
  22. ready chan bool
  23. }
  24. func (c *CustomCommitConsumer) Setup(sarama.ConsumerGroupSession) error {
  25. // Mark the consumer as ready
  26. close(c.ready)
  27. return nil
  28. }
  29. func (c *CustomCommitConsumer) Cleanup(sarama.ConsumerGroupSession) error {
  30. return nil
  31. }
  32. func (c *CustomCommitConsumer) ConsumeClaim(session sarama.ConsumerGroupSession, claim sarama.ConsumerGroupClaim) error {
  33. for {
  34. message, ok := <-claim.Messages()
  35. log.Printf("Message claimed: value = %s, timestamp = %v, topic = %s", string(message.Value), message.Timestamp, message.Topic)
  36. session.MarkMessage(message, "") // 主动提交 Offset
  37. }
  38. }

Stream API

在 Java 的 SDK 中,是支持一种叫做 Stream 的 API 的,它的作用是我们应用可以定义一种针对消息的处理函数,然后指定 Input Topic 和 Output Topic,然后 SDK 即可帮助我们将 Input Topic 的消息经过处理之后直接放入 Output Topic 中,这样我们就无须自行定义 Consumer 和 Producer 了。

当然,这是粗浅的理解,实际上它还无状态的 map 和 reduce 以及有状态的 aggregate、join 等操作,此外还需要考虑处理的线程模型以及容错等,实际上我也只是简单了解了一下,没有深入地去尝试和学习它,如果感兴趣的话可以查看 Kafka 的官方文档:STREAMS DSL

同时,对于 Go 的 Sarama SDK 来说,它是没有支持 Stream 语义的,所以这里就不多看了。

Admin API

在我前面写过的一篇介绍安装和 Kafka 运维命令的文章(Kafka 安装和运维)中我介绍了很多关于 Kafka 运维的命令,那么如果我们希望基于 Kafka 进行二次开发的话,或者我们需要做一个可以管理 Kafka 的平台的话,可能就需要看我们的 SDK 是否能够支持类似的功能,可以让我们通过 API 来获取或者控制 Kafka 的状态以及配置信息。

不同于 Stream API,Sarama 是支持 Admin API 的,但是在初始化的时候,我们需要初始化的是 ClusterAdmin 对象,例如下面这个获取 Kafka 中所有 Topics 的示例:

  1. [root@liqiang.io]# cat admin/topics.go
  2. // 创建 Kafka 配置
  3. config := sarama.NewConfig()
  4. config.Version = sarama.V3_6_0_0
  5. // 创建 Admin 客户端
  6. admin, err := sarama.NewClusterAdmin(strings.Split(brokers, ","), config)
  7. defer admin.Close()
  8. // 获取 Topic 列表
  9. topics, err := admin.ListTopics()
  10. // 打印 Topic 列表
  11. for name := range topics {
  12. fmt.Println(name)
  13. }

当然,ClusterAdmin 不仅仅提供查看和操作 Topic,还支持其他的例如 ACL、ConsumerGroup、分区等的信息查看和操作,具体的就不一一演示了,大概知道支持这些功能就可以了。

额外话题

exactly once

使用消息队列的时候,我们会关注消息的处理语义,在前面我们提过了 at most once 和 at lease once 的语义,但是,有时,我们需要的事 exactly once,就是刚刚好一次,不多不少。这个问题是一个比较复杂的问题,它涉及到多个环节:

传递 Meta 信息

在使用消息队列的时候,我们可能还需要透传一些信息,例如比较常见的 Tracing 信息,当整条调用链路中间有两个服务使用 Kafka 进行通信时,我们肯定是不希望他们之间的 Tracing 信息就断了,所以一个很自然的想法就是 Kafka 是否可以直接透传我们的 Tracing 信息。

对比一下 Kafka 的 ProducerMessageConsumerMessage

  1. type ProducerMessage struct {
  2. Topic string // The Kafka topic for this message.
  3. Key Encoder
  4. Value Encoder
  5. // The headers are key-value pairs that are transparently passed
  6. // by Kafka between producers and consumers.
  7. Headers []RecordHeader
  8. Metadata interface{}
  9. Offset int64
  10. Partition int32
  11. ...
  12. }
  13. type ConsumerMessage struct {
  14. Headers []*RecordHeader // only set if kafka is version 0.11+
  15. Timestamp time.Time // only set if kafka is version 0.10+, inner message timestamp
  16. BlockTimestamp time.Time // only set if kafka is version 0.10+, outer (compressed) block timestamp
  17. Key, Value []byte
  18. Topic string
  19. Partition int32
  20. Offset int64
  21. }

我们可以发现,Kafka 在 Message 中提供了一个 Header 字段可以供我们传递元数据,(注意,这里的 ProducerMessage 中也有一个 Metadata 字段,但是它不是用于在 Consumer 和 Producer 中传递数据的,而是只给 Producer 用的),所以我们可以将我们想要传递的元数据放入 Header 中,然后在 Consumer 中读取出来再传递下去。

Sarama 库的缺点

在查找资料的时候,我们可以看到很多人都说使用 Sarama 客户端收发消息存在以下问题:

并且提供的解决方案就是建议使用 Confluent-Kafka-go 作为 Kafka 客户端库(😂),并且很容易找到几个常用 Golang SDK 的对比:

客户端 优点 缺点
Confluent-Kafka-go
  • Confluent-Kafka-go是由Confluent提供的官方Kafka客户端库,与Kafka完全兼容,支持所有的Kafka特性。
  • 稳定性高,基于librdkafka,具有高性能和低延迟的特点。
  • 增加编译复杂度。由于导入C++库,Golang编译器需要引入额外编译配置,增加了编译依赖,提高编译复杂度。
Kafka-go
  • kafka-go是一个简单、轻量级的Kafka客户端库,易于学习和使用。
  • kafka-go的代码库相对较小,依赖较少,可以减少应用程序的体积和依赖关系。
  • Kafka-go相对于Confluent-Kafka-go来说,功能较为有限,不支持一些高级特性和复杂的配置选项。
  • 性能和吞吐量较低,适用于一些对性能要求不高的简单应用场景。
Sarama
  • Sarama采用原生Golang语言编写,对于异步以及高并发操作支持度较好。
  • 问题较多,文档相对较少。
  • Sarama在处理大量消息时,会占用较多的内存资源,可能会对应用程序的性能造成一定的影响。

这部分内容摘抄自:为什么不建议使用Sarama客户端收发消息?

但是,实际上,当我学习 Sarama 的时候,我发现这个 SDK 的维护主体已经由 Shopify 迁移到 IBM,也就是以前我们的 import package 是 github.com/shopify/sarama,但是你看我的示例代码里面,已经变成了 github.com/ibm/sarama,所以对于这些问题: