python Celery定时任务的示例

Floria ·
更新时间:2024-09-21
· 893 次阅读

本文介绍了python Celery定时任务的示例,分享给大家,具体如下:

配置

启用Celery的定时任务需要设置CELERYBEAT_SCHEDULE 。


Celery的定时任务都由celery beat来进行调度。celery beat默认按照settings.py之中的时区时间来调度定时任务。

创建定时任务

一种创建定时任务的方式是配置CELERYBEAT_SCHEDULE:

#每30秒调用task.add from datetime import timedelta CELERYBEAT_SCHEDULE = { 'add-every-30-seconds': { 'task': 'tasks.add', 'schedule': timedelta(seconds=30), 'args': (16, 16) }, } #crontab任务 #每周一7:30调用task.add from celery.schedules import crontab CELERYBEAT_SCHEDULE = { # Executes every Monday morning at 7:30 A.M 'add-every-monday-morning': { 'task': 'tasks.add', 'schedule': crontab(hour=7, minute=30, day_of_week=1), 'args': (16, 16), }, }

使用数据库存储定时任务

使用数据库存储定时任务需要设置CELERYBEAT_SCHEDULE如下:

import datetime import json from djcelery import models as celery_models from django.utils import timezone #创建任务 def create_task(name, task, task_args, crontab_time): ''' name # 任务名字 task # 执行的任务 "myapp.tasks.add" task_args # 任务参数 {"x":1, "Y":1} crontab_time # 定时任务时间 格式: { 'month_of_year': 9 # 月份 'day_of_month': 5 # 日期 'hour': 01 # 小时 'minute':05 # 分钟 } ''' # task任务, created是否定时创建 task, created = celery_models.PeriodicTask.objects. get_or_create(name=name,task=task) # 获取 crontab crontab = celery_models.CrontabSchedule.objects. filter(**crontab_time).first() if crontab is None: # 如果没有就创建,有的话就继续复用之前的crontab crontab = celery_models.CrontabSchedule.objects. create(**crontab_time) task.crontab = crontab # 设置crontab task.enabled = True # 开启task task.kwargs = json.dumps(task_args) # 传入task参数 expiration = timezone.now() + datetime.timedelta(day=1) task.expires = expiration # 设置任务过期时间为现在时间的一天以后 task.save() return True #关闭任务 def disable_task(name): ''' 关闭任务 ''' try: task = celery_models.PeriodicTask.objects.get(name=name) task.enabled = False # 设置关闭 task.save() return True except celery_models.PeriodicTask.DoesNotExist: return True

启动beat

执行定时任务时, Celery会通过celery beat进程来完成。Celery beat会保持运行, 一旦到了某一定时任务需要执行时, Celery beat便将其加入到queue中. 不像worker进程, Celery beat只需要一个即可。而且为了避免有重复的任务被发送出去,所以Celery beat仅能有一个。

启动:

python manage.py celery beat --loglevel=info

其实还有一种简单的启动方式worker和beat一起启动:

python manage.py celery worker --loglevel=info --beat

定时删除

由于很多任务都是一次执行完就不需要,留在数据库里就是垃圾数据了有没有办法清除。方法肯定有因为django-celery本身就有定时任务功能我们加个任务就解决了。好我们看代码:在django app目录中打开taske.py加入如下代码

from djcelery import models as celery_models from django.utils import timezone @task() def delete(): ''' 删除任务 从models中过滤出过期时间小于现在的时间然后删除 ''' return celery_models.PeriodicTask.objects.filter( expires__lt=timezone.now()).delete()

创建任务脚本里设置了 expires 1天以后过期,这样在filter的时候就能当做条件把过期的任务找到并且删除。

您可能感兴趣的文章:Python Django2.0集成Celery4.1教程python使用celery实现异步任务执行的例子python celery分布式任务队列的使用详解Python并行分布式框架Celery详解Python环境下安装使用异步任务队列包Celery的基础教程在RedHat系Linux上部署Python的Celery框架的教程Python实现定时执行任务的三种方式简单示例python基于celery实现异步任务周期任务定时任务



示例 celery Python

需要 登录 后方可回复, 如果你还没有账号请 注册新账号