使用多个队列处理Celery任务

目录

当Rabbitmq作为后端时,Celery默认使用名为celery的队列处理所有任务。去年NMP项目中也采用这样的方式。但今年任务量翻倍后,一次beat批量任务执行的总时间已超过beat的间隔,批量执行顺序会发生混乱,之前的方式无法处理增多的任务。

尝试使用多个worker处理任务,发现依然存在问题:group的某个任务会莫名丢失。一篇博文《使用Celery踩过的坑》中指出单个队列执行大量任务可能会出现问题,建议为不同类型的任务设置不同的队列,并使用不同的worker执行每个队列中的任务。我没有仔细研究任务丢失的原因,确定采纳博文中的建议,使用多个任务处理Celery任务。

配置

Celery使用路由(routing)实现任务向不同队列的分发,默认会自动创建需要的队列,所以最简单的使用方法就是在配置中设置路由项目,类似:

task_routes = ([
    ('feed.tasks.*', {'queue': 'feeds'}),
    ('web.tasks.*', {'queue': 'web'}),
    (re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)

未配置路由的任务会发送给默认队列,即celery。可以通过修改task_default_queue配置项改变默认队列的名称。

为worker设置队列

使用命令行启动 worker 可以通过 -Q 设置多个队列,例如

celery -A proj worker -Q feeds,celery

使用API接口启动 worker 则稍微复杂些,需要使用 Celery.select_queues 函数,例如:

app.select_queues(['feeds', 'celery'])
app.Worker().start()

应用效果

使用不同的队列处理任务后,不再出现任务丢失的情况。