Celery入门

目录

Celery是一个用Python写的分布式任务队列系统。
我想用Celery作任务调度器,并异步处理用户交互任务。

开始使用

介绍Celery最简单的应用,包括:

  • 选择并安装消息代理(broker)
  • 安装Celery并创建任务
  • 运行worker并调用任务
  • 跟踪任务执行状态,检查任务返回值

选择代理

Celery需要一个用来发送和接受消息的消息代理(message broker),有多种选择,官方推荐使用RabbitMQ。

RabbitMQ

安装RabbitMQ

vagrant@precise64:/vagrant/windroc$ sudo apt-get install rabbitmq-server

安装信息中显示

Starting rabbitmq-server: SUCCESS
rabbitmq-server.

表示RabbitMQ服务器已经运行。
其它选项暂不考虑。

安装Celery

Celery是Python包,所以可以使用pip安装。

pip install celery

应用

需要一个Celery示例,叫做celery应用或简称celery app。
最简单的一个示例:tasks.py

from celery import Celery
app = Celery('tasks', broker='amqp://guest@localhost//')
@app.task
def add(x, y):
    return x + y

这里的broker使用本机的RabbitMQ。

运行Celery worker服务器

前台启动worker服务器

celery -A tasks worker --loglevel=info

界面显示如下:

-------------- celery@precise64 v3.1.17 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.2.0-23-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x2535290
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     disabled
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
[tasks]
  . tasks.add
[2015-02-13 16:58:22,333: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-02-13 16:58:22,346: INFO/MainProcess] mingle: searching for neighbors
[2015-02-13 16:58:23,369: INFO/MainProcess] mingle: all alone
[2015-02-13 16:58:23,387: WARNING/MainProcess] celery@precise64 ready.

从信息中可以看到当前配置、消息队列、任务列表等。
还可以后台启动,守护进程启动,详情参见官方文档。

执行任务

使用{py}delay(){/py}方法异步执行任务

>>> from tasks import add
>>> add.delay(4, 4)

可以在之前启动的服务窗口中看到如下的日志信息:

[2015-02-13 16:59:02,051: INFO/MainProcess] Received task: tasks.add[6381399e-274e-4aa8-8e86-66627bfa0e64]
[2015-02-13 16:59:02,052: INFO/MainProcess] Task tasks.add[6381399e-274e-4aa8-8e86-66627bfa0e64] succeeded in 0.000748680000015s: 7

客户端返回一个AsyncResult实例,但我们需要配置result backend才能使用它的一些高级功能。

保存结果

如果想跟踪任务状态,Celery需要存储或发送状态到某地方,这就是Result backend,同样有多种选项。
创建Celery对象时设置backend属性,或者使用配置对象设置CELERY_RESULT_BACKEND。
修改创建app的代码:

app = Celery('tasks', backend='amqp', broker='amqp://guest@localhost//')

重新启动worker服务,屏幕显示:

-------------- celery@precise64 v3.1.17 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.2.0-23-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         tasks:0x1da6290
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
[tasks]
  . tasks.add
[2015-02-14 02:46:50,873: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-02-14 02:46:50,886: INFO/MainProcess] mingle: searching for neighbors
[2015-02-14 02:46:51,901: INFO/MainProcess] mingle: all alone
[2015-02-14 02:46:51,922: WARNING/MainProcess] celery@precise64 ready.

config下的results属性现在为amqp,表示使用RabbitMQ存储结果。
可以使用ready判断任务是否运行结束,使用get获取返回结果。

>>> result = add.delay(5,5)
>>> result.ready()
True
>>> result.get()
10

当出现异常时,get会重新抛出异常:

>>> result = add.delay(5,'a')
>>> result.ready()
True
>>> result.get()
Traceback (most recent call last):
  File "", line 1, in
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 162, in get
    self.maybe_reraise()
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 271, in maybe_reraise
    raise self.result
TypeError: unsupported operand type(s) for +: 'int' and 'str'

可以改变该行为

>>> result.get(propagate=False)
TypeError("unsupported operand type(s) for +: 'int' and 'str'",)

同样可以获取原来的异常

>>> result.traceback
'Traceback (most recent call last):\n  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task\n    R = retval = fun(*args, **kwargs)\n  File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__\n    return self.run(*args, **kwargs)\n  File "/vagrant/windroc/py/celery/tasks.py", line 7, in add\n    return x + y\nTypeError: unsupported operand type(s) for +: \'int\' and \'str\'\n'

配置

设置app或者使用专用的配置对象,类似flask config,是一个字典,并且某些属性值单独列出。
直接设置

app.conf.CELERY_TASK_SERIALIZER = 'json'

更新字典

app.conf.update(
    CELERY_TASK_SERIALIZER='json',
    CELERY_ACCEPT_CONTENT=['json'],  # Ignore other content
    CELERY_RESULT_SERIALIZER='json',
    CELERY_TIMEZONE='Europe/Oslo',
    CELERY_ENABLE_UTC=True,
)

从文件中读取

app.config_from_object('celeryconfig')

下面展示一下配置文件的强大力量:
创建celeryconfig.py文件

CELERY_ANNOTATIONS = {
    'tasks.add': {'rate_limit': '10/m'}
}

限制每分钟完成10个任务
如果使用RabbitMQ或Redis作为Broker,可以使用命令在线设置:

vagrant@precise64:/vagrant/windroc/py/celery$ celery -A tasks control rate_limit tasks.add 10/m
-> celery@precise64: OK
        new rate limit set successfully

服务端显示

[2015-02-14 03:37:33,948: INFO/MainProcess] New rate limit for tasks of type tasks.add: 10/m.

下一步

在应用中使用Celery

项目结构

proj/__init__.py
    /celery.py
    /tasks.py

文件
proj/celery.py

from __future__ import absolute_import
from celery import Celery
app = Celery('proj',
             broker='amqp://',
             backend='amqp://',
             include=['proj.tasks'])
# Optional configuration, see the application user guide.
app.conf.update(
    CELERY_TASK_RESULT_EXPIRES=3600,
)
if __name__ == '__main__':
    app.start()

broker: 代理
backend: 指定保存结果后端
可以设置没有返回结果,在任务中加入ignore_result属性

@task(ignore_result=True)

include:第一个例子中没有include,表示worker启动时需要导入的模块列表
proj/tasks.py

from __future__ import absolute_import
from proj.celery import app
@app.task
def add(x, y):
    return x + y
@app.task
def mul(x, y):
    return x * y
@app.task
def xsum(numbers):
    return sum(numbers)

启动worker

vagrant@precise64:~/project/py/celery$ celery worker --app=proj -l info

得到类似下面的信息:

-------------- celery@precise64 v3.1.17 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.8.0-44-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x22a2210
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
[tasks]
  . proj.tasks.add
  . proj.tasks.mul
  . proj.tasks.xsum
[2015-02-14 13:32:17,225: INFO/MainProcess] Connected to amqp://guest:**@127.0.0.1:5672//
[2015-02-14 13:32:17,239: INFO/MainProcess] mingle: searching for neighbors
[2015-02-14 13:32:18,251: INFO/MainProcess] mingle: all alone
[2015-02-14 13:32:18,264: WARNING/MainProcess] celery@precise64 ready.

transport: 表示broker? 可以用-b属性设置另外一个
concurrency:工作进程个数,默认为CPU核心数,使用-c指定
queues:获取任务的队列

关闭worker

Ctrl+C

后台运行

使用celery multi命令
启动 start

vagrant@precise64:~/project/py/celery$ celery multi start w1 -A proj -l info
celery multi v3.1.17 (Cipater)
> Starting nodes...
    > w1@precise64: OK

重启restart

vagrant@precise64:~/project/py/celery$ celery multi restart w1 -A proj -l info
celery multi v3.1.17 (Cipater)
> Stopping nodes...
    > w1@precise64: TERM -> 7292
> Waiting for 1 node -> 7292.....
    > w1@precise64: OK
> Restarting node w1@precise64: OK

停止stop

vagrant@precise64:/var/log/celery$ celery multi stop w1 -A proj -l info
celery multi v3.1.17 (Cipater)
> w1@precise64: DOWN

等待停止

vagrant@precise64:~/project/py/celery$ celery multi stopwait w1 -A proj -l info
celery multi v3.1.17 (Cipater)
> Stopping nodes...
    > w1@precise64: TERM -> 7359
> Waiting for 1 node -> 7359.....
    > w1@precise64: OK

命令后续参数必须一致才可以控制worker
默认在当前目录创建pid和log文件,可以指定位置

vagrant@precise64:~/project/py/celery$ sudo celery multi start w1 -A proj -l info --pidfile=/var/run/celery/%n.pid --logfile=/var/log/celery/%n.pid
celery multi v3.1.17 (Cipater)
> Starting nodes...
    > w1@precise64: OK

关于–app参数

module.path:attribute
可以简写:
–app=proj
搜索路径
proj.app
proj.celery
proj中是否有Celery应用,或者找子模块proj.celery
proj.celery.app
proj.celery.celery
proj.celery中是否有Celery应用
小项目可以用proj:app
大项目可以用proj.celery:app

调用任务

delay()函数是对apply_async()函数的封装,下面的两条语句等价。

add.delay(2, 2)
add.apply_async((2, 2))

applay_async函数可以设置选项:

tasks.add.apply_async((4,4), queue="lopri", countdown=10)

任务将被发送到lopri队列,最早在10秒后执行。
直接调用则直接运行

>>> add(2,2)
4

返回结果

每个任务有一个UUID标识
delay和applay_async方法都返回一个AsyncResult 对象,但必须制定一个result backend,否则不会保存。
result backend不用来控制tasks和workers,Cleery使用dedicated event messages
获取值

>>> res = tasks.add.delay(2,2)
>>> res.get(timeout=1)
4

任务id号

>>> res.id
'377f2eb0-c25e-4709-9ed7-ccb0d3f5bd7e'

get默认会传递任意错误:

>>> res = tasks.add.delay(3)
>>> res.get(timeout=1)
Traceback (most recent call last):
  File "", line 1, in
  File "/usr/local/lib/python2.7/dist-packages/celery/result.py", line 175, in get
    raise meta['result']
TypeError: add() takes exactly 2 arguments (1 given)

不希望传递(propagate)错误,需要制定propagate参数

>>> res.get(propagate=False)
TypeError('add() takes exactly 2 arguments (1 given)',)

检查任务是否运行成功

>>> res.failed()
True
>>> res.successful()
False

任务状态

>>> res.state
'FAILURE'

默认任务状态流程
PENDING -> STARTED -> SUCCESS
PENDING状态是默认任务的状态,包括未知任务

>>> from proj.celery import app
>>> res = app.AsyncResult('this-id-does-not-exist')
>>> res.state
'PENDING'

任务重试可能的状态
PENDING -> STARTED -> RETRY -> STARTED -> RETRY -> STARTED -> SUCCESS

Canvas: 设计工作流

创建子任务subtasks
使用subtask创建子任务

>>>tasks.add.subtask((2,2), countdown=10)
proj.tasks.add(2, 2)

也有简写形式

>>>s1 = tasks.add.s(2,2)
>>> type(s1)
<class 'celery.canvas.Signature'>

子任务调用与普通任务相同,可以使用delay和apply_async。
>>> res = s1.delay()
>>> res.get()
4
但可以创建partials,不完整的signatures

>>> s2 = tasks.add.s(2)
>>> res = s2.delay(8)
>>> res.get()
10

关键词参数也可以添加,并后续更新

>>> s3 = add.s(2, 2, debug=True)
>>> s3.delay(debug=False)   # debug is now False.

subtasks调用语法

subtask.apply_async(args=(), kwargs={}, **options)
subtask.delay(*args, **kwargs)

原语

本身为subtask,可以组合成复杂的工作流
group
chain
chord
map
starmap
chunks
示例

Groups

一组任务并行执行

>>> from celery import group
>>> from proj.tasks import add
>>> group(add.s(i,i) for i in xrange(10))().get()
[0, 2, 4, 6, 8, 10, 12, 14, 16, 18]

Partial group

>>> g = group(add.s(i) for i in xrange(10))
>>> g(10).get()
[10, 11, 12, 13, 14, 15, 16, 17, 18, 19]
Chains

连续执行

>>> from celery import chain
>>> from proj.tasks import add, mul
>>> chain(add.s(4,4) | mul.s(8))().get()
64

也可以写成这样

>>> (add.s(4, 4) | mul.s(8))().get()
64

patial chain

>>> g = chain(add.s(4,4) | mul.s(8))
>>> g().get()
64
>>> g = chain(add.s(4) | mul.s(8))
>>> g(4).get()
64
Chords

一组带回调函数的任务

>>> from celery import chord
>>> from proj.tasks import add, xsum
>>> chord( (add.s(i,i) for i in xrange(10)) , xsum.s() )().get()
90

group连接到另一个任务可以自动转换为chord

>>> (group(add.s(i, i) for i in xrange(10)) | xsum.s())().get()
90

Routing 路由

支持所有AMQP的路由特性,并提供发送给指定名字队列的简单路由。
CELERY_ROUTES属性
添加简单路由

app.conf.update(
    CELERY_ROUTES = {
        'proj.tasks.add': {'queue': 'hipri'},
    },
)

设置消费队列的worker,使用-Q指定队列,逗号分隔多个队列

celery worker --app=proj -l info -Q hipri,celery

显示

-------------- celery@precise64 v3.1.17 (Cipater)
---- **** -----
--- * ***  * -- Linux-3.8.0-44-generic-x86_64-with-Ubuntu-12.04-precise
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app:         proj:0x16bc210
- ** ---------- .> transport:   amqp://guest:**@localhost:5672//
- ** ---------- .> results:     amqp://
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ----
--- ***** ----- [queues]
 -------------- .> celery           exchange=celery(direct) key=celery
                .> hipri            exchange=hipri(direct) key=hipri

执行任务时设置queue属性

>>> from proj.tasks import add
>>> add.apply_async((2,2), queue='hipri')
<asyncResult: 3abe224a-60f9-44d1-b551-c54b68412489>

远程控制

RabbitMQ,Redis和MongoDB可以在线控制worker
查看哪些worker正在工作

vagrant@precise64:~/project/py/celery$ celery -A proj inspect active
-> celery@precise64: OK
    - empty -

指定workers响应,使用–destination选项,用逗号分隔

vagrant@precise64:~/project/py/celery$ celery -A proj inspect active --destination=celery@precise64
-> celery@precise64: OK
    - empty -

celery inspect 命令不修改worker的任何信息
celery control 修改woker
例如:
开启事件消息

vagrant@precise64:~/project/py/celery$ celery -A proj control enable_events
-> celery@precise64: OK
        task events enabled

服务器端响应

[2015-02-15 02:42:39,816: INFO/MainProcess] Events of group {task} enabled by remote.

开启事件后,可以查看worker在干什么

vagrant@precise64:~/project/py/celery$ celery -A proj events --dump
-> evdump: starting capture...
celery@precise64 [2015-02-15 02:43:57.587321] heartbeat: clock=1187, freq=5, local_received=1423968237.59, pid=7743, sw_ident=py-celery, sw_sys=Linux, sw_ver=3.1.17, utcoffset=0
celery@precise64 [2015-02-15 02:43:58.758686] heartbeat: active=0, clock=1189, freq=2.0, loadavg=[0.04, 0.03, 0.05], local_received=1423968238.76, pid=7743, processed=1, sw_ident=py-celery, sw_sys=Linux, sw_ver=3.1.17, utcoffset=0
celery@precise64 [2015-02-15 02:44:00.759378] heartbeat: active=0, clock=1191, freq=2.0, loadavg=[0.04, 0.03, 0.05], local_received=1423968240.76, pid=7743, processed=1, sw_ident=py-celery, sw_sys=Linux, sw_ver=3.1.17, utcoffset=0

或者直接开启curses interface

celery -A proj events

关闭事件

vagrant@precise64:~/project/py/celery$  celery -A proj control disable_events
-> celery@precise64: OK
        task events disabled

celery status 查看状态

vagrant@precise64:~/project/py/celery$ celery -A proj status
celery@precise64: OK
1 node online.

时区

默认使用UTC
可用CELERY_TIMEZONE配置

优化

默认没有优化
使用RabbitMQ则需要安装librabbitmq模块

pip install librabbitmq