Celery解析系列


概述

今天要聊的话题可能被大家关注得不过,但是对于 Celery 来说确实很有用的功能,曾经我在工作中遇到这类情况,就是我们将所有的任务都放在同一个队列里面,然后有一天突然某个同学的代码写得不对,导致大量的耗时任务被同时塞进了消息队列里面,这就悲剧了,这直接导致了其他服务长时间不可用,例如发送登录短信验证码无法使用了,还有支付信息无法同步了等等,反正就是造成了一些不小的影响。

当时我们的处理方式就很被动,只能手动连接上 MQ,然后把消息卸掉,其实也就手动将这些消息抛弃掉,从而让其他业务的消息可能正常运行。但是,这种方式也只适合当初作为少量流量的情况,对于搭建了大集群和大量任务的消息队列来说,这种方式是不可想象的,这么做是要死人的,不仅仅是被累垮,上头的口水都能把你淹了。所以,这个时候,我需要介绍一个 Celery 不太常被人使用的功能——远程控制。

远程控制功能

其实 Celery 很早之前就存在控制命令,例如可以使用 Python shell 的 shell 命令,可以查看任务状态的 status 命令等等,但是这些命令都是本地的,不能让人觉得有意思,但是,这里有两个系列的命令很厉害,它们分别是:

例如,我在机器 A 运行着一个 Celery,机器 B 也运行着一个 Celery,机器 C 没有运行 Celery,但是我可以在机器 C 上查询机器 A 或者机器 B 上的任务状态,甚至可以删除和停止任务,这些都是很简单可以实现的,但是本文不是讲解这些功能的文章,而是解析这些功能的文章,所以有兴趣的同学可以参考这份官方文档继续了解。

远程控制功能组件

要实现远程控制功能,我们需要从宏观上先看看 Celery 的设计思路,在 Celery 中,采用的是分布式的管理方式,其实没有太大秘密,就是每个节点之间都是通过广播/单播进行通信,从而达到协同效果,但是这过程还是有很多不好之处的,值得我们来思考一番。

Celery 每个运行的实例都维护着一个 Control Node,其实就是一个可以接收/发送消息的对象,这个对象的封装是 Kombu.pidbox.Mailbox.Node,我们就先来看看创建的实现吧。

还是回到第一篇,在 Consumer 的 Blueprint 中,有一个叫做 Control 的 Bootstep,这个就是用于节点管理和通信的,我们来看一下:

其实代码还是比较简单的,有两个地方值得我们关注,分别是:

所以这个 Pidbox 是什么就很重要了,在我们看之前,不放看下上面的注释,也许会更容易一些:

虽然这里关系说得很明确了,但是我们还是有必要看看的,毕竟有可能里面有设置什么特殊的东西:

ok,确实还好,很诚实得就是用 Kombu 的 mailbox,但是 kombu 的 mailbox 是什么,可能很多人都没试过,我之前也没试过,后来试了一下,感觉还挺有意思的,注意,下面这段可能是现在互联网上公开的为数不多的可以运行得 Kombu Demo 示例,甚至于讲解。

Kombu Mailbox

在 Kombu 中提供了 Mailbox 的实现,它的作用就是通过 Mailbox 我们可以实现不同实例之间的消息发送和处理,具体可以是单播广播,这个在 Celery 中是作为 Control 的功能使用的,但是,在其他的模型里面,例如 Celery 试图实现但是没有实现的 Actor 模型里面也是可以用的。

Anyway,下面还是讲讲 Kombu 中的 Mailbox 是怎么用的吧,当初找相关的资料费了老大力了,但是,并没有太大收获,所以自己总结了一番。在 Kombu 中,Mailbox 中只有一个概念,那就是 Node

但是,为了测试,我们还会引入一个 client 的概念,但是这个概念不是 Kombu 自己的,而是我为了演示效果添加进来的,所以现在我们应该有两个地方,分别是:NodeClient,其中可以认为 NodeServer 端,Client 是触发端,你会发现,Client 只是做了一件触发的工作,没有其他更多的事情:

这是 node 的代码,你会发现它底层其实还是依赖于 Kombu 的 Connection,所以可以看到依赖的还是我们 Celery 里面的 Broker,这点很重要。然后再看看我们是怎么触发的:

可以发现这里非常简单,还是通过 Connection 构建出 mailbox,有一点需要注意的就是,Broker 要一致,不然你让他们怎么通信?执行这段代码,然后你就会在 node 上看到执行效果了,具体怎样,体验之后就明了。

Celery 的远程控制

看过 Kombu 的实现例子之后,我们来看看 Celery 是怎么构建这些对象的,首先还是得从最开始的 control 开始说起,control 在 Celery 中也是有两处的,一处是 app/control.py,另外一处就是:woker/control.py,可以认为第一处的是对外的接口,而第二处的是初始化的入口,实现自然就是 Kombu 提供的了,这里只是用到他们而已。

所以,现在来看,我们的目标很简单了,无非是看

  1. 如何初始化 mailbox 和 Node 的
  2. 提供了那些对外接口可以使用的

下面就这两个问题进行一一解答

mailbox 的初始化

故事是从 control 这个 Bootstep 开始说起,这是是初始化的起源:

这段代码我们前面已经见过了,同时我们也已经知道了 self.box 是个啥了,但是,对于更进一步的 c.app.control.mailbox.Node 中的 c.app.control.mailbox 是啥还不知道,不妨来看看:

ok,这里也很清晰,因为这个 control_cls 就是我们后面要看的:

control_cls = 'celery.app.control:Control'

所以 Mailbox 也就是 Kombu 的 Mailbox 了,这里没做什么改动。除此之外,还有一个地方需要我们去关注的,那就是收到消息后怎么处理,这个得看到 Bootstep 的 start 操作,这里是初始化过程中会被调用到的:

start 的第一句(Line 50)没毛病,因为用的都是 Celery 的 Connection,然后是第二句,这里我们根据之前的例子,已经很清楚会发生什么事情了,所以关键就是 Line 37 中的 on_message,每当有其他消息过来的时候,这里都是处理点。

Line 42 来看,Celery 还是甩锅给了 Kombu,但是这也不是啥问题,所以我们得找 Kombu 问清楚它是怎么处理的:

ok,这里一直有个叫 clock 的东西,我先不看,后面再说一下,先看看 dispatch 是如何处理的:

Line 99 这里其实是通过是否有发送主体(参数传过来的 reply_to)来判断是 单播 还是 多播 的消息,然后选择不同的处理方式,分别是 Line 118 中的 handle_call 用来处理 单播,而 Line 121 中的 handler_cast 用来处理 多播。这里有一个点,那就是 Line 116 中的 handlers 里面放置了所有注册的函数的信息,这个我们稍后会看到。

接口的注册

前面说了,Celery 注册了很多管理接口给我们使用,我们就看看有那些注册接口以及这些接口是如何注册进去的,我们是否可以自定义管理接口。关于接口注册相关的代码,我们得走到 celery/worker/control.py 中,现在进来看一看:

这里有两个注册函数,其实也就是我们前面说过的对应的两类操作,分别是 查看设置 类,然后也可以发现,其实注册就是往 Panel 这个 Dict 里头写入一些 key 和 value 对,然而,这里有两个 Dict 是需要我们关注的,他们分别是:

ok,了解完这些我们就知道了远程命令的对象和处理函数的对应关系都放在 data 和 meta 里面,这有什么用?回想一下之前 Mailbox 的构造函数的地方:

注意看 Line 28,用的就是这里的 data,然后就直接用来构造 Node 了,现在和前面的关系对应起来,了解了吧?

下面我就找个复杂点的例子看看,是怎么讲一个函数注册进 data 这个 Dict 里面的:

这个功能很明确了,在注释中已经提及了,但是,我们并不 care 它的功能,我们更多的是关注它是怎么发生的,在 Line 227 这里就是调用注册函数进行注册,可以看到,args 分别对应到我们下面的几个命名参数,然后调用 control_command 之后其实就是直接挂在 Dict 上了,没有其他操作,需要注意的是 Line 51if args 这个条件,我在整个 Celery 中都没有看到有使用,所以应该这里是预留的。

重要:有一点值得注意的是,前面也有稍微提到,Celery 的分布式实现机制是广播,所以我们在单机上发送的命令,只要没有指定主机,那么都是以广播的形式发送出去,所有的实例都将受到这个消息,然后根据消息处理本机的事务,所以我们在看代码的时候需要着重关注这一理念。

远程控制客户端

关于控制消息接收和处理的逻辑我们已经看完了,那么我们来看看我们在命令行中敲下命令的时候,这一切是怎么运行起来的。要看这些逻辑,我推荐的入口是:celery/bin/control.py,这是一条典型的 Celery 命令类,这里的结构就比较复杂了,我不多说,直接看最后的结果,那就是调用的时候:

可以发现这里很简单得就直接用一个 广播 了事,还有比这更粗暴的么?