Python分布式异步任务框架Celery使用教程

Gilana ·
更新时间:2024-11-13
· 368 次阅读

目录

一、Celery架构介绍

二、Celery简单使用

三、Celery包结构

四、Celery延迟任务

五、Celery定时任务

六、Django中集成Celery

一、Celery架构介绍

Celery:芹菜?(跟翻译没有任何关系),分布式异步任务框架(跟其他web框架无关)

Celery is a project with minimal funding, so we don’t support Microsoft Windows. Please don’t open any issues related to that platform.(不支持windows)

celery服务为其他项目服务提供异步解决任务需求的。

架构:

分为三部分

broker:任务中间件,用户提交的任务,存在这个里面(redis,rabbitmq)

worker:任务执行者,消费者,真正执行任务的进程(真正干活的人)

backend:任务结果存储,任务执行后的结果(redis,rabbitmq)

celery能够做的事:

异步任务(区分同步任务)

延迟任务

定时任务(其他框架做)

怎么更好的理解celery?

会有两个服务同时运行,一个是项目服务(django服务),一个是celery服务,项目服务将需要异步处理的任务交给celery服务,celery就会在需要时异步完成项目的需求。打个比方,人是一个独立运行的服务(django) | 医院也是一个独立运行的服务(celery)。正常情况下,人可以完成所有健康情况的动作,不需要医院的参与;但当人生病时,就会被医院接收,解决人生病问题,人生病的处理方案交给医院来解决,所有人不生病时,医院独立运行,人生病时,医院就来解决人生病的需求。

注:python有自己的定时任务,感兴趣的了解下apscheduler

二、Celery简单使用

安装:pip install celery==5.1.2

使用:

1.配置celery

from celery import Celery # app=Celery('test',) # backend='redis://:密码@127.0.0.1:6379/1' 如果有密码,这么写 broker = 'redis://127.0.0.1:6379/1' # redis地址 backend = 'redis://127.0.0.1:6379/2' # redis地址 # 1 实例化得到celery对象 app = Celery(__name__, backend=backend, broker=broker) # 2 写一堆任务(计算a+b,挖井,砍树),函数 # 使用装饰器包裹任务(函数) @app.task() def add(a, b): import time time.sleep(2) return a + b

2.提交任务

# from celery_task import app import celery_task # 1 同步执行 # res = celery_task.add(2, 3) # 普通的同步任务,同步执行任务 # print(res)

2 异步任务:

第一步:提交(使用任务名.apply_async(参数))

结果是任务id号,唯一标识这个任务

# res = celery_task.add.apply_async(args=[2, 3]) res = celery_task.add.apply_async(kwargs={'a':2,'b':3}) print(res) # abab1ad3-0e58-4faa-bc05-14d157dc8217

第二步:让worker执行—>结果存到redis

通过命令启动,非windows:

5.x之前这么启动

命令:celery worker -A celery_task -l info

5.x以后

命令:celery -A celery_task worker -l info

windows:

pip3 install eventlet

5.x之前这么启动

命令:celery worker -A celery_task -l info -P eventlet

5.x以后

命令:celery -A celery_task worker -l info -P eventlet

3.查看任务执行结果

from celery_task import app from celery.result import AsyncResult id = 'abab1ad3-0e58-4faa-bc05-14d157dc8217' if __name__ == '__main__': a = AsyncResult(id=id, app=app) if a.successful(): print('任务执行成功了') result = a.get() # 异步任务执行的结果 print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行') 三、Celery包结构

目录结构:

    -celery_task               # 包名
        __init__.py
        celery.py             # app所在py文件
        course_task.py        # 任务
        order_task.py         # 任务
        user_task.py          # 任务
    提交任务.py                # 提交任务
    查看结果.py                # 查看结果

创建多个任务:

celery_task /celery.py

from celery import Celery broker = 'redis://127.0.0.1:6379/1' backend = 'redis://127.0.0.1:6379/2' # include 是一个列表,放被管理的task 的py文件 app = Celery(__name__, backend=backend, broker=broker,include=[ 'celery_task.course_task', 'celery_task.order_task', 'celery_task.user_task', ]) # 原来,任务写在这个py文件中 # 后期任务非常多,可能有用户相关任务,课程相关任务,订单相关任务。。。

celery_task /任务.py

user_task.py

import time from .celery import app # 发送短信任务 @app.task() def send_sms(phone, code): time.sleep(3) # 模拟发送短信延迟 print('短信发送成功,手机号是:%s,验证码是:%s' % (phone, code)) return '短信发送成功'

order_task.py

from .celery import app # 生成订单任务 @app.task() def make_order(): with open(r'D:\py18\luffy_api\script\2 celery的包结构\celery_task\order.txt', 'a', encoding='utf-8') as f: f.write('生成一条订单\n') return True

course_task.py

from .celery import app @app.task() def add(a,b): return a+b

提交多个任务:

from celery_task import user_task,order_task # 提交一个发送短信任务 # res = user_task.send_sms.apply_async(args=['18972374345', '8888']) # print(res) # 提交一个生成订单任务 # res=order_task.make_order.apply_async() # print(res)

查看结果:

from celery_task.celery import app from celery.result import AsyncResult id = '0f283e22-e8d0-40a6-a8ed-8998038bc7a3' if __name__ == '__main__': a = AsyncResult(id=id, app=app) print(app.conf) if a.successful(): print('任务执行成功了') result = a.get() # 异步任务执行的结果 print(result) elif a.failed(): print('任务失败') elif a.status == 'PENDING': print('任务等待中被执行') elif a.status == 'RETRY': print('任务异常后正在重试') elif a.status == 'STARTED': print('任务已经开始被执行') 四、Celery延迟任务 # 添加延迟任务方式一: # from datetime import datetime, timedelta # datetime.utcnow() 获取当前的utc时间 # eta=datetime.utcnow() + timedelta(seconds=50) # 50s后的utc时间 # 10s后,发送短信 res=user_task.send_sms.apply_async(args=('12345566677', '8888'), eta=eta) print(res) # 使用第二种方式执行异步任务(两者传参不同;不写时间,就表示立即执行): res=user_task.send_sms.delay('12345566677', '8888') print(res) 五、Celery定时任务

第一步:celery.py中写入

# 第一步,在包(celery_task)下的celey.py中写入 ###修改celery的配置信息 app.conf整个celery的配置信息 # 时区 app.conf.timezone = 'Asia/Shanghai' # 是否使用UTC app.conf.enable_utc = False ####配置定时任务 from datetime import timedelta from celery.schedules import crontab app.conf.beat_schedule = { 'send_sms_every_3_seconds': { 'task': 'celery_task.user_task.send_sms', # 指定执行的是哪个任务 'schedule': timedelta(seconds=3), # 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': ('18953675221', '8888'), }, 'make_order_every_5_seconds': { 'task': 'celery_task.order_task.make_order', # 指定执行的是哪个任务 'schedule': timedelta(seconds=5), }, 'add_every_1_seconds': { 'task': 'celery_task.course_task.add', # 指定执行的是哪个任务 'schedule': crontab(hour=8, day_of_week=1), # 每周一早八点 'args': (3, 5), }, }

第二步:启动worker

# celery worker -A 包名 -l info -P eventlet
celery worker -A celery_task -l info -P eventlet

如果beat没有启动,worker是没有活干的,需要启动beat,worker才能干活,和beat启动顺序无先后

第三步:启动beat

# celery beat -A celery_task -l 
celery -A celery_task beat -l info

六、Django中集成Celery

第一种方式使用django-celery(了解):

第三方把django和celery集成起来,方便我们使用,但是,第三方写的包的版本,跟celery和django版本完全对应。

我们自己使用包结构集成到django中:

第一步,把写好的包,直接复制到项目根路径

第二步,在视图类中(函数中)

from celery_task.user_task import send_sms def test(request): mobile = request.GET.get('mobile') code = '9999' res = send_sms.delay(mobile, code) # 同步发送假设3分支钟,异步发送,直接就返回id了,是否成功不知道,后期通过id查询 print(res) return HttpResponse(res)

到此这篇关于Python分布式异步任务框架Celery使用教程的文章就介绍到这了,更多相关Python Celery内容请搜索软件开发网以前的文章或继续浏览下面的相关文章希望大家以后多多支持软件开发网!



celery 异步 Python 教程

需要 登录 后方可回复, 如果你还没有账号请 注册新账号