All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
324 lines
11 KiB
Python
324 lines
11 KiB
Python
"""
|
|
Copyright (c) 2025 zhbaor <zhbaor@zhaozuohong.vip>
|
|
|
|
This file is part of mower-ng (https://git.zhaozuohong.vip/mower-ng/mower-ng).
|
|
|
|
Mower-ng is free software: you may copy, redistribute and/or modify it
|
|
under the terms of the GNU General Public License as published by the
|
|
Free Software Foundation, version 3 or later.
|
|
|
|
This file is distributed in the hope that it will be useful, but
|
|
WITHOUT ANY WARRANTY; without even the implied warranty of
|
|
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
|
|
General Public License for more details.
|
|
|
|
You should have received a copy of the GNU General Public License
|
|
along with this program. If not, see <http://www.gnu.org/licenses/>.
|
|
"""
|
|
|
|
from copy import copy
|
|
from datetime import timedelta
|
|
from functools import cache
|
|
from itertools import combinations
|
|
|
|
import cv2
|
|
|
|
from mower.data import recruit_agent
|
|
from mower.static import recruit, recruit_result_knn
|
|
from mower.utils import config
|
|
from mower.utils import typealias as tp
|
|
from mower.utils.image import cmatch, cropimg, thres2
|
|
from mower.utils.log import logger
|
|
from mower.utils.path import get_path
|
|
from mower.utils.scene import Scene
|
|
from mower.utils.solver import BaseSolver
|
|
from mower.utils.vector import sa, va
|
|
|
|
|
|
def calculate(tags: set[str]) -> tuple[int, tuple[str], list[str]]:
|
|
"""根据词条计算9:00时长下的公招结果
|
|
|
|
Args:
|
|
tags (set[str]): 词条
|
|
|
|
Returns:
|
|
tuple[int, tuple[str], list[str]]: 结果星级、词条组合、结果
|
|
"""
|
|
star_ops = {i: [] for i in range(1, 7)} # 指定稀有度的干员
|
|
op_tags: dict[str, list[str]] = {} # 干员的标签
|
|
op_star: dict[str, int] = {} # 干员的稀有度
|
|
for _, data in recruit_agent.items():
|
|
name = data["name"]
|
|
star = data["stars"]
|
|
star_ops[star].append(name)
|
|
op_tags[name] = data["tags"]
|
|
op_star[name] = star
|
|
operators = star_ops[5] + star_ops[4] + star_ops[3]
|
|
if "高级资深干员" in tags:
|
|
operators += star_ops[6]
|
|
operators.sort(key=lambda op: op_star[op], reverse=True)
|
|
|
|
tags -= {"新手", "支援机械"}
|
|
tag_ops: dict[str, set[str]] = {} # 标签对应的干员
|
|
for t in tags:
|
|
tag_ops[t] = set()
|
|
for op in operators:
|
|
if t in op_tags[op]:
|
|
tag_ops[t].add(op)
|
|
|
|
best_comb = ()
|
|
best_range = op_star[operators[-1]], op_star[operators[0]]
|
|
for n in range(1, 4):
|
|
for comb in combinations(tag_ops.keys(), n):
|
|
result = copy(tag_ops[comb[0]])
|
|
for c in comb[1:]:
|
|
result &= tag_ops[c]
|
|
if not result:
|
|
continue
|
|
result = list(result)
|
|
result.sort(key=lambda op: op_star[op], reverse=True)
|
|
lower, upper = op_star[result[-1]], op_star[result[0]]
|
|
logger.debug(f"({lower}, {upper}) {comb} {result}")
|
|
if (lower, upper) > best_range:
|
|
best_range, best_comb, operators = (lower, upper), comb, result
|
|
logger.debug(f"best: {best_range}, {best_comb}, {operators}")
|
|
return best_range[0], best_comb, operators
|
|
|
|
|
|
class RecruitSolver(BaseSolver):
|
|
solver_name = "公开招募"
|
|
solver_max_duration = timedelta(minutes=3)
|
|
|
|
def run(self):
|
|
self.index_known: bool = False
|
|
self.slot_index: int = 0
|
|
self.info = {}
|
|
result = super().run()
|
|
|
|
if result:
|
|
logger.debug(self.info)
|
|
|
|
return result
|
|
|
|
@property
|
|
def slot_info(self):
|
|
return self.info.get(self.slot_index)
|
|
|
|
@slot_info.setter
|
|
def slot_info(self, value):
|
|
self.info[self.slot_index] = value
|
|
|
|
def tags(self) -> dict[str, tp.Scope]:
|
|
"""识别词条
|
|
|
|
Returns:
|
|
dict[str, tp.Scope]: 词条及对应坐标区域
|
|
"""
|
|
top_left = 557, 534
|
|
scope = top_left, (1286, 725)
|
|
img = cropimg(config.recog.gray, scope)
|
|
|
|
results = []
|
|
|
|
def match_tag(tpl: tp.GrayImage):
|
|
h, w = tpl.shape
|
|
result = cv2.matchTemplate(img, tpl, cv2.TM_CCOEFF_NORMED)
|
|
min_val, max_val, min_loc, max_loc = cv2.minMaxLoc(result)
|
|
scope = sa(((0, 0), (w, h)), va(top_left, max_loc))
|
|
results.append([max_val, name, scope])
|
|
|
|
for name, tpl in recruit().items():
|
|
tpl = cv2.cvtColor(tpl, cv2.COLOR_RGB2GRAY)
|
|
tpl = thres2(tpl, 127)
|
|
match_tag(tpl)
|
|
if "资深干员" in name:
|
|
match_tag(cv2.bitwise_not(tpl))
|
|
|
|
results.sort(reverse=True)
|
|
logger.debug(results)
|
|
results = [i[1:] for i in results[:5]]
|
|
results.sort(key=lambda r: (r[1][0][1], r[1][0][0]))
|
|
results = dict(results)
|
|
logger.debug(results)
|
|
return results
|
|
|
|
def tag_chosen(self, scope: tp.Scope) -> bool:
|
|
"""识别指定区域的词条是否被选中
|
|
|
|
Args:
|
|
scope (tp.Scope): 词条所在的坐标区域
|
|
|
|
Returns:
|
|
bool: 是否被选中
|
|
"""
|
|
img1 = cropimg(config.recog.img, scope)
|
|
h, w, _ = img1.shape
|
|
img2 = img1.copy()
|
|
BLUE = 0, 151, 219
|
|
YELLOW = 255, 216, 2
|
|
for color in [BLUE, YELLOW]:
|
|
cv2.rectangle(img2, (0, 0), (w, h), color, -1)
|
|
if cmatch(img1, img2, thresh=150):
|
|
return True
|
|
return False
|
|
|
|
def ticket_count(self) -> int:
|
|
"""识别公招券的数量
|
|
|
|
Returns:
|
|
int: 剩余公招券的数量
|
|
"""
|
|
begin = self.find("recruit/ticket")[1][0]
|
|
end = self.find("recruit/stone")[0][0]
|
|
scope = (begin, 43), (end, 76)
|
|
ticket_count = config.recog.num.number(font="riic_base", scope=scope, thres=200)
|
|
ticket_count = int(ticket_count)
|
|
logger.debug(ticket_count)
|
|
return ticket_count
|
|
|
|
def number(self, scope: tp.Scope) -> int:
|
|
"""识别招募时间处指定区域内的数字
|
|
|
|
Args:
|
|
scope (tp.Scope): 数字所在区域
|
|
|
|
Returns:
|
|
int: 识别结果
|
|
"""
|
|
img = cropimg(config.recog.gray, scope)
|
|
img = thres2(img, 127)
|
|
img = cv2.bitwise_not(img)
|
|
img = cv2.resize(img, None, None, 0.35, 0.35)
|
|
number = int(config.recog.num.number(font="noto", img=img))
|
|
return number
|
|
|
|
def read_time(self) -> timedelta:
|
|
"""识别招募时间
|
|
|
|
Returns:
|
|
timedelta: 招募时间
|
|
"""
|
|
hour_scope = (676, 290), (740, 385)
|
|
minute_scope = (855, 290), (920, 385)
|
|
hour = self.number(hour_scope)
|
|
minute = self.number(minute_scope) * 10
|
|
duration = timedelta(hours=hour, minutes=minute)
|
|
logger.debug(duration)
|
|
return duration
|
|
|
|
def change_time(self) -> bool:
|
|
"""调整招募时长
|
|
|
|
Returns:
|
|
bool: 招募时长是否改变
|
|
"""
|
|
target_time = timedelta(hours=9)
|
|
tap_m = 923
|
|
tap_h = 676
|
|
tap_inc = 226
|
|
tap_dec = 441
|
|
|
|
delta = target_time - self.read_time()
|
|
delta_m = (delta % timedelta(hours=1)) // timedelta(minutes=10)
|
|
delta_h = delta // timedelta(hours=1)
|
|
if delta_m != 0:
|
|
x = tap_m
|
|
y = tap_inc if ((delta_m > 0) ^ (abs(delta_m) > 3)) else tap_dec
|
|
elif delta_h != 0:
|
|
x = tap_h
|
|
y = tap_inc if ((delta_h > 0) ^ (abs(delta_h) > 5)) else tap_dec
|
|
else:
|
|
return False
|
|
self.ctap((x, y), 2)
|
|
return True
|
|
|
|
@property
|
|
@cache
|
|
def mobilenet(self):
|
|
net_name = "mobilenet_v3_small_feature_extractor.onnx"
|
|
net_path = get_path(f"@install/mower/static/{net_name}")
|
|
return cv2.dnn.readNetFromONNX(str(net_path))
|
|
|
|
def recruit_result(self) -> str:
|
|
for profession, knn_classifier in recruit_result_knn().items():
|
|
if config.recog.find(f"recruit/profession/{profession}"):
|
|
img = cropimg(config.recog.img, ((800, 100), (1300, 600)))
|
|
blob = cv2.dnn.blobFromImage(
|
|
img,
|
|
scalefactor=1 / 255,
|
|
size=(224, 224),
|
|
mean=(0, 0, 0),
|
|
swapRB=False,
|
|
crop=False,
|
|
)
|
|
self.mobilenet.setInput(blob)
|
|
features = self.mobilenet.forward()
|
|
result = str(knn_classifier.predict(features)[0])
|
|
logger.debug(result)
|
|
return result
|
|
|
|
def transition(self):
|
|
if len(self.info) == 4:
|
|
return True
|
|
if (scene := self.scene()) == Scene.RECRUIT_TAGS:
|
|
if not (self.index_known and self.slot_info is None):
|
|
self.scene_graph_step(Scene.RECRUIT_MAIN)
|
|
return
|
|
if self.animation():
|
|
return
|
|
tags = self.tags()
|
|
|
|
if ("支援机械" in tags) and config.conf.recruit_robot:
|
|
logger.info("保留支援机械")
|
|
self.slot_info = None
|
|
self.slot_index += 1
|
|
self.scene_graph_navigation(Scene.RECRUIT_MAIN)
|
|
return
|
|
|
|
star, comb, result = calculate(set(tags))
|
|
|
|
if star < 4:
|
|
if pos := self.find("recruit/refresh"):
|
|
self.ctap(pos, 3)
|
|
return
|
|
ticket_count = self.ticket_count()
|
|
if ticket_count <= config.conf.recruitment_permit:
|
|
self.slot_info = None
|
|
self.slot_index += 1
|
|
self.scene_graph_step(Scene.RECRUIT_MAIN)
|
|
return
|
|
|
|
if self.change_time():
|
|
return
|
|
|
|
for tag, scope in tags.items():
|
|
if self.tag_chosen(scope) != (tag in comb):
|
|
self.ctap(scope, 3, tag)
|
|
return
|
|
self.ctap("recruit/confirm", 3)
|
|
elif scene == Scene.RECRUIT_MAIN:
|
|
if self.animation():
|
|
return
|
|
x = 27, 974
|
|
y = 274, 691
|
|
scope = (0, 0), (916, 373)
|
|
slots = [(x[0], y[0]), (x[1], y[0]), (x[0], y[1]), (x[1], y[1])]
|
|
slots = [sa(scope, slot) for slot in slots]
|
|
slot = slots[self.slot_index]
|
|
if self.find("recruit/recruit_lock", scope=slot):
|
|
self.slot_info = None
|
|
self.slot_index += 1
|
|
return
|
|
if self.find("recruit/job_requirements", scope=slot):
|
|
self.slot_info = None
|
|
self.slot_index += 1
|
|
return
|
|
if pos := self.find("recruit/recruit_done", scope=slot):
|
|
self.tap(pos)
|
|
return
|
|
if self.ctap(self.find("recruit/begin_recruit", scope=slot), 3):
|
|
self.index_known = True
|
|
elif scene == Scene.REFRESH_TAGS:
|
|
self.tap("double_confirm/main", x_rate=1)
|
|
else:
|
|
self.scene_graph_step(Scene.RECRUIT_MAIN)
|