Celery解析系列
- 前传: Kombu 系列
- 正文: celery 系列
在上一篇中,我们从一个 Consumer 的 Sample 中聊起,看了一些 Kombu 的内部实现,了解了一些消息是如何拿回来的。现在我们接着继续看看消息是如何发出去的,同样,我们还是从第一篇文章中提到的 Sample 出发,看看真实情况是如何的:
首先第一步还是先创建 Connection,这个毫无疑问,因为肯定要有连接才能发送消息。这个代码我们在上一篇文章中已经看过了,目前阶段就先不看了,继续看下面一个。
Line 29 我们明确得声明了一个 Producer,这个和我们在第二篇文章中看到的不太一样,第二篇中我们只是简单得构造了一个 SimpleQueue,而 SimpleQueue 会创建 Consumer 和 Producer,但是这里我们明确得声明了一个 Producer,所以我们可以探究一下发生了什么,代码的位置在:kombu/messageing.py Line 61:
从这里可以看到很多熟悉的名词,channel/exchange 等等,然而我们发现我们都没有传递这些参数,那么默认都是 None 了,只有 channel 我们传的是 connection,ok,记住这些,继续出发看看调用 publish
的时候发生了啥:
publish 的注释很长,所以我把他忽略了,看下我们调用的代码,我们也传了很多参数,先一一对应起来看下我们的参数都起到什么作用了,首先我们传递的参数有:
- body:
{'hello': 'world'}
- exchange:
Exchange('kombu_demo', type='direct')
- routing_key:
'kombu_demo'
- serializer:
json
- compression:
zlib
纵观一下,发现这里没做啥太多有价值的事情,只是对参数进行校验和规整,比较有用的估计也就在 Line 138-140 这三行了,这里对数据进行了压缩处理,然后 Line 148 也有点用,这里对重试进行了策略设置。最后就直接在 Line 149 调用内部的 _publish
方法了。
这里是比较有意思的地方啦,这里在 Line 188 就将消息封装成了 Message,然后 Line 194 对 mq 的 queue 进行声明,然后最后还是通过 Channel 来发送消息。
话说,看到这里,我们还是没看到 Exchange 和 Routing Key 是如何工作的,所以我们希望 basic_publish
能够给我们一点启发,跟踪 Channel 的基类,我们看到 virtual/base.py
Line 600:
可以看到,这里的消息发送原来都是交给 Exchange 来做的,所以我们是时候看下 Exchange 的实现了,代码跳到 komu/transport/virtual/exchange.py
,我们会发现很多有趣的东西,例如在 Line 155:
我们会发现 Kombu 默认会支持 3 种不同的 Exchange,并且定义了它的 Exchange 实现,我们来看看 DirectExchange 的具体 deliver
:
所以关键是在于 _lookup
是如何实现的,其实 _lookup
具体实现我们不太需要关心,我们往上看一下,查看一下 lookup
心中就有一些了解了:
这里有一个问题就在于 table 是什么,我们找出 table 的实现事情就清楚多了,但是跟踪一下代码,你会发现这个 table 的获取比较复杂,这里抽取几块比较典型的代码,记录一下这个 table 的来源,位置其实都在 komu/transport/virtual/base.py:
从这些代码片段中我们就大概知道了 Exchange routing 的原理了,关键的点应该是在每一个 Exchange 的 prepare_bind
,然后我们再来看看 Direct Exchange 的 put
操作具体是怎样的,这个代码就在 kombu/transport/redis.py
Line 762 这个位置:
没啥亮点了,因为我们都已经知道了 queue 是哪个,所以直接放进去就可以了,留个心眼,这里还做了一个 q_for_pri
的操作,以后可能会看到。
总结
OK,这就是一个简单的生产消息的流程,我们简单得走了一遍,又了解了不少的知识:
- Producer 包含了很多东西,有 Exchange、routing_key 和 channel 等等
- Message 只是一些要素的封装,没有干什么事情
- Exchange 只是将发送的
routing_key
转化为queue
的名字 - 实际发送还是得 channel 来干活,每个不同的 Transport 都有对应的 Channel