Compare commits

..

2 commits

Author SHA1 Message Date
9276c3a1cb SwitchProductSolver优化
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
2024-09-28 19:50:05 +08:00
91643bb2f4 ruff鸡扒的代码 2024-09-28 19:49:23 +08:00
6 changed files with 168 additions and 66 deletions

View file

@ -20,24 +20,31 @@ class DroneSolver(SceneGraphSolver, BaseMixin):
room: str,
count: int = None,
all_in: bool = False,
cur_count: int = None,
) -> bool:
"""
Args:
room: 房间名
count:消耗无人机数量
all_in:贸易站-加速完成一笔订单制造站-消耗全部无人机消耗数量
cur_count:当前无人机数量
"""
logger.info("Start:无人机加速")
self.room = room
while cur_count is None:
try:
cur_count = DigitReader().get_drone(config.recog.gray)
except Exception as e:
raise e
finally:
EnterRoomSolver().run(self.room, detail=False)
if count is None:
count = (
DigitReader().get_drone(config.recog.gray)
- config.conf.drone_count_limit
)
count = cur_count - config.conf.drone_count_limit
self.count = count
self.all_in = all_in
self.success = False
if DigitReader().get_drone(config.recog.gray) < self.count and not self.all_in:
if cur_count < self.count and not self.all_in:
logger.info("无人机数量不足")
return False
if (
@ -45,6 +52,7 @@ class DroneSolver(SceneGraphSolver, BaseMixin):
and not self.detect_room_inside() == self.room
):
EnterRoomSolver().run(self.room, detail=False)
self.wait_start()
super().run()
return True
@ -78,7 +86,7 @@ class DroneSolver(SceneGraphSolver, BaseMixin):
return value
def timeout(self) -> bool:
return datetime.now() > self.start_time + timedelta(seconds=15)
return datetime.now() > self.start_time + timedelta(seconds=20)
def wait_start(self):
self.start_time = datetime.now()
@ -97,26 +105,26 @@ class DroneSolver(SceneGraphSolver, BaseMixin):
elif scene == Scene.DRONE_ACCELERATE:
if self.all_in:
self.tap((1450, 500), interval=0)
self.tap((1450, 900), interval=0)
self.tap((1450, 500))
self.tap((1450, 900))
self.success = True
elif self.timeout():
logger.info("加速时间超过订单剩余时间")
self.tap((1450, 900), interval=0)
self.tap((1450, 900))
self.success = True
elif (
tap_count := self.count - self.number(((240, 650), (350, 720)), 40, 150)
) == 0:
self.tap((1450, 900), interval=0)
self.tap((1450, 900))
self.success = True
elif tap_count > 0:
for _ in range(tap_count):
self.tap((1300, 500), interval=0)
self.sleep(0.3)
self.tap((1300, 500), interval=0.07)
self.sleep(0.2)
elif tap_count < 0:
for _ in range(-tap_count):
self.tap((900, 500), interval=0)
self.sleep(0.3)
self.tap((900, 500), interval=0.07)
self.sleep(0.2)
elif scene in self.waiting_scene:
self.waiting_solver()
else:

View file

@ -1,28 +1,35 @@
from datetime import datetime
import cv2
from arknights_mower.models import riic_base_digits
from arknights_mower.solvers.infra.base_mixin import BaseMixin
from arknights_mower.solvers.infra.drone import DroneSolver
from arknights_mower.solvers.infra.enter_room import EnterRoomSolver
from arknights_mower.solvers.infra.reload import ReloadSolver
from arknights_mower.utils import config
from arknights_mower.utils import typealias as tp
from arknights_mower.utils.digit_reader import DigitReader
from arknights_mower.utils.graph import SceneGraphSolver
from arknights_mower.utils.image import cropimg, thres2
from arknights_mower.utils.log import logger
from .check_current_product import CheckCurrentProductSolver
from .choose_product import ChooseProductSolver
from .get_remain_time import GetRemainTimeSolver
from .utils import product
from .wait_for_product import WaitForProductSolver
production_time = {
"经验": 180,
"赤金": 72,
"源石碎片": 60,
"先锋双芯片": 60,
"狙击双芯片": 60,
"医疗双芯片": 60,
"术师双芯片": 60,
"近卫双芯片": 60,
"重装双芯片": 60,
"辅助双芯片": 60,
"特种双芯片": 60,
"经验": 180 * 60,
"赤金": 72 * 60,
"源石碎片": 60 * 60,
"先锋双芯片": 60 * 60,
"狙击双芯片": 60 * 60,
"医疗双芯片": 60 * 60,
"术师双芯片": 60 * 60,
"近卫双芯片": 60 * 60,
"重装双芯片": 60 * 60,
"辅助双芯片": 60 * 60,
"特种双芯片": 60 * 60,
}
@ -30,41 +37,76 @@ class SwitchProductSolver(SceneGraphSolver, BaseMixin):
def run(self, room, tar_product):
logger.info(f"Start:切换产物 房间:{room} 目标产物:{tar_product}")
EnterRoomSolver().run(room, detail=False)
# 检查当前产物
cur_product = CheckCurrentProductSolver().run(room)
logger.info(f"当前产物:{cur_product}")
if cur_product == tar_product:
logger.info("当前产物已为目标产物")
return False
return True
# 读取当前无人机数量避免重复开关
cur_drone_count = DigitReader().get_drone(config.recog.gray)
# 获取无人机加速的剩余时间
remain_time = GetRemainTimeSolver().run(room)
# 使用无人机加速
drone_count = remain_time % (production_time[cur_product] * 60) // 180
max_wait_time = remain_time % (production_time[cur_product] * 60) % 180
logger.info(f"应使用无人机数量:{drone_count} 最长等待时间:{max_wait_time}")
if not DroneSolver().run(room, drone_count):
start_time = datetime.now()
drone_count = remain_time % production_time[cur_product] // 180
logger.info(f"应使用无人机数量:{drone_count}")
if not DroneSolver().run(room, drone_count, cur_count=cur_drone_count):
return False
# 加速过程中产物恰好完成
if (production_time[cur_product] * 60) // 180 - drone_count < 2:
self.tap((1750, 950))
self.sleep(0.5)
while self.detect_product_complete_double():
self.sleep()
# 等待产物完成
speed = self.read_speed()
wait_time = (
remain_time % production_time[cur_product] % 180 / speed
- (datetime.now() - start_time).total_seconds()
)
WaitForProductSolver().run(room, max_wait_time)
logger.info(f"等待时间:{wait_time}")
WaitForProductSolver().run(room, wait_time)
# 选择目标产物
if not ChooseProductSolver().run(room, tar_product):
return False
# yj的网络加载
self.sleep()
while self.find("reload_check"):
self.sleep()
ReloadSolver().run(room)
return True
def detect_product_complete_double(self) -> str:
"""
仅用于识别制造站无人机加速后收取产物
"""
for p in product:
if self.find(f"infra_complete/{p}", scope=((1440, 130), (1920, 380))):
return p
def number(self, scope: tp.Scope, height: int, thres: int) -> int:
"数字识别"
img = cropimg(config.recog.gray, scope)
default_height = 25
if height != default_height:
scale = 25 / height
img = cv2.resize(img, None, None, scale, scale)
img = thres2(img, thres)
img = cv2.bitwise_not(img)
contours, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
rect = [cv2.boundingRect(c) for c in contours]
rect.sort(key=lambda c: c[0])
value = ""
for x, y, w, h in rect:
if h < 8 and w < 8:
value += "."
continue
elif h < 20 and w < 20:
continue
digit = cropimg(img, ((x, y), (x + w, y + h)))
digit = cv2.copyMakeBorder(
digit, 10, 10, 10, 10, cv2.BORDER_CONSTANT, None, (0,)
)
score = []
for i in range(10):
im = riic_base_digits[i]
result = cv2.matchTemplate(digit, im, cv2.TM_SQDIFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
score.append(min_val)
value += str(score.index(min(score)))
return value
def read_speed(self) -> float:
speed1 = self.number(((1185, 955), (1255, 977)), 17, 120)
speed2 = self.number(((1285, 955), (1355, 977)), 17, 150)
return float(speed1) + float(speed2) + 1.0

View file

@ -1,7 +1,14 @@
from datetime import datetime, timedelta
import cv2
from arknights_mower.models import riic_base_digits
from arknights_mower.solvers.infra.enter_room import EnterRoomSolver
from arknights_mower.utils import config
from arknights_mower.utils import typealias as tp
from arknights_mower.utils.email import send_message
from arknights_mower.utils.graph import SceneGraphSolver
from arknights_mower.utils.image import cmatch, cropimg, loadres
from arknights_mower.utils.image import cmatch, cropimg, loadres, thres2
from arknights_mower.utils.log import logger
from arknights_mower.utils.recognize import Scene
@ -30,6 +37,8 @@ class ChooseProductSolver(SceneGraphSolver):
self.first_pos = first_pos[production_index[self.tar_product][0]]
self.second_pos = second_pos[production_index[self.tar_product][1]]
self.success = False
self.num_flag = False
self.wait_start()
super().run()
return self.success
@ -38,6 +47,41 @@ class ChooseProductSolver(SceneGraphSolver):
res = loadres("product_be_choosen")
return cmatch(img, res, 60)
def timeout(self) -> bool:
return datetime.now() > self.start_time + timedelta(seconds=10)
def wait_start(self):
self.start_time = datetime.now()
def number(self, scope: tp.Scope, height: int, thres: int) -> int:
"数字识别"
img = cropimg(config.recog.gray, scope)
default_height = 25
if height != default_height:
scale = 25 / height
img = cv2.resize(img, None, None, scale, scale)
img = thres2(img, thres)
contours, _ = cv2.findContours(img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE)
rect = [cv2.boundingRect(c) for c in contours]
rect.sort(key=lambda c: c[0])
value = 0
for x, y, w, h in rect:
digit = cropimg(img, ((x, y), (x + w, y + h)))
digit = cv2.copyMakeBorder(
digit, 10, 10, 10, 10, cv2.BORDER_CONSTANT, None, (0,)
)
score = []
for i in range(10):
im = riic_base_digits[i]
result = cv2.matchTemplate(digit, im, cv2.TM_SQDIFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
score.append(min_val)
value = value * 10 + score.index(min(score))
return value
def transition(self) -> bool:
if (scene := self.scene()) == Scene.INFRA_DETAILS:
self.ctap((200, 1000), 1, config.screenshot_avg / 1000)
@ -45,12 +89,22 @@ class ChooseProductSolver(SceneGraphSolver):
if self.success:
return True
elif self.find("reload_check"):
self.ctap((1400, 850), 1, config.screenshot_avg / 1000)
if self.num_flag:
self.ctap((1400, 850), 1, config.screenshot_avg / 1000)
elif self.timeout():
logger.warning("切换成功,剩余材料不足1")
send_message(f"切换成功,剩余材料不足1:{self.room}", level="WARNING")
self.num_flag = True
else:
self.tap((1450, 300))
if self.number(((1306, 462), (1391, 521)), 50, 200) != 1:
self.num_flag = True
else:
self.tap((1750, 400))
elif scene == Scene.CHOOSE_PRODUCT:
if self.find("icon_notification_black"):
logger.warning("切换产物材料不足")
send_message(f"切换产物材料不足:{self.room}", level="WARNING")
return True
elif not self.be_choosen():
self.tap(self.get_pos(self.first_pos))

View file

@ -25,7 +25,6 @@ class GetRemainTimeSolver(SceneGraphSolver, BaseMixin):
and not self.detect_room_inside() == self.room
):
EnterRoomSolver().run(self.room, detail=False)
self.success = False
super().run()
return self.res
@ -68,16 +67,13 @@ class GetRemainTimeSolver(SceneGraphSolver, BaseMixin):
if (scene := self.scene()) == Scene.INFRA_DETAILS:
self.ctap((200, 1000), 1, config.screenshot_avg / 1000)
elif scene == Scene.FACTORY_ROOMS:
if self.success:
return True
if pos := self.find("factory_accelerate"):
self.tap(pos)
elif scene == Scene.DRONE_ACCELERATE:
if res := self.read_remain_time():
self.res = res
self.success = True
self.tap((500, 850))
return True
elif scene in self.waiting_scene:
self.waiting_solver()
else:

View file

@ -8,14 +8,16 @@ from arknights_mower.utils.recognize import Scene
class WaitForProductSolver(SceneGraphSolver, BaseMixin):
def run(self, room, max_wait_time) -> None:
def run(self, room, wait_time) -> None:
self.room = room
self.max_wait_time = max_wait_time
self.wait_start()
if wait_time < 0:
wait_time = 0
self.sleep(wait_time)
super().run()
def timeout(self) -> bool:
return datetime.now() > self.start_time + timedelta(seconds=self.max_wait_time)
return datetime.now() > self.start_time + timedelta(seconds=180)
def wait_start(self):
self.start_time = datetime.now()

View file

@ -63,7 +63,7 @@ class RecruitSolver(SceneGraphSolver):
# 默认要支援机械
self.recruit_order_index = 2
self.recruit_order = [6, 5, 1, 4, 3, 2]
self.refresh=False
self.refresh = False
self.result_agent = {}
self.tags = {}
self.ticket_number = None
@ -177,25 +177,25 @@ class RecruitSolver(SceneGraphSolver):
elif scene == Scene.RECRUIT_TAGS:
self.ticket_number = self.get_ticket_number()
if self.recruit_index not in self.tags.keys() or self.refresh:
tmp_tags = self.get_recruit_tag()
if tmp_tags is False:
self.back()
return
self.tags[self.recruit_index] = tmp_tags
self.refresh=False
self.refresh = False
logger.info(
f"{self.recruit_index}号位置的tag识别结果{self.tags[self.recruit_index]}"
)
if self.recruit_index in self.agent_choose.keys():
if self.agent_choose[self.recruit_index]["level"] == 3:
if pos := self.find("recruit/refresh"):
self.tap(pos)
del self.tags[self.recruit_index]
del self.agent_choose[self.recruit_index]
self.refresh=True
self.refresh = True
return
choose = self.agent_choose[self.recruit_index]["tags"]