mower-ng/mower/utils/solver.py
2024-11-18 09:51:08 +08:00

650 lines
21 KiB
Python

import random
import time
from abc import abstractmethod
from datetime import datetime, timedelta
from typing import Any, Callable, Literal, Optional, Tuple
import cv2
import numpy as np
from mower.data import scene_list
from mower.utils import config
from mower.utils import typealias as tp
from mower.utils.csleep import MowerExit, csleep
from mower.utils.device.adb_client.const import KeyCode
from mower.utils.device.adb_client.session import Session
from mower.utils.device.device import Device
from mower.utils.image import cropimg, thres2
from mower.utils.log import logger
from mower.utils.recognize import RecognizeError, Recognizer, Scene
from mower.utils.simulator import restart_simulator
from mower.utils.traceback import caller_info
class BaseSolver:
solver_name: str | None = None
tap_info = None, None
waiting_scene = [
Scene.CONNECTING,
Scene.UNKNOWN,
Scene.UNKNOWN_WITH_NAVBAR,
Scene.LOADING,
Scene.LOGIN_LOADING,
Scene.LOGIN_MAIN_NOENTRY,
Scene.OPERATOR_ONGOING,
]
# 超时强制退出
scheduler_stop_time: datetime | None = None
solver_max_duration: timedelta | None = None
# 每轮transition开始前是否清除截图
solver_update_before_transition = False
def init_device(self):
if config.device is None:
while True:
try:
config.device = Device()
config.device.client.check_server_alive()
Session().connect()
if not config.device.check_device_screen():
raise MowerExit
if config.conf.droidcast.enable:
config.device.start_droidcast()
break
except MowerExit:
raise
except Exception as e:
logger.exception(e)
restart_simulator()
if config.recog is None:
config.recog = Recognizer()
def run(self) -> bool:
"""
Returns:
若接入调度器,run方法不应接受额外参数,且应在超时后退出,并返回任务是否完成。
具体分为以下情况:
1. 运行时长超过任务的内置间隔,一般为程序出错,应终止此次运行,返回True;
2. 运行时间超过调度器设置的停止时间,应继续运行此次任务,返回False;
3. 时间不够时主动退出,应继续运行此次任务,返回False;
4. 连续多次遇到报错,一般为程序出错,应终止此次运行,返回True;
5. 任务正常结束,返回True。
不接入调度器,则无需遵守此约定。
"""
self.init_device()
self.check_current_focus()
exception_time = 10
solver_stop_time = (
datetime.now() + self.solver_max_duration
if self.solver_max_duration
else None
)
stop_time = min(
filter(None, [solver_stop_time, self.scheduler_stop_time]), default=None
)
while exception_time:
try:
if stop_time and datetime.now() > stop_time:
if stop_time == self.scheduler_stop_time:
logger.info("调度器安排的时间不够")
return False
elif stop_time == solver_stop_time:
logger.error("任务超时,请联系开发者修bug")
return True
if self.solver_update_before_transition:
config.recog.update()
else:
self.solver_update_before_transition = True
if self.transition():
return True
exception_time = 10
except MowerExit:
raise
except Exception as e:
exception_time -= 1
logger.exception(e)
logger.error("连续报错次数过多,请联系开发者修bug")
return True
@abstractmethod
def transition(self) -> bool:
# the change from one state to another is called transition
return True # means task completed
def get_color(self, pos: tp.Coordinate) -> tp.Pixel:
"""get the color of the pixel"""
return config.recog.color(pos[0], pos[1])
@staticmethod
def get_pos(
poly: tp.Location, x_rate: float = 0.5, y_rate: float = 0.5
) -> tp.Coordinate:
"""get the pos form tp.Location"""
if poly is None:
raise RecognizeError("poly is empty")
elif len(poly) == 4:
# tp.Rectangle
x = (
poly[0][0] * (1 - x_rate)
+ poly[1][0] * (1 - x_rate)
+ poly[2][0] * x_rate
+ poly[3][0] * x_rate
) / 2
y = (
poly[0][1] * (1 - y_rate)
+ poly[3][1] * (1 - y_rate)
+ poly[1][1] * y_rate
+ poly[2][1] * y_rate
) / 2
elif len(poly) == 2 and isinstance(poly[0], (list, tuple)):
# tp.Scope
x = poly[0][0] * (1 - x_rate) + poly[1][0] * x_rate
y = poly[0][1] * (1 - y_rate) + poly[1][1] * y_rate
else:
# tp.Coordinate
x, y = poly
return (int(x), int(y))
def sleep(self, interval: float = 1) -> None:
"""sleeping for a interval"""
csleep(interval)
config.recog.update()
def input(self, referent: str, input_area: tp.Scope, text: str = None) -> None:
"""input text"""
logger.debug(f"{referent=} {input_area=}")
config.device.tap(self.get_pos(input_area))
time.sleep(0.5)
if text is None:
text = input(referent).strip()
config.device.send_text(str(text))
config.device.tap((0, 0))
def find(
self,
res: tp.Res,
draw: bool = False,
scope: tp.Scope = None,
thres: int = None,
judge: bool = True,
strict: bool = False,
score=0.0,
) -> tp.Scope:
return config.recog.find(res, draw, scope, thres, judge, strict, score)
def tap(
self,
poly: tp.Location,
x_rate: float = 0.5,
y_rate: float = 0.5,
interval: float = 1,
) -> None:
"""tap"""
if config.stop_mower.is_set():
raise MowerExit
self.tap_info = None, None
pos = self.get_pos(poly, x_rate, y_rate)
config.device.tap(pos)
if interval > 0:
self.sleep(interval)
def ctap(
self,
pos: tp.Location,
max_seconds: float = 10,
interval: float = 1,
id: str | None = None,
) -> bool:
"""同一处代码多次调用ctap,在max_seconds时长内最多点击一次
Args:
pos: 点击位置
max_seconds: 等待网络连接建议设10,等待动画建议设5或3
interval: 点击后sleep的时长
id: 用于区分不同的点击,为None时为“文件名:行号”
Returns:
本次点击是否成功
"""
if id is None:
id = caller_info()
now = datetime.now()
lid, ltime = self.tap_info
if lid != id or (lid == id and now - ltime > timedelta(seconds=max_seconds)):
logger.debug(f"tap {id}")
self.tap(pos, interval=interval)
self.tap_info = id, now
return True
else:
self.sleep(interval)
logger.debug(f"skip {id}")
return False
def check_current_focus(self):
config.recog.check_current_focus()
def restart_game(self):
"重启游戏"
config.device.exit()
config.device.launch()
config.recog.update()
def tap_element(
self,
element_name: tp.Res,
x_rate: float = 0.5,
y_rate: float = 0.5,
interval: float = 1,
score: float = 0.0,
draw: bool = False,
scope: tp.Scope = None,
judge: bool = True,
detected: bool = False,
thres: Optional[int] = None,
) -> bool:
"""tap element"""
element = self.find(
element_name, draw, scope, judge=judge, score=score, thres=thres
)
if detected and element is None:
return False
self.tap(element, x_rate, y_rate, interval)
return True
def tap_index_element(
self,
name: Literal[
"friend",
"infrastructure",
"mission",
"recruit",
"shop",
"terminal",
"warehouse",
"headhunting",
"mail",
"operator",
],
):
pos = {
"friend": (544, 862), # 好友
"infrastructure": (1545, 948), # 基建
"mission": (1201, 904), # 任务
"recruit": (1507, 774), # 公开招募
"shop": (1251, 727), # 采购中心
"terminal": (1458, 297), # 终端
"warehouse": (1788, 973), # 仓库
"headhunting": (1749, 783), # 干员寻访
"mail": (292, 62), # 邮件
"operator": (1590, 534), # 干员(角色管理)
}
self.ctap(pos[name])
def tap_nav_element(
self,
name: Literal[
"index",
"terminal",
"infrastructure",
"recruit",
"headhunting",
"shop",
"mission",
"friend",
"operator",
],
):
pos = {
"index": (140, 365), # 首页
"terminal": (793, 163), # 终端
"infrastructure": (1030, 163), # 基建
"recruit": (1435, 365), # 公开招募
"headhunting": (1623, 364), # 干员寻访
"shop": (1727, 362), # 采购中心
"mission": (1631, 53), # 任务
"friend": (1727, 53), # 好友
"operator": (462, 408), # 干员
}
self.ctap(pos[name])
def tap_terminal_button(
self,
name: Literal[
"main",
"main_theme",
"intermezzi",
"biography",
"collection",
"regular",
"longterm",
"contract",
],
):
y = 1005
pos = {
"main": (115, y), # 首页
"main_theme": (356, y), # 主题曲
"intermezzi": (596, y), # 插曲
"biography": (836, y), # 别传
"collection": (1077, y), # 资源收集
"regular": (1317, y), # 常态事务
"longterm": (1556, y), # 长期探索
"contract": (1796, y), # 危机合约
}
self.ctap(pos[name])
def switch_shop(
self,
name: Literal[
"recommend", "originite", "bundle", "skin", "token", "furniture", "credit"
],
):
y = 165
pos = {
"recommend": (150, y), # 可露希尔推荐
"originite": (425, y), # 源石交易所
"bundle": (700, y), # 组合包
"skin": (970, y), # 时装商店
"token": (1250, y), # 凭证交易所
"furniture": (1520, y), # 家具商店
"credit": (1805, y), # 信用交易所
}
self.tap(pos[name])
def template_match(
self,
res: str,
scope: Optional[tp.Scope] = None,
method: int = cv2.TM_CCOEFF_NORMED,
) -> Tuple[float, tp.Scope]:
return config.recog.template_match(res, scope, method)
def swipe(
self,
start: tp.Coordinate,
movement: tp.Coordinate,
duration: int = 100,
interval: float = 1,
) -> None:
"""swipe"""
if config.stop_mower.is_set():
raise MowerExit
end = (start[0] + movement[0], start[1] + movement[1])
config.device.swipe(start, end, duration=duration)
if interval > 0:
self.sleep(interval)
def swipe_noinertia(
self,
start: tp.Coordinate,
movement: tp.Coordinate,
duration: int = 20,
interval: float = 0.2,
) -> None:
"""swipe with no inertia (movement should be vertical)"""
if config.stop_mower.is_set():
raise MowerExit
points = [start]
if movement[0] == 0:
dis = abs(movement[1])
points.append((start[0] + 100, start[1]))
points.append((start[0] + 100, start[1] + movement[1]))
points.append((start[0], start[1] + movement[1]))
else:
dis = abs(movement[0])
points.append((start[0], start[1] + 100))
points.append((start[0] + movement[0], start[1] + 100))
points.append((start[0] + movement[0], start[1]))
config.device.swipe_ext(
points, durations=[200, dis * duration // 100, 200], interval=interval
)
config.recog.update()
def swipe_ext(
self,
points: list[tp.Coordinate],
durations: list[int],
up_wait: int = 1000,
interval: float = 1,
):
"""复杂的拖动
拖动过程:
1. 拖动
2. 等待up_wait(防止抬手后按照惯性继续移动)
3. 抬手
4. 等待interval(回弹动画)
5. 清除截图缓存
+-lift +-recog.update()
| |
+------+---------+----------+
| drag | up_wait | interval |
+------+---------+----------+
Args:
points (list[tp.Coordinate]): 坐标列表
durations (list[int]): 每两个坐标之间拖动的时长
up_wait (int, optional): 抬手前的等待时间,单位毫秒. Defaults to 1000.
interval (float, optional): 抬手后等待回弹动画结束的时间. Defaults to 1.
Raises:
MowerExit: 停止运行
"""
if config.stop_mower.is_set():
raise MowerExit
if up_wait > 0:
points += points[-1:]
durations.append(up_wait)
config.device.swipe_ext(points, durations, False, interval)
config.recog.update()
def swipe_update(
self,
points: list[tp.Coordinate],
durations: list[int],
up_wait: int = 1000,
interval: float = 1,
func: Callable[[tp.Image], Any] = lambda _: None,
):
"""在抬手前截图的拖动
拖动过程:
1. 拖动
2. 截图,等待up_wait(防止抬手后按照惯性继续移动)
3. 抬手
4. 等待interval(回弹动画)
函数返回后,config.recog.img为抬手前的截图。需要额外调用config.recog.update()以获得回弹动画结束后的截图。
+-lift
|
+------+---------+----------+
| drag | up_wait | interval |
+------++--------+----------+
|
+-recog.update()
func(recog.img)
Args:
points (list[tp.Coordinate]): 坐标列表
durations (list[int]): 每两个坐标之间拖动的时长
up_wait (int, optional): 抬手前的等待时间,单位毫秒. Defaults to 1000.
interval (float, optional): 抬手后等待回弹动画结束的时间. Defaults to 1.
func (Callable[[tp.Image], Any], optional): 处理截图的函数. Defaults to lambda _: None.
Raises:
MowerExit: 停止运行
Returns:
Any: func的返回值
"""
if config.stop_mower.is_set():
raise MowerExit
if up_wait > 0:
points += points[-1:]
durations.append(up_wait)
return config.device.swipe_ext(points, durations, True, interval, func)
def back(self, interval: float = 1) -> None:
"""send back keyevent"""
self.tap_info = None, None
config.device.send_keyevent(KeyCode.KEYCODE_BACK)
self.sleep(interval)
def cback(
self,
max_seconds: float = 10,
interval: float = 1,
id: str | None = None,
) -> bool:
"""同一处代码多次调用cback,在max_seconds时长内最多返回一次
Args:
max_seconds: 等待网络连接建议设10,等待动画建议设5或3
interval: 点击后sleep的时长
Returns:
本次点击是否成功
"""
if id is None:
id = caller_info()
now = datetime.now()
lid, ltime = self.tap_info
if lid != id or (lid == id and now - ltime > timedelta(seconds=max_seconds)):
logger.debug(f"tap {id}")
self.back(interval=interval)
self.tap_info = id, now
return True
else:
self.sleep(interval)
logger.debug(f"skip {id}")
return False
def scene(self) -> int:
"""get the current scene in the game"""
return config.recog.get_scene()
def ra_scene(self) -> int:
"""
生息演算场景识别
"""
return config.recog.get_ra_scene()
def sf_scene(self) -> int:
"""
隐秘战线场景识别
"""
return config.recog.get_sf_scene()
def train_scene(self) -> int:
"""
训练室景识别
"""
return config.recog.get_train_scene()
def solve_captcha(self, refresh=False):
th = thres2(config.recog.gray, 254)
pos = np.nonzero(th)
offset_x = pos[1].min()
offset_y = pos[0].min()
img_scope = ((offset_x, offset_y), (pos[1].max(), pos[0].max()))
img = cropimg(config.recog.img, img_scope)
h = img.shape[0]
def _t(ratio):
return int(h * ratio)
def _p(ratio_x, ratio_y):
return _t(ratio_x), _t(ratio_y)
if refresh:
logger.info("刷新验证码")
self.tap((offset_x + _t(0.189), offset_y + _t(0.916)), interval=3)
img = cropimg(config.recog.img, img_scope)
left_part = cropimg(img, (_p(0.032, 0.032), _p(0.202, 0.591)))
hsv = cv2.cvtColor(left_part, cv2.COLOR_RGB2HSV)
mask = cv2.inRange(hsv, (25, 0, 0), (35, 255, 255))
tpl_width = _t(0.148)
tpl_height = _t(0.135)
tpl_border = _t(0.0056)
tpl_padding = _t(0.018)
tpl = np.zeros((tpl_height, tpl_width), np.uint8)
tpl[:] = (255,)
tpl[
tpl_border : tpl_height - tpl_border,
tpl_border : tpl_width - tpl_border,
] = (0,)
result = cv2.matchTemplate(mask, tpl, cv2.TM_SQDIFF, None, tpl)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
x, y = min_loc
source_p = (
(x + tpl_padding, y + tpl_padding),
(x + tpl_width - tpl_padding, y + tpl_height - tpl_padding),
)
source = cropimg(left_part, source_p)
mask = cropimg(mask, source_p)
right_part = cropimg(
img,
(
(_t(0.201), _t(0.032) + source_p[0][1]),
(_t(0.94), _t(0.032) + source_p[1][1]),
),
)
for _y in range(source.shape[0]):
for _x in range(source.shape[1]):
for _c in range(source.shape[2]):
source[_y, _x, _c] = np.clip(source[_y, _x, _c] * 0.7 - 23, 0, 255)
mask = cv2.bitwise_not(mask)
result = cv2.matchTemplate(right_part, source, cv2.TM_SQDIFF_NORMED, None, mask)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
x = _t(0.201) + min_loc[0] - _t(0.032) - x - tpl_padding + _t(0.128)
x += random.choice([-4, -3, -2, 2, 3, 4])
def _rb(R, r):
return random.random() * _t(R) + _t(r)
btn_x = _rb(0.1, 0.01)
start = offset_x + btn_x + _t(0.128), offset_y + _rb(0.1, 0.01) + _t(0.711)
end = offset_x + btn_x + x, offset_y + _rb(0.1, 0.01) + _t(0.711)
p1 = end[0] + _rb(0.1, 0.02), end[1] + _rb(0.05, 0.02)
p2 = end[0] + _rb(0.1, 0.02), end[1] + _rb(0.05, 0.02)
logger.info("滑动验证码")
config.device.swipe_ext(
(start, p1, p2, end, end),
durations=[
random.randint(400, 600),
random.randint(200, 300),
random.randint(200, 300),
random.randint(200, 300),
],
)
def waiting_solver(self):
"""需要等待的页面解决方法。UNKNOWN_WITH_NAVBAR超时直接返回False,其他超时重启返回False"""
scene = self.scene()
start_time = datetime.now()
sleep_time, wait_time = getattr(
config.conf.waiting_scene_v2, scene_list[str(scene)]["label"]
)
stop_time = start_time + timedelta(seconds=wait_time)
while datetime.now() < stop_time:
self.sleep(sleep_time / 1000)
if self.scene() != scene:
return True
if scene == Scene.UNKNOWN_WITH_NAVBAR:
logger.debug("有导航栏的未知场景超时")
return False
else:
logger.warning("相同场景等待超时")
config.device.exit()
csleep(3)
self.check_current_focus()
return False