mower-ng/server.py
zhbaor 7989350824
All checks were successful
ci/woodpecker/push/check_format Pipeline was successful
每12小时关闭模拟器或退出游戏
2025-01-29 22:18:37 +08:00

791 lines
22 KiB
Python
Executable file

#!/usr/bin/env python3
"""
Copyright (c) 2023 fuyn101 <fuynshile@outlook.com>
Copyright (c) 2024 Elaina <2901432375@qq.com>
Copyright (c) 2023 EightyDollars <eightydollars@163.com>
Copyright (c) 2024 zhbaor <zhbaor@zhaozuohong.vip>
This file is part of mower-ng (https://git.zhaozuohong.vip/mower-ng/mower-ng).
Mower-ng is free software: you may copy, redistribute and/or modify it
under the terms of the GNU General Public License as published by the
Free Software Foundation, version 3 or later.
This file is distributed in the hope that it will be useful, but
WITHOUT ANY WARRANTY; without even the implied warranty of
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the GNU
General Public License for more details.
You should have received a copy of the GNU General Public License
along with this program. If not, see <http://www.gnu.org/licenses/>.
This file incorporates work covered by the following copyright and
permission notice:
Copyright (c) 2023 Ks-luow <1048879349@qq.com>
Copyright (c) 2024 MuelNova <muel@nova.gal>
Copyright (c) 2023 Peace2F <https://github.com/Peace2F>
Copyright (c) 2023 Shawnsdaddy <wu2xx@dukes.jmu.edu>
Permission is hereby granted, free of charge, to any person obtaining a copy
of this software and associated documentation files (the "Software"), to deal
in the Software without restriction, including without limitation the rights
to use, copy, modify, merge, publish, distribute, sublicense, and/or sell
copies of the Software, and to permit persons to whom the Software is
furnished to do so, subject to the following conditions:
The above copyright notice and this permission notice shall be included in all
copies or substantial portions of the Software.
THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL THE
AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING FROM,
OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER DEALINGS IN THE
SOFTWARE.
"""
import json
import mimetypes
import os
import sys
from datetime import date, datetime, timedelta
from functools import wraps
from io import BytesIO
from pathlib import Path
from threading import Thread
from flask import Flask, abort, request, send_file, send_from_directory
from flask_cors import CORS
from flask_sock import Sock
from tzlocal import get_localzone
from werkzeug.exceptions import NotFound
from werkzeug.security import safe_join
from mower.utils import config
from mower.utils.log import folder as log_folder
from mower.utils.log import logger, screenshot_folder
from mower.utils.path import get_path
mimetypes.add_type("text/html", ".html")
mimetypes.add_type("text/css", ".css")
mimetypes.add_type("application/javascript", ".js")
os.environ["OPENCV_OPENCL_RUNTIME"] = "disabled"
app = Flask(__name__)
app.config["SOCK_SERVER_OPTIONS"] = {"ping_interval": 5}
sock = Sock(app)
CORS(app)
mower_thread = None
@sock.route("/ws")
def log(ws):
config.ws_connections.append(ws)
logger.debug(f"WebSocket客户端建立连接,共{len(config.ws_connections)}条连接")
ws.send(json.dumps({"type": "log", "data": "\n".join(config.log_lines)}))
if config.screenshot_notification:
ws.send(json.dumps(config.screenshot_notification))
try:
while True:
ws.receive()
except Exception:
config.ws_connections.remove(ws)
logger.debug(f"WebSocket客户端断开连接,共{len(config.ws_connections)}条连接")
def post_require_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if (
request.method == "POST"
and hasattr(app, "token")
and request.headers.get("token", "") != app.token
):
abort(403)
return f(*args, **kwargs)
return decorated_function
@app.route("/update/depot_skland", methods=["POST"])
def depot_skland():
from mower.solvers.cultivate_depot import cultivate
logger.info("手动启动森空岛读取仓库")
a = cultivate()
a.start()
return "OK"
@app.route("/update/depot_opencv", methods=["POST"])
def depot_opencv():
from mower.solvers.depotREC import depotREC
logger.info("手动启动图像识别读取仓库")
a = depotREC()
a.run()
return "OK"
def get_require_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if (
request.method == "GET"
and hasattr(app, "token")
and request.headers.get("token", "") != app.token
):
abort(403)
return f(*args, **kwargs)
return decorated_function
@app.route("/")
def serve_index():
return send_from_directory("ui/dist", "index.html")
@app.route("/<path:path>")
def serve_static(path):
try:
return send_from_directory("ui/dist", path)
except NotFound:
pass
try:
return send_from_directory("ui/public", path)
except NotFound:
pass
return send_from_directory("ui/dist", "index.html")
@app.route("/conf", methods=["GET", "POST"])
@get_require_token
@post_require_token
def load_config():
if request.method == "GET":
return config.conf.model_dump()
else:
if config.timestamp < request.json["time"]:
config.timestamp = request.json["time"]
config.conf = config.Conf(**request.json["data"])
config.save_conf(update_timestamp=False)
return "New config saved!"
return "Old config ignored!"
@app.route("/plan", methods=["GET", "POST"])
@post_require_token
def load_plan_from_json():
if request.method == "GET":
return config.plan.model_dump(exclude_none=True)
else:
config.plan = config.PlanModel(**request.json)
config.save_plan()
return "New plan saved。"
@app.route("/operator")
def operator_list():
from mower.data import agent_list
return list(agent_list.keys())
@app.route("/shop")
def shop_list():
from mower.data import shop_items
return list(shop_items.keys())
@app.route("/activity")
def activity():
from mower.solvers.navigation.activity import ActivityNavigation
location = ActivityNavigation.location["normal"]
if isinstance(location, dict):
return list(location.keys())
elif isinstance(location, list):
return location
else:
return []
@app.route("/depot/readdepot")
def read_depot():
from mower.solvers.depot_reader import DepotManager
a = DepotManager()
return a.读取仓库()
@app.route("/running")
def running():
return "true" if mower_thread and mower_thread.is_alive() else "false"
def run_in_thread():
global mower_thread
from mower.utils.config import 受监控的基建任务
if mower_thread and mower_thread.is_alive():
return
tmp_dir = get_path("@app/tmp")
tmp_dir.mkdir(exist_ok=True)
config.stop_mower.clear()
config.base_scheduler = None
config.global_plan = {}
config.tasks = 受监控的基建任务()
config.task_count = 0
from mower.utils.scheduler import scheduler
scheduler.priority_queue = []
scheduler.time_queue = []
scheduler.register_one_time(
solver="mower.solvers.base_manager.BaseManager",
time=datetime.now(),
)
scheduler.register_daily(
solver="mower.solvers.sign_in.SignInManager",
offset=timedelta(hours=4),
priority=15,
)
scheduler.register_daily(
solver="mower.solvers.credit.CreditSolver",
offset=timedelta(hours=4),
priority=16,
)
scheduler.register_daily(
solver="mower.solvers.infra.report.ReportSolver",
offset=timedelta(hours=4),
priority=17,
)
scheduler.register_daily(
solver="mower.solvers.infra.switch_assistants.SwitchAssistantsSolver",
offset=timedelta(hours=4),
priority=18,
)
scheduler.register_daily(
solver="mower.solvers.infra.switch_activity_users.SwitchActivityUsersSolver",
offset=timedelta(hours=4),
priority=19,
)
scheduler.register_daily(
solver="mower.solvers.skland.SKLand",
offset=timedelta(hours=3),
priority=20,
)
scheduler.register_daily(
solver="mower.solvers.mail.MailSolver",
offset=timedelta(hours=4),
priority=21,
)
scheduler.register_daily(
solver="mower.solvers.fight.credit_fight.CreditFight",
offset=timedelta(hours=4),
priority=22,
)
scheduler.register_daily(
solver="mower.solvers.trade_token.TradeTokenSolver",
offset=timedelta(hours=4),
priority=23,
)
scheduler.register_periodic(
solver="mower.solvers.infra.clue.ClueManager",
interval=timedelta(hours=1),
priority=24,
)
scheduler.register_periodic(
solver="mower.solvers.recruit.RecruitSolver",
interval=timedelta(hours=config.conf.recruit_gap),
priority=25,
)
scheduler.register_periodic(
solver="mower.solvers.fight.copy_works.CopyWorksSolver",
interval=timedelta(hours=config.conf.maa_gap),
priority=26,
)
scheduler.register_periodic(
solver="mower.solvers.operation.OperationManager",
interval=timedelta(hours=config.conf.maa_gap),
priority=27,
)
scheduler.register_periodic(
solver="mower.solvers.depotREC.depotREC",
interval=timedelta(hours=config.conf.maa_gap),
priority=28,
)
scheduler.register_periodic(
solver="mower.solvers.mission.MissionSolver",
interval=timedelta(hours=config.conf.maa_gap),
priority=29,
)
scheduler.register_periodic(
solver="mower.solvers.exit.ExitSolver",
interval=timedelta(hours=12),
priority=40,
)
scheduler.register_long(
solver="mower.solvers.sss.SSSSolver",
priority=50,
)
scheduler.register_long(
solver="mower.solvers.rogue.RogueSolver",
priority=51,
)
mower_thread = Thread(target=scheduler.schedule, daemon=True)
mower_thread.start()
config.idle = False
@app.route("/start")
@get_require_token
def start():
run_in_thread()
return "OK"
@app.route("/stop")
@get_require_token
def stop():
global mower_thread
if mower_thread is None:
return "true"
config.stop_mower.set()
mower_thread.join(10)
if mower_thread.is_alive():
logger.error("mower-ng线程仍在运行")
return "false"
logger.info("成功停止mower-ng线程")
mower_thread = None
config.idle = True
if config.device:
config.device.idle_solver(strategy=False)
return "true"
@app.route("/stop-maa")
@get_require_token
def stop_maa():
global mower_thread
if mower_thread is None:
return "true"
config.stop_maa.set()
return "OK"
def conn_send(text):
if not config.webview_process.is_alive():
return ""
config.parent_conn.send(text)
return config.parent_conn.recv()
@app.route("/dialog/file")
@get_require_token
def open_file_dialog():
return conn_send("file")
@app.route("/dialog/folder")
@get_require_token
def open_folder_dialog():
return conn_send("folder")
@app.route("/import", methods=["POST"])
@post_require_token
def import_from_image():
img = request.files["img"]
if img.mimetype == "application/json":
data = json.load(img)
else:
try:
from PIL import Image
from mower.utils import qrcode
img = Image.open(img)
data = qrcode.decode(img)
except Exception as e:
msg = f"排班表导入失败:{e}"
logger.exception(msg)
return msg
if data:
config.plan = config.PlanModel(**data)
config.save_plan()
return "排班已加载"
else:
return "排班表导入失败!"
@app.route("/sss-copilot", methods=["GET", "POST"])
@post_require_token
def upload_sss_copilot():
copilot = get_path("@app/sss.json")
if request.method == "GET":
if copilot.is_file():
with copilot.open("r", encoding="utf-8") as f:
data = json.load(f)
else:
return {"exists": False}
else:
data = request.files["copilot"]
data = json.load(data)
with copilot.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return {
"exists": True,
"title": data["doc"]["title"],
"details": data["doc"]["details"],
"operators": data["opers"],
}
@app.route("/dialog/save/img", methods=["POST"])
@post_require_token
def save_file_dialog():
img = request.files["img"]
from PIL import Image
from mower.utils import qrcode
upper = Image.open(img)
img = qrcode.export(
config.plan.model_dump(exclude_none=True), upper, config.conf.theme
)
buffer = BytesIO()
img.save(buffer, format="JPEG")
buffer.seek(0)
return send_file(buffer, "image/jpeg")
@app.route("/export-json")
def export_json():
return send_file(config.plan_path)
@app.route("/auto-get-path")
def auto_get_path():
from mower.utils.device.emulator.auto_get_path import auto_get_path
auto_get_path()
config.save_conf()
return "OK"
@app.route("/get-adb-serial")
def get_adb_serial():
from mower.utils.device.emulator.auto_get_path import get_adb_serial
try:
get_adb_serial()
config.save_conf()
return "OK"
except Exception:
return "ERROR"
@app.route("/check-maa")
@get_require_token
def get_maa_adb_version():
try:
asst_path = os.path.dirname(Path(config.conf.maa_path) / "Python" / "asst")
if asst_path not in sys.path:
sys.path.append(asst_path)
from asst.asst import Asst
Asst.load(config.conf.maa_path)
asst = Asst()
version = asst.get_version()
asst.set_instance_option(2, config.conf.maa_touch_option)
if asst.connect(config.conf.maa_adb_path, config.conf.adb):
maa_msg = f"Maa {version} 加载成功"
else:
maa_msg = "连接失败,请检查Maa日志!"
except Exception as e:
maa_msg = "Maa加载失败:" + str(e)
logger.exception(maa_msg)
return maa_msg
@app.route("/maa-conn-preset")
@get_require_token
def get_maa_conn_presets():
try:
with open(
os.path.join(config.conf.maa_path, "resource", "config.json"),
"r",
encoding="utf-8",
) as f:
presets = [i["configName"] for i in json.load(f)["connection"]]
except Exception as e:
logger.exception(e)
presets = []
return presets
@app.route("/record/getMoodRatios")
def get_mood_ratios():
from mower.solvers.infra import record
return record.get_mood_ratios()
@app.route("/getwatermark")
def getwatermark():
from mower.__init__ import __version__
return __version__
def str2date(target: str):
try:
return datetime.strptime(target, "%Y-%m-%d").date()
except ValueError:
return datetime.strptime(target, "%Y/%m/%d").date()
def date2str(target: date):
try:
return datetime.strftime(target, "%Y-%m-%d")
except ValueError:
return datetime.strftime(target, "%Y/%m/%d")
@app.route("/report/getReportData")
def get_report_data():
from mower.solvers.infra.report import ReportSolver
a = ReportSolver()
return a.get_report_data()
@app.route("/report/getOrundumData")
def get_orundum_data():
from mower.solvers.infra.report import ReportSolver
a = ReportSolver()
return a.get_orundum_data()
@app.route("/test-email")
@get_require_token
def test_email():
from mower.utils.email import Email
email = Email("mower-ng测试邮件", config.conf.mail_subject + "测试邮件", None)
try:
email.send()
except Exception as e:
msg = "邮件发送失败!\n" + str(e)
logger.exception(msg)
return msg
return "邮件发送成功!"
@app.route("/test-screenshot")
@get_require_token
def test_screenshot():
import base64
from mower.utils.image import img2bytes
config.mount_device()
interval = config.conf.screenshot_interval
if config.conf.screencap_strategy != "scrcpy":
config.conf.screenshot_interval = 0
try:
if not config.device.check_device_screen():
return {
"success": False,
"reason": "mower-ng仅支持1920x1080分辨率、280DPI,请修改模拟器设置。",
}
for _ in range(3):
config.recog.update()
img = config.recog.img
config.screenshot_avg = None
for _ in range(5):
config.recog.update()
img = config.recog.img
data = base64.b64encode(img2bytes(img)).decode("ascii")
elapsed = round(config.screenshot_avg)
result = {"success": True, "elapsed": elapsed, "screenshot": data}
except Exception as e:
logger.exception(e)
result = {"success": False, "reason": str(e)}
config.conf.screenshot_interval = interval
return result
@app.route("/check-skland")
@get_require_token
def test_skland():
from mower.solvers.skland import SKLand
return SKLand().test_connect()
@app.route("/idle")
def get_idle_status():
return str(config.idle)
@app.route("/scheduler")
def get_scheduler_tasks():
from mower.utils import scheduler
scheduler.now = datetime.now()
return [
{
"name": i._solver_name,
"priority": i.priority,
"time": i.next_execution().isoformat(),
}
for i in scheduler.scheduler.waiting_tasks
]
@app.route("/task", methods=["GET", "POST"])
@post_require_token
def get_count():
if request.method == "POST":
import pytz
from mower.data import agent_list
from mower.utils.operators import SkillUpgradeSupport
from mower.utils.scheduler_task import SchedulerTask, TaskTypes
base_scheduler = config.base_scheduler
try:
req = request.json
task = req["task"]
logger.debug(f"收到新增任务请求:{req}")
if base_scheduler and mower_thread.is_alive():
if task:
utc_time = datetime.strptime(task["time"], "%Y-%m-%dT%H:%M:%S.%f%z")
task_time = (
utc_time.replace(tzinfo=pytz.utc)
.astimezone(get_localzone())
.replace(tzinfo=None)
)
new_task = SchedulerTask(
time=task_time,
task_plan=task["plan"],
task_type=task["task_type"],
meta_data=task["meta_data"],
)
if base_scheduler.find_next_task(
compare_time=task_time, compare_type="="
):
raise Exception("找到同时间任务请勿重复添加")
if new_task.type == TaskTypes.SKILL_UPGRADE:
supports = []
for s in req["upgrade_support"]:
if s["name"] not in list(agent_list.keys()) or s[
"swap_name"
] not in list(agent_list.keys()):
raise Exception("干员名不正确")
supports.append(
SkillUpgradeSupport(
name=s["name"],
skill_level=s["skill_level"],
efficiency=s["efficiency"],
match=s["match"],
swap_name=s["swap_name"],
)
)
if len(supports) == 0:
raise Exception("请添加专精工具人")
base_scheduler.op_data.skill_upgrade_supports = supports
logger.error("更新专精工具人完毕")
config.tasks.append(new_task)
logger.debug(f"成功:{str(new_task)}")
return "添加任务成功!"
raise Exception("添加任务失败!!")
except Exception as e:
logger.exception(f"添加任务失败:{str(e)}")
return str(e)
else:
from jsonpickle import encode
from mower.utils.translate import translate_room
message = [
json.loads(
encode(
i,
unpicklable=False,
)
)
for i in config.tasks
]
for task in message:
if task.get("plan"): # 如果 plan 字段存在
translated_plan = {}
for room, value in task["plan"].items():
translated_room_name = translate_room(room) # 翻译房间名称
translated_plan[translated_room_name] = value
task["plan"] = translated_plan # 更新 plan 字段
return message
@app.route("/debug/log")
@get_require_token
def list_log():
result = [i.name for i in get_path("@app/log").iterdir()]
result.sort(key=lambda i: (-len(i), i), reverse=True)
return result
@app.route("/debug/log/<path:filename>")
@get_require_token
def send_log_file(filename):
result = []
split_str = "save_screenshot"
sc = None
log_lines = (
Path(safe_join(log_folder, filename)).read_text(encoding="utf-8").splitlines()
)
logs = []
start = log_lines[0][:14]
for line in log_lines:
if split_str in line:
result.append({"screenshot": sc, "log": logs, "time": start})
sc = line.split(split_str)[-1].strip()
logs = []
start = line[:14]
logs.append(line)
result.append({"screenshot": sc, "log": logs, "time": start})
return result
@app.route("/debug/screenshot/<path:filename>")
def send_screenshot(filename):
return send_from_directory(screenshot_folder, filename)
if config.conf.start_automatically:
run_in_thread()