使用多个队列处理Celery任务

目录

当 RabbitMQ 作为后端时,Celery 默认使用名为 celery 的队列处理所有任务。 去年 NMP 项目中也采用这样的方式。 但今年任务量翻倍后,一次 beat 批量任务执行的总时间已超过 beat 的间隔,批量执行顺序会发生混乱,之前的方式无法处理增多的任务。

尝试使用多个 worker 处理任务,发现依然存在问题:group 的某个任务会莫名丢失。

一篇博文《使用Celery踩过的坑》中指出单个队列执行大量任务可能会出现问题,建议为不同类型的任务设置不同的队列,并使用不同的 worker 执行每个队列中的任务。

我没有仔细研究任务丢失的原因,确定采纳博文中的建议,使用多个任务处理 Celery 任务。

配置

Celery 使用路由 (routing) 实现任务向不同队列的分发,默认会自动创建需要的队列,所以最简单的使用方法就是在配置中设置路由项目,类似:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

未配置路由的任务会发送给默认队列,即 celery。 可以通过修改 task_default_queue 配置项改变默认队列的名称。

为 worker 设置队列

使用命令行启动 worker 可以通过 -Q 设置多个队列,例如

celery -A proj worker -Q feeds,celery

使用 API 接口启动 worker 则稍微复杂些,需要使用 Celery.select_queues 函数,例如:

app.select_queues(['feeds', 'celery'])
app.Worker().start()

应用效果

使用不同的队列处理任务后,不再出现任务丢失的情况。