mower-ng/mower/utils/scheduler.py
2024-11-24 10:56:53 +08:00

221 lines
7.6 KiB
Python

import heapq
from datetime import datetime, time, timedelta
from importlib import import_module
from pydantic import BaseModel
from mower.utils.csleep import MowerExit, csleep
from mower.utils.graph import SceneGraphSolver
from mower.utils.log import logger
def import_solver(full_name: str):
index = full_name.rfind(".")
module_name = full_name[:index]
class_name = full_name[index + 1 :]
module = import_module(module_name)
return getattr(module, class_name)
class BasicTask(BaseModel):
"内部使用"
solver: str
"调用solver().run()"
priority: int = 10
"数字越小,优先级越高"
_schedule_time: datetime | None = None
"调度器指定的时间"
@property
def _solver_name(self):
solver_class = import_solver(self.solver)
if solver_class.solver_name is None:
return solver_class.__qualname__
return solver_class.solver_name
def _get_next_execution(self, now: datetime):
return now
def _next_execution(self, now: datetime):
if self._schedule_time:
return self._schedule_time
return self._get_next_execution(now)
def __lt__(self, other: "BasicTask"):
now = datetime.now()
self_time = self._next_execution(now)
other_time = other._next_execution(now)
time_difference = abs((self_time - other_time).total_seconds())
if (self_time <= now and other_time <= now) or time_difference < 1:
return (
self.priority < other.priority
) # 两边都小于当前时间或时间相同时按优先级比较
return self_time < other_time # 按时间比较
class OneTimeTask(BasicTask):
"精确的任务,如基建任务"
time: datetime
def __str__(self):
return f"定时任务[{self._solver_name}] 运行时间{self.time}"
def _get_next_execution(self, now: datetime):
return self.time
class PeriodicTask(BasicTask):
"多次执行、不低于一定间隔的任务,如清理智、公招"
interval: timedelta
priority: int = 20
_last_execution: datetime | None = None
def __str__(self):
return f"周期任务[{self._solver_name}] 运行周期{self.interval}"
def _get_next_execution(self, now: datetime):
if self._last_execution is None:
return now
else:
return self._last_execution + self.interval
class DailyTask(BasicTask):
"每日任务,如签到"
time_offset: timedelta
"每日最早执行的时间"
priority: int = 30
_last_execution: datetime | None = None
def __str__(self):
return f"每日任务[{self._solver_name}] 每日{self.time_offset}后运行"
def _get_next_execution(self, now: datetime):
if self._last_execution is None:
return now
else:
result = (
datetime.combine(self._last_execution.date(), time()) + self.time_offset
)
if self._last_execution > result:
result += timedelta(days=1)
return result
class LongTask(BasicTask):
"大型任务,如肉鸽、保全"
priority: int = 40
def __str__(self):
return f"大型任务[{self._solver_name}]"
Task = OneTimeTask | PeriodicTask | DailyTask | LongTask
class Scheduler(SceneGraphSolver):
solver_name = "任务调度"
def __init__(self):
self.priority_queue: list[Task] = [] # 小于当前时间的待执行队列,按优先级排序
self.time_queue: list[Task] = [] # 大于当前时间的等待队列,按时间排序
def register(self, task: Task):
logger.info(f"注册任务:{task}")
now = datetime.now()
if task._next_execution(now) <= now:
heapq.heappush(self.priority_queue, task)
else:
heapq.heappush(self.time_queue, task)
def update_time_queue(self):
"将时间队列中的任务移动到优先队列中"
now = datetime.now()
while self.time_queue:
if self.time_queue[0]._next_execution(now) <= now:
heapq.heappush(self.priority_queue, heapq.heappop(self.time_queue))
else:
break
def register_one_time(self, solver: str, time: datetime, priority: int = 10):
self.register(OneTimeTask(solver=solver, time=time, priority=priority))
def register_periodic(self, solver: str, interval: timedelta, priority: int = 20):
self.register(PeriodicTask(solver=solver, interval=interval, priority=priority))
def register_daily(self, solver: str, offset: timedelta, priority: int = 30):
self.register(DailyTask(solver=solver, time_offset=offset, priority=priority))
def register_long(self, solver: str, priority: int = 40):
self.register(LongTask(solver=solver, priority=priority))
def list_tasks(self):
now = datetime.now()
for i, task in enumerate(
sorted(self.priority_queue)
): # 堆里元素是无序的,只保证堆顶元素最小,排序后会失去时间复杂度优势,但反正数据量很小,排序后看着舒服点
logger.info(
f"待执行队列:({i + 1}/{len(self.priority_queue)}) {task._next_execution(now)} {task.priority} {task._solver_name}"
)
for i, task in enumerate(sorted(self.time_queue)):
logger.info(
f"等待队列:({i + 1}/{len(self.time_queue)}) {task._next_execution(now)} {task.priority} {task._solver_name}"
)
logger.info(f"{len(self.priority_queue+self.time_queue)}项任务")
def schedule(self):
while True:
try:
if len(self.priority_queue + self.time_queue) == 0:
logger.info("任务列表为空,停止运行调度器")
return
self.update_time_queue()
now = datetime.now()
self.list_tasks()
if self.priority_queue:
next_task = heapq.heappop(self.priority_queue)
solver = import_solver(next_task.solver)()
stop_time = None
for task in self.time_queue:
if task.priority < next_task.priority:
stop_time = task._next_execution(now)
break
solver.scheduler_stop_time = stop_time
logger.info(f"开始运行{next_task}")
try:
complete = solver.run()
except MowerExit:
raise
except Exception as e:
logger.exception(f"{next_task._solver_name}任务运行出错:{e}")
complete = True
if complete:
if isinstance(next_task, (PeriodicTask, DailyTask)):
next_task._last_execution = datetime.now()
next_task._schedule_time = None
heapq.heappush(self.time_queue, next_task)
else:
next_task._schedule_time = stop_time
heapq.heappush(self.time_queue, next_task)
else:
# 如果没有需要立即执行的任务,休息
sleep_time = self.time_queue[0]._next_execution(now) - now
if sleep_time > timedelta(minutes=3):
self.idle_solver()
csleep(sleep_time.total_seconds())
except MowerExit:
logger.info("停止运行调度器")
return
scheduler = Scheduler()