mower-ng/mower/utils/device/method/scrcpy/core.py
Elaina a79715dbde
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
优化kill_server
2024-12-10 17:08:05 +08:00

335 lines
11 KiB
Python

import socket
import struct
import threading
import time
from datetime import datetime
from functools import cached_property, wraps
from queue import Queue
from typing import Any, Callable, Optional
import numpy as np
from adbutils import AdbConnection, Network
from mower.utils import config
from mower.utils import typealias as tp
from mower.utils.csleep import MowerExit, csleep
from mower.utils.device import swipe_update
from mower.utils.device.method.adb import ADB
from mower.utils.device.method.adb.const import KeyCode
from mower.utils.log import logger
from mower.utils.path import get_path
from mower.utils.vector import sm, va
from . import const
from .control import ControlSender
SCR_PATH = "/data/local/tmp/scrcpy-server.jar"
SCR_VER = "3.1"
def retry_scrcpy(func):
@wraps(func)
def retry_wrapper(self, *args, **kwargs):
for _ in range(3):
try:
return func(self, *args, **kwargs)
except MowerExit:
raise
except Exception as e:
logger.debug_exception(e)
self.stop()
time.sleep(1)
self.start()
return retry_wrapper
class Client:
def __init__(self):
# User accessible
self.last_frame: Optional[np.ndarray] = None
self.resolution = (1920, 1080)
self.control = ControlSender(self)
# Need to destroy
self.__server_stream: Optional[AdbConnection] = None
self.control_socket: Optional[socket.socket] = None
self.control_socket_lock = threading.Lock()
self.video_socket: Optional[socket.socket] = None
self.frame_queue = Queue()
self.start()
def receive_frame(self):
from av.codec.context import CodecContext
codec = CodecContext.create("h264", "r")
while True:
header = self.video_socket.recv(12)
_pts, size = struct.unpack("!QL", header)
chunks = []
bytes_read = 0
while bytes_read < size:
chunk = self.video_socket.recv(min(size - bytes_read, 4096))
chunks.append(chunk)
bytes_read += len(chunk)
data = b"".join(chunks)
packets = codec.parse(data)
for packet in packets:
frames = codec.decode(packet)
for frame in frames:
self.frame_queue.put(frame)
@retry_scrcpy
def capture_display(self):
while not self.frame_queue.empty():
self.frame_queue.get()
frame = self.frame_queue.get(timeout=5)
return frame.to_ndarray(format="rgb24")
@cached_property
def adb(self) -> ADB:
return config.device.get_service("adb")
def __del__(self) -> None:
self.stop()
def __start_server(self) -> None:
"""
Start server and get the connection
"""
cmdline = f"CLASSPATH={SCR_PATH} app_process /"
cmdline += f" com.genymobile.scrcpy.Server {SCR_VER} audio=false"
cmdline += " control=true tunnel_forward=true send_device_meta=false"
if config.conf.screencap_strategy == "scrcpy":
cmdline += f" send_codec_meta=false max_fps={1000 / config.conf.screenshot_interval}"
cmdline += " video_bit_rate=128000000 video_codec=h264"
if config.conf.app_control_strategy == "scrcpy":
cmdline += " new_display=1920x1080/280 vd_system_decorations=false"
else:
cmdline += " video=false"
self.__server_stream: AdbConnection = self.adb.adb_shell(cmdline, True)
# Wait for server to start
self.__server_stream.conn.settimeout(3)
logger.info("Create server stream")
response = self.__server_stream.read(10)
logger.debug(response)
if b"[server]" not in response:
raise ConnectionError(
"Failed to start scrcpy-server: " + response.decode("utf-8", "ignore")
)
def __deploy_server(self) -> None:
"""
Deploy server to android device
"""
server_file_path = get_path(f"@install/mower/vendor/scrcpy-server-v{SCR_VER}")
self.adb.adb_push(str(server_file_path), SCR_PATH)
self.__start_server()
def __init_server_connection(self) -> None:
try:
first_socket = self.adb.adb.create_connection(
Network.LOCAL_ABSTRACT, "scrcpy"
)
first_socket.settimeout(3)
dummy_byte = first_socket.recv(1)
if not len(dummy_byte) or dummy_byte != b"\x00":
raise ConnectionError("Did not receive Dummy Byte!")
if config.conf.screencap_strategy == "scrcpy":
self.video_socket = first_socket
self.frame_thread = threading.Thread(
target=self.receive_frame, daemon=True
)
self.frame_thread.start()
self.control_socket = self.adb.adb.create_connection(
Network.LOCAL_ABSTRACT, "scrcpy"
)
self.control_socket.settimeout(3)
else:
self.control_socket = first_socket
if config.conf.app_control_strategy == "scrcpy":
self.launch()
except socket.timeout:
raise ConnectionError("Failed to connect scrcpy-server")
def start(self) -> None:
"""
Start listening video stream
"""
try_count = 0
while try_count < 3:
try:
self.__deploy_server()
time.sleep(0.5)
self.__init_server_connection()
break
except ConnectionError as e:
logger.debug_exception(f"Failed to connect scrcpy-server: {e}")
self.stop()
logger.warning("Try again in 10 seconds...")
time.sleep(10)
try_count += 1
else:
raise RuntimeError("Failed to connect scrcpy-server.")
def stop(self) -> None:
"""
Stop listening (both threaded and blocked)
"""
for i in [self.__server_stream, self.control_socket, self.video_socket]:
if i:
i.close()
i = None
@retry_scrcpy
def tap(self, x: int, y: int) -> None:
self.control.tap(x, y)
@retry_scrcpy
def back(self):
self.control.send_keyevent(KeyCode.KEYCODE_BACK)
@retry_scrcpy
def swipe(
self,
x0: int,
y0: int,
x1: int,
y1: int,
duration: float = 1,
fall: bool = True,
lift: bool = True,
update: bool = False,
interval: float = 0,
func: Callable[[tp.Image], Any] = lambda _: None,
):
"""滑动
Args:
x0 (int): 起点横坐标
y0 (int): 起点纵坐标
x1 (int): 终点横坐标
y1 (int): 终点纵坐标
duration (float, optional): 拖动时长. Defaults to 1.
fall (bool, optional): 按下. Defaults to True.
lift (bool, optional): 抬起. Defaults to True.
update (bool, optional): 滑动前截图. Defaults to False.
interval (float, optional): 拖动后的等待时间. Defaults to 0.
func (Callable[[tp.Image], Any], optional): 处理截图的函数. Defaults to lambda _: None.
"""
frame_time = 1 / 30
start_time = time.perf_counter()
fall and self.control.touch(x0, y0, const.ACTION_DOWN)
if update and config.recog:
thread = threading.Thread(target=swipe_update.start, args=(func,))
thread.start()
if (down_time := time.perf_counter()) < start_time + frame_time:
time.sleep(start_time + frame_time - down_time)
down_time = start_time + frame_time
while (progress := (time.perf_counter() - down_time) / duration) <= 1:
x, y = va(sm(1 - progress, (x0, y0)), sm(progress, (x1, y1)))
start_time = time.perf_counter()
self.control.touch(x, y, const.ACTION_MOVE)
if (move_time := time.perf_counter() - start_time) < frame_time:
time.sleep(frame_time - move_time)
self.control.touch(x1, y1, const.ACTION_MOVE)
lift and self.control.touch(x1, y1, const.ACTION_UP)
if update and config.recog:
start_time = time.perf_counter()
thread.join()
if (stop_time := time.perf_counter()) < start_time + interval:
time.sleep(start_time + interval - stop_time)
return swipe_update.result
if interval > 0:
time.sleep(interval)
@retry_scrcpy
def swipe_ext(
self,
points: list[tp.Coordinate],
durations: list[int],
update: bool = False,
interval: float = 0,
func: Callable[[tp.Image], Any] = lambda _: None,
):
total = len(durations)
for idx, (S, E, D) in enumerate(zip(points[:-1], points[1:], durations)):
first = idx == 0
last = idx == total - 1
result = self.swipe(
x0=S[0],
y0=S[1],
x1=E[0],
y1=E[1],
duration=D / 1000,
fall=first,
lift=last,
update=last and update,
interval=interval if last else 0,
func=func,
)
return result
@retry_scrcpy
def send_keyevent(self, keycode: int) -> None:
"""send a key event"""
logger.debug(keycode)
self.control.send_keyevent(keycode)
@retry_scrcpy
def launch(self):
self.control.start_app()
@retry_scrcpy
def exit(self):
self.adb.exit()
@retry_scrcpy
def home(self):
self.adb.exit()
@retry_scrcpy
def check_device_screen(self):
return True
@retry_scrcpy
def check_current_focus(self) -> bool:
"""check if the application is in the foreground"""
update = False
start_time = datetime.now()
while True:
try:
focus = self.adb.adb_shell("dumpsys window | grep Window")
if (
f"{config.conf.APPNAME}/{config.APP_ACTIVITY_NAME}" not in focus
and "com.hypergryph.arknights.bilibili/com.gsc.welcome.WelcomeActivity"
not in focus
):
if (datetime.now() - start_time).total_seconds() > 40:
self.exit() # 应用卡死
start_time = datetime.now()
self.launch()
update = True
csleep(1)
continue
return update
except MowerExit:
raise
except Exception as e:
logger.exception(e)
update = True
def kill_server(self):
pass