Celery解析系列


OK,现在真正进入到 Kombu 的源码解析部分,我们还是从 Kombu 源码解析一 中的例子讲起,我们来看下对于一个简单的使用,在 Kombu 内部是如何实现的,首先从最开始的这段代码说起:

我们先来关注一下,conn.SimpleQueue 发生了什么事情,然后再来看看 queue 是啥,最后应该看看 message 的内容。conn 的类型应该是 Connection,而我们这里用的是 Redis,那么应该对应的是 RedisConnection 才对,具体如何,我们跟进代码看一下 (kombu/connection.py line 49):

跟踪 Connection 的构造函数我们可以发现在 Line 174 和 Line 187 里面都只是将 transport 的类型记住,然后在 Line 189 那里做了一个参数初始化,我们查看一下代码可以发现在 Line 246 中又直接赋值给 transport_cls,这在以后有大用处:

OK,小插曲一段,回来看看 SimpleQueue,在 Line 712 我们可以看到很简单的一项:

我们可以发现,代码就这么简单,看看上面的注释,注意看下注释里面参数的意思,可以发现,这里提供了好几个默认参数:

好,我们就跟进去看看这个 SimpleQueue 是啥,位置应该在:kombu/simple.py ,我们可以在 Line 118 上看到定义的代码,说实话,看到这个还是蛮欣喜的,因为感觉看到头了:

可以看到在这个 SimpleQueue 中,已经创建了 Consumer 和 Producer 了,这里我们暂时不关注他们的代码,而是根据我上一篇文章的描述理解先。我们可以关注一下 Consumer 和 Producer 的参数,他们都是默认帮我们设置的,然后这就告一段落了。

接下来,是时候看下获取消息的实现了,还是在这个文件,但是,我们可以发现 SimpleQueue 的代码真的很 Simple,它自己没有重载 get 方法,所以我们可以在它的父类 SimpleBase 中找到,应该在 Line 35

Line 36 很庆幸,是 block 的,所以我们不需要看更多的代码了,然后这里有个很应景的 _consume,进去看一下:

很简单,其实就是调用 Consumer 的 consume,然后返回,注意,这里是非阻塞的,那么我们要怎么拿到消息呢,继续看下去,可以看到,下面有个循环,然后着重看一下注释,如果有消息进来,那么就会被放到 self.buffer 里面,没有消息这里就会阻塞住了,同时,如果阻塞超过我们设定的超时时间,那么就会跑出异常啦。

那我们就按照正常的逻辑走吧,在 Line 45 就是正常拿到数据之后,然后返回去了,我们看看拿到的是啥,回到原来的代码,其实也就是我们自己写的 Sample 中:

返回的是一个 Message,我们在前面跟踪的代码里面没有体现这个 message 是什么类型,那么它是什么类型?其实你找遍 Kombu 的代码,发现其实只有一种在 kombu/message.py 中的 Message 类型,之前提过了,这是 Kombu 中生产/消费的基本单位,我们快速得看一下代码:

其实无非就是消息内容的封装,但是,内容还是比较丰富的,因为没啥讲究的必要,所以,内容就不讲了,我们回到后面一句对消息的处理:message.ack 看看这里发生了什么事情:

其实你会发现也没做啥,其实就是判断需不需要确认,确认过了没,然后这一段比较重要的是:self.channel.basic_ack 这里确认的方式是交给 channel(Connection) 来执行,然后就完了。

这里就是一段我们跟踪简单实现的一种方式,整个环节还是比较简单的,但是我们已经知道了一些东西。

Reference

  1. Kombu Code