MuMumanager初始化时关掉日志打印
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful

This commit is contained in:
Elaina 2024-12-11 15:50:47 +08:00
parent a324ad6b65
commit 1f002bd5dc
6 changed files with 38 additions and 21 deletions

View file

@ -169,9 +169,7 @@ def simulate():
if len(config.tasks) > 0:
(config.tasks.sort(key=lambda x: x.time, reverse=False))
logger.info("||".join([str(t) for t in config.tasks]))
remaining_time = (
config.tasks[0].time - datetime.now()
).total_seconds()
remaining_time = (config.tasks[0].time - datetime.now()).total_seconds()
if remaining_time > 540:
# 刷新时间以鹰历为准
@ -257,15 +255,14 @@ def simulate():
f"仓库扫描未到时间,将在 {config.conf.maa_gap - dt // 3600}小时之内开始扫描"
)
if config.conf.maa_enable == 1:
subject = f"下次任务在{config.tasks[0].time.strftime('%H:%M:%S')}"
subject = (
f"下次任务在{config.tasks[0].time.strftime('%H:%M:%S')}"
)
context = f"下一次任务:{config.tasks[0].plan}"
logger.info(context)
logger.info(subject)
body = task_template.render(
tasks=[
obj.format(timezone_offset)
for obj in config.tasks
],
tasks=[obj.format(timezone_offset) for obj in config.tasks],
base_scheduler=base_scheduler,
)
send_message(body, subject)
@ -285,8 +282,7 @@ def simulate():
base_scheduler.idle_solver()
body = task_template.render(
tasks=[
obj.format(timezone_offset)
for obj in config.tasks
obj.format(timezone_offset) for obj in config.tasks
],
base_scheduler=base_scheduler,
)
@ -338,9 +334,7 @@ def simulate():
if remaining_time > 300:
base_scheduler.idle_solver()
body = task_template.render(
tasks=[
obj.format(timezone_offset) for obj in config.tasks
],
tasks=[obj.format(timezone_offset) for obj in config.tasks],
base_scheduler=base_scheduler,
)
send_message(body, subject)

View file

@ -2,7 +2,7 @@ from __future__ import annotations
from datetime import datetime, timedelta
from importlib import import_module
from typing import Any, Callable
from typing import Any, Callable, Literal
from mower.utils import config
from mower.utils import typealias as tp
@ -33,7 +33,19 @@ class Device:
solver_name = "设备"
services = {}
def get_service(self, method: str):
def get_service(
self,
method: Literal[
"adb",
"droidcast",
"droidcast_raw",
"scrcpy",
"mumuipc",
"diy",
"mumumanager",
"adb_multiuser",
],
) -> Any:
if method not in self.services:
self.services[method] = import_method(method_map[method])()
return self.services[method]

View file

@ -41,9 +41,9 @@ def restart_emulator(stop: bool = True, start: bool = True) -> bool:
csleep(10)
return False
if emulator_type == Emulator_Type.MuMu12.value:
from mower.utils.device.method.mumumanager import MuMuManager
mumumanager = config.device.get_service("mumumanager")
return MuMuManager().restart_emulator(stop, start)
return mumumanager.restart_emulator(stop, start)
elif emulator_type == Emulator_Type.Nox.value:
cmd = "Nox.exe"

View file

@ -271,12 +271,12 @@ class ADB:
config.conf.emulator.name == "MuMu12"
and config.conf.screencap_strategy.startswith("droidcast")
):
from mower.utils.device.method.mumumanager import MuMuManager
mumumanager = config.device.get_service("mumumanager")
if MuMuManager().app_kept_alive():
if mumumanager.app_kept_alive():
logger.error("MuMu12的droidcast截图策略下,需要关闭应用后台保活")
try:
MuMuManager().set_app_kept_alive_false()
mumumanager.set_app_kept_alive_false()
logger.info("关闭应用后台保活成功")
restart_emulator()
self._adb_device = None

View file

@ -134,9 +134,13 @@ class MuMu12IPC:
]
self.external_renderer.nemu_input_event_key_up.restype = ctypes.c_int
@property
def mumumanager(self) -> MuMuManager:
return config.device.get_service("mumumanager")
def connect(self):
"连接到 emulator"
if MuMuManager().emulator_status() != "running":
if self.mumumanager.emulator_status() != "running":
raise EmulatorError("模拟器未启动,请启动模拟器")
self.connection = self.external_renderer.nemu_connect(
ctypes.c_wchar_p(os.path.dirname(self.emulator_folder)),

View file

@ -36,6 +36,13 @@ class MuMuManager:
manager_path = config.conf.emulator.emulator_folder + "\\MuMuManager.exe"
index = config.conf.emulator.index
def __init__(self):
self.disable_log()
def disable_log(self):
cmd = [self.manager_path, "log", "off"]
subprocess_run(cmd)
def load_json(self, data: str) -> dict:
try:
return json.loads(data)