mower-ng/mower/solvers/operation.py
Elaina 3cf37f3ea1
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
替换导航选人战斗相关ctap
2024-12-11 19:52:27 +08:00

306 lines
11 KiB
Python

from datetime import datetime, timedelta
from typing import Optional
import cv2
from mower.solvers.navigation import NavigationSolver
from mower.solvers.navigation.utils import generate_name
from mower.utils import config
from mower.utils import typealias as tp
from mower.utils.datetime import get_server_weekday
from mower.utils.email import drop_template, notify, send_message
from mower.utils.image import cropimg, diff_ratio, loadimg
from mower.utils.log import logger
from mower.utils.path import get_path
from mower.utils.recognize import Scene
from mower.utils.solver import BaseSolver
from mower.utils.vector import overlap, sa, va
drop_data = {}
for i in get_path("@install/ui/public/depot").iterdir():
if "信物" in i.stem:
continue
supplement = "罗德岛物资补给"
if i.stem.startswith(supplement) and i.stem != supplement:
continue
drop_data[i.stem] = loadimg(i, True)
drop_digits = [generate_name(str(i), font_size=28, style="dark") for i in range(10)]
class OperationSolver(BaseSolver):
"不是标准Solver容器,接入调度器需使用OperationManager"
solver_name = "代理作战"
def set_stop_time(self):
"计算刷关停止时间"
if self.scheduler_stop_time is None:
self.stop_time = None
else:
if self.operation_start_time is None:
operation_time = timedelta(minutes=3)
else:
operation_time = datetime.now() - self.operation_start_time
logger.info(f"作战时间:{operation_time}")
self.stop_time = self.scheduler_stop_time - operation_time
def check_timeout(self):
"检查是否应该提前退出"
if self.stop_time and datetime.now() > self.stop_time:
self.timeout = True
return True
return False
def run(self):
# 计算刷关时间和超时时间
self.operation_start_time = None # 最后一次刷关的开始时间
self.set_stop_time()
self.sanity_drain = False # 退出原因:理智耗尽
self.timeout = False # 退出原因:时间不够
self.drop_list: list[dict[str, int]] = [] # 掉落列表
self.drop_recog_complete = False # 本轮作战识别是否完成
return super().run()
def number(self, scope: tp.Scope, height: Optional[int] = None):
rect_limits = [{"w": 5, "h": 5, "char": ""}]
return config.recog.num.number_int(
"secret_front", scope, height, rect_limits=rect_limits
)
def drop_animation(self) -> bool:
drop_scope = (100, 775), (1920, 945)
img1 = cropimg(config.recog.gray, drop_scope)
self.sleep()
img2 = cropimg(config.recog.gray, drop_scope)
return diff_ratio(img1, img2, ratio=0.03)
def drop_recog(self):
result = []
for name, img in drop_data.items():
full = cropimg(config.recog.gray, ((95, 770), (1920, 953)))
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(
cv2.matchTemplate(full, img, cv2.TM_CCOEFF_NORMED)
)
if max_val < 0.5:
continue
h, w = img.shape
score, scope = max_val, sa(((0, 0), (w, h)), max_loc)
replace = False
for idx, (max_name, max_score, max_scope) in enumerate(result):
if overlap(max_scope, scope):
replace = True
if max_val > max_score:
result[idx] = name, score, scope
break
if not replace:
result.append((name, score, scope))
result.sort(key=lambda x: x[2][0][0])
for idx, (name, score, scope) in enumerate(result):
orange = (182, 124, 84)
color = self.get_color((scope[0][0] + 187, 955))
if all([abs(color[i] - orange[i]) < 10 for i in range(3)]):
result[idx] = "家具", score, scope
drop_result = {}
for name, score, scope in result:
target = cropimg(full, scope)
target = cropimg(target, ((0, 125), (180, 155)))
values = []
for _ in range(10):
max_score, max_pos, max_digit = 0, None, None
for n in range(10):
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(
cv2.matchTemplate(target, drop_digits[n], cv2.TM_CCOEFF_NORMED)
)
if max_val > max_score:
max_score, max_pos, max_digit = max_val, max_loc, n
if max_score < 0.75:
break
logger.debug(f"{max_score=} {max_pos=} {max_digit=}")
values.append((max_pos[0], max_digit))
h, w = drop_digits[max_digit].shape
cv2.rectangle(target, max_pos, va(max_pos, (w, h)), (0,), -1)
logger.debug(f"{name=} {values=}")
values = sorted(values)
value = 0
for v in values:
value = value * 10 + v[1]
if value != 0:
drop_result[name] = value
logger.debug(f"{drop_result=}")
self.drop_list.append(drop_result)
self.drop_recog_complete = True
def ope_start(self):
self.operation_start_time = datetime.now()
self.drop_recog_complete = False
self.ctap("ope_start", 3)
def transition(self):
if (scene := self.scene()) == Scene.OPERATOR_BEFORE:
if self.check_timeout():
return True
if config.recog.gray[65][1333] < 200:
self.sleep()
return
if config.recog.gray[907][1600] < 127:
self.tap((1776, 908))
return
if self.number(((1520, 890), (1545, 930)), 28) > 1:
self.tap((1500, 910))
self.tap((1500, 801))
return
self.ope_start()
elif scene == Scene.OPERATOR_SELECT:
self.tap((1655, 781))
elif scene == Scene.OPERATOR_FINISH:
if self.drop_animation():
self.solver_update_before_transition = False
return
self.set_stop_time()
self.tap((310, 330), update=False)
if not self.drop_recog_complete:
self.drop_recog()
elif scene == Scene.OPERATOR_FAILED:
notify("代理作战失败!", "ERROR")
return True
elif scene == Scene.OPERATOR_ONGOING:
if self.find("ope_agency_fail"):
self.tap((121, 79))
else:
self.sleep(10)
elif scene == Scene.OPERATOR_GIVEUP:
self.tap("fight/give_up")
elif scene == Scene.OPERATOR_RECOVER_POTION:
confirm = (1635, 865)
if config.conf.use_all_medicine:
if self.check_timeout():
return True
logger.info("自动使用全部理智药")
self.tap(confirm)
return
use_medicine = False
# 先看设置是否吃药
if config.conf.maa_expiring_medicine:
if config.conf.exipring_medicine_on_weekend:
use_medicine = get_server_weekday() >= 5
else:
use_medicine = True
# 再看是否有快要过期的药可吃
if use_medicine:
img = cropimg(config.recog.hsv, ((1015, 515), (1170, 560)))
img = cv2.inRange(img, (170, 0, 0), (174, 255, 255))
count = cv2.countNonZero(img)
logger.debug(f"{count=}")
use_medicine = count > 3000
if use_medicine:
if self.check_timeout():
return True
logger.info("使用即将过期的理智药")
self.tap(confirm)
return
self.sanity_drain = True
return True
elif scene == Scene.OPERATOR_RECOVER_ORIGINITE:
self.sanity_drain = True
return True
elif scene == Scene.OPERATOR_ELIMINATE:
if self.find("ope_agency_lock"):
notify("无法代理当期剿灭", "ERROR")
return True
if self.find("1800"):
logger.info("本周剿灭已完成")
return True
if pos := self.find("ope_elimi_agency"):
self.tap(pos)
return
self.ope_start()
elif scene == Scene.OPERATOR_ELIMINATE_AGENCY:
if self.find("can_not_use_ope"):
notify("当期剿灭代理获取合成玉数量为0", "WARNING")
return True
self.tap("ope_elimi_agency_confirm")
elif scene == Scene.UPGRADE:
self.tap((960, 540))
elif scene in self.waiting_scene:
self.waiting_solver()
else:
return True
class OperationManager:
solver_name = "刷理智"
scheduler_stop_time: datetime | None = None
def stage_today(self) -> list[str]:
general_important = ["", "Annihilation"]
general_unimportant = [
"1-7",
"LS-6",
"CE-6",
"AP-5",
"SK-5",
"CA-5",
"PR-A-2",
"PR-A-1",
"PR-B-2",
"PR-B-1",
"PR-C-2",
"PR-C-1",
"PR-D-2",
"PR-D-1",
]
conf = config.conf
plan_today = conf.weekly_plan[get_server_weekday()]
result = []
for stage in general_important:
if stage in plan_today.general:
result.append(stage)
result += plan_today.custom
for stage in general_unimportant:
if stage in plan_today.general:
result.append(stage)
return result
def run(self) -> bool:
stage_today = self.stage_today()
if task_finish := len(stage_today) == 0:
send_message("今天没有要刷的关卡")
return True
nav_solver = NavigationSolver()
ope_solver = OperationSolver()
drop_list = []
for name in stage_today:
if self.scheduler_stop_time - datetime.now() < timedelta(minutes=3):
break
if nav_solver.run(name):
ope_solver.scheduler_stop_time = self.scheduler_stop_time
ope_solver.run()
drop_list += ope_solver.drop_list
if ope_solver.sanity_drain:
task_finish = True
break
if ope_solver.timeout:
break
if drop_list:
drop_total = {}
for drop in drop_list:
for name, value in drop.items():
drop_total[name] = drop_total.get(name, 0) + value
body = drop_template.render(drop_list=drop_list, drop_total=drop_total)
send_message(body, "刷理智掉落")
return task_finish