mower-ng/server.py

667 lines
18 KiB
Python
Executable file

#!/usr/bin/env python3
import json
import mimetypes
import os
import sys
from datetime import date, datetime, timedelta
from functools import wraps
from io import BytesIO
from pathlib import Path
from threading import Thread
from flask import Flask, abort, request, send_file, send_from_directory
from flask_cors import CORS
from flask_sock import Sock
from tzlocal import get_localzone
from werkzeug.exceptions import NotFound
from werkzeug.security import safe_join
from mower.utils import config
from mower.utils.log import folder as log_folder
from mower.utils.log import logger, screenshot_folder
from mower.utils.path import get_path
mimetypes.add_type("text/html", ".html")
mimetypes.add_type("text/css", ".css")
mimetypes.add_type("application/javascript", ".js")
app = Flask(__name__)
sock = Sock(app)
CORS(app)
mower_thread = None
@sock.route("/ws")
def log(ws):
config.ws_connections.append(ws)
ws.send(json.dumps({"type": "log", "data": "\n".join(config.log_lines)}))
from simple_websocket import ConnectionClosed
try:
while True:
ws.receive()
except ConnectionClosed:
config.ws_connections.remove(ws)
def post_require_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if (
request.method == "POST"
and hasattr(app, "token")
and request.headers.get("token", "") != app.token
):
abort(403)
return f(*args, **kwargs)
return decorated_function
def get_require_token(f):
@wraps(f)
def decorated_function(*args, **kwargs):
if (
request.method == "GET"
and hasattr(app, "token")
and request.headers.get("token", "") != app.token
):
abort(403)
return f(*args, **kwargs)
return decorated_function
@app.route("/")
def serve_index():
return send_from_directory("ui/dist", "index.html")
@app.route("/<path:path>")
def serve_static(path):
try:
return send_from_directory("ui/dist", path)
except NotFound:
pass
try:
return send_from_directory("ui/public", path)
except NotFound:
pass
return send_from_directory("ui/dist", "index.html")
@app.route("/conf", methods=["GET", "POST"])
@get_require_token
@post_require_token
def load_config():
if request.method == "GET":
return config.conf.model_dump()
else:
if config.timestamp < request.json["time"]:
config.timestamp = request.json["time"]
config.conf = config.Conf(**request.json["data"])
config.save_conf(update_timestamp=False)
return "New config saved!"
return "Old config ignored!"
@app.route("/plan", methods=["GET", "POST"])
@post_require_token
def load_plan_from_json():
if request.method == "GET":
return config.plan.model_dump(exclude_none=True)
else:
config.plan = config.PlanModel(**request.json)
config.save_plan()
return "New plan saved。"
@app.route("/operator")
def operator_list():
from mower.data import agent_list
return list(agent_list.keys())
@app.route("/shop")
def shop_list():
from mower.data import shop_items
return list(shop_items.keys())
@app.route("/activity")
def activity():
from mower.solvers.navigation.activity import ActivityNavigation
location = ActivityNavigation.location["normal"]
if isinstance(location, dict):
return list(location.keys())
elif isinstance(location, list):
return location
else:
return []
@app.route("/depot/readdepot")
def read_depot():
from mower.solvers.depot_reader import DepotManager
a = DepotManager()
return a.读取仓库()
@app.route("/running")
def running():
return "true" if mower_thread and mower_thread.is_alive() else "false"
@app.route("/start")
@get_require_token
def start():
global mower_thread
if mower_thread and mower_thread.is_alive():
return "false"
# 创建 tmp 文件夹
tmp_dir = get_path("@app/tmp")
tmp_dir.mkdir(exist_ok=True)
config.stop_mower.clear()
config.base_scheduler = None
config.global_plan = {}
config.tasks = []
from mower.utils.scheduler import scheduler
scheduler.priority_queue = []
scheduler.time_queue = []
scheduler.register_one_time(
solver="mower.solvers.base_manager.BaseManager",
time=datetime.now(),
)
scheduler.register_daily(
solver="mower.solvers.sign_in.SignInManager",
offset=timedelta(hours=4),
priority=15,
)
scheduler.register_daily(
solver="mower.solvers.credit.CreditSolver",
offset=timedelta(hours=4),
priority=16,
)
scheduler.register_daily(
solver="mower.solvers.infra.report.ReportSolver",
offset=timedelta(hours=4),
priority=17,
)
scheduler.register_daily(
solver="mower.solvers.infra.switch_assistants.SwitchAssistantsSolver",
offset=timedelta(hours=4),
priority=18,
)
scheduler.register_daily(
solver="mower.solvers.skland.SKLand",
offset=timedelta(hours=3),
priority=19,
)
scheduler.register_daily(
solver="mower.solvers.mail.MailSolver",
offset=timedelta(hours=4),
priority=20,
)
scheduler.register_daily(
solver="mower.solvers.fight.credit_fight.CreditFight",
offset=timedelta(hours=4),
priority=21,
)
scheduler.register_daily(
solver="mower.solvers.trade_token.TradeTokenSolver",
offset=timedelta(hours=4),
priority=22,
)
scheduler.register_periodic(
solver="mower.solvers.infra.clue.ClueManager",
interval=timedelta(hours=1),
priority=24,
)
scheduler.register_periodic(
solver="mower.solvers.recruit.RecruitSolver",
interval=timedelta(hours=config.conf.recruit_gap),
priority=25,
)
scheduler.register_periodic(
solver="mower.solvers.fight.copy_works.CopyWorksSolver",
interval=timedelta(hours=config.conf.maa_gap),
priority=26,
)
scheduler.register_periodic(
solver="mower.solvers.operation.OperationManager",
interval=timedelta(hours=config.conf.maa_gap),
priority=27,
)
scheduler.register_periodic(
solver="mower.solvers.depotREC.depotREC",
interval=timedelta(hours=config.conf.maa_gap),
priority=28,
)
scheduler.register_periodic(
solver="mower.solvers.mission.MissionSolver",
interval=timedelta(hours=config.conf.maa_gap),
priority=29,
)
scheduler.register_long(solver="mower.solvers.sss.SSSSolver", priority=40)
mower_thread = Thread(target=scheduler.schedule, daemon=True)
mower_thread.start()
config.idle = False
return "true"
@app.route("/stop")
@get_require_token
def stop():
global mower_thread
if mower_thread is None:
return "true"
config.stop_mower.set()
mower_thread.join(10)
if mower_thread.is_alive():
logger.error("mower-ng线程仍在运行")
return "false"
else:
logger.info("成功停止mower-ng线程")
mower_thread = None
config.idle = True
return "true"
@app.route("/stop-maa")
@get_require_token
def stop_maa():
global mower_thread
if mower_thread is None:
return "true"
config.stop_maa.set()
return "OK"
def conn_send(text):
if not config.webview_process.is_alive():
return ""
config.parent_conn.send(text)
return config.parent_conn.recv()
@app.route("/dialog/file")
@get_require_token
def open_file_dialog():
return conn_send("file")
@app.route("/dialog/folder")
@get_require_token
def open_folder_dialog():
return conn_send("folder")
@app.route("/import", methods=["POST"])
@post_require_token
def import_from_image():
img = request.files["img"]
if img.mimetype == "application/json":
data = json.load(img)
else:
try:
from PIL import Image
from mower.utils import qrcode
img = Image.open(img)
data = qrcode.decode(img)
except Exception as e:
msg = f"排班表导入失败:{e}"
logger.exception(msg)
return msg
if data:
config.plan = config.PlanModel(**data)
config.save_plan()
return "排班已加载"
else:
return "排班表导入失败!"
@app.route("/sss-copilot", methods=["GET", "POST"])
@post_require_token
def upload_sss_copilot():
copilot = get_path("@app/sss.json")
if request.method == "GET":
if copilot.is_file():
with copilot.open("r", encoding="utf-8") as f:
data = json.load(f)
else:
return {"exists": False}
else:
data = request.files["copilot"]
data = json.load(data)
with copilot.open("w", encoding="utf-8") as f:
json.dump(data, f, ensure_ascii=False, indent=2)
return {
"exists": True,
"title": data["doc"]["title"],
"details": data["doc"]["details"],
"operators": data["opers"],
}
@app.route("/dialog/save/img", methods=["POST"])
@post_require_token
def save_file_dialog():
img = request.files["img"]
from PIL import Image
from mower.utils import qrcode
upper = Image.open(img)
img = qrcode.export(
config.plan.model_dump(exclude_none=True), upper, config.conf.theme
)
buffer = BytesIO()
img.save(buffer, format="JPEG")
buffer.seek(0)
return send_file(buffer, "image/jpeg")
@app.route("/export-json")
def export_json():
return send_file(config.plan_path)
@app.route("/auto-get-path")
def auto_get_path():
from mower.utils.device.emulator.auto_get_path import auto_get_path
auto_get_path()
config.save_conf()
return "OK"
@app.route("/get-adb-serial")
def get_adb_serial():
from mower.utils.device.emulator.auto_get_path import get_adb_serial
try:
get_adb_serial()
config.save_conf()
return "OK"
except Exception:
return "ERROR"
@app.route("/check-maa")
@get_require_token
def get_maa_adb_version():
try:
asst_path = os.path.dirname(Path(config.conf.maa_path) / "Python" / "asst")
if asst_path not in sys.path:
sys.path.append(asst_path)
from asst.asst import Asst
Asst.load(config.conf.maa_path)
asst = Asst()
version = asst.get_version()
asst.set_instance_option(2, config.conf.maa_touch_option)
if asst.connect(config.conf.maa_adb_path, config.conf.adb):
maa_msg = f"Maa {version} 加载成功"
else:
maa_msg = "连接失败,请检查Maa日志!"
except Exception as e:
maa_msg = "Maa加载失败:" + str(e)
logger.exception(maa_msg)
return maa_msg
@app.route("/maa-conn-preset")
@get_require_token
def get_maa_conn_presets():
try:
with open(
os.path.join(config.conf.maa_path, "resource", "config.json"),
"r",
encoding="utf-8",
) as f:
presets = [i["configName"] for i in json.load(f)["connection"]]
except Exception as e:
logger.exception(e)
presets = []
return presets
@app.route("/record/getMoodRatios")
def get_mood_ratios():
from mower.solvers.infra import record
return record.get_mood_ratios()
@app.route("/getwatermark")
def getwatermark():
from mower.__init__ import __version__
return __version__
def str2date(target: str):
try:
return datetime.strptime(target, "%Y-%m-%d").date()
except ValueError:
return datetime.strptime(target, "%Y/%m/%d").date()
def date2str(target: date):
try:
return datetime.strftime(target, "%Y-%m-%d")
except ValueError:
return datetime.strftime(target, "%Y/%m/%d")
@app.route("/report/getReportData")
def get_report_data():
from mower.solvers.infra.report import ReportSolver
a = ReportSolver()
return a.get_report_data()
@app.route("/report/getOrundumData")
def get_orundum_data():
from mower.solvers.infra.report import ReportSolver
a = ReportSolver()
return a.get_orundum_data()
@app.route("/test-email")
@get_require_token
def test_email():
from mower.utils.email import Email
email = Email("mower测试邮件", config.conf.mail_subject + "测试邮件", None)
try:
email.send()
except Exception as e:
msg = "邮件发送失败!\n" + str(e)
logger.exception(msg)
return msg
return "邮件发送成功!"
@app.route("/test-screenshot")
@get_require_token
def test_screenshot():
import base64
from mower.utils.image import img2bytes
config.mount_device()
interval = config.conf.screenshot_interval
if config.conf.screencap_strategy != "scrcpy":
config.conf.screenshot_interval = 0
try:
if not config.device.check_device_screen():
return {
"success": False,
"reason": "mower-ng仅支持1920x1080分辨率、280DPI,请修改模拟器设置。",
}
for _ in range(3):
config.recog.update()
img = config.recog.img
config.screenshot_avg = None
for _ in range(5):
config.recog.update()
img = config.recog.img
data = base64.b64encode(img2bytes(img)).decode("ascii")
elapsed = round(config.screenshot_avg)
result = {"success": True, "elapsed": elapsed, "screenshot": data}
except Exception as e:
logger.exception(e)
result = {"success": False, "reason": str(e)}
config.conf.screenshot_interval = interval
return result
@app.route("/check-skland")
@get_require_token
def test_skland():
from mower.solvers.skland import SKLand
return SKLand().test_connect()
@app.route("/idle")
def get_idle_status():
return str(config.idle)
@app.route("/task", methods=["GET", "POST"])
@post_require_token
def get_count():
if request.method == "POST":
import pytz
from mower.data import agent_list
from mower.utils.operators import SkillUpgradeSupport
from mower.utils.scheduler_task import SchedulerTask, TaskTypes
base_scheduler = config.base_scheduler
try:
req = request.json
task = req["task"]
logger.debug(f"收到新增任务请求:{req}")
if base_scheduler and mower_thread.is_alive():
if task:
utc_time = datetime.strptime(task["time"], "%Y-%m-%dT%H:%M:%S.%f%z")
task_time = (
utc_time.replace(tzinfo=pytz.utc)
.astimezone(get_localzone())
.replace(tzinfo=None)
)
new_task = SchedulerTask(
time=task_time,
task_plan=task["plan"],
task_type=task["task_type"],
meta_data=task["meta_data"],
)
if base_scheduler.find_next_task(
compare_time=task_time, compare_type="="
):
raise Exception("找到同时间任务请勿重复添加")
if new_task.type == TaskTypes.SKILL_UPGRADE:
supports = []
for s in req["upgrade_support"]:
if s["name"] not in list(agent_list.keys()) or s[
"swap_name"
] not in list(agent_list.keys()):
raise Exception("干员名不正确")
supports.append(
SkillUpgradeSupport(
name=s["name"],
skill_level=s["skill_level"],
efficiency=s["efficiency"],
match=s["match"],
swap_name=s["swap_name"],
)
)
if len(supports) == 0:
raise Exception("请添加专精工具人")
base_scheduler.op_data.skill_upgrade_supports = supports
logger.error("更新专精工具人完毕")
config.tasks.append(new_task)
logger.debug(f"成功:{str(new_task)}")
return "添加任务成功!"
raise Exception("添加任务失败!!")
except Exception as e:
logger.exception(f"添加任务失败:{str(e)}")
return str(e)
else:
from jsonpickle import encode
return [
json.loads(
encode(
i,
unpicklable=False,
)
)
for i in config.tasks
]
@app.route("/debug/log")
@get_require_token
def list_log():
result = [i.name for i in get_path("@app/log").iterdir()]
result.sort(key=lambda i: (-len(i), i), reverse=True)
return result
@app.route("/debug/log/<path:filename>")
@get_require_token
def send_log_file(filename):
result = []
split_str = "save_screenshot"
logs = []
sc = None
logs = (
Path(safe_join(log_folder, filename)).read_text(encoding="utf-8").splitlines()
)
start = logs[0][:14]
for line in logs:
if split_str in line:
result.append({"screenshot": sc, "log": logs, "time": start})
sc = line.split(split_str)[-1].strip()
logs = []
start = line[:14]
logs.append(line)
result.append({"screenshot": sc, "log": logs, "time": start})
return result
@app.route("/debug/screenshot/<path:filename>")
def send_screenshot(filename):
return send_from_directory(screenshot_folder, filename)