Celery解析系列
- 前传: Kombu 系列
- 正文: celery 系列
OK,现在真正进入到 Kombu 的源码解析部分,我们还是从 Kombu 源码解析一 中的例子讲起,我们来看下对于一个简单的使用,在 Kombu 内部是如何实现的,首先从最开始的这段代码说起:
我们先来关注一下,conn.SimpleQueue
发生了什么事情,然后再来看看 queue 是啥,最后应该看看 message 的内容。conn 的类型应该是 Connection
,而我们这里用的是 Redis,那么应该对应的是 Redis 的 Connection
才对,具体如何,我们跟进代码看一下 (kombu/connection.py line 49):
跟踪 Connection
的构造函数我们可以发现在 Line 174 和 Line 187 里面都只是将 transport
的类型记住,然后在 Line 189 那里做了一个参数初始化,我们查看一下代码可以发现在 Line 246 中又直接赋值给 transport_cls
,这在以后有大用处:
OK,小插曲一段,回来看看 SimpleQueue
,在 Line 712 我们可以看到很简单的一项:
我们可以发现,代码就这么简单,看看上面的注释,注意看下注释里面参数的意思,可以发现,这里提供了好几个默认参数:
- channel:就是这个 connection 了
- queue:就是我们传递的参数
- no_ack:None
- queue_opts:None
- exchange_opts:None
好,我们就跟进去看看这个 SimpleQueue
是啥,位置应该在:kombu/simple.py ,我们可以在 Line 118 上看到定义的代码,说实话,看到这个还是蛮欣喜的,因为感觉看到头了:
可以看到在这个 SimpleQueue 中,已经创建了 Consumer 和 Producer 了,这里我们暂时不关注他们的代码,而是根据我上一篇文章的描述理解先。我们可以关注一下 Consumer 和 Producer 的参数,他们都是默认帮我们设置的,然后这就告一段落了。
接下来,是时候看下获取消息的实现了,还是在这个文件,但是,我们可以发现 SimpleQueue 的代码真的很 Simple,它自己没有重载 get
方法,所以我们可以在它的父类 SimpleBase
中找到,应该在 Line 35:
在 Line 36 很庆幸,是 block 的,所以我们不需要看更多的代码了,然后这里有个很应景的 _consume
,进去看一下:
很简单,其实就是调用 Consumer 的 consume,然后返回,注意,这里是非阻塞的,那么我们要怎么拿到消息呢,继续看下去,可以看到,下面有个循环,然后着重看一下注释,如果有消息进来,那么就会被放到 self.buffer
里面,没有消息这里就会阻塞住了,同时,如果阻塞超过我们设定的超时时间,那么就会跑出异常啦。
那我们就按照正常的逻辑走吧,在 Line 45 就是正常拿到数据之后,然后返回去了,我们看看拿到的是啥,回到原来的代码,其实也就是我们自己写的 Sample 中:
返回的是一个 Message,我们在前面跟踪的代码里面没有体现这个 message 是什么类型,那么它是什么类型?其实你找遍 Kombu 的代码,发现其实只有一种在 kombu/message.py 中的 Message 类型,之前提过了,这是 Kombu 中生产/消费的基本单位,我们快速得看一下代码:
其实无非就是消息内容的封装,但是,内容还是比较丰富的,因为没啥讲究的必要,所以,内容就不讲了,我们回到后面一句对消息的处理:message.ack
看看这里发生了什么事情:
其实你会发现也没做啥,其实就是判断需不需要确认,确认过了没,然后这一段比较重要的是:self.channel.basic_ack
这里确认的方式是交给 channel(Connection) 来执行,然后就完了。
这里就是一段我们跟踪简单实现的一种方式,整个环节还是比较简单的,但是我们已经知道了一些东西。
- Connection 里面包含了 Transport,并且是抽象的,根据我们的参数确定具体的 Transport 是什么
- SimpleQueue 里面不仅仅只有 queue,还包含了 connection, consumer 和 producer
- Consumer 的 consume 是非阻塞的,真实是在
drain_events
中获取,然后塞到成员变量中的,但是具体怎么塞的,我们还没看到 - 消息的确认是通过 connection 来确认的,但是我们没看到有持久化之类的