Merge remote-tracking branch 'origin/main' into ff0000

This commit is contained in:
Elaina 2024-08-19 18:52:04 +08:00
commit 8b909931e0
12 changed files with 289 additions and 242 deletions

BIN
arknights_mower/resources/infra_overview_top_right.png (Stored with Git LFS) Normal file

Binary file not shown.

BIN
arknights_mower/resources/room_detail.png (Stored with Git LFS)

Binary file not shown.

View file

@ -6,10 +6,10 @@ import cv2
import numpy as np
from arknights_mower import __rootdir__
from arknights_mower.utils import config, rapidocr, segment
from arknights_mower.utils import config, rapidocr
from arknights_mower.utils.character_recognize import operator_room_select
from arknights_mower.utils.csleep import MowerExit
from arknights_mower.utils.image import cropimg, loadres, thres2
from arknights_mower.utils.image import cropimg, thres2
from arknights_mower.utils.log import logger
with lzma.open(f"{__rootdir__}/models/operator_room.model", "rb") as f:
@ -172,88 +172,6 @@ class BaseMixin:
if err_cnt > 3:
raise Exception("筛选确认失败")
def detect_room_number(self, img) -> int:
score = []
for i in range(1, 5):
digit = loadres(f"room/{i}")
result = cv2.matchTemplate(img, digit, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
score.append(max_val)
return score.index(max(score)) + 1
def detect_room(self) -> str:
color_map = {
"制造站": 25,
"贸易站": 99,
"发电站": 36,
"训练室": 178,
"加工站": 32,
}
img = cropimg(config.recog.img, ((568, 18), (957, 95)))
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
colored_room = None
for room, color in color_map.items():
mask = cv2.inRange(hsv, (color - 1, 0, 0), (color + 2, 255, 255))
if cv2.countNonZero(mask) > 1000:
colored_room = room
break
if colored_room in ["制造站", "贸易站", "发电站"]:
digit_1 = cropimg(img, ((211, 24), (232, 54)))
digit_2 = cropimg(img, ((253, 24), (274, 54)))
digit_1 = self.detect_room_number(digit_1)
digit_2 = self.detect_room_number(digit_2)
logger.debug(f"{colored_room}B{digit_1}0{digit_2}")
return f"room_{digit_1}_{digit_2}"
elif colored_room == "训练室":
logger.debug("训练室B305")
return "train"
elif colored_room == "加工站":
logger.debug("加工站B105")
return "factory"
white_room = ["central", "dormitory", "meeting", "contact"]
score = []
for room in white_room:
tpl = loadres(f"room/{room}")
result = cv2.matchTemplate(img, tpl, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
score.append(max_val)
room = white_room[score.index(max(score))]
if room == "central":
logger.debug("控制中枢")
elif room == "dormitory":
digit = cropimg(img, ((174, 24), (195, 54)))
digit = self.detect_room_number(digit)
if digit == 4:
logger.debug("宿舍B401")
else:
logger.debug(f"宿舍B{digit}04")
return f"dormitory_{digit}"
elif room == "meeting":
logger.debug("会客室1F02")
else:
logger.debug("办公室B205")
return room
def enter_room(self, room):
"""从基建首页进入房间"""
for enter_times in range(3):
for retry_times in range(10):
if pos := self.find("control_central"):
_room = segment.base(config.recog.img, pos)[room]
for i in range(4):
_room[i, 0] = max(_room[i, 0], 0)
_room[i, 0] = min(_room[i, 0], config.recog.w)
_room[i, 1] = max(_room[i, 1], 0)
_room[i, 1] = min(_room[i, 1], config.recog.h)
self.tap(_room)
elif self.detect_room() == room:
return
else:
self.sleep()
if not pos:
self.back_to_infrastructure()
raise Exception("未成功进入房间")
def double_read_time(self, cord, upperLimit=None, use_digit_reader=False):
config.recog.update()
time_in_seconds = self.read_time(cord, upperLimit, use_digit_reader)

View file

@ -15,6 +15,7 @@ from arknights_mower.solvers.credit import CreditSolver
from arknights_mower.solvers.credit_fight import CreditFight
from arknights_mower.solvers.cultivate_depot import cultivate as cultivateDepotSolver
from arknights_mower.solvers.depotREC import depotREC as DepotSolver
from arknights_mower.solvers.enter_room import EnterRoomSolver
from arknights_mower.solvers.mail import MailSolver
from arknights_mower.solvers.mission import MissionSolver
from arknights_mower.solvers.navigation import NavigationSolver
@ -168,6 +169,9 @@ class BaseSchedulerSolver(SceneGraphSolver, BaseMixin):
self.last_room = ""
logger.info("重设上次房间为空")
def enter_room(self, room):
EnterRoomSolver().run(room)
def overtake_room(self):
candidates = self.task.meta_data.split(",")
if len(candidates) == 0:
@ -2592,26 +2596,7 @@ class BaseSchedulerSolver(SceneGraphSolver, BaseMixin):
self.op_data.operators[_operator].time_stamp = None
def turn_on_room_detail(self, room):
for enter_times in range(3):
for retry_times in range(10):
if pos := self.find("room_detail"):
if all(self.get_color((1233, 1)) > [252] * 3):
return
logger.info("等待动画")
self.sleep(interval=0.5)
elif pos := self.find("arrange_check_in"):
self.tap(pos, interval=0.7)
else:
self.sleep()
for back_time in range(3):
if pos := self.find("control_central"):
break
self.back()
if not pos:
self.back_to_infrastructure()
self.enter_room(room)
self.reset_room_time(room)
raise Exception("未成功进入房间")
EnterRoomSolver().run(room)
def get_agent_from_room(self, room, read_time_index=None):
if read_time_index is None:

View file

@ -0,0 +1,220 @@
from datetime import datetime, timedelta
import cv2
from arknights_mower.utils import config
from arknights_mower.utils import typealias as tp
from arknights_mower.utils.graph import SceneGraphSolver
from arknights_mower.utils.image import cropimg, loadres
from arknights_mower.utils.log import logger
from arknights_mower.utils.recognize import Scene
from arknights_mower.utils.vector import sm, va
facility = {
"central": (
(-0.6551724137931034, -0.26436781609195403),
(2.3524904214559386, 1.1340996168582376),
),
"dormitory_1": (
(-0.6615384615384616, 0.9692307692307693),
(1.7538461538461538, 0.5115384615384616),
),
"dormitory_2": (
(-0.046153846153846156, 1.580769230769231),
(1.7461538461538462, 0.5115384615384616),
),
"dormitory_3": (
(-0.6615384615384616, 2.1884615384615387),
(1.7538461538461538, 0.5192307692307693),
),
"dormitory_4": (
(-0.046153846153846156, 2.7961538461538464),
(1.7461538461538462, 0.5230769230769231),
),
"factory": (
(2.689655172413793, 0.9540229885057471),
(1.1379310344827585, 0.5172413793103448),
),
"meeting": (
(2.0804597701149423, 0.3486590038314176),
(1.747126436781609, 0.5172413793103448),
),
"contact": (
(2.689655172413793, 1.5632183908045976),
(1.1379310344827585, 0.5172413793103448),
),
"room_1_1": (
(-4.626923076923077, 0.9692307692307693),
(1.126923076923077, 0.5115384615384616),
),
"room_1_2": (
(-3.403846153846154, 0.9692307692307693),
(1.123076923076923, 0.5115384615384616),
),
"room_1_3": (
(-2.184615384615385, 0.9692307692307693),
(1.1307692307692307, 0.5115384615384616),
),
"room_2_1": (
(-5.2384615384615385, 1.580769230769231),
(1.1346153846153846, 0.5115384615384616),
),
"room_2_2": (
(-4.015384615384615, 1.580769230769231),
(1.1307692307692307, 0.5115384615384616),
),
"room_2_3": (
(-2.7923076923076926, 1.580769230769231),
(1.126923076923077, 0.5115384615384616),
),
"room_3_1": (
(-4.6230769230769235, 2.1884615384615387),
(1.123076923076923, 0.5192307692307693),
),
"room_3_2": (
(-3.4000000000000004, 2.1884615384615387),
(1.123076923076923, 0.5192307692307693),
),
"room_3_3": (
(-2.180769230769231, 2.1884615384615387),
(1.126923076923077, 0.5192307692307693),
),
"train": (
(2.689655172413793, 2.18007662835249),
(1.1379310344827585, 0.5172413793103448),
),
}
class EnterRoomSolver(SceneGraphSolver):
def run(self, room: str, detail: bool = True):
"""
Args:
room: 房间名
detail: 打开进驻信息
"""
logger.info(f"进入房间:{room}")
self.room = room
self.detail = detail
self.wait_start()
super().run()
def timeout(self) -> bool:
return datetime.now() > self.start_time + timedelta(seconds=5)
def wait_start(self):
self.start_time = datetime.now()
def detect_room_number(self, img) -> int:
score = []
for i in range(1, 5):
digit = loadres(f"room/{i}")
result = cv2.matchTemplate(img, digit, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
score.append(max_val)
return score.index(max(score)) + 1
def detect_room(self) -> str:
color_map = {
"制造站": 25,
"贸易站": 99,
"发电站": 36,
"训练室": 178,
"加工站": 32,
}
img = cropimg(config.recog.img, ((568, 18), (957, 95)))
hsv = cv2.cvtColor(img, cv2.COLOR_RGB2HSV)
colored_room = None
for room, color in color_map.items():
mask = cv2.inRange(hsv, (color - 1, 0, 0), (color + 2, 255, 255))
if cv2.countNonZero(mask) > 1000:
colored_room = room
break
if colored_room in ["制造站", "贸易站", "发电站"]:
digit_1 = cropimg(img, ((211, 24), (232, 54)))
digit_2 = cropimg(img, ((253, 24), (274, 54)))
digit_1 = self.detect_room_number(digit_1)
digit_2 = self.detect_room_number(digit_2)
logger.debug(f"{colored_room}B{digit_1}0{digit_2}")
return f"room_{digit_1}_{digit_2}"
elif colored_room == "训练室":
logger.debug("训练室B305")
return "train"
elif colored_room == "加工站":
logger.debug("加工站B105")
return "factory"
white_room = ["central", "dormitory", "meeting", "contact"]
score = []
for room in white_room:
tpl = loadres(f"room/{room}")
result = cv2.matchTemplate(img, tpl, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
score.append(max_val)
room = white_room[score.index(max(score))]
if room == "central":
logger.debug("控制中枢")
elif room == "dormitory":
digit = cropimg(img, ((174, 24), (195, 54)))
digit = self.detect_room_number(digit)
if digit == 4:
logger.debug("宿舍B401")
else:
logger.debug(f"宿舍B{digit}04")
return f"dormitory_{digit}"
elif room == "meeting":
logger.debug("会客室1F02")
else:
logger.debug("办公室B205")
return room
@staticmethod
def segment(central: tp.Scope) -> dict[str, tp.Rectangle]:
top_left = central[0]
width = central[1][0] - central[0][0]
result = {}
for name, (position, size) in facility.items():
facility_top_left = va(top_left, sm(width, position))
scope = facility_top_left, va(facility_top_left, sm(width, size))
result[name] = (
(round(max(scope[0][0], 0)), round(scope[0][1])),
(round(min(scope[1][0], 1920)), round(scope[1][1])),
)
logger.debug(result)
return result
def transition(self) -> bool:
if (scene := self.scene()) == Scene.INFRA_MAIN:
if pos := self.find("control_central"):
pos = self.segment(self.find("control_central"))[self.room]
self.wait_start()
self.ctap(pos, max_seconds=1, interval=config.screenshot_avg / 1000)
else:
config.recog.update()
elif scene in [Scene.CTRLCENTER_ASSISTANT, Scene.INFRA_DETAILS]:
if self.detect_room() == self.room:
if not self.detail:
return True
if self.find("room_detail"):
return True
if pos := self.find("arrange_check_in"):
if self.find("infra_overview_top_right"):
self.wait_start()
self.tap(pos, interval=config.screenshot_avg / 1000)
else:
config.recog.update()
elif self.timeout():
self.scene_graph_navigation(Scene.INFRA_MAIN)
else:
config.recog.update()
elif self.timeout():
self.scene_graph_navigation(Scene.INFRA_MAIN)
else:
config.recog.update()
elif scene == Scene.UNKNOWN_WITH_NAVBAR:
config.recog.update()
elif scene in self.waiting_scene:
self.waiting_solver()
else:
self.scene_graph_navigation(Scene.INFRA_MAIN)

View file

@ -149,5 +149,4 @@ def cmatch(
result = diff <= thresh
msg += f"{diff=} {'<=' if result else '>'} {thresh=}"
logger.debug(msg)
return result

View file

@ -3,8 +3,10 @@ import shutil
import sys
import time
import traceback
from logging.handlers import TimedRotatingFileHandler
from datetime import datetime, timedelta
from logging.handlers import QueueHandler, QueueListener, TimedRotatingFileHandler
from pathlib import Path
from queue import Queue
from threading import Thread
import colorlog
@ -34,16 +36,16 @@ class PackagePathFilter(logging.Filter):
filter = PackagePathFilter()
logger = logging.getLogger(__name__)
logger.setLevel("DEBUG")
logger.setLevel(logging.DEBUG)
# d(ebug)hlr: 终端输出
dhlr = logging.StreamHandler(stream=sys.stdout)
dhlr.setFormatter(color_formatter)
dhlr.setLevel("DEBUG")
dhlr.setLevel(logging.DEBUG)
dhlr.addFilter(filter)
logger.addHandler(dhlr)
# f(ile)hlr: 文件记录
folder = Path(get_path("@app/log"))
folder.mkdir(exist_ok=True, parents=True)
fhlr = TimedRotatingFileHandler(
@ -52,7 +54,6 @@ fhlr = TimedRotatingFileHandler(
fhlr.setFormatter(basic_formatter)
fhlr.setLevel("DEBUG")
fhlr.addFilter(filter)
logger.addHandler(fhlr)
class Handler(logging.StreamHandler):
@ -63,34 +64,51 @@ class Handler(logging.StreamHandler):
config.log_queue.put(msg)
# w(ebsocket)hlr: WebSocket
whlr = Handler()
whlr.setLevel(logging.INFO)
logger.addHandler(whlr)
log_queue = Queue()
queue_handler = QueueHandler(log_queue)
logger.addHandler(queue_handler)
listener = QueueListener(log_queue, dhlr, fhlr, whlr, respect_handler_level=True)
listener.start()
screenshot_folder = get_path("@app/screenshot")
screenshot_folder.mkdir(exist_ok=True, parents=True)
screenshot_queue = Queue()
cleanup_time = datetime.now()
def screenshot_cleanup():
logger.info("清理过期截图")
start_time_ns = time.time_ns() - config.conf.screenshot * 3600 * 10**9
for i in screenshot_folder.iterdir():
if i.is_dir():
shutil.rmtree(i)
elif not i.stem.isnumeric():
i.unlink()
elif int(i.stem) < start_time_ns:
i.unlink()
global cleanup_time
cleanup_time = datetime.now()
def screenshot_worker():
screenshot_cleanup()
while True:
logger.info("清理过期截图")
start_time_ns = time.time_ns() - config.conf.screenshot * 3600 * 10**9
for i in screenshot_folder.iterdir():
if i.is_dir():
shutil.rmtree(i)
elif not i.stem.isnumeric():
i.unlink()
elif int(i.stem) < start_time_ns:
i.unlink()
time.sleep(60 * 60)
now = datetime.now()
if now - cleanup_time > timedelta(hours=1):
screenshot_cleanup()
img, filename = screenshot_queue.get()
with screenshot_folder.joinpath(filename).open("wb") as f:
f.write(img)
Thread(target=screenshot_cleanup, daemon=True).start()
Thread(target=screenshot_worker, daemon=True).start()
def save_screenshot(img: bytes) -> None:
filename = f"{time.time_ns()}.jpg"
with screenshot_folder.joinpath(filename).open("wb") as f:
f.write(img)
logger.debug(filename)
screenshot_queue.put((img, filename))

View file

@ -619,7 +619,6 @@ class Recognizer:
:return ret: 若匹配成功则返回元素在游戏界面中出现的位置否则返回 None
"""
logger.debug(res)
color = {
"1800": (158, 958),
@ -651,6 +650,7 @@ class Recognizer:
"hypergryph": (0, 961),
"infra_overview": (54, 135),
"infra_overview_in": (64, 705),
"infra_overview_top_right": (1820, 0),
"infra_todo": (13, 1013),
"loading2": (620, 247),
"loading7": (106, 635),
@ -693,6 +693,7 @@ class Recognizer:
"riic/exp": (1385, 239),
"riic/manufacture": (1328, 126),
"riic/report_title": (1712, 25),
"room_detail": (1291, 33),
"spent_credit": (332, 264),
"shop_cart": (1252, 842),
"shop_credit_2": (1657, 135),
@ -722,8 +723,8 @@ class Recognizer:
gray = cropimg(self.gray, scope)
res_img = cv2.cvtColor(res_img, cv2.COLOR_RGB2GRAY)
ssim = structural_similarity(gray, res_img)
logger.debug(f"{ssim=:.2f}")
if ssim >= 0.9:
logger.debug(f"cmatch+SSIM: {res=} {scope=}")
return scope
return None
@ -829,8 +830,8 @@ class Recognizer:
threshold = template_matching_score[res]
pos = template_matching[res]
res = loadres(res, True)
h, w = res.shape
res_img = loadres(res, True)
h, w = res_img.shape
if isinstance(pos[0], tuple):
scope = pos
@ -838,12 +839,13 @@ class Recognizer:
scope = pos, va(pos, (w, h))
img = cropimg(self.gray, scope)
result = cv2.matchTemplate(img, res, cv2.TM_CCOEFF_NORMED)
result = cv2.matchTemplate(img, res_img, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
top_left = va(max_loc, scope[0])
logger.debug(f"{top_left=} {max_val=}")
if max_val >= threshold:
return top_left, va(top_left, (w, h))
scope = top_left, va(top_left, (w, h))
logger.debug(f"template matching: {res=} {scope=}")
return scope
return None
dpi_aware = res in [
@ -870,6 +872,7 @@ class Recognizer:
scope = ((550, 900), (800, 1080))
threshold = 0.45
logger.debug(f"feature matching: {res=}")
res_img = loadres(res, True)
if thres is not None:
# 对图像二值化处理

View file

@ -1,105 +0,0 @@
import cv2
import numpy as np
from arknights_mower.utils import typealias as tp
from arknights_mower.utils.log import logger
from arknights_mower.utils.recognize import RecognizeError
class FloodCheckFailed(Exception):
pass
def get_poly(x1: int, x2: int, y1: int, y2: int) -> tp.Rectangle:
x1, x2 = int(x1), int(x2)
y1, y2 = int(y1), int(y2)
return np.array([[x1, y1], [x1, y2], [x2, y2], [x2, y1]])
def base(
img: tp.Image, central: tp.Scope, draw: bool = False
) -> dict[str, tp.Rectangle]:
"""
基建布局的图像分割算法
"""
try:
ret = {}
x1, y1 = central[0]
x2, y2 = central[1]
alpha = (y2 - y1) / 160
x1 -= 170 * alpha
x2 += 182 * alpha
y1 -= 67 * alpha
y2 += 67 * alpha
central = get_poly(x1, x2, y1, y2)
ret["central"] = central
for i in range(1, 5):
y1 = y2 + 25 * alpha
y2 = y1 + 134 * alpha
if i & 1:
dormitory = get_poly(x1, x2 - 158 * alpha, y1, y2)
else:
dormitory = get_poly(x1 + 158 * alpha, x2, y1, y2)
ret[f"dormitory_{i}"] = dormitory
x1, y1 = ret["dormitory_1"][0]
x2, y2 = ret["dormitory_1"][2]
x1 = x2 + 419 * alpha
x2 = x1 + 297 * alpha
factory = get_poly(x1, x2, y1, y2)
ret["factory"] = factory
y2 = y1 - 25 * alpha
y1 = y2 - 134 * alpha
meeting = get_poly(x1 - 158 * alpha, x2, y1, y2)
ret["meeting"] = meeting
y1 = y2 + 25 * alpha
y2 = y1 + 134 * alpha
y1 = y2 + 25 * alpha
y2 = y1 + 134 * alpha
contact = get_poly(x1, x2, y1, y2)
ret["contact"] = contact
y1 = y2 + 25 * alpha
y2 = y1 + 134 * alpha
train = get_poly(x1, x2, y1, y2)
ret["train"] = train
for floor in range(1, 4):
x1, y1 = ret[f"dormitory_{floor}"][0]
x2, y2 = ret[f"dormitory_{floor}"][2]
x2 = x1 - 102 * alpha
x1 = x2 - 295 * alpha
if floor & 1 == 0:
x2 = x1 - 24 * alpha
x1 = x2 - 295 * alpha
room = get_poly(x1, x2, y1, y2)
ret[f"room_{floor}_3"] = room
x2 = x1 - 24 * alpha
x1 = x2 - 295 * alpha
room = get_poly(x1, x2, y1, y2)
ret[f"room_{floor}_2"] = room
x2 = x1 - 24 * alpha
x1 = x2 - 295 * alpha
room = get_poly(x1, x2, y1, y2)
ret[f"room_{floor}_1"] = room
if draw:
polys = list(ret.values())
cv2.polylines(img, polys, True, (255, 0, 0), 10, cv2.LINE_AA)
from matplotlib import pyplot as plt
plt.imshow(img)
plt.show()
logger.debug({key: value.tolist() for key, value in ret.items()})
return ret
except Exception as e:
logger.exception(e)
raise RecognizeError(e)

View file

@ -166,16 +166,16 @@ class BaseSolver:
if interval > 0:
self.sleep(interval)
def ctap(self, pos: tp.Location, max_seconds: int = 10):
def ctap(self, pos: tp.Location, max_seconds: int = 10, interval: float = 1):
id = caller_info()
logger.debug(id)
now = datetime.now()
lid, ltime = self.tap_info
if lid != id or (lid == id and now - ltime > timedelta(seconds=max_seconds)):
self.tap_info = id, now
self.tap(pos)
self.tap(pos, interval=interval)
else:
self.sleep()
self.sleep(interval)
def check_current_focus(self):
config.recog.check_current_focus()

View file

@ -132,6 +132,7 @@ Res = Literal[
"ope_select_start_empty",
"next_step",
"notice",
"infra_overview_top_right",
"agent_name/char_009_12fce",
"agent_name/char_010_chen",
"agent_name/char_017_huang",

View file

@ -14,3 +14,8 @@ def vs(a: tp.Coordinate, b: tp.Coordinate) -> tp.Coordinate:
def sa(scope: tp.Scope, vector: tp.Coordinate) -> tp.Scope:
"""区域偏移scope add"""
return va(scope[0], vector), va(scope[1], vector)
def sm(a: float, v: tp.Coordinate) -> tp.Coordinate:
"""数乘向量scalar multiply"""
return a * v[0], a * v[1]