关于 Bootstep 的这个问题比较难理解,在官方的说明文档 Extensions and Bootsteps中虽然用了不小的篇幅来阐述它,但是可能还是让人头晕。

这篇文章就以我粗浅的理解来尝试解释一下这个东西,可能并不能完全讲明白,但是,我觉得至少可以讲出一点点东西出来,这样,如果你想自己探究的话至少有个线索可以参考。

Bootsteps 是给 wroker 添加功能的一个技术。Bootstep 是一个自定义的类,hook 了 worker 在不同阶段会执行的自定义动作。

这是我根据官方介绍翻译过来的解释,乍一看不太懂,细细思考一番,有点理解了,然后再结合一下源码,有点思路了。其实所谓的 Bootstep 就是一个自定义的类,然后实现了一些需要覆盖的方法,之后将它注册到 Celery 中,当 Celery 的 Worker 执行到你设定的状态时,你自定义的 Bootstep 对应的方法将会被调用。不知道这样说会不会好理解一些。

Blueprints

Blueprint 这个概念在多个地方都有,例如 Flask 中,那说明作用也是差不多的。事实上,在 Celery 中, blueprint 也差不多是一个分组的概念,表示 worker 的不同阶段。

目前在 Celeryworker 中定义了两个 blueprint:Worker 和 Consumer。每个 Bootstep 属于一个 Blueprint,也就是说要么属于 Worker, 要么属于 Consumer

Worker

Worker 是第一个启动的 blueprint,它会启动主要的组件,例如 event loop, processing poll 还有 timer。此外,还将启动可选的组件,例如 autoscaler, 当 Worker 完全启动之后,Consumer blueprint 将会被启动。

Consumer

Consume blueprint 和 broker 之间会建立一个连接,并且在连接中断之后会自动重连。Consumer blueprint 包含 worker 心跳、consumer 的远程控制命令以及重要的任务consumer.

当你创建 consumer bootsteps 的时候,你应该注意的是它必须支持重启 blueprint,也就是说你应该定义一个 shutdown 方法,当 worker 被关闭的时候将会调用它。

WorkController

WorkController 是核心的 Worker 实现, 里面包含了一些可以在 bootstep 中的方法和属性,事实上,Blueprint 都是从这启动的,然后再调用一个个 Bootstep。

需要说明的是上面有两个 worker,不要搞混,小写的 worker 是说 Celery 启动的时候加 --worker 参数的 worker,而大写的 Worker 则是指两个 Blueprint 中的其中一个。

Show the code

讲那么多,不如给大家来一例子更实际,下面就是一个简单的 celery 代码:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
25
26
27
28
29
30
31
32
33
34
35
36
37
38
39
#!/usr/bin/env python
# encoding: utf-8
import time

from celery import Celery, bootsteps
from celery.utils.log import get_logger


class InfoStep(bootsteps.StartStopStep):
    requires = ('celery.worker.components:Pool', )

    def __init__(self, parent, **kwargs):
        # here we can prepare the Worker/Consumer object
        # in any way we want, set attribute defaults and so on.
        self.logger = get_logger(__name__)
        self.logger.info('{0!r} is in init'.format(parent))

    def create(self, worker):
        return self

    def start(self, parent):
        # our step is started together with all other Worker/Consumer
        # bootsteps.
        print('{0!r} is starting'.format(parent))

    def stop(self, parent):
        # the Consumer calls stop every time the consumer is restarted
        # (i.e. connection is lost) and also at shutdown.  The Worker
        # will call stop at shutdown only.
        print('{0!r} is stopping'.format(parent))

    def shutdown(self, parent):
        # shutdown is called by the Consumer at shutdown, it's not
        # called by Worker.
        print('{0!r} is shutting down'.format(parent))


app = Celery('worker', broker='redis://localhost:6379/0')
app.steps['worker'].add(InfoStep)

将这段代码保存为 bootstep_worker.py ,然后执行以下代码:

1
celery worker -A bootstep_worker -B -l debug

然后注意观察打印出来的日志:

 1
 2
 3
 4
 5
 6
 7
 8
 9
10
11
12
13
14
15
16
17
18
19
20
21
22
23
24
[tasks]
  . celery.backend_cleanup
  . celery.chain
  . celery.chord
  . celery.chord_unlock
  . celery.chunks
  . celery.group
  . celery.map
  . celery.starmap

[2016-10-17 15:58:58,418: DEBUG/MainProcess] | Worker: Starting Hub
[2016-10-17 15:58:58,418: DEBUG/MainProcess] ^-- substep ok
[2016-10-17 15:58:58,418: DEBUG/MainProcess] | Worker: Starting Pool
[2016-10-17 15:58:58,431: DEBUG/MainProcess] ^-- substep ok
[2016-10-17 15:58:58,432: DEBUG/MainProcess] | Worker: Starting InfoStep
[2016-10-17 15:58:58,433: WARNING/MainProcess] <Worker: celery@China (running)> is starting
... ...

[2016-10-17 15:59:00,554: DEBUG/MainProcess] | Consumer: Stopping Connection...
[2016-10-17 15:59:00,554: DEBUG/MainProcess] | Worker: Stopping Beat...
[2016-10-17 15:59:00,554: INFO/MainProcess] beat: Shutting down...
[2016-10-17 15:59:00,555: DEBUG/MainProcess] | Worker: Stopping InfoStep...
[2016-10-17 15:59:00,555: WARNING/MainProcess] <Worker: celery@China (closing)> is stopping
[2016-10-17 15:59:00,555: DEBUG/MainProcess] | Worker: Stopping Pool...

这里你可以发现我们自定义的 InfoStep 被执行了,而且,你会发现,我们的 InfoStep 实在 Pool 之后执行,并且在 Pool 之前结束,而且是刚刚好。

执行顺序

没错,这些执行顺序是可以控制的,但是,只能是依赖的顺序,也就是说我可以指定任务在哪些任务之后执行,如果没有指定的话,那么平等任务之间的顺序是随机的,无关紧要的。

那么如何指定顺序呢?

查看一下我们的 InfoStep 的代码,可以发现第一句是:

1
requires = ('celery.worker.components:Pool', )

其实这一句就是表示我们要求这个 Bootstep 在 Pool 之后执行,而且我们可以看到这里是一个元组,也就是说我们可以设定多个依赖,这样的话,就可以在多个 BootStep 之后执行。

事实上,Celery 的文档中已经有默认的执行依赖表,我们可以参考一下,箭头表示依赖的方向:

worker_graph_full (1).png

Reference