简单来说
- 把任务放在不同的格子上,然后每经过tick时间,都会去找最小时间轮上当前格子的任务列表,如果有任务,则执行。
- 小轮转一圈,会带动上一层轮子转一格,大轮转动时,如果它的格子上有任务,则需根据间隔时间,将其下放至小轮子对应的一周上。
举例
假设我们的任务需要在每天的 7:30:20 秒执行一次。任务首先添加于小时轮的第 7 号刻度上,当其轮询线程访问到第 20 号刻度时,就将此任务转移到分钟级别时钟轮的第 30 号刻度上。当分钟级别的时钟轮线程访问到第 30 号刻度,就将此任务转移到小时级别时钟轮的第 7 号刻度上。当小时级别时钟轮线程访问到第 7 号刻度时,最终会将任务交给异步线程负责执行,然后将任务再次注册到秒级别的时间轮中。
适用场景
定时器,任务调度等。
时间复杂度
查询,新增都为O(1)
import datetime
from time import sleep
import threading
import signal
from notifyUtil import sendNotify,getNotifyContent
isExit = False
# 定义三个数组
hourTaskArray = [[] for i in range(24)]
minuteTaskArray = [[] for i in range(60)]
secondTaskArray = [[] for i in range(60)]
def getNowTime():
# 获取当前的时间
nowTime = datetime.datetime.now()
# 给当前时间赋值curXXX
return [nowTime.hour,nowTime.minute,nowTime.second]
def taskDetect():
(curHour,curMinute,curSecond) = getNowTime()
# 每一秒,去检测
global isExit
while not isExit:
curSecond = (curSecond+1) % len(secondTaskArray)
if(curMinute == 0):
curHour = (curHour+1) % len(hourTaskArray)
if(len(hourTaskArray[curHour]) != 0):
# 把该小时的数据,下放到分钟来
for minutTask in hourTaskArray[curHour]:
minuteTaskArray[minutTask.minute].append(minutTask)
hourTaskArray[curHour] = []
if(curSecond == 0):
curMinute = (curMinute+1) % len(minuteTaskArray)
# 将当前分钟任务列表,放到秒任务列表来
if(len(minuteTaskArray[curMinute]) != 0):
for secondTask in minuteTaskArray[curMinute]:
secondTaskArray[secondTask.second].append(secondTask)
minuteTaskArray[curMinute] = []
# print("当前时间为: {0}:{1}:{2}".format(curHour,curMinute,curSecond))
if(len(secondTaskArray[curSecond]) != 0):
print("len(secondTaskArray[curSecond]) != 0")
# 顺序执行
for task in secondTaskArray[curSecond]:
print("当前需要执行的任务为:{0}".format(str(task)))
task.func(task.taskName)
# 删掉当前
secondTaskArray[curSecond] = []
sleep(1)
print("taskDetect Exit")
def handleExit(signum,params):
global isExit
isExit = True
print ("程序即将退出\nreceive a signal %d"%(signum))
class Task:
second = 0
minute = 0
hour = 0
taskID = 0
taskName = "空"
func = None
def __init__(self, second, minute,hour):
self.second = int(second)
self.minute = int(minute)
self.hour = int(hour)
def __str__(self):
return "任务名为:{3},hour:{0},minute:{1},second:{2},执行方法为:{4}".format(self.hour,self.minute,self.second,self.taskName,self.func)
# todo: 校验时间
@classmethod
def validateTimeStr(cls,timeStr):
if(not len(timeStr)==6 or not timeStr.isdigit() or int(timeStr[0:2]) > 23 or int(timeStr[2:4]) > 59 or int(timeStr[4:6]) > 59):
raise Exception("时间格式应为hhmmss")
# 通过timeStr来初始化实例
@classmethod
def createFromTimeStr(cls,timeStr):
try:
# 校验输入是否合法
Task.validateTimeStr(timeStr)
hour = timeStr[0:2]
minute = timeStr[2:4]
second = timeStr[4:6]
obj = cls(second,minute,hour)
return obj
except Exception as e:
print("=============【 错误:",e,"】 =============")
return None
# self.Task(taskTime[0:2],taskTime[0:2],taskTime[4:6])
def addTaskToTimeWheel(task):
global hourTaskArray,secondTaskArray,minuteTaskArray
# 判断小时和分钟,是否为当前
(curHour,curMinute,curSecond) = getNowTime()
if(task.hour == curHour and task.minute == curMinute):
secondTaskArray[task.second].append(task)
return
if(task.hour == curHour and task.minute != curMinute):
minuteTaskArray[task.minute].append(task)
return
if(task.hour != curHour):
hourTaskArray[task.hour].append(task)
return
def taskAdd(taskTime,taskName):
newTask = Task.createFromTimeStr(taskTime)
newTask.taskName = taskName
newTask.func = sendNotify
# 添加任务到时间轮
addTaskToTimeWheel(newTask)
return True
def getAllTask():
totalTaskList = hourTaskArray + minuteTaskArray + secondTaskArray
return [[str(task) for task in taskList ] for taskList in totalTaskList if len(taskList) != 0 ]
def init():
print("timingWheel init success")
# 启动任务检测线程
taskDetectThread = threading.Thread(name='taskDetect',target= taskDetect)
# taskAddThread = threading.Thread(name='taskAdd',target= taskAdd)
taskDetectThread.setDaemon(True)
# taskAddThread.setDaemon(True)
taskDetectThread.start()
# taskAddThread.start()
# 响应退出事件
# signal.signal(signal.SIGINT, handleExit)
# signal.signal(signal.SIGTERM, handleExit)
# while True:
# if(not isExit):
# pass
# else:
# break