本文介紹了python Celery定時任務的示例,分享給大家,具體如下:
配置
啟用Celery的定時任務需要設置CELERYBEAT_SCHEDULE 。

Celery的定時任務都由celery beat來進行調度。celery beat默認按照settings.py之中的時區時間來調度定時任務。
創建定時任務
一種創建定時任務的方式是配置CELERYBEAT_SCHEDULE:
#每30秒調用task.add
from datetime import timedelta
CELERYBEAT_SCHEDULE = {
'add-every-30-seconds': {
'task': 'tasks.add',
'schedule': timedelta(seconds=30),
'args': (16, 16)
},
}
#crontab任務
#每周一7:30調用task.add
from celery.schedules import crontab
CELERYBEAT_SCHEDULE = {
# Executes every Monday morning at 7:30 A.M
'add-every-monday-morning': {
'task': 'tasks.add',
'schedule': crontab(hour=7, minute=30, day_of_week=1),
'args': (16, 16),
},
}
使用數據庫存儲定時任務
使用數據庫存儲定時任務需要設置CELERYBEAT_SCHEDULE如下:

import datetime
import json
from djcelery import models as celery_models
from django.utils import timezone
#創建任務
def create_task(name, task, task_args, crontab_time):
'''
name # 任務名字
task # 執行的任務 "myapp.tasks.add"
task_args # 任務參數 {"x":1, "Y":1}
crontab_time # 定時任務時間 格式:
{
'month_of_year': 9 # 月份
'day_of_month': 5 # 日期
'hour': 01 # 小時
'minute':05 # 分鐘
}
'''
# task任務, created是否定時創建
task, created = celery_models.PeriodicTask.objects.
get_or_create(name=name,task=task)
# 獲取 crontab
crontab = celery_models.CrontabSchedule.objects.
filter(**crontab_time).first()
if crontab is None:
# 如果沒有就創建,有的話就繼續復用之前的crontab
crontab = celery_models.CrontabSchedule.objects.
create(**crontab_time)
task.crontab = crontab # 設置crontab
task.enabled = True # 開啟task
task.kwargs = json.dumps(task_args) # 傳入task參數
expiration = timezone.now() + datetime.timedelta(day=1)
task.expires = expiration # 設置任務過期時間為現在時間的一天以后
task.save()
return True
#關閉任務
def disable_task(name):
'''
關閉任務
'''
try:
task = celery_models.PeriodicTask.objects.get(name=name)
task.enabled = False # 設置關閉
task.save()
return True
except celery_models.PeriodicTask.DoesNotExist:
return True
啟動beat
執行定時任務時, Celery會通過celery beat進程來完成。Celery beat會保持運行, 一旦到了某一定時任務需要執行時, Celery beat便將其加入到queue中. 不像worker進程, Celery beat只需要一個即可。而且為了避免有重復的任務被發送出去,所以Celery beat僅能有一個。
啟動:
python manage.py celery beat --loglevel=info
其實還有一種簡單的啟動方式worker和beat一起啟動:
python manage.py celery worker --loglevel=info --beat
定時刪除
由于很多任務都是一次執行完就不需要,留在數據庫里就是垃圾數據了有沒有辦法清除。方法肯定有因為django-celery本身就有定時任務功能我們加個任務就解決了。好我們看代碼:在django app目錄中打開taske.py加入如下代碼
from djcelery import models as celery_models
from django.utils import timezone
@task()
def delete():
'''
刪除任務
從models中過濾出過期時間小于現在的時間然后刪除
'''
return celery_models.PeriodicTask.objects.filter(
expires__lt=timezone.now()).delete()
創建任務腳本里設置了 expires 1天以后過期,這樣在filter的時候就能當做條件把過期的任務找到并且刪除。
以上就是本文的全部內容,希望對大家的學習有所幫助,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。