有使用过 Celery 的同学应该都知道我们通常会用 Celery 主要是因为两个原因:异步任务和定时任务。这其实也差不多是 Celery 最主要的用法了,关于异步任务的实现原理也相对简单,因为中间 broker 的存在,其实也就是将客户端和服务器分开了。那么定时任务又是怎么个道理呢?其实定时任务是一种特别的异步任务,原理和异步任务一样,但是,有一样不相同的就是定时任务的客户端是定时器,我们设置定时任务的执行周期,然后就交给 Celery 定期得帮我们触发异步任务。

从刚才的分析我们就可以看出 Celery 定时任务的关键就是定时器了,这也就是我在这篇文章要着重介绍的内容,Celery 是如何实现定时功能的。

Celery 定时任务的使用

一开始,我不准备一下子就讲底层原理,先说说在 Celery 中如何使用定时任务吧,首先需要知道要想使用 Celery 的定时任务,我们需要启动两个东西,分别是:

所以,我们先写一个 Celery 的作业 tasks.py

#!/usr/bin/env python
# encoding: utf-8
from datetime import timedelta
from celery import Celery

app = Celery('tasks')

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'perminute': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=3),
            'args': (1, 1)
        }
    }
)

@app.task
def add(x, y):
    return x + y

这是一个非常简单的 Celery 定时任务,我们要想跑起这个任务,根据之前说的,需要运行两个任务,所以我们需要打开两个控制台,然后分别输入以下命令:

pip install celery 
Console1: celery -A tasks -l INFO worker
Console2: celery -A tasks -l INFO beat

这里需要先说下,我们必须安装 celery 库才能运行 celery 命令,不然是会出现 Command not found 的。

ok,相信到这里你会发现在 worker 的那个控制台会一直在执行任务了,而且还会把结果打印出来,这就是一个简单的定时任务啦。

定时任务实现解析

在上面一节中,其实真正是和定时有关的代码是这一段:

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'perminute': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=3),
            'args': (1, 1)
        }
    }
)

我们这里添加了一个名字叫做 perminute 的任务,然后里面设置这个任务的详情,这里的意思就是说每隔3秒钟就执行一次 tasks.add 这个任务,并且使用的参数是 1, 1。这里只要求了一个任务,我们可以多加几个任务,例如:

app.conf.update(
    CELERYBEAT_SCHEDULE={
        'add_task': {
            'task': 'tasks.add',
            'schedule': timedelta(seconds=3),
            'args': (1, 1)
        },
        'sub_task': {
            'task': 'tasks.sub',
            'schedule': timedelta(seconds=3),
            'args': (1, 1)
        }
    }
)

这里我就多加了一个任务叫做 sub_task,每隔 3 秒钟就执行一次 tasks.sub 任务,参数是 1, 1,接着我们就分析一下,假如我们又两个任务:add_tasksub_task,那么我们要如何实现定时的效果呢?以前我实现的就原始的伪代码是这样的:

tasks = [add_task, sub_task]
while True:
    for task in tasks:
        if task.last_run_at + task.schedule > current_time:
            task.last_run_at = current_time
            run_task(task)
    time.sleep(1)

就是每隔 1 秒钟就遍历一遍所有的任务,然后看这个任务是否需要执行了,如果到点了,那么就跑,不到点就等下一秒再查看一下。细心的同学可能会发现这个代码的问题,那就是当任务变多的时候,我们每秒钟的任务就会很多,然后依旧每个任务都判断一遍的话,那么我们的 CPU 就会很忙,所以这个方案很差。基于这个考虑,我就思考另外一个方案,既然都计算了时间了,为什么不充分利用一下计算结果呢?于是,方案二就出来了:

while True:
    min_time = 30
    for task in tasks:
        wait_time = current_time - task.last_run_at
        if wait_time > task.schedule:
            task.last_run_at = current_time
            run_task(task)
        else:
            # 计算任务还差多久才能执行
            remain_time = task.schedule - wait_time
            min_time = min_time if min_time < remain_time else remain_time
    time.sleep(min_time)

这个方案就能较充分得利用 CPU 的资源了,因为我们不再每秒钟都运行一次了,而是有效得利用计算结果,当有任务需要执行的时候才计算,而且也会等待下次有任务执行的时候才会再次执行。因为我们这里 sleep 的时间是最近可能被执行任务的时间。

这个方案有一个值得说的地方就是我们的 min_time 初始值是 30,为什么呢?假设我们的任务都是每天执行一次的,那么很可能这个 sleep 会 sleep 上好几个小时,那么可能有些系统中会任务这个进程死了或者因为系统资源不足而被kill掉,所以,如果我们将 min_time 设置得小一些,那么及时所有任务都需要等上几小时,我们的进程也会在一段时间内活跃一次。而且,还有一个好处是,后面会说到的动态添加任务的特性,如果在这种场景下,我们可以保证最多 30 秒内,我们新动态添加的任务就可能被执行,否则的话,可能要等上几小时。

这个方案已经很不错了,但是,还是有点浪费,为什么呢?因为我们每次还是对每个任务都进行了计算,有些其实根本就没有计算的必要了,所以,为了更加节省资源,我们决定引入一个新的数据结构:堆。思考一下,堆有什么特性,不知道你听说过最小堆么?没错,我们这里就使用最小堆来优化方案:

heap = build_min_heap(tasks)

while True:
    min_time = 30
    task = heap.top()
    remain_time = current_time - task.last_run_at
    if remain_time < task.schedule:
        heap.pop()
        task.last_run_at = current_time
        heap.push(task)
        min_time = min(min_time, current_time - heap.top().last_run_at - heap.top().schedule)
    time.sleep(min_time)

这里可以看到,每个周期我都只需要判断一次是否需要执行任务,这样的话我们的时间复杂度就减少到了 O(log(n)),漂亮地方案。

好吧,关于定时任务的说明就这么多了,最后说一下,在 Celery 中上面提到的方案 2 和方案 3 都使用过,其中 Celery 3 之前使用的是方案 2,从 Celery 4 开始,就变成了方案三种的方式了。很荣幸的是,虽然我在 Celery4 出来之前没有想到最小堆的方案,但是我确实发现了方案二中方式的缺点,并且在我编写 RedisBeat 的时候就用 Redis 中的 OrderSet 实现了方案三中的效果。

有件有意思的事情值得分享一下的就是有一次我去一家技术性很强的公司面试的时候提到这个问题,我和面试者的交流中了解到方案三中的这种方式(事实上那时我还不知道Celery已经优化了这一点),而面试者表示方案三种的实现是 Linux 实现 Crontab 的方式,这让我有点感触了,让我更加清晰得知道如果想在技术上发展得好,可以说数据结构和算法是基石,而 Linux 操作系统的低层实现是不可错过的实践经典案例。

总结

在本文中,我简要介绍了一下 Celery 中定时任务的使用,以及由浅及深得介绍了复杂度为每秒 O(n) 到一段时间 O(n) 到最后一段时间 O(log(n)) 复杂度的进化方案。

感谢

update at: 2017-1-4 10:08:43

感谢 @sillyboy 同学指出本文的错误,我将堆的 pushpop 的复杂度写成了 O(1),事实上 pop 的过程是有个重新选举新的最小元素的过程,所以应该是 O(log(n))

update at: 2018-01-15 21:53:48

感谢 @kongtianyi 同学的邮件,指出本文关于 heap 的错误。

update @: 2019-04-09

感谢 @yiran 同学的提醒,修改了其中一段代码的缩进 BUG。