使用多个队列处理Celery任务
目录
当 RabbitMQ 作为后端时,Celery 默认使用名为 celery 的队列处理所有任务。 去年 NMP 项目中也采用这样的方式。 但今年任务量翻倍后,一次 beat 批量任务执行的总时间已超过 beat 的间隔,批量执行顺序会发生混乱,之前的方式无法处理增多的任务。
尝试使用多个 worker 处理任务,发现依然存在问题:group 的某个任务会莫名丢失。
一篇博文《使用Celery踩过的坑》中指出单个队列执行大量任务可能会出现问题,建议为不同类型的任务设置不同的队列,并使用不同的 worker 执行每个队列中的任务。
我没有仔细研究任务丢失的原因,确定采纳博文中的建议,使用多个任务处理 Celery 任务。
配置
Celery 使用路由 (routing) 实现任务向不同队列的分发,默认会自动创建需要的队列,所以最简单的使用方法就是在配置中设置路由项目,类似:
task_routes = ([
('feed.tasks.*', {'queue': 'feeds'}),
('web.tasks.*', {'queue': 'web'}),
(re.compile(r'(video|image)\.tasks\..*'), {'queue': 'media'}),
],)
未配置路由的任务会发送给默认队列,即 celery。
可以通过修改 task_default_queue
配置项改变默认队列的名称。
为 worker 设置队列
使用命令行启动 worker 可以通过 -Q
设置多个队列,例如
celery -A proj worker -Q feeds,celery
使用 API 接口启动 worker 则稍微复杂些,需要使用 Celery.select_queues
函数,例如:
app.select_queues(['feeds', 'celery'])
app.Worker().start()
应用效果
使用不同的队列处理任务后,不再出现任务丢失的情况。