一. 如何調用
def f1(arg1, arg2):
print('f1', arg1, arg2)
def f2(arg1):
print('f2', arg1)
def f3():
print('f3')
def f4():
print('周期任務', int(time.time()))
timer = TaskTimer()
# 把任務加入任務隊列
timer.join_task(f1, [1, 2], timing=15.5) # 每天15:30執行
timer.join_task(f2, [3], timing=14) # 每天14:00執行
timer.join_task(f3, [], timing=15) # 每天15:00執行
timer.join_task(f4, [], interval=10) # 每10秒執行1次
# 開始執行(此時才會創建線程)
timer.start()
f1~f4是我們需要定時執行的函數。
首先創建TaskTimer對象(TaskTimer的代碼在下面)。調用join_task函數,把需要執行的函數加入到任務隊列。最后調用start,任務就開始執行了。
join_task參數:
fun:需要執行的函數
arg:fun的參數,如果沒有就傳一個空列表
interval:如果有此參數,說明任務是周期任務,單位為秒(注意interval最少5秒)
timing:如果有此參數,說明任務是定時任務,單位為時
注意:interval和timing只能選填1個
二. 源碼
import datetime
import time
from threading import Thread
from time import sleep
class TaskTimer:
__instance = None
def __new__(cls, *args, **kwargs):
"""
單例模式
"""
if not cls.__instance:
cls.__instance = object.__new__(cls)
return cls.__instance
def __init__(self):
if not hasattr(self, 'task_queue'):
setattr(self, 'task_queue', [])
if not hasattr(self, 'is_running'):
setattr(self, 'is_running', False)
def write_log(self, level, msg):
cur_time = datetime.datetime.now()
with open('./task.log', mode='a+', encoding='utf8') as file:
s = "[" + str(cur_time) + "][" + level + "] " + msg
print(s)
file.write(s + "\n")
def work(self):
"""
處理任務隊列
"""
while True:
for task in self.task_queue:
if task['interval']:
self.cycle_task(task)
elif task['timing']:
self.timing_task(task)
sleep(5)
def cycle_task(self, task):
"""
周期任務
"""
if task['next_sec'] <= int(time.time()):
try:
task['fun'](*task['arg'])
self.write_log("正常", "周期任務:" + task['fun'].__name__ + " 已執行")
except Exception as e:
self.write_log("異常", "周期任務:" + task['fun'].__name__ + " 函數內部異常:" + str(e))
finally:
task['next_sec'] = int(time.time()) + task['interval']
def timing_task(self, task):
"""
定時任務
"""
# 今天已過秒數
today_sec = self.get_today_until_now()
# 到了第二天,就重置任務狀態
if task['today'] != self.get_today():
task['today'] = self.get_today()
task['today_done'] = False
# 第一次執行
if task['first_work']:
if today_sec >= task['task_sec']:
task['today_done'] = True
task['first_work'] = False
else:
task['first_work'] = False
# 今天還沒有執行
if not task['today_done']:
if today_sec >= task['task_sec']: # 到點了,開始執行任務
try:
task['fun'](*task['arg'])
self.write_log("正常", "定時任務:" + task['fun'].__name__ + " 已執行")
except Exception as e:
self.write_log("異常", "定時任務:" + task['fun'].__name__ + " 函數內部異常:" + str(e))
finally:
task['today_done'] = True
if task['first_work']:
task['first_work'] = False
def get_today_until_now(self):
"""
獲取今天凌晨到現在的秒數
"""
i = datetime.datetime.now()
return i.hour * 3600 + i.minute * 60 + i.second
def get_today(self):
"""
獲取今天的日期
"""
i = datetime.datetime.now()
return i.day
def join_task(self, fun, arg, interval=None, timing=None):
"""
interval和timing只能存在1個
:param fun: 你要調用的任務
:param arg: fun的參數
:param interval: 周期任務,單位秒
:param timing: 定時任務,取值:[0,24)
"""
# 參數校驗
if (interval != None and timing != None) or (interval == None and timing == None):
raise Exception('interval和timing只能選填1個')
if timing and not 0 <= timing < 24:
raise Exception('timing的取值范圍為[0,24)')
if interval and interval < 5:
raise Exception('interval最少為5')
# 封裝一個task
task = {
'fun': fun,
'arg': arg,
'interval': interval,
'timing': timing,
}
# 封裝周期或定時任務相應的參數
if timing:
task['task_sec'] = timing * 3600
task['today_done'] = False
task['first_work'] = True
task['today'] = self.get_today()
elif interval:
task['next_sec'] = int(time.time()) + interval
# 把task加入任務隊列
self.task_queue.append(task)
self.write_log("正常", "新增任務:" + fun.__name__)
def start(self):
"""
開始執行任務
返回線程標識符
"""
if not self.is_running:
thread = Thread(target=self.work)
thread.start()
self.is_running = True
self.write_log("正常", "TaskTimer已開始運行!")
return thread.ident
self.write_log("警告", "TaskTimer已運行,請勿重復啟動!")
以上這篇python異步實現定時任務和周期任務的方法就是小編分享給大家的全部內容了,希望能給大家一個參考,也希望大家多多支持億速云。
免責聲明:本站發布的內容(圖片、視頻和文字)以原創、轉載和分享為主,文章觀點不代表本網站立場,如果涉及侵權請聯系站長郵箱:is@yisu.com進行舉報,并提供相關證據,一經查實,將立刻刪除涉嫌侵權內容。