Celery解析系列


在上一篇中,我们从一个 Consumer 的 Sample 中聊起,看了一些 Kombu 的内部实现,了解了一些消息是如何拿回来的。现在我们接着继续看看消息是如何发出去的,同样,我们还是从第一篇文章中提到的 Sample 出发,看看真实情况是如何的:

首先第一步还是先创建 Connection,这个毫无疑问,因为肯定要有连接才能发送消息。这个代码我们在上一篇文章中已经看过了,目前阶段就先不看了,继续看下面一个。

Line 29 我们明确得声明了一个 Producer,这个和我们在第二篇文章中看到的不太一样,第二篇中我们只是简单得构造了一个 SimpleQueue,而 SimpleQueue 会创建 Consumer 和 Producer,但是这里我们明确得声明了一个 Producer,所以我们可以探究一下发生了什么,代码的位置在:kombu/messageing.py Line 61:

从这里可以看到很多熟悉的名词,channel/exchange 等等,然而我们发现我们都没有传递这些参数,那么默认都是 None 了,只有 channel 我们传的是 connection,ok,记住这些,继续出发看看调用 publish 的时候发生了啥:

publish 的注释很长,所以我把他忽略了,看下我们调用的代码,我们也传了很多参数,先一一对应起来看下我们的参数都起到什么作用了,首先我们传递的参数有:

纵观一下,发现这里没做啥太多有价值的事情,只是对参数进行校验和规整,比较有用的估计也就在 Line 138-140 这三行了,这里对数据进行了压缩处理,然后 Line 148 也有点用,这里对重试进行了策略设置。最后就直接在 Line 149 调用内部的 _publish 方法了。

这里是比较有意思的地方啦,这里在 Line 188 就将消息封装成了 Message,然后 Line 194 对 mq 的 queue 进行声明,然后最后还是通过 Channel 来发送消息。

话说,看到这里,我们还是没看到 Exchange 和 Routing Key 是如何工作的,所以我们希望 basic_publish 能够给我们一点启发,跟踪 Channel 的基类,我们看到 virtual/base.py Line 600:

可以看到,这里的消息发送原来都是交给 Exchange 来做的,所以我们是时候看下 Exchange 的实现了,代码跳到 komu/transport/virtual/exchange.py,我们会发现很多有趣的东西,例如在 Line 155

我们会发现 Kombu 默认会支持 3 种不同的 Exchange,并且定义了它的 Exchange 实现,我们来看看 DirectExchange 的具体 deliver

所以关键是在于 _lookup 是如何实现的,其实 _lookup 具体实现我们不太需要关心,我们往上看一下,查看一下 lookup 心中就有一些了解了:

这里有一个问题就在于 table 是什么,我们找出 table 的实现事情就清楚多了,但是跟踪一下代码,你会发现这个 table 的获取比较复杂,这里抽取几块比较典型的代码,记录一下这个 table 的来源,位置其实都在 komu/transport/virtual/base.py

从这些代码片段中我们就大概知道了 Exchange routing 的原理了,关键的点应该是在每一个 Exchange 的 prepare_bind,然后我们再来看看 Direct Exchange 的 put 操作具体是怎样的,这个代码就在 kombu/transport/redis.py Line 762 这个位置:

没啥亮点了,因为我们都已经知道了 queue 是哪个,所以直接放进去就可以了,留个心眼,这里还做了一个 q_for_pri 的操作,以后可能会看到。

总结

OK,这就是一个简单的生产消息的流程,我们简单得走了一遍,又了解了不少的知识:

Reference

  1. Kombu Code