132 lines
4.6 KiB
Python
132 lines
4.6 KiB
Python
from datetime import datetime, timedelta
|
|
|
|
import cv2
|
|
|
|
from mower.models import Digtal
|
|
from mower.solvers.infra.base_mixin import BaseMixin
|
|
from mower.solvers.infra.enter_room import EnterRoomSolver
|
|
from mower.utils import config
|
|
from mower.utils import typealias as tp
|
|
from mower.utils.digit_reader import DigitReader
|
|
from mower.utils.graph import SceneGraphSolver
|
|
from mower.utils.image import cropimg, thres2
|
|
from mower.utils.log import logger
|
|
from mower.utils.recognize import Scene
|
|
|
|
|
|
class DroneSolver(SceneGraphSolver, BaseMixin):
|
|
def run(
|
|
self,
|
|
room: str,
|
|
count: int = None,
|
|
all_in: bool = False,
|
|
cur_count: int = None,
|
|
) -> bool:
|
|
"""
|
|
Args:
|
|
room: 房间名
|
|
count:消耗无人机数量
|
|
all_in:贸易站-加速完成一笔订单,制造站-消耗全部无人机消耗数量
|
|
cur_count:当前无人机数量
|
|
"""
|
|
logger.info("Start:无人机加速")
|
|
self.room = room
|
|
while cur_count is None and not all_in:
|
|
try:
|
|
cur_count = DigitReader().get_drone(config.recog.gray)
|
|
except Exception:
|
|
EnterRoomSolver().run(self.room, detail=False)
|
|
|
|
if count is None and not all_in:
|
|
count = cur_count - config.conf.drone_count_limit
|
|
if count <= 0:
|
|
return True
|
|
|
|
self.count = count
|
|
self.all_in = all_in
|
|
self.success = False
|
|
if cur_count < self.count and not self.all_in:
|
|
logger.info("无人机数量不足")
|
|
return False
|
|
if (
|
|
self.scene() in [Scene.ORDER_LIST, Scene.FACTORY_ROOMS]
|
|
and not self.detect_room_inside() == self.room
|
|
):
|
|
EnterRoomSolver().run(self.room, detail=False)
|
|
self.wait_start()
|
|
super().run()
|
|
return True
|
|
|
|
def number(self, scope: tp.Scope, height: int, thres: int) -> int:
|
|
"数字识别"
|
|
img = cropimg(config.recog.gray, scope)
|
|
default_height = 25
|
|
if height != default_height:
|
|
scale = 25 / height
|
|
img = cv2.resize(img, None, None, scale, scale)
|
|
img = thres2(img, thres)
|
|
contours, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
|
|
rect = [cv2.boundingRect(c) for c in contours]
|
|
rect.sort(key=lambda c: c[0])
|
|
|
|
value = 0
|
|
|
|
for x, y, w, h in rect:
|
|
digit = cropimg(img, ((x, y), (x + w, y + h)))
|
|
digit = cv2.copyMakeBorder(
|
|
digit, 10, 10, 10, 10, cv2.BORDER_CONSTANT, None, (0,)
|
|
)
|
|
score = []
|
|
for i in range(10):
|
|
im = Digtal().riic_base_digits[i]
|
|
result = cv2.matchTemplate(digit, im, cv2.TM_SQDIFF_NORMED)
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
|
score.append(min_val)
|
|
value = value * 10 + score.index(min(score))
|
|
|
|
return value
|
|
|
|
def timeout(self) -> bool:
|
|
return datetime.now() > self.start_time + timedelta(seconds=20)
|
|
|
|
def wait_start(self):
|
|
self.start_time = datetime.now()
|
|
|
|
def transition(self) -> bool:
|
|
if (scene := self.scene()) == Scene.INFRA_DETAILS:
|
|
self.ctap((200, 1000), 1, config.screenshot_avg / 1000)
|
|
elif scene in [Scene.ORDER_LIST, Scene.FACTORY_ROOMS]:
|
|
self.wait_start()
|
|
if self.success:
|
|
return True
|
|
elif pos := self.find("factory_accelerate"):
|
|
self.tap(pos)
|
|
elif pos := self.find("bill_accelerate"):
|
|
self.tap(pos)
|
|
|
|
elif scene == Scene.DRONE_ACCELERATE:
|
|
if self.all_in:
|
|
self.tap((1450, 500))
|
|
self.tap((1450, 900))
|
|
self.success = True
|
|
elif self.timeout():
|
|
logger.info("加速时间超过订单剩余时间")
|
|
self.tap((1450, 900))
|
|
self.success = True
|
|
elif (
|
|
tap_count := self.count - self.number(((240, 650), (350, 720)), 40, 150)
|
|
) == 0:
|
|
self.tap((1450, 900))
|
|
self.success = True
|
|
elif tap_count > 0:
|
|
for _ in range(tap_count):
|
|
self.tap((1300, 500), interval=0.07)
|
|
self.sleep(0.2)
|
|
elif tap_count < 0:
|
|
for _ in range(-tap_count):
|
|
self.tap((900, 500), interval=0.07)
|
|
self.sleep(0.2)
|
|
elif scene in self.waiting_scene:
|
|
self.waiting_solver()
|
|
else:
|
|
EnterRoomSolver().run(self.room, detail=False)
|