mower-ng/mower/utils/solver.py
2025-02-15 11:02:55 +08:00

809 lines
28 KiB
Python

"""
Copyright (c) 2024 EightyDollars <eightydollars@163.com>
Copyright (c) 2024 Elaina <2901432375@qq.com>
Copyright (c) 2024 fuyn101 <fuynshile@outlook.com>
Copyright (c) 2024 zhbaor <zhbaor@zhaozuohong.vip>
This file is part of mower-ng (https://git.zhaozuohong.vip/mower-ng/mower-ng).
Mower-ng is free software: you may copy, redistribute and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, version 3 or later.
This file is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
This file incorporates work covered by the following copyright and
permission notice:
Copyright (c) 2023 BiologyHazard <3482991796@qq.com>
Copyright (c) 2023 minine <<xxz@ixoixi.com>>
Copyright (c) 2024 MuelNova <muel@nova.gal>
Copyright (c) 2021 Nano <nanoapezlk@gmail.com>
Copyright (c) 2023 Peace2F <https://github.com/Peace2F>
Copyright (c) 2022 rebelice <yangrebelice@gmail.com>
Copyright (c) 2023 Shawnsdaddy <wu2xx@dukes.jmu.edu>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import inspect
import random
import sys
from abc import abstractmethod
from datetime import datetime, timedelta
from pathlib import Path
from types import FrameType
from typing import Any, Callable, Literal, Optional, Tuple
import cv2
import numpy as np
from mower.utils import config
from mower.utils import typealias as tp
from mower.utils.csleep import MowerExit, csleep
from mower.utils.email import notify, send_message
from mower.utils.image import cropimg, diff_ratio, loadres, thres2
from mower.utils.log import logger
from mower.utils.matcher import GOOD_DISTANCE_LIMIT, flann, keypoints_scale_invariant
from mower.utils.path import get_path
from mower.utils.recognize import Scene
from mower.utils.traceback import caller_info
from mower.utils.vector import clamp_vector
config.mount_device()
class BaseSolver:
solver_name: str | None = None
waiting_scene = [
Scene.CONNECTING,
Scene.UNKNOWN,
Scene.UNKNOWN_WITH_NAVBAR,
Scene.UNKNOWN_ROGUE,
Scene.LOADING,
Scene.LOGIN_LOADING,
Scene.LOGIN_MAIN_NOENTRY,
Scene.OPERATOR_ONGOING,
Scene.DOUBLE_CONFIRM,
]
# 超时强制退出
scheduler_stop_time: datetime | None = None
solver_max_duration: timedelta | None = None
# 每轮transition开始前是否清除截图
solver_update_before_transition = False
transition_return: str = ""
game_stuck_detection: bool = True
game_stuck_begin: datetime | None = None
def run(self) -> bool:
"""
Returns:
若接入调度器,run方法不应接受额外参数,且应在超时后退出,并返回任务是否完成。
具体分为以下情况:
1. 运行时长超过任务的内置间隔,一般为程序出错,应终止此次运行,返回True;
2. 运行时间超过调度器设置的停止时间,应继续运行此次任务,返回False;
3. 时间不够时主动退出,应继续运行此次任务,返回False;
4. 连续多次遇到报错,一般为程序出错,应终止此次运行,返回True;
5. 任务正常结束,返回True。
不接入调度器,则无需遵守此约定。
"""
if self.solver_name:
logger.info(f"Start: {self.solver_name}")
if not config.check_screen and not config.device.check_device_screen():
raise MowerExit
self.check_current_focus()
exception_time = 10
solver_stop_time = (
datetime.now() + self.solver_max_duration
if self.solver_max_duration
else None
)
stop_time = min(
filter(None, [solver_stop_time, self.scheduler_stop_time]), default=None
)
while True:
try:
if stop_time and datetime.now() > stop_time:
if stop_time == self.scheduler_stop_time:
logger.info("调度器安排的时间不够")
return False
elif stop_time == solver_stop_time:
error_msg = "任务超时,请联系开发者修bug"
logger.error(error_msg)
send_message(f"{self.solver_name}{error_msg}", level="ERROR")
return True
if self.solver_update_before_transition:
config.recog.update()
self.solver_update_before_transition = True
self.game_stuck_detection = True
last_return = self.transition_return
old_trace = sys.gettrace()
sys.settrace(self.trace_callback)
result = self.transition()
sys.settrace(old_trace)
if result:
return True
if self.game_stuck_detection and self.transition_return == last_return:
if self.game_stuck_begin is None:
self.game_stuck_begin = datetime.now()
elif datetime.now() - self.game_stuck_begin > timedelta(minutes=3):
notify("游戏卡死,尝试重启游戏", level="ERROR")
self.game_stuck_begin = None
config.device.exit()
csleep(3)
self.check_current_focus()
else:
self.game_stuck_begin = None
exception_time = 10
except MowerExit:
raise
except Exception as e:
exception_time -= 1
if exception_time > 0:
logger.debug_exception(e)
else:
logger.exception(e)
error_msg = "连续报错次数过多,请联系开发者修bug"
logger.error(error_msg)
send_message(f"{self.solver_name}{error_msg}", level="ERROR")
return True
@abstractmethod
def transition(self) -> bool:
# the change from one state to another is called transition
return True # means task completed
def trace_callback(self, frame: FrameType, event, arg):
if event == "call":
if frame.f_code.co_name == "transition":
frame.f_trace_lines = False
return self.trace_callback
elif event == "return":
relative_path = Path(inspect.getsourcefile(frame))
try:
relative_path = relative_path.relative_to(get_path("@install"))
except ValueError:
pass
self.transition_return = f"{relative_path}:{frame.f_lineno}"
logger.debug(f"{self.transition_return=}")
def get_color(self, pos: tp.Coordinate) -> tp.Pixel:
"""get the color of the pixel"""
return config.recog.color(pos[0], pos[1])
@staticmethod
def get_pos(
poly: tp.Location, x_rate: float = 0.5, y_rate: float = 0.5
) -> tp.Coordinate:
"""get the pos form tp.Location"""
if poly is None:
raise Exception("poly is empty")
elif len(poly) == 4:
# tp.Rectangle
x = (
poly[0][0] * (1 - x_rate)
+ poly[1][0] * (1 - x_rate)
+ poly[2][0] * x_rate
+ poly[3][0] * x_rate
) / 2
y = (
poly[0][1] * (1 - y_rate)
+ poly[3][1] * (1 - y_rate)
+ poly[1][1] * y_rate
+ poly[2][1] * y_rate
) / 2
elif len(poly) == 2 and isinstance(poly[0], (list, tuple)):
# tp.Scope
x = poly[0][0] * (1 - x_rate) + poly[1][0] * x_rate
y = poly[0][1] * (1 - y_rate) + poly[1][1] * y_rate
else:
# tp.Coordinate
x, y = poly
x, y = clamp_vector((x, y))
return (int(x), int(y))
def sleep(self, interval: float = 1, update: bool = True) -> None:
"""sleeping for a interval"""
csleep(interval)
if update:
config.recog.update()
def find(
self,
res: tp.Res,
draw: bool = False,
scope: tp.Scope = None,
threshold: float = 0.9,
) -> tp.Scope | None:
return config.recog.find(res, draw, scope, threshold)
def tap(
self,
name: tp.Location | tp.Res,
x_rate: float = 0.5,
y_rate: float = 0.5,
update: bool = True,
):
"""点击元素或坐标
Args:
name (tp.Location | tp.Res): 元素名、区域或坐标
x_rate (float, optional): 区域中横坐标的比例. Defaults to 0.5.
y_rate (float, optional): 区域中纵坐标的比例. Defaults to 0.5.
update (bool, optional): 点击后更新截图缓存. Defaults to True.
Raises:
MowerExit: 停止运行
Exception: 未识别到元素
"""
if config.stop_mower.is_set():
raise MowerExit
if isinstance(name, str):
pos = self.find(name)
if pos is None:
raise Exception(f"未识别到{name}元素")
else:
pos = name
config.tap_info = None, None
config.device.tap(self.get_pos(pos, x_rate, y_rate))
update and config.recog.update()
def ctap(
self,
name: tp.Location | tp.Res,
max_seconds: float = 10,
id: Optional[str] = None,
x_rate: float = 0.5,
y_rate: float = 0.5,
) -> bool:
"""同一处代码多次调用ctap,在max_seconds时长内最多点击一次
Args:
name (tp.Location | tp.Res): 元素名、区域或坐标
max_seconds: 等待网络连接建议设10,等待动画建议设5或3
id: 用于区分不同的点击,为None时为“文件名:行号”
Returns:
本次点击是否成功
"""
if id is None:
id = caller_info()
now = datetime.now()
lid, ltime = config.tap_info
if lid != id or (lid == id and now - ltime > timedelta(seconds=max_seconds)):
logger.debug(f"tap {id}")
self.tap(name, x_rate, y_rate)
config.tap_info = id, now
return True
else:
logger.debug(f"skip {id}")
config.recog.update()
return False
def check_current_focus(self):
config.recog.check_current_focus()
def restart_game(self):
"重启游戏"
config.device.exit()
config.device.launch()
config.recog.update()
def tap_index_element(
self,
name: Literal[
"friend",
"infrastructure",
"mission",
"recruit",
"shop",
"terminal",
"warehouse",
"headhunting",
"mail",
"operator",
],
):
pos = {
"friend": (544, 862), # 好友
"infrastructure": (1545, 948), # 基建
"mission": (1201, 904), # 任务
"recruit": (1507, 774), # 公开招募
"shop": (1251, 727), # 采购中心
"terminal": (1458, 297), # 终端
"warehouse": (1788, 973), # 仓库
"headhunting": (1749, 783), # 干员寻访
"mail": (292, 62), # 邮件
"operator": (1590, 534), # 干员(角色管理)
}
self.ctap(pos[name])
def tap_nav_element(
self,
name: Literal[
"index",
"terminal",
"infrastructure",
"recruit",
"headhunting",
"shop",
"mission",
"friend",
"operator",
],
):
pos = {
"index": (140, 365), # 首页
"terminal": (793, 163), # 终端
"infrastructure": (1030, 163), # 基建
"recruit": (1435, 365), # 公开招募
"headhunting": (1623, 364), # 干员寻访
"shop": (1727, 362), # 采购中心
"mission": (1631, 53), # 任务
"friend": (1727, 53), # 好友
"operator": (462, 408), # 干员
}
self.ctap(pos[name])
def tap_terminal_button(
self,
name: Literal[
"main",
"main_theme",
"intermezzi",
"biography",
"collection",
"regular",
"longterm",
"contract",
],
):
y = 1005
pos = {
"main": (115, y), # 首页
"main_theme": (356, y), # 主题曲
"intermezzi": (596, y), # 插曲
"biography": (836, y), # 别传
"collection": (1077, y), # 资源收集
"regular": (1317, y), # 常态事务
"longterm": (1556, y), # 长期探索
"contract": (1796, y), # 危机合约
}
self.ctap(pos[name])
def switch_shop(
self,
name: Literal[
"recommend", "originite", "bundle", "skin", "token", "furniture", "credit"
],
):
y = 165
pos = {
"recommend": (150, y), # 可露希尔推荐
"originite": (425, y), # 源石交易所
"bundle": (700, y), # 组合包
"skin": (970, y), # 时装商店
"token": (1250, y), # 凭证交易所
"furniture": (1520, y), # 家具商店
"credit": (1805, y), # 信用交易所
}
self.tap(pos[name])
def template_match(
self,
res: str,
scope: Optional[tp.Scope] = None,
method: int = cv2.TM_CCOEFF_NORMED,
) -> Tuple[float, tp.Scope]:
return config.recog.template_match(res, scope, method)
def swipe(
self,
start: tp.Coordinate,
movement: tp.Coordinate,
duration: int = 100,
interval: float = 1,
) -> None:
"""swipe"""
if config.stop_mower.is_set():
raise MowerExit
end = (start[0] + movement[0], start[1] + movement[1])
config.device.swipe(start, end, duration=duration)
self.sleep(interval)
def swipe_noinertia(
self,
start: tp.Coordinate,
movement: tp.Coordinate,
duration: int = 20,
interval: float = 0.2,
) -> None:
"""swipe with no inertia (movement should be vertical)"""
if config.stop_mower.is_set():
raise MowerExit
points = [start]
if movement[0] == 0:
dis = abs(movement[1])
points.append((start[0] + 100, start[1]))
points.append((start[0] + 100, start[1] + movement[1]))
points.append((start[0], start[1] + movement[1]))
else:
dis = abs(movement[0])
points.append((start[0], start[1] + 100))
points.append((start[0] + movement[0], start[1] + 100))
points.append((start[0] + movement[0], start[1]))
config.device.swipe_ext(
points, durations=[200, dis * duration // 100, 200], interval=interval
)
config.recog.update()
def swipe_ext(
self,
points: list[tp.Coordinate],
durations: list[int],
up_wait: int = 1000,
interval: float = 1,
):
"""复杂的拖动
拖动过程:
1. 拖动
2. 等待up_wait(防止抬手后按照惯性继续移动)
3. 抬手
4. 等待interval(回弹动画)
5. 清除截图缓存
+-lift +-recog.update()
| |
+------+---------+----------+
| drag | up_wait | interval |
+------+---------+----------+
Args:
points (list[tp.Coordinate]): 坐标列表
durations (list[int]): 每两个坐标之间拖动的时长
up_wait (int, optional): 抬手前的等待时间,单位毫秒. Defaults to 1000.
interval (float, optional): 抬手后等待回弹动画结束的时间. Defaults to 1.
Raises:
MowerExit: 停止运行
"""
if config.stop_mower.is_set():
raise MowerExit
if up_wait > 0:
points += points[-1:]
durations.append(up_wait)
config.device.swipe_ext(points, durations, False, interval)
config.recog.update()
def swipe_update(
self,
points: list[tp.Coordinate],
durations: list[int],
up_wait: int = 1000,
interval: float = 1,
func: Callable[[tp.Image], Any] = lambda _: None,
):
"""在抬手前截图的拖动
拖动过程:
1. 拖动
2. 截图,等待up_wait(防止抬手后按照惯性继续移动)
3. 抬手
4. 等待interval(回弹动画)
函数返回后,config.recog.img为抬手前的截图。需要额外调用config.recog.update()以获得回弹动画结束后的截图。
+-lift
|
+------+---------+----------+
| drag | up_wait | interval |
+------++--------+----------+
|
+-recog.update()
func(recog.img)
Args:
points (list[tp.Coordinate]): 坐标列表
durations (list[int]): 每两个坐标之间拖动的时长
up_wait (int, optional): 抬手前的等待时间,单位毫秒. Defaults to 1000.
interval (float, optional): 抬手后等待回弹动画结束的时间. Defaults to 1.
func (Callable[[tp.Image], Any], optional): 处理截图的函数. Defaults to lambda _: None.
Raises:
MowerExit: 停止运行
Returns:
Any: func的返回值
"""
if config.stop_mower.is_set():
raise MowerExit
if up_wait > 0:
points += points[-1:]
durations.append(up_wait)
return config.device.swipe_ext(points, durations, True, interval, func)
def back(self, update: bool = True) -> None:
"""send back keyevent"""
config.tap_info = None, None
config.device.back()
update and config.recog.update()
def cback(
self,
max_seconds: float = 10,
id: str | None = None,
) -> bool:
"""同一处代码多次调用cback,在max_seconds时长内最多返回一次
Args:
max_seconds: 等待网络连接建议设10,等待动画建议设5或3
Returns:
本次点击是否成功
"""
if id is None:
id = caller_info()
now = datetime.now()
lid, ltime = config.tap_info
if lid != id or (lid == id and now - ltime > timedelta(seconds=max_seconds)):
logger.debug(f"back {id}")
self.back()
config.tap_info = id, now
return True
else:
logger.debug(f"skip {id}")
config.recog.update()
return False
def scene(self) -> int:
"""get the current scene in the game"""
return config.recog.get_scene()
def solve_captcha(self, refresh=False):
th = thres2(config.recog.gray, 254)
pos = np.nonzero(th)
offset_x = pos[1].min()
offset_y = pos[0].min()
img_scope = ((offset_x, offset_y), (pos[1].max(), pos[0].max()))
img = cropimg(config.recog.img, img_scope)
h = img.shape[0]
def _t(ratio):
return int(h * ratio)
def _p(ratio_x, ratio_y):
return _t(ratio_x), _t(ratio_y)
if refresh:
logger.info("刷新验证码")
self.tap((offset_x + _t(0.189), offset_y + _t(0.916)))
self.sleep(3)
img = cropimg(config.recog.img, img_scope)
left_part = cropimg(img, (_p(0.032, 0.032), _p(0.202, 0.591)))
hsv = cv2.cvtColor(left_part, cv2.COLOR_RGB2HSV)
mask = cv2.inRange(hsv, (25, 0, 0), (35, 255, 255))
tpl_width = _t(0.148)
tpl_height = _t(0.135)
tpl_border = _t(0.0056)
tpl_padding = _t(0.018)
tpl = np.zeros((tpl_height, tpl_width), np.uint8)
tpl[:] = (255,)
tpl[
tpl_border : tpl_height - tpl_border,
tpl_border : tpl_width - tpl_border,
] = (0,)
result = cv2.matchTemplate(mask, tpl, cv2.TM_SQDIFF, None, tpl)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
x, y = min_loc
source_p = (
(x + tpl_padding, y + tpl_padding),
(x + tpl_width - tpl_padding, y + tpl_height - tpl_padding),
)
source = cropimg(left_part, source_p)
mask = cropimg(mask, source_p)
right_part = cropimg(
img,
(
(_t(0.201), _t(0.032) + source_p[0][1]),
(_t(0.94), _t(0.032) + source_p[1][1]),
),
)
for _y in range(source.shape[0]):
for _x in range(source.shape[1]):
for _c in range(source.shape[2]):
source[_y, _x, _c] = np.clip(source[_y, _x, _c] * 0.7 - 23, 0, 255)
mask = cv2.bitwise_not(mask)
result = cv2.matchTemplate(right_part, source, cv2.TM_SQDIFF_NORMED, None, mask)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
x = _t(0.201) + min_loc[0] - _t(0.032) - x - tpl_padding + _t(0.128)
x += random.choice([-4, -3, -2, 2, 3, 4])
def _rb(R, r):
return random.random() * _t(R) + _t(r)
btn_x = _rb(0.1, 0.01)
start = offset_x + btn_x + _t(0.128), offset_y + _rb(0.1, 0.01) + _t(0.711)
end = offset_x + btn_x + x, offset_y + _rb(0.1, 0.01) + _t(0.711)
p1 = end[0] + _rb(0.1, 0.02), end[1] + _rb(0.05, 0.02)
p2 = end[0] + _rb(0.1, 0.02), end[1] + _rb(0.05, 0.02)
logger.info("滑动验证码")
config.device.swipe_ext(
(start, p1, p2, end, end),
durations=[
random.randint(400, 600),
random.randint(200, 300),
random.randint(200, 300),
random.randint(200, 300),
],
)
def waiting_solver(self):
"""需要等待的页面解决方法。UNKNOWN_WITH_NAVBAR超时直接返回False,其他超时重启返回False"""
self.game_stuck_detection = False
scene = self.scene()
start_time = datetime.now()
sleep_time, wait_time = config.conf.waiting_scene[scene]
stop_time = start_time + timedelta(seconds=wait_time)
while datetime.now() < stop_time:
self.sleep(sleep_time / 1000)
if self.scene() != scene:
self.solver_update_before_transition = False
return True
if scene == Scene.UNKNOWN_WITH_NAVBAR:
logger.debug("有导航栏的未知场景超时")
self.ctap("nav_button", 3, id="open_nav")
return False
elif scene == Scene.UNKNOWN_ROGUE:
logger.debug("未知肉鸽场景超时")
self.ctap("rogue/back", 3)
return False
elif scene == Scene.DOUBLE_CONFIRM:
logger.debug("未知二次确认超时")
self.ctap("double_confirm/main", 3, x_rate=0)
return False
else:
logger.warning("相同场景等待超时")
config.device.exit()
csleep(3)
self.check_current_focus()
return False
def terminal_entry(self, res: tp.Res):
img = loadres(res, True)
kp1, des1 = keypoints_scale_invariant(img)
kp2, des2 = config.recog.matcher.kp, config.recog.matcher.des
matches = flann.knnMatch(des1, des2, k=2)
good = []
for pair in matches:
if (len_pair := len(pair)) == 2:
x, y = pair
if x.distance < GOOD_DISTANCE_LIMIT * y.distance:
good.append(x)
elif len_pair == 1:
good.append(pair[0])
good = sorted(good, key=lambda x: x.distance)
self.tap(kp2[good[0].trainIdx].pt)
def main_theme(self, act: int, episode: int):
"""从主线关卡界面进入对应的章节
Args:
act (int): Act (0-2)
episode (int): 章节 (0-14)
"""
if self.animation():
return
img = cropimg(config.recog.gray, ((300, 315), (405, 370)))
_act, _score = 0, 0
for i in range(3):
result = cv2.matchTemplate(
img, loadres(f"navigation/act/{i}", True), cv2.TM_CCOEFF_NORMED
)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val > _score:
_act, _score = i, max_val
if act > _act:
self.ctap((251, 846), 3)
return
elif act < _act:
self.ctap((230, 175), 3)
return
episode_scope = ((1150, 250), (1700, 780))
img = cropimg(config.recog.gray, episode_scope)
_episode, _score = 0, 0
for i in range(15):
result = cv2.matchTemplate(
img, loadres(f"navigation/main/{i}", True), cv2.TM_CCOEFF_NORMED
)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val > _score:
_episode, _score = i, max_val
if episode > _episode:
self.swipe_ext([(1425, 554), (932, 554)], [300], 500)
elif episode < _episode:
self.swipe_ext([(932, 554), (1425, 554)], [300], 500)
else:
self.tap(episode_scope)
def animation(
self,
scope: tp.Scope = ((0, 0), (1920, 1080)),
interval: float = 0.5,
thresh: int = 20,
ratio: float = 0.05,
) -> bool:
"""最近一段时间画面是否有变化
Args:
scope (tp.Scope, optional): 用于对比的区域. Defaults to ((0, 0), (1920, 1080)).
interval (float, optional): 画面保持不变的时长. Defaults to 0.5.
thresh (int, optional): 二值化阈值. Defaults to 20.
ratio (float, optional): 像素不同的比例. Defaults to 0.05.
Returns:
bool: 是否正在播放动画
"""
if config.recog.animation_stop:
return False
interval = timedelta(seconds=interval)
last = None
while config.animation:
if config.animation[0][1] + interval <= config.screenshot_time:
last = config.animation.pop(0)[0]
else:
break
result = True
if last is not None:
result = diff_ratio(
img1=cropimg(last, scope),
img2=cropimg(config.recog.gray, scope),
thresh=thresh,
ratio=ratio,
)
if result:
logger.debug("等待动画结束")
config.animation.append((config.recog.gray, config.screenshot_time))
else:
config.recog.animation_stop = True
config.animation = []
return result