mower-ng/mower/solvers/fight/mixin.py
Elaina 7ec0694151
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
fix:第一段滑动参数写错
2024-12-07 15:22:13 +08:00

543 lines
19 KiB
Python

from copy import deepcopy
from datetime import datetime, timedelta
import cv2
import numpy as np
from scipy.signal import argrelmax
from mower.data import agent_list
from mower.utils import config
from mower.utils import typealias as tp
from mower.utils.character_recognize import match_avatar
from mower.utils.generate_image import generate_image
from mower.utils.image import cropimg, loadres, thres2
from mower.utils.log import logger
from mower.utils.scene import Scene
from mower.utils.vector import sa, va
name_score = {"维娜·维多利亚": 0.6}
class FightMixin:
def change_group(self):
"替换干员组名"
for group in self.groups:
flag = True
for op in group["opers"]:
name = op["name"]
self.watching[name] = deepcopy(op)
if name in self.operators and flag:
self.group_to_name[group["name"]] = name
flag = False
logger.debug(f"{group['name']}替换为{name}")
def update_watching(self):
"更新干员的技能用法"
for op in self.opers:
name = op["name"]
self.watching[name] = deepcopy(op)
def reset(self):
"重置"
self.strategies = deepcopy(self.strategies_copy) # 策略列表
self.actions = deepcopy(self.actions_copy) # 行动列表
self.speed = 1 # 速度
self.loading = True # 是否初始化加载
self.playing = True # 暂停/继续
self.clear_op()
self.location = {} # 对应位置部署的干员
self.post_delay = 0 # 行动后延迟
self.watching = {} # 核心干员的技能用法
self.group_to_name = {} # 干员组名替换的干员名
self.action_time = datetime.now() # 行动结束时间,用于计算pre_delay
self.Bullet_Time = False # 是否选中干员进入1/5速度
def fight_init(self):
"战斗开始时重新初始化"
self.pause()
self.check_operators()
self.update_operators()
self.change_group()
self.update_watching()
self.pre_cost = self.cost()
self.play()
self.loading = False
@property
def action(self):
"下一步行动"
if len(self.actions) > 0:
return self.actions[0]
return None
def complete_action(self):
"完成当前行动"
self.actions.pop(0)
self.action_time = datetime.now()
self.pre_cost = self.cost()
if self.post_delay > 0:
self.sleep_post_delay()
def sleep_post_delay(self):
self.sleep(self.post_delay)
self.post_delay = 0
def number(self, scope: tp.Scope, height: int, thres: int) -> int:
"数字识别"
return config.recog.num.number_int("secret_front", scope, height, thres)
def kills(self, scope) -> int:
"获取击杀数"
img = cropimg(config.recog.gray, scope)
img = thres2(img, 127)
sep = loadres("fight/kills_separator", True)
result = cv2.matchTemplate(img, sep, cv2.TM_SQDIFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
x = min_loc[0] + scope[0][0] - 1
kills = self.number((scope[0], (x, scope[1][1])), 28, 127)
logger.debug(kills)
return kills
def cost(self) -> int:
"获取部署费用"
cost = self.number(((1800, 745), (1920, 805)), 52, 200)
logger.debug(cost)
return cost
def check_condition(self) -> bool:
if "kills" in self.action and self.action["kills"] > self.kills():
return False
if "costs" in self.action and self.action["costs"] > self.cost():
return False
if "cooling" in self.action and self.action["cooling"] > self.CD():
return False
if (
"cost_changes" in self.action
and self.action["cost_changes"] + self.pre_cost > self.cost()
):
return False
if "pre_delay" in self.action and (
datetime.now() - self.action_time
< timedelta(seconds=self.action["pre_delay"] / 1000)
):
return False
if "post_delay" in self.action:
self.post_delay = self.action["post_delay"] / 1000
if "name" in self.action and self.action["name"] in self.group_to_name:
self.action["name"] = self.group_to_name[self.action["name"]]
return True
def play(self):
logger.info("继续")
while (
self.scene() == Scene.OPERATOR_FIGHT
and self.find("fight/pause")
and not self.find("sss/start_directly")
):
self.tap((1800, 80), interval=0.5)
def pause(self):
logger.info("暂停")
while (
self.scene() == Scene.OPERATOR_FIGHT
and not self.find("fight/pause")
and not self.find("sss/start_directly")
):
self.tap((1800, 80), interval=0.5)
def toggle_speed(self):
target = 1 if self.speed == 2 else 2
logger.info(f"切换至{target}倍速")
self.tap((1650, 80))
self.speed = target
self.complete_action()
def measure_time(func):
def wrapper(self, *args, **kwargs):
start = datetime.now()
result = func(self, *args, **kwargs)
logger.debug(
f"{func.__name__}耗时:{(datetime.now() - start).total_seconds() * 1000} ms"
)
return result
return wrapper
def avatar_recog_pause_decorator(func):
def wrapper(self, *args, **kwargs):
if config.conf.avatar_recog_pause:
self.pause()
try:
result = func(self, *args, **kwargs)
finally:
if config.conf.avatar_recog_pause:
self.play()
return result
return wrapper
@measure_time
def update_operators_cost_pos(self):
y = 887
img = cropimg(config.recog.gray, ((0, y), (1920, 905)))
threshold = 0.7
c = loadres("fight/c", True)
mask = loadres("fight/c_mask", True)
result = cv2.matchTemplate(img, c, cv2.TM_CCOEFF_NORMED, None, mask)[0]
op = []
for i in argrelmax(result, order=50)[0]:
if result[i] > threshold:
bar_scope = sa(((-20, 187), (10, 190)), (i, y))
img = cropimg(config.recog.hsv, bar_scope)
img = cv2.inRange(img, (1, 0, 0), (3, 255, 255))
count = cv2.countNonZero(img)
if count < 50:
op.append(i)
return op
@avatar_recog_pause_decorator
@measure_time
def update_operators(self):
"识别可部署的干员"
try:
y = 887
segment = []
cost = []
self.operators = {}
self.tool_men = {}
for x in self.op:
scope = sa(((-84, 58), (56, 153)), (x, y))
segment.append(scope)
cost_scope = sa(((-13, 19), (30, 44)), (x, y))
cost.append(self.number(cost_scope, 25, 80))
operators = match_avatar(config.recog.gray, segment)
self.operators_num = len(operators)
for idx, (name, scope) in enumerate(operators):
self.operators[name] = {"scope": scope, "cost": cost[idx]}
if name not in self.watching and name in agent_list:
profession = agent_list[name]["profession"]
if (
profession not in self.tool_men
or cost[idx] < self.operators[self.tool_men[profession]]["cost"]
):
self.tool_men[profession] = name
logger.debug(f"{self.operators=} {self.tool_men=}")
except Exception as e:
logger.exception(e)
return
def check_operators(self):
self.op = self.update_operators_cost_pos()
logger.debug(f"{len(self.op)=} {self.operators_num=}")
if len(self.op) == self.operators_num:
return True
return False
def clear_op(self):
self.operators = {}
self.op = []
self.operators_num = 0
def select(self, x: int, y: int):
"选中干员"
pos = self.calc.get_character_screen_pos(x, y, False, False)
pos = int(pos.x), int(pos.y)
self.tap(pos)
def withdraw(self):
"撤下干员"
if "location" in self.action:
x, y = self.action["location"]
else:
for loc, name in self.location.items():
if name == self.action["name"]:
x, y = loc
break
if not self.Bullet_Time:
self.select(x, y)
self.sleep(0.5)
else:
self.Bullet_Time = False
pos = self.calc.get_with_draw_screen_pos(x, y)
pos = int(pos.x), int(pos.y)
self.tap(pos)
self.sleep(0.5)
self.clear_op()
self.complete_action()
def use_skill(self, x, y):
"开技能"
start_time = datetime.now()
if not self.Bullet_Time:
self.select(x, y)
if not self.check_operators(): # 利用这个空档期更新可部署干员
self.update_operators()
elapsed_time = datetime.now() - start_time
remaining_time = max(timedelta(seconds=0.4) - elapsed_time, timedelta(0))
self.sleep(remaining_time.total_seconds())
else:
self.Bullet_Time = False
pos = self.calc.get_skill_screen_pos(x, y)
pos = int(pos.x), int(pos.y)
self.tap(pos)
self.sleep(0.4)
if self.find("fight/attack") or self.find("fight/defend"):
self.tap((20, 20))
return False
return True
def action_skill(self):
"使用action中指定位置的技能"
if "location" in self.action:
x, y = self.action["location"]
else:
for loc, name in self.location.items():
if name == self.action["name"]:
x, y = loc
break
if self.use_skill(x, y):
self.complete_action()
def change_skill_usage(self):
"修改干员技能用法"
skill_usage = self.action["skill_usage"]
self.watching[self.action["name"]]["skill_usage"] = skill_usage
if skill_usage == 2:
self.watching[self.action["name"]]["skill_times"] = self.action[
"skill_times"
]
self.complete_action()
def bullet_time(self):
"选中干员进入1/5速度"
logger.info(f"{self.action["name"]}")
if "name" in self.action and ((name := self.action["name"]) in self.operators):
self.tap(self.operators[name]["scope"])
self.Bullet_Time = True
self.complete_action()
return
if "location" in self.action:
x, y = self.action["location"]
else:
for loc, name in self.location.items():
if name == self.action["name"]:
x, y = loc
break
self.select(x, y)
self.Bullet_Time = True
self.complete_action()
@measure_time
def check_name(self, name: str, scope: tp.Scope) -> bool:
"检查部署干员名字是否正确"
res = generate_image(name, 33)
img = cropimg(config.recog.gray, scope)
result = cv2.matchTemplate(img, res, cv2.TM_CCOEFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.debug(f"{name=} {max_val=}")
score = name_score.get(name, 0.7)
return max_val > score
def drag_success(self):
"判断第一段拖动是否成功"
try:
img = thres2(config.recog.gray, 190)
contours, _ = cv2.findContours(
img, cv2.RETR_EXTERNAL, cv2.CHAIN_APPROX_SIMPLE
)
rect = [cv2.boundingRect(c) for c in contours]
left = min((x for x, y, w, h in rect if w > 10 and h > 10), default=None)
right = max(
(x + w for x, y, w, h in rect if w > 10 and h > 10), default=None
)
top = min((y for x, y, w, h in rect if w > 10 and h > 10), default=None)
bottom = max(
(y + h for x, y, w, h in rect if w > 10 and h > 10), default=None
)
if any(v is None for v in [left, right, top, bottom]):
return False
width = right - left
height = bottom - top
logger.debug(f"{width=} {height=}")
return width < 950 and height < 850
except Exception as e:
logger.exception(e)
return False
def CD(self) -> int:
"获取进入CD的干员数量"
y = 887
img = cropimg(config.recog.gray, ((0, y), (1920, 905)))
threshold = 0.7
c = loadres("fight/c", True)
mask = loadres("fight/c_mask", True)
result = cv2.matchTemplate(img, c, cv2.TM_CCOEFF_NORMED, None, mask)[0]
op = []
for i in argrelmax(result, order=50)[0]:
if result[i] > threshold:
op.append(i)
number = 0
for x in op:
# 看最下方条的颜色判断是否正在转CD
bar_scope = sa(((-20, 187), (10, 190)), (x, y))
img = cropimg(config.recog.hsv, bar_scope)
img = cv2.inRange(img, (1, 0, 0), (3, 255, 255))
count = cv2.countNonZero(img)
logger.debug(count)
if count > 50:
number += 1
return number
def skill_ready(self, x: int, y: int) -> bool:
"""指定坐标的干员技能是否可以开启
Args:
x: 横坐标
y: 纵坐标
"""
try:
skill_ready = loadres("fight/skill_ready", True)
pos = self.calc.get_character_screen_pos(x, y, False, False)
pos = int(pos.x), int(pos.y)
img = cropimg(config.recog.gray, sa(((-25, -168), (25, -118)), pos))
mask = np.ones_like(skill_ready, dtype=np.float32)
mask[8:17, :] = 0.3
result = cv2.matchTemplate(
img, skill_ready, cv2.TM_CCOEFF_NORMED, mask=mask
)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.debug(f"{max_val=} {max_loc=} {x=} {y=}")
return max_val >= 0.65
except Exception as e:
logger.exception(e)
return False
def skill_stop(self, x: int, y: int) -> bool:
"""指定坐标的干员技能是否可以关闭
Args:
x: 横坐标
y: 纵坐标
"""
try:
skill_stop = loadres("fight/skill_stop")
h, w, _ = skill_stop.shape
pos = self.calc.get_character_screen_pos(x, y, False, False)
pos = int(pos.x), int(pos.y)
img = cropimg(config.recog.img, sa(((-15, -168), (15, -138)), pos))
result = cv2.matchTemplate(img, skill_stop, cv2.TM_SQDIFF_NORMED)
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
logger.debug(f"{min_val=} {min_loc=}")
return min_val <= 0.2
except Exception as e:
logger.exception(e)
return False
def deploy(self, name: str, x: int, y: int, direction: str):
"策略的干员部署"
if self.Bullet_Time:
self.Bullet_Time = False
if name not in self.operators:
if not self.check_operators():
self.update_operators()
return False
if self.cost() < self.operators[name]["cost"]:
return False
start = self.get_pos(self.operators[name]["scope"])
pos = self.calc.get_character_screen_pos(x, y, True, False)
pos = int(pos.x), int(pos.y + 20)
logger.info(f"在({x}, {y})部署{name},方向为{direction}")
if direction == "Left":
dir = (-400, 0)
elif direction == "Right":
dir = (400, 0)
elif direction == "Up":
dir = (0, -400)
else:
dir = (0, 400)
dir = va(pos, dir)
self.swipe_ext([start, pos], [config.conf.first_swipe_duration], 100, 0.05)
if direction == "None":
self.clear_op()
self.location[(x, y)] = name
return True
if not self.check_name(name):
self.ctap(va(pos, (-300, -300)), 0.5, config.screenshot_avg / 1000)
self.clear_op()
return False
for _ in range(5):
if not (flag := self.drag_success()):
self.sleep(0.1)
else:
break
if flag:
self.swipe_ext([pos, dir], [config.conf.second_swipe_duration], 0, 0.4)
if self.drag_success():
self.ctap(va(pos, (-300, -300)), 0.5, config.screenshot_avg / 1000)
self.clear_op()
return False
self.clear_op()
self.location[(x, y)] = name
return True
else:
self.ctap(va(pos, (-300, -300)), 0.5, config.screenshot_avg / 1000)
self.clear_op()
return False
def travel_watching_skills(self):
"检查每个干员技能并释放"
for (x, y), name in self.location.items():
if name in self.watching and (
(self.watching[name]["skill_usage"]) == 0
or (
self.watching[name]["skill_usage"] == 2
and self.watching[name]["skill_times"] <= 0
)
):
continue
if self.skill_ready(x, y):
if self.use_skill(x, y):
if (
name in self.watching
and self.watching[name]["skill_usage"] == 2
):
self.watching[name]["skill_times"] -= 1
logger.info(
f"{name}技能剩余次数:{self.watching[name]['skill_times']}"
)
def execute_action(self):
if self.action is None:
return
if not self.check_condition():
return
logger.debug(self.action)
if self.action["type"] == "SpeedUp":
self.toggle_speed()
elif self.action["type"] == "Deploy":
name = self.action["name"]
x, y = self.action["location"]
direction = self.action["direction"]
if self.deploy(name, x, y, direction):
self.complete_action()
elif self.action["type"] == "Retreat":
self.withdraw()
elif self.action["type"] == "Skill":
self.action_skill()
elif self.action["type"] == "SkillUsage":
self.change_skill_usage()
elif self.action["type"] == "BulletTime":
self.bullet_time()
elif self.action["type"] == "调配干员":
if not self.draw_as_possible and self.find("sss/add_agent"):
self.tap((1805, 684), interval=0.05)
self.complete_action()
else:
self.complete_action()