celery 介绍

按照官方说法 Celery 是一个简单,灵活可靠的分布式消息处理组件。至于这货性能如何,实际测试才知道。

首先 celery 不是mq,也不是生产者或者消费者,celery 只是一个库,对消息进行了封装。

Application

celery 的 Application是线程安全的,所以可以在一个进程内创建多个application.

1
2
3
4
5
6
7
from celery import Celery

app = Celery('tasks', broker='pyamqp://guest@localhost//')

@app.task
def add(x, y):
return x + y

application的创建是lazy模式,默认只

通过 application 设置参数

1
2
3
4
5
6
7
8
app.conf.enable_utc = True
app.conf.timezone = 'Europe/London'

# 批量设置
app.conf.update(
enable_utc=True,
timezone='Europe/London',
)

这里的 broker 指定消息中间件的地址,rabbitmq 有多种写法:

  • amqp://
  • pyamqp://
  • librabbitmq://

如果安装了librabbitmq,则 amqp 使用 librabbitmq,否则使用 py-amqp。当然也可直接指定。librabbitmq是python封装的c库,效率更高。

启动Celery

1
celery -A tasks worker --loglevel=info

调用task

调用 task 有多种方式,分别是

  • apply_async(args[, kwargs[, …]])

    发送消息,支持多种参数

  • delay(*args, **kwargs)

    发送消息,不支持执行参数

  • calling (__call__)

1
2
>>> from tasks import add
>>> add.delay(4, 4)

调用task会返回一个AsyncResult对象,可以用来获取task是否完成,返回值,异常信息。

保存结果

如果需要task的状态,需要通过backend参数设置状态的存放地址,可以是 SQLAlchemy/DjangoORM, Memcached, Redis, RPC(RabbitMQ/AMQP)

完成状态

1
2
result = add.delay(4, 4)
result.ready()

释放资源

每一个AsyncResult都需要显示调用get(),forget()来释放资源。

celery 配置

配置项 说明
accept_content 支持序列化类型,默认 json格式
1
2
# 控制task速率(每分钟最多执行200个)
app.control.rate_limit('myapp.mytask', '200/m')

消息持久化

默认情况下,消息是会写到硬盘的,当然也可以自己配置:

1
2
3
4
5
6
7
from kombu import Exchange, Queue

CELERY_QUEUES = (
Queue('celery', routing_key='celery'),
Queue('transient', routing_key='transient',
delivery_mode=1), # 1: 不持久化, 2: 持久化
)

celery 启动参数

1
celery -A tasks worker --loglevel=info

启动参数说明

  • -Q: 指定 queue
  • -n: 指定 hostname, eg: something.1@%h
  • —loglevel: 指定日志级别
  • -f: 指定日志文件
  • --without-heartbeat: 不使用心跳
  • --without-gossip:
  • --without-mingle:
  • --concurrency=1: 指定worker数量