mower-ng/mower/solvers/infra/clue/place.py
zhbaor c86e7ac651
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
场景图导航改写为solver
2025-01-30 10:14:54 +08:00

152 lines
5.7 KiB
Python

import cv2
from mower.solvers.infra.base_mixin import BaseMixin
from mower.solvers.infra.enter_room import EnterRoomSolver
from mower.utils import config
from mower.utils.image import crop2content, cropimg, loadres, thres2
from mower.utils.log import logger
from mower.utils.rapidocr import ocr_rec
from mower.utils.recognize import Scene
from mower.utils.solver import BaseSolver
from mower.utils.vector import va
from .utils import (
clue_cls,
clue_scope,
exit_pos,
is_orange,
main_dots,
main_scope,
main_time,
tl2p,
tm_thres,
)
clue_pos = ((1305, 208), (1305, 503), (1305, 797))
filter_receive = (1900, 45)
filter_self = (1610, 70)
class PlaceSolver(BaseSolver, BaseMixin):
solver_name = "放置线索"
def run(self) -> None:
self.clue_status = {}
super().run()
def detect_unlock(self):
unlock_pos = self.find("clue/button_unlock")
if unlock_pos is None:
return None
color = self.get_color(self.get_pos(unlock_pos))
if all(color > [252] * 3):
return unlock_pos
return None
def place_index(self):
for cl, st in self.clue_status.items():
if st in ["available", "self", "available_self_only"]:
return cl, st
return None, None
def transition(self) -> bool:
if (
scene := self.scene()
) == Scene.INFRA_DETAILS and self.detect_room() == "meeting":
self.tap((500, 1000))
elif scene == Scene.INFRA_CONFIDENTIAL:
if self.animation(((1410, 660), (1920, 1080))):
return
if unlock_pos := self.detect_unlock():
self.tap(unlock_pos)
return
for i in range(1, 8):
if is_orange(self.get_color(main_dots[i]).tolist()):
self.clue_status[i] = "available"
elif clue_cls(i):
hsv = config.recog.hsv
if 160 < hsv[main_time[i][1]][main_time[i][0]][0] < 180:
self.clue_status[i] = "friend"
else:
self.clue_status[i] = "self"
else:
self.clue_status[i] = None
cl, st = self.place_index()
if st in ["available", "self", "available_self_only"]:
self.ctap(main_scope[cl], 1)
return
else:
return True
elif scene == Scene.CLUE_PLACE:
cl, st = self.place_index()
if cl is None:
if unlock_pos := self.detect_unlock():
self.tap(unlock_pos)
return
else:
self.tap(exit_pos)
return True
if self.get_color((1328 + 77 * cl, 114))[0] < 150:
# 右上角 1-7
self.tap(clue_scope[cl])
return
receive = st in ["available", "self"]
filter_pos = filter_receive if receive else filter_self
if not all(self.get_color(filter_pos) > [252] * 3):
self.tap(filter_pos)
return
self.clue_list = []
for cp in clue_pos:
clue_img = cropimg(config.recog.img, tl2p(cp))
res = loadres(f"clue/{cl}")
result = cv2.matchTemplate(clue_img, res, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
if max_val > tm_thres:
name_scope = (va(cp, (274, 99)), va(cp, (580, 134)))
name_img = cropimg(config.recog.gray, name_scope)
name = ocr_rec(name_img)
time_scope = (va(cp, (45, 222)), va(cp, (168, 255)))
time_hsv = cropimg(config.recog.hsv, time_scope)
if 165 < time_hsv[0][0][0] < 175:
time_img = thres2(cropimg(config.recog.gray, time_scope), 180)
time_img = crop2content(time_img)
time = ocr_rec(time_img)
else:
time = None
self.clue_list.append(
{"name": name, "time": time, "scope": tl2p(cp)}
)
else:
break
if self.clue_list:
list_name = "接收库" if receive else "自有库"
logger.info(f"{cl}号线索{list_name}{self.clue_list}")
selected = None
for c in self.clue_list:
if c["time"]:
selected = c
break
selected = selected or self.clue_list[0]
self.tap(selected["scope"])
if self.clue_status[cl] == "available":
self.clue_status[cl] = "friend"
elif self.clue_status[cl] == "available_self_only":
self.clue_status[cl] = "self_only"
elif self.clue_status[cl] == "self":
self.clue_status[cl] = "friend"
else:
self.clue_status[cl] = None
else:
if self.clue_status[cl] == "available":
self.clue_status[cl] = "available_self_only"
elif self.clue_status[cl] == "available_self_only":
self.clue_status[cl] = None
elif self.clue_status[cl] == "self":
self.clue_status[cl] = "self_only"
else:
self.clue_status[cl] = None
elif scene in self.waiting_scene:
self.waiting_solver()
else:
EnterRoomSolver().run("meeting", detail=False)