使用堆并分为两个队列优化调度器写法

This commit is contained in:
Elaina 2024-11-03 20:02:34 +08:00
parent 3269bf0adf
commit 3f126a35ff

View file

@ -1,3 +1,4 @@
import heapq
from datetime import datetime, time, timedelta
from importlib import import_module
@ -44,6 +45,18 @@ class BasicTask(BaseModel):
return self._schedule_time
return self._get_next_execution(now)
def __lt__(self, other: "BasicTask"):
now = datetime.now()
self_time = self._next_execution(now)
other_time = other._next_execution(now)
time_difference = abs((self_time - other_time).total_seconds())
if (self_time <= now and other_time <= now) or time_difference < 1:
return (
self.priority < other.priority
) # 两边都小于当前时间或时间相同时按优先级比较
return self_time < other_time # 按时间比较
class OneTimeTask(BasicTask):
"精确的任务,如基建任务"
@ -112,11 +125,25 @@ class Scheduler:
solver_name = "任务调度"
def __init__(self):
self.task_list: list[Task] = []
self.priority_queue: list[Task] = [] # 小于当前时间的待执行队列,按优先级排序
self.time_queue: list[Task] = [] # 大于当前时间的等待队列,按时间排序
def register(self, task: Task):
logger.info(f"注册任务:{task}")
self.task_list.append(task)
now = datetime.now()
if task._next_execution(now) <= now:
heapq.heappush(self.priority_queue, task)
else:
heapq.heappush(self.time_queue, task)
def update_time_queue(self):
"将时间队列中的任务移动到优先队列中"
now = datetime.now()
while self.time_queue:
if self.time_queue[0]._next_execution(now) <= now:
heapq.heappush(self.priority_queue, heapq.heappop(self.time_queue))
else:
break
def register_one_time(self, solver: str, time: datetime, priority: int = 10):
self.register(OneTimeTask(solver=solver, time=time, priority=priority))
@ -131,42 +158,37 @@ class Scheduler:
self.register(LongTask(solver=solver, priority=priority))
def list_tasks(self):
for i, task in enumerate(self.task_list):
logger.info(f"({i + 1}/{len(self.task_list)}) {task}")
logger.info(f"{self.task_list}项任务")
now = datetime.now()
for i, task in enumerate(
sorted(self.priority_queue)
): # 堆里元素是无序的,只保证堆顶元素最小,排序后会失去时间复杂度优势,但反正数据量很小,排序后看着舒服点
logger.info(
f"待执行队列:({i + 1}/{len(self.priority_queue)}) {task._next_execution(now)} {task.priority} {task._solver_name}"
)
for i, task in enumerate(sorted(self.time_queue)):
logger.info(
f"等待队列:({i + 1}/{len(self.time_queue)}) {task._next_execution(now)} {task.priority} {task._solver_name}"
)
logger.info(f"{len(self.priority_queue+self.time_queue)}项任务")
def schedule(self):
while True:
try:
if len(self.task_list) == 0:
if len(self.priority_queue + self.time_queue) == 0:
logger.info("任务列表为空,停止运行调度器")
return
self.update_time_queue()
now = datetime.now()
execution_list: list[tuple[datetime, Task]] = []
for task in self.task_list:
execution_list.append((task._next_execution(now), task))
execution_list.sort(key=lambda t: t[0])
for _time, task in execution_list:
logger.info(f"{_time} {task.priority} {task._solver_name}")
if (earliest_time := execution_list[0][0]) <= now:
# 如果有需要立即执行的任务,选出优先级最高的运行,截止时间为优先级更高的任务
next_task = min(
[task for _time, task in execution_list if _time <= now],
key=lambda task: task.priority,
)
self.list_tasks()
if self.priority_queue:
next_task = heapq.heappop(self.priority_queue)
solver = import_solver(next_task.solver)()
try:
stop_time = min(
[
_time
for _time, task in execution_list
if _time > now and task.priority < next_task.priority
]
)
except ValueError:
stop_time = None
for task in self.time_queue:
if task.priority < next_task.priority:
stop_time = task._next_execution(now)
break
solver.scheduler_stop_time = stop_time
logger.info(f"开始运行{next_task}")
@ -174,13 +196,13 @@ class Scheduler:
if isinstance(next_task, (PeriodicTask, DailyTask)):
next_task._last_execution = datetime.now()
next_task._schedule_time = None
else:
self.task_list.remove(next_task)
heapq.heappush(self.time_queue, next_task)
else:
next_task._schedule_time = stop_time
heapq.heappush(self.time_queue, next_task)
else:
# 如果没有需要立即执行的任务,休息
sleep_time = earliest_time - now
sleep_time = self.time_queue[0]._next_execution(now) - now
if sleep_time > timedelta(minutes=3):
if config.conf.close_simulator_when_idle:
restart_simulator(start=False)