Celery 是 Python 中最流行的定时任务,不仅强大而且复杂,在我的之前的一篇文章Celery 快速入门中已经介绍了如何入门使用 Celery,但是,这还很基础,Celery 还有很多强大的功能等着我们去发掘,在我的使用过程中,我意识到有两个功能很有必要,那就是定时任务和优先级,本文将以个人的理解进行分享。

序言

在开始正式介绍之前,有必要说一下的是读者最好是有一点点使用 Celery 的基础,如果没有的话,我之前写过一篇入门的文章 Celery 快速入门 可以快速学习一下,以便方便后面的介绍。在我使用和探索 Celery 的过程中,发现有两个需求非常重要,那就是 定时任务优先级

首先是 定时任务,这里所谓的定时任务不仅仅是代码写死的定时任务,也就是说一跑起来就固定的定时任务,还有随着代码的运行而动态加载的定时任务。例如,举个简单点的例子,假如我的博客需要定时发布一篇文章,现在是下午 3 点 15 分,我希望这篇文章是晚上 8 点 17 分准时发,这里就需要定时任务了,而且还不能是代码写死的,因为这是我的动态需求;

然后是一个优先级,优先级在很多地方都有应用,例如操作系统,各个进程运行中有个优先级吧,重要的任务肯定要先跑,那么对于 Celery 这样的队列任务肯定也需要优先级,一些比较紧急的任务要先跑,例如用户的短信验证码比较紧急,需要及时发送;一些宣传的短信不那么急,可以让让位置,缓点再发。这里就是优先级的作用,要分清事情的轻重缓急,才能做出更好的产品和用户体验,然而,后面会提到的是,Celery 其实是不支持优先级的哦,虽然在官方文档中声明并非所有 Broker 都支持优先级,只有两个队列 RedisBeanstalk 支持,但是,经试验和查资料,发现真不支持。

定时任务

最近爬虫很是凶猛,标注下文章的原文地址: https://liqiang.io/post/celery-advanced-topic-scheduler-and-priority

定时任务其实和普通的异步任务差不多,都是需要定义执行体的,所谓的执行体就是真正定时运行的代码,所以,首先,我就先给出定义 worker 的代码,这里就不多做解释了,因为代码和 Celery 快速入门 中的是一致的:

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. from celery import Celery
  4. app = Celery('worker', broker='redis://localhost:6379/1')
  5. @app.task
  6. def add(x, y):
  7. result = x + y
  8. return result

然后将 worker 运行起来,代码都很简单的:

  1. celery -A worker worker -l info

然后就会很自然得看到这些输出,说明 worker 启动正常了:

图 1:Celery Worker 工作

然后先来看一下简单的写死的定时任务,也就是说是在代码中固定的,随着代码运行的设置:

  1. #!/usr/bin/env python
  2. # encoding: utf-8
  3. from datetime import timedelta
  4. from celery import Celery
  5. app = Celery('tasks', broker='redis://localhost:6379/1')
  6. app.conf.update(
  7. CELERYBEAT_SCHEDULE={
  8. 'perminute': {
  9. 'task': 'worker.add',
  10. 'schedule': timedelta(seconds=5),
  11. 'args': (1, 2)
  12. }
  13. }
  14. )

这里的核心其实就是 app.conf.update 这里添加了一个定时任务,就是 每5秒执行一个 worker.add 任务,然后传递的参数是 (1, 2),就是这么简单。

动态修改定时任务

然而,可以发现的是这显然是太过于呆板了,也就是说我们的任务是固定在代码中的,我们只能够在代码运行之前将它写在代码中,但是,并不能在运行过程中动态得更新它,有没有什么办法来动态更新呢?

很遗憾,对于官方版本的 Celery 是没有的,但是,幸运的是在 Celery 的官网介绍中有提及一下相关的信息,我们看一下:

Using django-celery‘s scheduler you can add, modify and remove periodic tasks from the Django Admin.

既然这样,我们不妨尝试一下 Django 的任务,看下效果是怎样的。

Django Celery Scheduler

因为我对 Django 不是很熟悉,所以我就以别人的项目为基础,以我残留的 Django 知识进行了些许修改,最终将项目跑起来了,然后可以看到动态添加任务的 Admin 页面,我看到的第一眼是很震撼!!真的很震撼,非常佩服这个作者,我一定要将他开源的代码都扫一遍,因为真的太有思想了,上一张 Admin 的图。

图 2:Django Admin 截图

这是添加定时动态任务的图,我知道但你看这篇文章的时候肯定也想自己跑一遍,所以下面我就说下怎么跑起来的步骤:

1. clone 项目

因为原项目只是说明性的,所以不能直接用,因此我修改后放在 github 上了,第一步你需要做的就是将它 clone 下来:

  1. git clone https://github.com/yetship/django_celery_redis.git

2. django 套路

将代码拷贝下来之后,后面的工作就是套路了,第一步肯定是安装依赖的:

  1. pip install -r requirements.txt

然后是迁移数据库:

  1. python manage.py migrate

接着还不能跑应用,还有一步需要做,那就是创建 admin 用户:

  1. python manage.py createsuperuser --username=admin --email=liqianglau@outlook.com

后面就是输入密码了,随便输点 admin 什么的就行了。

最后亮点来了,你需要跑两个东西,一个是 worker,一个是网页应用,需要注意的是,这两个都是阻塞式的命令,所以最好打开两个 shell 来跑最好:

运行 worker

  1. celery -A proj worker -l info

运行 app

  1. python manage.py runserver 0.0.0.0:8000

然后你打开浏览器的地址: http://localhost:8000/admin,输入刚才的账号密码就可以了。

其他应用怎么办

按照 Django 的套路应该你是见识到了动态添加任务的甜头了,可是,如果我用的不是 Django 该怎么办呢?总不能说特定为了动态修改任务而特地加一个 Django 用户管理任务吧,好像有点不值得啊。

我在使用 Celery 的时候也遇到了这个问题,在查阅了一些资料之后,还是看到了官网的说明:

Custom scheduler classes can be specified on the command-line (the -S argument). The default scheduler is celery.beat.PersistentScheduler, which is simply keeping track of the last run times in a local database file (a shelve).

原来类似于 Django 的方式是自定义了一个调度器,然后我就寻找有没有方便的调度器可以使用呢,找了好几圈之后发现没有满足我需求的调度器,其实我的需求很简单,就两个:

所以,为了自己的需求,我自己开发了一个 celerybeatredis,开源在 github, 下面就以 celerybeatredis 为 Scheduler 介绍一下如何动态得修改定时任务。

优先级

最近爬虫很是凶猛,标注下文章的原文地址: https://liuliqiang.info

谈完第一个问题,我们接下来再说说第二个问题,那就是优先级,其实就以优先级这个概念来说,是不好实现的,因为要想完全安排任务的优先顺序是有困难的,这依赖于异步任务的调度算法以及异步任务的编写,为什么这么说,这里就介绍两个场景来说明一下优先级的问题。

关于优先级设计的两个场景

异步任务调度算法

我要描述的第一个场景是关于调度器的调度算法的,为了方便描述,我将优先级缩小到只有两个级别:高优先级和低优先级。那么,按照我们的思维逻辑,当同时有高优先级的任务和低优先级的任务到来是,肯定是先执行高优先级的任务,将低优先级的任务放一边。

那么,问题是,假如我们的低优先级和高优先级的处理程序是同一个进程,但调度器空闲的时候,也就是低优先级和高优先级都没来的时候,突然来了一个很耗时间的低优先级任务,然后跑起来了,一会会之后,高优先级的任务来了,但是,很不巧,我们在执行着低优先级的,这时该怎么办?中断低优先级的?还是怎么办好?

针对上面的问题,也许解决方案可以是将低优先级和高优先级的处理进程分开,也就是说各跑一个,低优先级的进程只执行低优先级的,高优先级的只执行高优先级的,好像很合理。但是,这样的话就不得不面对一个问题,那就是当只有一档优先级的任务的时候,两个进程就只有一个很忙,一个很闲,势必就浪费了一半的资源,这时又该怎么办?

异步任务的设计

刚才说到当高优先级和低优先级放到同一进程会有问题,分开进程也会有问题,不知道怎么办。这里放到同一个进程我们所的问题就是冗长低优先级先运行的时候会阻塞高优先级的任务,那么我们就像为什么低优先级任务要那么冗长?不能小一些吗?

其实,在实际环境中低优先级之所以放低优先级,就是因为他们需要占用大量的资源,并且这些任务完成的时间没有约束,也就是我没期望它能在一两个小时内跑完,我允许它跑个大半晚上,这都是可能的。

在这种情况下,那我们能不能重构一些低优先级的任务,让它尽可能可中断,给高优先级的让让位?讲道理大多数情况下是允许的,例如:

但是,更理想的情况下是将耗时的任务分解成多个子任务,毕竟 Celery 还是提供 Chain 供链式调用的。

Celery 的处理方式

在 Celery 中,前面提高的两个点都可以实现,因此在我们的处理过程中,使用哪种解决方式就看我们自身的需要和能力了。因为在 Celery 中本身是不支持优先级的,但是,为什么我说可以实现呢?那是因为 Celery 提供其他特性,可以帮助我们实现。这里就以 Celery 的路由机制为核心,介绍一种处理优先级的方式。

要使用优先级,我们肯定是至少有两个任务要竞争资源的,所以,这里还是假设有两个优先级的任务,高优先级的 add 和 低优先级的 sub,但两个任务都过来的时候,我们需要保证的就是 add 肯定可以得到执行,sub 可能能执行,也可能不执行。

这里使用 Celery 的解决方案就是 add 使用的是 queueA, 然后 sub 使用的是 queueB,然后在启动 Celery 的时候,我们给 queueA 启动两个 worker,然后给 queueB 启动一个 worker,并且是和其中一个 queueA 共用的,这样的话,我们就能保证 add 肯定能得到执行,sub 可能执行也可能不执行。

这里的思想其实就是将之前讨论的场景一的两个考虑结合起来,但是,这不是最好的解决办法,毕竟 Celery 本身是不支持优先级的。但至少是一种折中的办法,因为 Celery 的重点不在这。

这里给出一个简单的实现,我已经将代码放到 gist(不知道为什么,gist 需要梯子) 上,有需要尝试的同学可以 clone 代码尝试一下:

  1. # 克隆代码
  2. git clone https://gist.github.com/16757bc9915bbc15b73853945bfca89c.git celery-demo
  3. cd celery-demo
  4. celery worker -A route_worker -E -l INFO -n workerA -Q for_task_A
  5. celery worker -A route_worker -E -l INFO -n workerB -Q for_task_A,for_task_B

然后按照你的想法修改一下 route_client.py,直接运行

  1. python route_client.py

最近爬虫很是凶猛,标注下文章的原文地址: https://liqiang.io/post/celery-advanced-topic-scheduler-and-priority

Reference