python Celery的基本应用
发表于:2019-10-16 18:24:01浏览:60次
介绍
Celery是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。
这是一个任务队列,着重于实时处理,同时还支持任务调度。
官方文档 https://docs.celeryproject.org/en/latest/
安装
消息中间件
一般是用Redis数据库来通讯更加方便,需要本地安装redis服务并启动,具体操作请参考其他博客和官方文档。
pip install redis
安装 celery
pip install celery[redis]
实现
项目布局
|- demo
|- celery_app # 包
|- __init__.py
|- conf.py # 配置文件
|- task.py # 任务文件
|- app.py # 执行文件
celery_app/__init__.py
# -*- coding: utf-8 -*-
from celery import Celery
app = Celery('celery_app')
# 官方文档 https://docs.celeryproject.org/en/latest/userguide/application.html#configuration
app.config_from_object('celery_app.conf')
celery_app/conf.py
# -*- coding: utf-8 -*-
# 官方文档 https://docs.celeryproject.org/en/latest/userguide/configuration.html#example-configuration-file
BROKER_URL = 'redis://127.0.0.1:6379/1'
CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'
# 时区设置
CELERY_TIMEZONE = 'Asia/Shanghai'
# 启动时要导入的模块列表
CELERY_IMPORTS = (
'celery_app.task',
)
celery_app/task.py
# -*- coding: utf-8 -*-
# 导入包
from celery_app import app
@app.task
def add(x, y):
return x + y
@app.task
def reduce(x, y):
return x - y
@app.task
def mul(x, y):
return x * y
app.py
# -*- coding: utf-8 -*-
from celery_app import task
task.add.delay(2, 4)
task.reduce.delay(7, 4)
task.mul.delay(7, 411)
task.add.delay(12, 40)
print('end...')
启动工作程序
# 可使用 celery worker --help 查看更详细的命令说明
# 官方文档 https://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html
$ celery worker -A celery_app -l info
开始工作时,可以看到
---- **** -----
--- * *** * -- Linux-4.4.0-165-generic-x86_64-with-Ubuntu-16.04-xenial 2019-10-16 07:35:59
-- * - **** ---
- ** ---------- [config]
- ** ---------- .> app: celery_app:0x7f793d447048
- ** ---------- .> transport: redis://127.0.0.1:6379/1
- ** ---------- .> results: redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** -----
-------------- [queues]
.> celery exchange=celery(direct) key=celery
[tasks]
. celery_app.task.add
. celery_app.task.mul
. celery_app.task.reduce
[2019-10-16 07:35:59,244: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2019-10-16 07:35:59,255: INFO/MainProcess] mingle: searching for neighbors
[2019-10-16 07:36:00,277: INFO/MainProcess] mingle: all alone
[2019-10-16 07:36:00,289: INFO/MainProcess] celery@ubuntu-xenial ready.
在后台运行, celery multi 命令
celery multi start worker1 -A celery_app -l info
# 停止
celery multi stop worker1 -A celery_app -l info
# 异步
celery multi stopwait worker1 -A celery_app -l info
呼叫任务, delay() 命令。在 app.py 文件中已经体现
res = task.add.delay(2, 4)
其他方法与属性
# 获取任务Id
print(res.id)
# 获取结果
print(res.get(timeout=1))
# 任务是否失败,失败则返回True
print(res.failed())
# 任务是否成功,成功则返回True
print(res.successful())
# 任务状态, 典型阶段 PENDING -> STARTED -> SUCCESS
print(res.state)
执行 app.py
$ python3 app.py
96facc03-5607-4694-9fe7-08a75a7d0988
6
False
True
SUCCESS
定时任务的实现
celery_app/conf.py 添加如下代码
form datetime import timedelta
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
'task1': {
'task': 'celery_app.task.add',
'schedule': timedelta(seconds=10), # 每10秒
'args': (2, 9)
},
'task2': {
'task': 'celery_app.task.mul',
'schedule': crontab(hour=12, minute=30), # 在12点30分定时执行
'args': (2, 9)
}
}
开启 celery beat 进程
celery beat -A celery_app -l INFO
开启 celery worker 进程
celery worker -A celery_app -l INFO
一条命令执行
celery -B -A worker celery -l INFO
注意 执行时间与时区的bug, 在4.1.0存在,4.0.2是没有这个问题的
pip uninstall celery
# 重新安装
pip install celery==4.0.2
Django 中配置使用 celery
安装依赖
pip install django-celery
安装 Django
# 官方文档 https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
pip install django==1.8
使用django创建项目, 例如创建blog_app项目
django-admin.py startproject blog_app
启动项目
cd blog_app
python manage.py runserver
python3 manage.py celery worker -l info

