mower-ng/mower/utils/device/method/mumu_ipc.py

372 lines
13 KiB
Python

import ctypes
import os
import threading
import time
from functools import wraps
from typing import Any, Callable
import numpy as np
from mower.utils import config
from mower.utils import typealias as tp
from mower.utils.csleep import MowerExit
from mower.utils.device import swipe_update
from mower.utils.device.emulator import restart_emulator
from mower.utils.device.exception import EmulatorError, GameError
from mower.utils.device.method.mumumanager import MuMuManager
from mower.utils.log import logger
from mower.utils.vector import sm, va
class NemuIpcIncompatible(Exception):
pass
def retry_mumuipc(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
for _ in range(3):
try:
return func(self, *args, **kwargs)
except MowerExit:
raise
except GameError as e:
logger.debug_exception(e)
config.device.check_current_focus()
except EmulatorError as e:
logger.debug_exception(e)
restart_emulator()
except Exception as e:
logger.debug_exception(e)
return retry_wrapper
class MuMu12IPC:
def __init__(self):
self.emulator_folder = config.conf.emulator.emulator_folder
self.instanse_index = int(config.conf.emulator.index)
self.connection = 0
self.display_id = -1
self.pixels = (ctypes.c_ubyte * 8294400)()
# 加载动态链接库
dll_path = os.path.join(
self.emulator_folder, "sdk", "external_renderer_ipc.dll"
)
try:
self.external_renderer = ctypes.CDLL(dll_path)
except OSError as e:
logger.error(e)
# OSError: [WinError 126] 找不到指定的模块。
if not os.path.exists(dll_path):
raise NemuIpcIncompatible(
f"文件不存在: {dll_path}, 请检查模拟器文件夹是否正确"
f"NemuIpc requires MuMu12 version >= 3.8.13, please check your version"
)
else:
raise NemuIpcIncompatible(
f"ipc_dll={dll_path} exists, but cannot be loaded"
)
# 定义函数原型
self.external_renderer.nemu_connect.argtypes = [ctypes.c_wchar_p, ctypes.c_int]
self.external_renderer.nemu_connect.restype = ctypes.c_int
self.external_renderer.nemu_disconnect.argtypes = [ctypes.c_int]
self.external_renderer.nemu_disconnect.restype = None
self.external_renderer.nemu_get_display_id.argtypes = [
ctypes.c_int,
ctypes.c_char_p,
ctypes.c_int,
]
self.external_renderer.nemu_get_display_id.restype = ctypes.c_int
self.external_renderer.nemu_capture_display.argtypes = [
ctypes.c_int,
ctypes.c_uint,
ctypes.c_int,
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_int),
ctypes.POINTER(ctypes.c_ubyte),
]
self.external_renderer.nemu_capture_display.restype = ctypes.c_int
self.external_renderer.nemu_input_event_touch_down.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.external_renderer.nemu_input_event_touch_down.restype = ctypes.c_int
self.external_renderer.nemu_input_event_touch_up.argtypes = [
ctypes.c_int,
ctypes.c_int,
]
self.external_renderer.nemu_input_event_touch_up.restype = ctypes.c_int
self.external_renderer.nemu_input_event_finger_touch_down.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.external_renderer.nemu_input_event_finger_touch_down.restype = ctypes.c_int
self.external_renderer.nemu_input_event_finger_touch_up.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.external_renderer.nemu_input_event_finger_touch_up.restype = ctypes.c_int
self.external_renderer.nemu_input_event_key_down.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.external_renderer.nemu_input_event_key_down.restype = ctypes.c_int
self.external_renderer.nemu_input_event_key_up.argtypes = [
ctypes.c_int,
ctypes.c_int,
ctypes.c_int,
]
self.external_renderer.nemu_input_event_key_up.restype = ctypes.c_int
@property
def mumumanager(self) -> MuMuManager:
return config.device.get_service("mumumanager")
def connect(self):
"连接到 emulator"
if self.mumumanager.emulator_status() != "running":
raise EmulatorError("模拟器未启动,请启动模拟器")
self.connection = self.external_renderer.nemu_connect(
ctypes.c_wchar_p(os.path.dirname(self.emulator_folder)),
self.instanse_index,
)
if self.connection == 0:
raise EmulatorError("连接模拟器失败,请启动模拟器")
logger.info("连接模拟器成功")
def get_display_id(self) -> int:
config.device.check_current_focus()
pkg_name = config.conf.APPNAME.encode("utf-8") # 替换为实际包名
self.display_id = self.external_renderer.nemu_get_display_id(
self.connection, pkg_name, config.conf.mumu_multi_app.app_index
)
if self.display_id < 0:
logger.error(f"获取Display ID失败: {self.display_id}")
raise GameError("获取Display ID失败,请先打开游戏")
logger.info(f"获取Display ID成功: {self.display_id}")
@retry_mumuipc
def check_status(self):
if self.connection == 0:
self.connect()
if self.display_id < 0:
self.get_display_id()
def capture_display(self) -> np.ndarray:
self.check_status()
result = self.external_renderer.nemu_capture_display(
self.connection,
self.display_id,
8294400,
ctypes.byref(ctypes.c_int(1920)),
ctypes.byref(ctypes.c_int(1080)),
self.pixels,
)
if result != 0:
logger.error(f"获取截图失败: {result}")
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
return np.zeros((1080, 1920, 3), dtype=np.uint8)
image = np.frombuffer(self.pixels, dtype=np.uint8).reshape((1080, 1920, 4))[
:, :, :3
]
image = np.flipud(image) # 翻转
return image
def key_down(self, key_code: int):
"""按下键盘按键"""
self.check_status()
result = self.external_renderer.nemu_input_event_key_down(
self.connection, self.display_id, key_code
)
if result != 0:
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
def key_up(self, key_code: int):
"""释放键盘按键"""
self.check_status()
result = self.external_renderer.nemu_input_event_key_up(
self.connection, self.display_id, key_code
)
if result != 0:
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
# 单点触控
def touch_down(self, x: int, y: int):
self.check_status()
result = self.external_renderer.nemu_input_event_touch_down(
self.connection, self.display_id, int(1080 - y), int(x)
)
if result != 0:
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
def touch_up(self):
self.check_status()
result = self.external_renderer.nemu_input_event_touch_up(
self.connection, self.display_id
)
if result != 0:
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
# 多点触控
def finger_touch_down(self, finger_id: int, x: int, y: int):
self.check_status()
result = self.external_renderer.nemu_input_event_finger_touch_down(
self.connection, self.display_id, finger_id, int(1080 - y), int(x)
)
if result != 0:
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
def finger_touch_up(self, finger_id: int):
self.check_status()
result = self.external_renderer.nemu_input_event_finger_touch_up(
self.connection, self.display_id, finger_id
)
if result != 0:
self.connection = 0
self.display_id = -1
config.device.exit() # 可能是游戏卡死
def tap(self, x, y, hold_time: float = 0.07) -> None:
"""
Tap on screen
Args:
x: horizontal position
y: vertical position
hold_time: hold time
"""
self.touch_down(x, y)
time.sleep(hold_time)
self.touch_up()
def send_keyevent(self, key_code: int, hold_time: float = 0.1):
self.key_down(key_code)
time.sleep(hold_time)
self.key_up(key_code)
def back(self):
self.send_keyevent(1)
def swipe(
self,
x0: int,
y0: int,
x1: int,
y1: int,
duration: float = 1.0,
fall: bool = True,
lift: bool = True,
update: bool = False,
interval: float = 0,
func: Callable[[np.ndarray], Any] = lambda _: None,
):
"""
模拟滑动
Args:
x0 (int): 起点横坐标
y0 (int): 起点纵坐标
x1 (int): 终点横坐标
y1 (int): 终点纵坐标
duration (float): 滑动持续时间,单位为秒
fall (bool): 是否按下,默认 True
lift (bool): 是否抬起,默认 True
update (bool): 滑动前是否截图,默认 False
interval (float): 滑动完成后的等待时间,单位为秒,默认 0
func (Callable[[np.ndarray], Any]): 截图后处理的函数,默认空函数
"""
frame_time = 1 / 30
start_time = time.perf_counter()
fall and self.touch_down(x0, y0)
if update and config.recog:
thread = threading.Thread(target=swipe_update.start, args=(func,))
thread.start()
if (down_time := time.perf_counter()) < start_time + frame_time:
time.sleep(start_time + frame_time - down_time)
down_time = start_time + frame_time
while (progress := (time.perf_counter() - down_time) / duration) <= 1:
x, y = va(sm(1 - progress, (x0, y0)), sm(progress, (x1, y1)))
start_time = time.perf_counter()
self.touch_down(x, y)
if (move_time := time.perf_counter() - start_time) < frame_time:
time.sleep(frame_time - move_time)
self.touch_down(x1, y1)
lift and self.touch_up()
if update and config.recog:
start_time = time.perf_counter()
thread.join()
if (stop_time := time.perf_counter()) < start_time + interval:
time.sleep(start_time + interval - stop_time)
return swipe_update.result
if interval > 0:
time.sleep(interval)
def swipe_ext(
self,
points: list[tp.Coordinate],
durations: list[int],
update: bool = False,
interval: float = 0,
func: Callable[[tp.Image], Any] = lambda _: None,
):
total = len(durations)
for idx, (S, E, D) in enumerate(zip(points[:-1], points[1:], durations)):
first = idx == 0
last = idx == total - 1
result = self.swipe(
x0=S[0],
y0=S[1],
x1=E[0],
y1=E[1],
duration=D / 1000,
fall=first,
lift=last,
update=last and update,
interval=interval if last else 0,
func=func,
)
return result
def kill_server(self):
if self.connection != 0:
self.external_renderer.nemu_disconnect(self.connection)
logger.info(f"Disconnected from emulator: handle={self.connection}")
self.connection = 0
self.display_id = -1
else:
logger.warning("No active connection to disconnect.")