mower-ng/mower/utils/device/method/adb/__init__.py
2024-12-01 14:50:39 +08:00

146 lines
4.5 KiB
Python

import gzip
import subprocess
from functools import cached_property, wraps
import cv2
import numpy as np
from adbutils import AdbClient, AdbDevice
from mower import __system__
from mower.utils import config
from mower.utils.csleep import MowerExit
from mower.utils.device.exception import ADBError, SimulatorError
from mower.utils.log import logger
from .utils import subprocess_run
def retry_adb(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
for attempt in range(2):
try:
return func(self, *args, **kwargs)
except MowerExit:
raise
except ADBError as e:
logger.warning(f"ADB error, retrying... ({attempt + 1}/2)")
logger.exception(e)
self.adb_reconnect()
# After adb retries fail, escalate to simulator error
raise SimulatorError("ADB failed after 3 attempts.")
return retry_wrapper
class ADB:
def __init__(self) -> None:
self.adb_bin = config.conf.maa_adb_path
self.check_server_status()
@cached_property
@retry_adb
def adb(self) -> AdbDevice:
if len(self.adb_client.list()) == 0:
self.adb_client.connect(config.conf.adb, 5)
return AdbDevice(self.adb_client, config.conf.adb)
@cached_property
@retry_adb
def adb_client(self) -> AdbClient:
try:
return AdbClient("127.0.0.1", config.adb_server_port, 5)
except Exception as e:
raise e
@retry_adb
def adb_command(self, cmd, timeout=10):
"""
Execute ADB commands in a subprocess,
usually to be used when pulling or pushing large files.
Args:
cmd (list):
timeout (int):
Returns:
str:
"""
cmd = list(map(str, cmd))
cmd = [self.adb_bin, "-P", str(config.adb_server_port)] + cmd
return subprocess_run(cmd, timeout=timeout)
def check_server_status(self) -> str:
"""Use `adb devices` as `adb start-server`, result is actually useless"""
stdout = self.adb_command(["devices"])
logger.info(stdout)
return stdout
def clear_cached_property(self, property_name: str):
if property_name in self.__dict__:
del self.__dict__[property_name]
def kill_server(self) -> None:
self.adb_command(["kill-server"])
self.clear_cached_property("adb_client")
self.clear_cached_property("adb")
def restart_server(self) -> None:
self.kill_server()
self.check_server_status()
def adb_disconnect(self):
self.adb_client.disconnect(config.conf.adb, raise_error=False)
self.clear_cached_property("adb")
def adb_reconnect(self):
status = self.check_server_status()
if "offline" in status:
self.adb_disconnect()
else:
self.restart_server()
def adb_shell(
self, cmd, stream=False, timeout=10, encoding: str | None = "utf-8", rstrip=True
):
"""
Equivalent to `adb -P <port> shell <*cmd>`
Args:
cmd (list, str):
stream (bool): Return stream instead of string output (Default: False)
timeout (int): (Default: 10)
rstrip (bool): Strip the last empty line (Default: True)
Returns:
str if stream=False
socket if stream=True
"""
if not isinstance(cmd, str):
cmd = list(map(str, cmd))
return self.adb.shell(
cmd, stream=stream, timeout=timeout, encoding=encoding, rstrip=rstrip
)
def adb_push(self, file_path: str, target: str):
"""push file into device with adb_bin"""
self.adb.push(file_path, target)
def process(
self, path: str, args: list[str] = [], stderr: int = subprocess.DEVNULL
) -> subprocess.Popen:
logger.debug(f"{path=} {args=}")
cmd = [self.adb_bin, "-P", str(config.adb_server_port), "shell", path] + args
return subprocess.Popen(
cmd,
stdout=subprocess.DEVNULL,
stderr=stderr,
creationflags=subprocess.CREATE_NO_WINDOW if __system__ == "windows" else 0,
)
def capture_display_with_gzip(self) -> np.ndarray:
command = "screencap 2>/dev/null | gzip -1"
resp = self.adb_shell(command, encoding=None)
data = gzip.decompress(resp)
array = np.frombuffer(data[-1920 * 1080 * 4 :], np.uint8).reshape(1080, 1920, 4)
return cv2.cvtColor(array, cv2.COLOR_RGBA2RGB)