Celery入门
Celery是一个用Python写的分布式任务队列系统。
我想用Celery作任务调度器,并异步处理用户交互任务。
开始使用
介绍Celery最简单的应用,包括:
- 选择并安装消息代理(broker)
- 安装Celery并创建任务
- 运行worker并调用任务
- 跟踪任务执行状态,检查任务返回值
选择代理
Celery需要一个用来发送和接受消息的消息代理(message broker),有多种选择,官方推荐使用RabbitMQ。
RabbitMQ
安装RabbitMQ
vagrant@precise64:/vagrant/windroc$ sudo apt-get install rabbitmq-server
安装信息中显示
Starting rabbitmq-server: SUCCESS rabbitmq-server.
表示RabbitMQ服务器已经运行。
其它选项暂不考虑。
安装Celery
Celery是Python包,所以可以使用pip安装。
pip install celery
应用
需要一个Celery示例,叫做celery应用或简称celery app。
最简单的一个示例:tasks.py
from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
return x + y这里的broker使用本机的RabbitMQ。
运行Celery worker服务器
前台启动worker服务器
celery -A tasks worker --loglevel=info
界面显示如下:
-------------- celery@precise64 v3.1.17 (Cipater) ---- **** ----- --- * *** * -- Linux-3.2.0-23-generic-x86_64-with-Ubuntu-12.04-precise -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x2535290 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: disabled - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2015-02-13 16:58:22,333: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-02-13 16:58:22,346: INFO/MainProcess] mingle: searching for neighbors [2015-02-13 16:58:23,369: INFO/MainProcess] mingle: all alone [2015-02-13 16:58:23,387: WARNING/MainProcess] celery@precise64 ready.
从信息中可以看到当前配置、消息队列、任务列表等。
还可以后台启动,守护进程启动,详情参见官方文档。
执行任务
使用{py}delay(){/py}方法异步执行任务
>>> from tasks import add >>> add.delay(4, 4)
可以在之前启动的服务窗口中看到如下的日志信息:
[2015-02-13 16:59:02,051: INFO/MainProcess] Received task: tasks.add[6381399e-274e-4aa8-8e86-66627bfa0e64] [2015-02-13 16:59:02,052: INFO/MainProcess] Task tasks.add[6381399e-274e-4aa8-8e86-66627bfa0e64] succeeded in 0.000748680000015s: 7
客户端返回一个AsyncResult实例,但我们需要配置result backend才能使用它的一些高级功能。
保存结果
如果想跟踪任务状态,Celery需要存储或发送状态到某地方,这就是Result backend,同样有多种选项。
创建Celery对象时设置backend属性,或者使用配置对象设置CELERY_RESULT_BACKEND。
修改创建app的代码:
app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')重新启动worker服务,屏幕显示:
-------------- celery@precise64 v3.1.17 (Cipater) ---- **** ----- --- * *** * -- Linux-3.2.0-23-generic-x86_64-with-Ubuntu-12.04-precise -- * - **** --- - ** ---------- [config] - ** ---------- .> app: tasks:0x1da6290 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . tasks.add [2015-02-14 02:46:50,873: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-02-14 02:46:50,886: INFO/MainProcess] mingle: searching for neighbors [2015-02-14 02:46:51,901: INFO/MainProcess] mingle: all alone [2015-02-14 02:46:51,922: WARNING/MainProcess] celery@precise64 ready.
config下的results属性现在为amqp,表示使用RabbitMQ存储结果。
可以使用ready判断任务是否运行结束,使用get获取返回结果。
>>> result = add.delay(5,5) >>> result.ready() True >>> result.get() 10
当出现异常时,get会重新抛出异常:
>>> result = add.delay(5,'a')
>>> result.ready()
True
>>> result.get()
Traceback (most recent call last):
File "", line 1, in
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 162, in get
self.maybe_reraise()
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 271, in maybe_reraise
raise self.result
TypeError: unsupported operand type(s) for +: 'int' and 'str'可以改变该行为
>>> result.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'",)同样可以获取原来的异常
>>> result.traceback 'Traceback (most recent call last):\n File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task\n R = retval = fun(*args, **kwargs)\n File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__\n return self.run(*args, **kwargs)\n File "/vagrant/windroc/py/celery/tasks.py", line 7, in add\n return x + y\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n'
配置
设置app或者使用专用的配置对象,类似flask config,是一个字典,并且某些属性值单独列出。
直接设置
app.conf.CELERY_TASK_SERIALIZER = 'json'
更新字典
app.conf.update(
CELERY_TASK_SERIALIZER='json',
CELERY_ACCEPT_CONTENT=['json'], # Ignore other content
CELERY_RESULT_SERIALIZER='json',
CELERY_TIMEZONE='Europe/Oslo',
CELERY_ENABLE_UTC=True,
)从文件中读取
app.config_from_object('celeryconfig')下面展示一下配置文件的强大力量:
创建celeryconfig.py文件
CELERY_ANNOTATIONS = {
'tasks.add': {'rate_limit': '10/m'}
}限制每分钟完成10个任务
如果使用RabbitMQ或Redis作为Broker,可以使用命令在线设置:
vagrant@precise64:/vagrant/windroc/py/celery$ celery -A tasks control rate_limit tasks.add 10/m
-> celery@precise64: OK
new rate limit set successfully服务端显示
[2015-02-14 03:37:33,948: INFO/MainProcess] New rate limit for tasks of type tasks.add: 10/m.
下一步
在应用中使用Celery
项目结构
proj/__init__.py
/celery.py
/tasks.py文件
proj/celery.py
from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
broker='amqp://',
backend='amqp://',
include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
app.start()broker: 代理
backend: 指定保存结果后端
可以设置没有返回结果,在任务中加入ignore_result属性
@task(ignore_result=True)
include:第一个例子中没有include,表示worker启动时需要导入的模块列表
proj/tasks.py
from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
return x + y
@app.task
def mul(x, y):
return x * y
@app.task
def xsum(numbers):
return sum(numbers)启动worker
vagrant@precise64:~/project/py/celery$ celery worker --app=proj -l info
得到类似下面的信息:
-------------- celery@precise64 v3.1.17 (Cipater) ---- **** ----- --- * *** * -- Linux-3.8.0-44-generic-x86_64-with-Ubuntu-12.04-precise -- * - **** --- - ** ---------- [config] - ** ---------- .> app: proj:0x22a2210 - ** ---------- .> transport: amqp://guest:**@localhost:5672// - ** ---------- .> results: amqp:// - *** --- * --- .> concurrency: 2 (prefork) -- ******* ---- --- ***** ----- [queues] -------------- .> celery exchange=celery(direct) key=celery [tasks] . proj.tasks.add . proj.tasks.mul . proj.tasks.xsum [2015-02-14 13:32:17,225: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672// [2015-02-14 13:32:17,239: INFO/MainProcess] mingle: searching for neighbors [2015-02-14 13:32:18,251: INFO/MainProcess] mingle: all alone [2015-02-14 13:32:18,264: WARNING/MainProcess] celery@precise64 ready.
transport: 表示broker? 可以用-b属性设置另外一个
concurrency:工作进程个数,默认为CPU核心数,使用-c指定
queues:获取任务的队列
关闭worker
Ctrl+C
后台运行
使用celery multi命令
启动 start
vagrant@precise64:~/project/py/celery$ celery multi start w1 -A proj -l info celery multi v3.1.17 (Cipater) > Starting nodes... > w1@precise64: OK
重启restart
vagrant@precise64:~/project/py/celery$ celery multi restart w1 -A proj -l info celery multi v3.1.17 (Cipater) > Stopping nodes... > w1@precise64: TERM -> 7292 > Waiting for 1 node -> 7292..... > w1@precise64: OK > Restarting node w1@precise64: OK
停止stop
vagrant@precise64:/var/log/celery$ celery multi stop w1 -A proj -l info celery multi v3.1.17 (Cipater) > w1@precise64: DOWN
等待停止
vagrant@precise64:~/project/py/celery$ celery multi stopwait w1 -A proj -l info celery multi v3.1.17 (Cipater) > Stopping nodes... > w1@precise64: TERM -> 7359 > Waiting for 1 node -> 7359..... > w1@precise64: OK
命令后续参数必须一致才可以控制worker
默认在当前目录创建pid和log文件,可以指定位置
vagrant@precise64:~/project/py/celery$ sudo celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n.pid celery multi v3.1.17 (Cipater) > Starting nodes... > w1@precise64: OK
关于–app参数
module.path:attribute
可以简写:
–app=proj
搜索路径
proj.app
proj.celery
proj中是否有Celery应用,或者找子模块proj.celery
proj.celery.app
proj.celery.celery
proj.celery中是否有Celery应用
小项目可以用proj:app
大项目可以用proj.celery:app
调用任务
delay()函数是对apply_async()函数的封装,下面的两条语句等价。
add.delay(2, 2) add.apply_async((2, 2))
applay_async函数可以设置选项:
tasks.add.apply_async((4,4), queue="lopri", countdown=10)
任务将被发送到lopri队列,最早在10秒后执行。
直接调用则直接运行
>>> add(2,2) 4
返回结果
每个任务有一个UUID标识
delay和applay_async方法都返回一个AsyncResult 对象,但必须制定一个result backend,否则不会保存。
result backend不用来控制tasks和workers,Cleery使用dedicated event messages
获取值
>>> res = tasks.add.delay(2,2) >>> res.get(timeout=1) 4
任务id号
>>> res.id '377f2eb0-c25e-4709-9ed7-ccb0d3f5bd7e'
get默认会传递任意错误:
>>> res = tasks.add.delay(3)
>>> res.get(timeout=1)
Traceback (most recent call last):
File "", line 1, in
File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 175, in get
raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)不希望传递(propagate)错误,需要制定propagate参数
>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)检查任务是否运行成功
>>> res.failed() True >>> res.successful() False
任务状态
>>> res.state 'FAILURE'
默认任务状态流程
PENDING -> STARTED -> SUCCESS
PENDING状态是默认任务的状态,包括未知任务
>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'任务重试可能的状态
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS
Canvas: 设计工作流
创建子任务subtasks
使用subtask创建子任务
>>>tasks.add.subtask((2,2), countdown=10) proj.tasks.add(2, 2)
也有简写形式
>>>s1 = tasks.add.s(2,2) >>> type(s1) <class 'celery.canvas.Signature'>
子任务调用与普通任务相同,可以使用delay和apply_async。
res = s1.delay()
res.get()
4
但可以创建partials,不完整的signatures
>>> s2 = tasks.add.s(2) >>> res = s2.delay(8) >>> res.get() 10
关键词参数也可以添加,并后续更新
>>> s3 = add.s(2, 2, debug=True) >>> s3.delay(debug=False) # debug is now False.
subtasks调用语法
subtask.apply_async(args=(), kwargs={}, **options)
subtask.delay(*args, **kwargs)原语
本身为subtask,可以组合成复杂的工作流
group
chain
chord
map
starmap
chunks
示例
Groups
一组任务并行执行
>>> from celery import group >>> from proj.tasks import add >>> group(add.s(i,i) for i in xrange(10))().get() [0, 2, 4, 6, 8, 10, 12, 14, 16, 18]
Partial group
>>> g = group(add.s(i) for i in xrange(10)) >>> g(10).get() [10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains
连续执行
>>> from celery import chain >>> from proj.tasks import add, mul >>> chain(add.s(4,4) | mul.s(8))().get() 64
也可以写成这样
>>> (add.s(4, 4) | mul.s(8))().get() 64
patial chain
>>> g = chain(add.s(4,4) | mul.s(8)) >>> g().get() 64 >>> g = chain(add.s(4) | mul.s(8)) >>> g(4).get() 64
Chords
一组带回调函数的任务
>>> from celery import chord >>> from proj.tasks import add, xsum >>> chord( (add.s(i,i) for i in xrange(10)) , xsum.s() )().get() 90
group连接到另一个任务可以自动转换为chord
>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get() 90
Routing 路由
支持所有AMQP的路由特性,并提供发送给指定名字队列的简单路由。
CELERY_ROUTES属性
添加简单路由
app.conf.update(
CELERY_ROUTES = {
'proj.tasks.add': {'queue': 'hipri'},
},
)设置消费队列的worker,使用-Q指定队列,逗号分隔多个队列
celery worker --app=proj -l info -Q hipri,celery
显示
-------------- celery@precise64 v3.1.17 (Cipater)
---- **** -----
--- * *** * -- Linux-3.8.0-44-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: proj:0x16bc210
- ** ---------- .> transport: amqp://guest:**@localhost:5672//
- ** ---------- .> results: amqp://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
-------------- .> celery exchange=celery(direct) key=celery
.> hipri exchange=hipri(direct) key=hipri执行任务时设置queue属性
>>> from proj.tasks import add >>> add.apply_async((2,2), queue='hipri') <asyncResult: 3abe224a-60f9-44d1-b551-c54b68412489>
远程控制
RabbitMQ,Redis和MongoDB可以在线控制worker
查看哪些worker正在工作
vagrant@precise64:~/project/py/celery$ celery -A proj inspect active
-> celery@precise64: OK
- empty -指定workers响应,使用–destination选项,用逗号分隔
vagrant@precise64:~/project/py/celery$ celery -A proj inspect active --destination=celery@precise64
-> celery@precise64: OK
- empty -celery inspect 命令不修改worker的任何信息
celery control 修改woker
例如:
开启事件消息
vagrant@precise64:~/project/py/celery$ celery -A proj control enable_events
-> celery@precise64: OK
task events enabled服务器端响应
[2015-02-15 02:42:39,816: INFO/MainProcess] Events of group {task} enabled by remote.开启事件后,可以查看worker在干什么
vagrant@precise64:~/project/py/celery$ celery -A proj events --dump -> evdump: starting capture... celery@precise64 [2015-02-15 02:43:57.587321] heartbeat: clock=1187, freq=5, local_received=1423968237.59, pid=7743, sw_ident=py-celery, sw_sys=Linux, sw_ver=3.1.17, utcoffset=0 celery@precise64 [2015-02-15 02:43:58.758686] heartbeat: active=0, clock=1189, freq=2.0, loadavg=[0.04, 0.03, 0.05], local_received=1423968238.76, pid=7743, processed=1, sw_ident=py-celery, sw_sys=Linux, sw_ver=3.1.17, utcoffset=0 celery@precise64 [2015-02-15 02:44:00.759378] heartbeat: active=0, clock=1191, freq=2.0, loadavg=[0.04, 0.03, 0.05], local_received=1423968240.76, pid=7743, processed=1, sw_ident=py-celery, sw_sys=Linux, sw_ver=3.1.17, utcoffset=0
或者直接开启curses interface
celery -A proj events
关闭事件
vagrant@precise64:~/project/py/celery$ celery -A proj control disable_events
-> celery@precise64: OK
task events disabledcelery status 查看状态
vagrant@precise64:~/project/py/celery$ celery -A proj status celery@precise64: OK 1 node online.
时区
默认使用UTC
可用CELERY_TIMEZONE配置
优化
默认没有优化
使用RabbitMQ则需要安装librabbitmq模块
pip install librabbitmq
