按照官方说法 Celery 是一个简单,灵活可靠的分布式消息处理组件。至于这货性能如何,实际测试才知道。
首先 celery 不是mq,也不是生产者或者消费者,celery 只是一个库,对消息进行了封装。
Application
celery 的 Application是线程安全的,所以可以在一个进程内创建多个application.
1 | from celery import Celery |
application的创建是lazy模式,默认只
通过 application 设置参数
1 | app.conf.enable_utc = True |
这里的 broker 指定消息中间件的地址,rabbitmq 有多种写法:
- amqp://
- pyamqp://
- librabbitmq://
如果安装了librabbitmq,则 amqp 使用 librabbitmq,否则使用 py-amqp。当然也可直接指定。librabbitmq是python封装的c库,效率更高。
启动Celery
1 | celery -A tasks worker --loglevel=info |
调用task
调用 task 有多种方式,分别是
apply_async(args[, kwargs[, …]])
发送消息,支持多种参数
delay(*args, **kwargs)
发送消息,不支持执行参数
calling (
__call__
)
1 | from tasks import add |
调用task
会返回一个AsyncResult
对象,可以用来获取task
是否完成,返回值,异常信息。
保存结果
如果需要task
的状态,需要通过backend
参数设置状态的存放地址,可以是 SQLAlchemy/DjangoORM, Memcached, Redis, RPC(RabbitMQ/AMQP)
完成状态
1 | result = add.delay(4, 4) |
释放资源
每一个AsyncResult
都需要显示调用get()
,forget()
来释放资源。
celery 配置
配置项 | 说明 |
---|---|
accept_content | 支持序列化类型,默认 json格式 |
1 | # 控制task速率(每分钟最多执行200个) |
消息持久化
默认情况下,消息是会写到硬盘的,当然也可以自己配置:
1 | from kombu import Exchange, Queue |
celery 启动参数
1 | celery -A tasks worker --loglevel=info |
启动参数说明
-Q
: 指定 queue-n
: 指定 hostname, eg:something.1@%h
—loglevel
: 指定日志级别-f
: 指定日志文件--without-heartbeat
: 不使用心跳--without-gossip
:--without-mingle
:--concurrency=1
: 指定worker数量