分层时间轮算法 - python实现

黄鹏宇 2,901 2022-08-03

简单来说

  1. 把任务放在不同的格子上,然后每经过tick时间,都会去找最小时间轮上当前格子的任务列表,如果有任务,则执行。
  2. 小轮转一圈,会带动上一层轮子转一格,大轮转动时,如果它的格子上有任务,则需根据间隔时间,将其下放至小轮子对应的一周上。

举例

假设我们的任务需要在每天的 7:30:20 秒执行一次。任务首先添加于小时轮的第 7 号刻度上,当其轮询线程访问到第 20 号刻度时,就将此任务转移到分钟级别时钟轮的第 30 号刻度上。当分钟级别的时钟轮线程访问到第 30 号刻度,就将此任务转移到小时级别时钟轮的第 7 号刻度上。当小时级别时钟轮线程访问到第 7 号刻度时,最终会将任务交给异步线程负责执行,然后将任务再次注册到秒级别的时间轮中。

适用场景

定时器,任务调度等。

时间复杂度

查询,新增都为O(1)
image-1659558957678

import datetime
from time import sleep 
import threading
import signal
from notifyUtil import sendNotify,getNotifyContent

isExit = False

# 定义三个数组
hourTaskArray = [[] for i in range(24)]
minuteTaskArray = [[] for i in range(60)]
secondTaskArray = [[] for i in range(60)]

def getNowTime():
	# 获取当前的时间
	nowTime = datetime.datetime.now()
	# 给当前时间赋值curXXX
	return [nowTime.hour,nowTime.minute,nowTime.second]  


def taskDetect():
	(curHour,curMinute,curSecond) = getNowTime()
	# 每一秒,去检测
	global isExit
	while not isExit:
		curSecond = (curSecond+1) % len(secondTaskArray)
		if(curMinute == 0):
			curHour = (curHour+1) % len(hourTaskArray)
			if(len(hourTaskArray[curHour]) != 0):
				# 把该小时的数据,下放到分钟来
				for minutTask in hourTaskArray[curHour]:
					minuteTaskArray[minutTask.minute].append(minutTask)
				hourTaskArray[curHour] = []

		if(curSecond == 0):
			curMinute = (curMinute+1) % len(minuteTaskArray)
			# 将当前分钟任务列表,放到秒任务列表来
			if(len(minuteTaskArray[curMinute]) != 0):
				for secondTask in minuteTaskArray[curMinute]:
					secondTaskArray[secondTask.second].append(secondTask)
				minuteTaskArray[curMinute] = []
		# print("当前时间为: {0}:{1}:{2}".format(curHour,curMinute,curSecond))
		if(len(secondTaskArray[curSecond]) != 0):
			print("len(secondTaskArray[curSecond]) != 0")
			# 顺序执行
			for task in secondTaskArray[curSecond]:
				print("当前需要执行的任务为:{0}".format(str(task)))	
				task.func(task.taskName)
				# 删掉当前
				secondTaskArray[curSecond] = []
		sleep(1)
	print("taskDetect Exit")

def handleExit(signum,params):
	global isExit
	isExit = True
	print ("程序即将退出\nreceive a signal %d"%(signum))

class Task:
	second = 0
	minute = 0
	hour = 0
	taskID = 0
	taskName = "空"
	func = None

	def __init__(self, second, minute,hour):
		self.second = int(second)
		self.minute = int(minute)
		self.hour = int(hour)
		
	def __str__(self):
		return "任务名为:{3},hour:{0},minute:{1},second:{2},执行方法为:{4}".format(self.hour,self.minute,self.second,self.taskName,self.func)

	# todo: 校验时间
	@classmethod
	def validateTimeStr(cls,timeStr):
		if(not len(timeStr)==6 or not timeStr.isdigit() or int(timeStr[0:2]) > 23 or int(timeStr[2:4]) > 59 or int(timeStr[4:6]) > 59):
			raise Exception("时间格式应为hhmmss") 

	# 通过timeStr来初始化实例
	@classmethod
	def createFromTimeStr(cls,timeStr):
		try:
			# 校验输入是否合法
			Task.validateTimeStr(timeStr)
			hour =  timeStr[0:2]
			minute = timeStr[2:4]
			second = timeStr[4:6]
			obj = cls(second,minute,hour)
			return obj
		except Exception as e:
			print("=============【 错误:",e,"】 =============")
			return None

		# self.Task(taskTime[0:2],taskTime[0:2],taskTime[4:6])

def addTaskToTimeWheel(task):
	global hourTaskArray,secondTaskArray,minuteTaskArray
	# 判断小时和分钟,是否为当前
	(curHour,curMinute,curSecond) = getNowTime()
	if(task.hour == curHour and task.minute == curMinute):
		secondTaskArray[task.second].append(task)
		return
	if(task.hour == curHour and task.minute != curMinute):
		minuteTaskArray[task.minute].append(task)
		return
	if(task.hour != curHour):
		hourTaskArray[task.hour].append(task)
		return	


def taskAdd(taskTime,taskName):
	newTask = Task.createFromTimeStr(taskTime)
	newTask.taskName = taskName
	newTask.func = sendNotify
	# 添加任务到时间轮
	addTaskToTimeWheel(newTask)
	return True

def getAllTask():
	totalTaskList = hourTaskArray + minuteTaskArray + secondTaskArray
	return [[str(task) for task in taskList ] for taskList in totalTaskList if len(taskList) != 0 ]

def init():
	print("timingWheel init success")
	# 启动任务检测线程
	taskDetectThread = threading.Thread(name='taskDetect',target= taskDetect)
	# taskAddThread = threading.Thread(name='taskAdd',target= taskAdd)

	taskDetectThread.setDaemon(True)
	# taskAddThread.setDaemon(True)

	taskDetectThread.start()   	
	# taskAddThread.start()   	
	
	# 响应退出事件
	# signal.signal(signal.SIGINT, handleExit)
	# signal.signal(signal.SIGTERM, handleExit)
	# while True:
		# if(not isExit):
			# pass		
		# else:
			# break