Celery解析系列


在前面的几篇文章中,我们已经将 Kombu 的收发消息都讲完了,似乎已经没有什么内容。但是,事实上,在我们快速走读这些代码的时候,我们忽略了一些很重要的东西,其中给一个就是获取消息的时候,我们说过 Consumer 的 consume 是非阻塞的,然后就进行到了一个循环,等待一个 buffer 被填满,然后再返回这个 buffer。但是,我们却是不知道这是如何实现的,所以,这里我们就来八一八后面的实现。

Consumer

最开始我们肯定是先看看 consume 非阻塞是怎么实现的啦,经过几篇代码走读我们应该可以很简单得找到 Consumer 的代码了:kombu/messaging.py,然后看看 consume 操作:

可以看到,好像没啥特别亮点的,如果有的话,那就应该有两处:

  1. 不能取消监听的 queue,除非调用 cancel_by_queue
  2. 这里对所有的 queue 都是 no_wait=True,只留了一个设置了 no_wait=False,有什么用呢?继续看看

这里多了一个 tag 的概念,通过查看 Kombu 的文档我们可以发现:

Consumer_tag: Unique identifier for the consumer. The consumer tag is local to a connection, so two clients can use the same consumer tags. If this field is empty the server will generate a unique tag.

ok,就这么理解着先吧,然后看下真正 consume 的代码,这里传递了几个参数:

先不看具体的 callback 是啥,继续往深处看 consume,这里就设计到 queue 了,这个结构我们还没在之前看过,这里顺带一看:kombu/entity.py

代码很简单,直接交给 channel 了,已经是老朋友了,直接跟过去:

很简单的一段,直接设置一下各种值,然后关键在于最后一句:_reset_cycle,里面是啥,看看就了然了:

然后似乎就完了,调用就 over 了,啥事都没有,回家。

消息回调

等等等,似乎还有事情没完啊,这里只是将任务消费者声明出去了,但是,消息收过来之后放哪还不知道啊,怎么放进去的也不知道啊,这怎么办?那就继续看下我们的 sample 后面的代码呀:

self.channel.connection.client.drain_events(timeout=remaining)

这一句就重要啦,我们找出来看看这里的代码是啥,位置在 kombu/connection.py

又转化到 Transport 上了,那没办法咯,避不开的嘛。。。

这里不久衔接上咯,看看这里的实现,关键在于 Line 961 中的 get,其实就对应到 kombu/utils/scheduling.py

fun 就是我们前边设置得咯,实际值应该是 kombu/transport/virtual/base.py

然后就调到 kombu/transport/redis.py

到此就真的结束了,到此,我们可以认识到所谓的异步其实就是轮训,Kombu 对应所有需要监听的 queues 都查询一遍,直到查询完毕或者遇到一个可以使用的 queue,然后就获取内容,回调对应的 callback!

回顾

在这个跟踪过程,我们可以发现兜兜转转,但是历经的都是这么几个文件,它们分别是:

Reference

  1. Kombu Code