Celery解析系列
概述
要想全盘了解 Celery,首先肯定不能直接上来就看代码细节,也不能看 Scheduler 和 Worker 的源码细节,而我推荐的是,你得先了解 Celery 的 Worker 的组成。虽然有很多同学可能会想 Celery 的源码不就是 Scheduler 和 Worker 两部分嘛,可以直接上手;虽然,从简单意义上讲,是这样的,但是,Worker 的实现比较复杂,所以我觉得在讲解 Worker 的代码之前,首先要讲解的是 Blueprint 这个东西,在 Flask 中它很重要,在 Celery 中也是如此(很久之前我写过一篇文章也是将这个的,不过是介绍概念)。
DAG 的构建
其实 Blueprint 决定了 Worker 的启动顺序,在 Celery 中,有两个 Blueprint,分别是 Worker 和 Consumer,如图:
在启动的时候,启动顺序是从下而上的,也就是说,Timer 是最先被启动的,然后根据依赖箭头逐步往上启动,到了 Consumer 之后,就会使用 Connection 真正获取消息进行执行,但是,Connection 又是依赖于很多东西,都一一列在图中了。
这里需要注意的是,虽然在图中我们看得还算清楚,有两个 Blueprint,但是,你在看代码的时候可能会被迷惑,会搞混谁是 Consumer,谁是 Worker,因为代码实现可没这么明确。根据我的 Review,Consumer 的实现应该是worker/worker/consumer.Consumer
,而 Worker 的实现是:worker/worker/WorkerController
。
在代码中都可以很简单得看到 bootstep 的列表,但是需要注意的是这里只是列表而已,并没有体现依赖关系,那么依赖关系又是如何体现的呢?这个就要看 Celery 的另外一个地方了,这里 Celery 自己实现了一套 DAG 的代码:
这里就不详细得描述代码了,因为这个似乎关系不大,我们了解他们之间是有 DAG 的依赖关系即可,那么我们是在哪里申明这种图的关系的呢,随便找一个 BootStep 来看一下就知道了:
这里就存在了依赖关系,这里的 requires 只是其中一种声明方式,除此之外,还有:
- conditional
- last
使用这些简单的声明,就构建出了一个简单的 DAG,然后根据这些 DAG 就可以开始进行初始化的操作了。
各个 Step 的作用
在真正讲解代码之前,很有必要从宏观的角度来看看每个 Step 的具体的功能是啥,这样看起代码来事半功倍
Worker
- 【2】Timer:用于执行定时任务的 Timer,和 Consumer 那里的 timer 不同
- 【1】Hub:Eventloop 的封装对象(回顾一下 Kombu 的)
- 【1】Pool:构造各种执行池(线程/进程/协程)的
- 【3】Autoscaler:用于自动增长或者 pool 中工作单元
- 【3】StateDB:持久化 worker 重启区间的数据(只是重启)
- 【3】Autoreloader:用于自动加载修改过的代码
- 【2】Beat:创建 Beat 进程,不过是以子进程的形式运行(不同于命令行中以 beat 参数运行)
Consumer
- 【1】Connection:管理和 broker 的 Connection 连接
- 【3】Events:用于发送监控事件
- 【2】Agent:
cell
actor - 【2】Mingle:不同 worker 之间同步状态用的
- 【1】Tasks:启动消息 Consumer
- 【3】Gossip:消费来自其他 worker 的事件
- 【1】Heart:发送心跳事件(consumer 的心跳)
- 【3】Control:远程命令管理服务
这里我对所有的 Bootstep 都做了标号处理,标号的大小说明了这些服务对于我们代码阅读的重要程序,1 最重要,3 最不紧要。对于 Consumer 来说,1 是基本功能,这些功能组成了一个简单的非强壮的消息队列框架;2 一般重要,可以实现一个高级一点的功能;3 属于附加功能,同时也属于一点分布式的功能。
小结
OK,今天就先介绍这么多,把一些基本的重要点都讲了,后面进入代码实例就会相对轻松很多了,同时,后面的代码也会跟着这里的编号前进。最后补一句,本文讲解的 Celery 的版本是 V4.1.0!