Celery入门

目录

Celery是一个用Python写的分布式任务队列系统。
我想用Celery作任务调度器,并异步处理用户交互任务。

开始使用

介绍Celery最简单的应用,包括:

  • 选择并安装消息代理(broker)
  • 安装Celery并创建任务
  • 运行worker并调用任务
  • 跟踪任务执行状态,检查任务返回值

选择代理

Celery需要一个用来发送和接受消息的消息代理(message broker),有多种选择,官方推荐使用RabbitMQ。

RabbitMQ

安装RabbitMQ

安装信息中显示

表示RabbitMQ服务器已经运行。
其它选项暂不考虑。

安装Celery

Celery是Python包,所以可以使用pip安装。

应用

需要一个Celery示例,叫做celery应用或简称celery app。
最简单的一个示例:tasks.py

这里的broker使用本机的RabbitMQ。

运行Celery worker服务器

前台启动worker服务器

界面显示如下:

从信息中可以看到当前配置、消息队列、任务列表等。
还可以后台启动,守护进程启动,详情参见官方文档。

执行任务

使用{py}delay(){/py}方法异步执行任务

可以在之前启动的服务窗口中看到如下的日志信息:

客户端返回一个AsyncResult实例,但我们需要配置result backend才能使用它的一些高级功能。

保存结果

如果想跟踪任务状态,Celery需要存储或发送状态到某地方,这就是Result backend,同样有多种选项。
创建Celery对象时设置backend属性,或者使用配置对象设置CELERY_RESULT_BACKEND。
修改创建app的代码:

重新启动worker服务,屏幕显示:

config下的results属性现在为amqp,表示使用RabbitMQ存储结果。
可以使用ready判断任务是否运行结束,使用get获取返回结果。

当出现异常时,get会重新抛出异常:

可以改变该行为

同样可以获取原来的异常

配置

设置app或者使用专用的配置对象,类似flask config,是一个字典,并且某些属性值单独列出。
直接设置

更新字典

从文件中读取

下面展示一下配置文件的强大力量:
创建celeryconfig.py文件

限制每分钟完成10个任务
如果使用RabbitMQ或Redis作为Broker,可以使用命令在线设置:

服务端显示

下一步

在应用中使用Celery

项目结构

文件
proj/celery.py

broker: 代理
backend: 指定保存结果后端
可以设置没有返回结果,在任务中加入ignore_result属性

include:第一个例子中没有include,表示worker启动时需要导入的模块列表
proj/tasks.py

启动worker

得到类似下面的信息:

transport: 表示broker? 可以用-b属性设置另外一个
concurrency:工作进程个数,默认为CPU核心数,使用-c指定
queues:获取任务的队列

关闭worker

Ctrl+C

后台运行

使用celery multi命令
启动 start

重启restart

停止stop

等待停止

命令后续参数必须一致才可以控制worker
默认在当前目录创建pid和log文件,可以指定位置

关于–app参数

module.path:attribute
可以简写:
–app=proj
搜索路径
proj.app
proj.celery
proj中是否有Celery应用,或者找子模块proj.celery
proj.celery.app
proj.celery.celery
proj.celery中是否有Celery应用
小项目可以用proj:app
大项目可以用proj.celery:app

调用任务

delay()函数是对apply_async()函数的封装,下面的两条语句等价。

applay_async函数可以设置选项:

任务将被发送到lopri队列,最早在10秒后执行。
直接调用则直接运行

返回结果

每个任务有一个UUID标识
delay和applay_async方法都返回一个AsyncResult 对象,但必须制定一个result backend,否则不会保存。
result backend不用来控制tasks和workers,Cleery使用dedicated event messages
获取值

任务id号

get默认会传递任意错误:

不希望传递(propagate)错误,需要制定propagate参数

检查任务是否运行成功

任务状态

默认任务状态流程
PENDING -> STARTED -> SUCCESS
PENDING状态是默认任务的状态,包括未知任务

任务重试可能的状态
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Canvas: 设计工作流

创建子任务subtasks
使用subtask创建子任务

也有简写形式

子任务调用与普通任务相同,可以使用delay和apply_async。

res = s1.delay()
res.get()
4
但可以创建partials,不完整的signatures

关键词参数也可以添加,并后续更新

subtasks调用语法

原语

本身为subtask,可以组合成复杂的工作流
group
chain
chord
map
starmap
chunks
示例

Groups

一组任务并行执行

Partial group

Chains

连续执行

也可以写成这样

patial chain

Chords
一组带回调函数的任务

group连接到另一个任务可以自动转换为chord

Routing 路由

支持所有AMQP的路由特性,并提供发送给指定名字队列的简单路由。
CELERY_ROUTES属性
添加简单路由

设置消费队列的worker,使用-Q指定队列,逗号分隔多个队列

显示

执行任务时设置queue属性

远程控制

RabbitMQ,Redis和MongoDB可以在线控制worker
查看哪些worker正在工作

指定workers响应,使用–destination选项,用逗号分隔

celery inspect 命令不修改worker的任何信息
celery control 修改woker
例如:
开启事件消息

服务器端响应

开启事件后,可以查看worker在干什么

或者直接开启curses interface

关闭事件

celery status 查看状态

时区

默认使用UTC
可用CELERY_TIMEZONE配置

优化

默认没有优化
使用RabbitMQ则需要安装librabbitmq模块