您的当前位置:首页>全部文章>文章详情

python Celery的基本应用

发表于:2019-10-16 18:24:01浏览:60次TAG: #python

介绍

Celery是一个简单,灵活且可靠的分布式系统,可以处理大量消息,同时为操作提供维护该系统所需的工具。

这是一个任务队列,着重于实时处理,同时还支持任务调度。

官方文档 https://docs.celeryproject.org/en/latest/

安装

消息中间件

一般是用Redis数据库来通讯更加方便,需要本地安装redis服务并启动,具体操作请参考其他博客和官方文档。

pip install redis

安装 celery

pip install celery[redis]

实现

项目布局

|- demo
    |- celery_app                 # 包
        |- __init__.py
        |- conf.py                 # 配置文件
        |- task.py                # 任务文件
    |- app.py                     # 执行文件

celery_app/__init__.py

# -*- coding: utf-8 -*-
from celery import Celery

app = Celery('celery_app')
# 官方文档 https://docs.celeryproject.org/en/latest/userguide/application.html#configuration
app.config_from_object('celery_app.conf')

celery_app/conf.py

# -*- coding: utf-8 -*-
# 官方文档 https://docs.celeryproject.org/en/latest/userguide/configuration.html#example-configuration-file

BROKER_URL = 'redis://127.0.0.1:6379/1'

CELERY_RESULT_BACKEND = 'redis://127.0.0.1:6379/2'

# 时区设置
CELERY_TIMEZONE = 'Asia/Shanghai'

# 启动时要导入的模块列表
CELERY_IMPORTS = (
    'celery_app.task',
)

celery_app/task.py

# -*- coding: utf-8 -*-
# 导入包
from celery_app import app

@app.task
def add(x, y):
    return x + y

@app.task
def reduce(x, y):
    return x - y

@app.task
def mul(x, y):
    return x * y

app.py

# -*- coding: utf-8 -*-
from celery_app import task

task.add.delay(2, 4)

task.reduce.delay(7, 4)

task.mul.delay(7, 411)

task.add.delay(12, 40)

print('end...')

启动工作程序

# 可使用 celery worker --help 查看更详细的命令说明
# 官方文档 https://docs.celeryproject.org/en/latest/reference/celery.bin.worker.html
$ celery worker -A celery_app -l info

开始工作时,可以看到

---- **** ----- 
--- * ***  * -- Linux-4.4.0-165-generic-x86_64-with-Ubuntu-16.04-xenial 2019-10-16 07:35:59
-- * - **** --- 
- ** ---------- [config]
- ** ---------- .> app:         celery_app:0x7f793d447048
- ** ---------- .> transport:   redis://127.0.0.1:6379/1
- ** ---------- .> results:     redis://127.0.0.1:6379/2
- *** --- * --- .> concurrency: 2 (prefork)
-- ******* ---- .> task events: OFF (enable -E to monitor tasks in this worker)
--- ***** ----- 
 -------------- [queues]
                .> celery           exchange=celery(direct) key=celery

[tasks]
  . celery_app.task.add
  . celery_app.task.mul
  . celery_app.task.reduce
[2019-10-16 07:35:59,244: INFO/MainProcess] Connected to redis://127.0.0.1:6379/1
[2019-10-16 07:35:59,255: INFO/MainProcess] mingle: searching for neighbors
[2019-10-16 07:36:00,277: INFO/MainProcess] mingle: all alone
[2019-10-16 07:36:00,289: INFO/MainProcess] celery@ubuntu-xenial ready.

在后台运行, celery multi 命令

celery multi start worker1 -A celery_app -l info

# 停止
celery multi stop worker1 -A celery_app -l info

# 异步
celery multi stopwait worker1 -A celery_app -l info

呼叫任务, delay() 命令。在 app.py 文件中已经体现

res = task.add.delay(2, 4)

其他方法与属性

# 获取任务Id
print(res.id)
# 获取结果
print(res.get(timeout=1))
# 任务是否失败,失败则返回True
print(res.failed())
# 任务是否成功,成功则返回True
print(res.successful())
# 任务状态, 典型阶段 PENDING -> STARTED -> SUCCESS
print(res.state)

执行 app.py

$ python3 app.py
96facc03-5607-4694-9fe7-08a75a7d0988
6
False
True
SUCCESS

定时任务的实现

celery_app/conf.py 添加如下代码

form datetime import timedelta
from celery.schedules import crontab

CELERYBEAT_SCHEDULE = {
    'task1': {
        'task': 'celery_app.task.add',
        'schedule': timedelta(seconds=10),         # 每10秒
        'args': (2, 9)
    },
    'task2': {
        'task': 'celery_app.task.mul',
        'schedule': crontab(hour=12, minute=30),         # 在12点30分定时执行
        'args': (2, 9)
    }
}

开启 celery beat 进程

celery beat -A celery_app -l INFO

开启 celery worker 进程

celery worker -A celery_app -l INFO

一条命令执行

celery -B -A worker celery -l INFO

注意 执行时间与时区的bug, 在4.1.0存在,4.0.2是没有这个问题的

pip uninstall celery

# 重新安装
pip install celery==4.0.2

Django 中配置使用 celery

安装依赖

pip install django-celery

安装 Django

# 官方文档 https://docs.celeryproject.org/en/latest/django/first-steps-with-django.html#using-celery-with-django
pip install django==1.8

使用django创建项目, 例如创建blog_app项目

django-admin.py startproject blog_app

启动项目

cd blog_app
python manage.py runserver

python3 manage.py celery worker -l info

栏目分类全部>
腾讯云采购季云服务器一折促销