#!/usr/bin/env python3 import base64 import json import mimetypes import os import sys from datetime import date, datetime from functools import wraps from io import BytesIO from pathlib import Path from threading import Thread from flask import Flask, abort, request, send_file, send_from_directory from flask_cors import CORS from flask_sock import Sock from tzlocal import get_localzone from werkzeug.exceptions import NotFound from werkzeug.security import safe_join from mower.utils import config from mower.utils.log import folder as log_folder from mower.utils.log import logger, screenshot_folder from mower.utils.path import get_path mimetypes.add_type("text/html", ".html") mimetypes.add_type("text/css", ".css") mimetypes.add_type("application/javascript", ".js") app = Flask(__name__) sock = Sock(app) CORS(app) mower_thread = None log_lines = [] ws_connections = [] def ws_push(): import cv2 from mower.utils.image import img2bytes while True: try: item = config.ws_queue.get() if not ws_connections: continue if item["type"] == "log": global log_lines log_lines.append(item["data"]) log_lines = log_lines[-100:] elif item["type"] == "sc": img = item["data"] img = cv2.resize(img, (480, 270)) img = img2bytes(img) img = base64.b64encode(img).decode("utf-8") img = f"data:image/jpeg;base64,{img}" item["data"] = img data = json.dumps(item) for ws in ws_connections: ws.send(data) except Exception as e: logger.debug_exception(e) Thread(target=ws_push, daemon=True).start() @sock.route("/ws") def log(ws): global ws_connections global log_lines ws.send(json.dumps({"type": "log", "data": "\n".join(log_lines)})) ws_connections.append(ws) from simple_websocket import ConnectionClosed try: while True: ws.receive() except ConnectionClosed: ws_connections.remove(ws) def post_require_token(f): @wraps(f) def decorated_function(*args, **kwargs): if ( request.method == "POST" and hasattr(app, "token") and request.headers.get("token", "") != app.token ): abort(403) return f(*args, **kwargs) return decorated_function def get_require_token(f): @wraps(f) def decorated_function(*args, **kwargs): if ( request.method == "GET" and hasattr(app, "token") and request.headers.get("token", "") != app.token ): abort(403) return f(*args, **kwargs) return decorated_function @app.route("/") def serve_index(): return send_from_directory("ui/dist", "index.html") @app.route("/") def serve_static(path): try: return send_from_directory("ui/dist", path) except NotFound: pass try: return send_from_directory("ui/public", path) except NotFound: pass return send_from_directory("ui/dist", "index.html") @app.route("/conf", methods=["GET", "POST"]) @get_require_token @post_require_token def load_config(): if request.method == "GET": return config.conf.model_dump() else: if config.timestamp < request.json["time"]: config.timestamp = request.json["time"] config.conf = config.Conf(**request.json["data"]) config.save_conf(update_timestamp=False) return "New config saved!" return "Old config ignored!" @app.route("/plan", methods=["GET", "POST"]) @post_require_token def load_plan_from_json(): if request.method == "GET": return config.plan.model_dump(exclude_none=True) else: config.plan = config.PlanModel(**request.json) config.save_plan() return "New plan saved。" @app.route("/operator") def operator_list(): from mower.data import agent_list return list(agent_list.keys()) @app.route("/shop") def shop_list(): from mower.data import shop_items return list(shop_items.keys()) @app.route("/activity") def activity(): from mower.solvers.navigation.activity import ActivityNavigation location = ActivityNavigation.location if isinstance(location, dict): return list(location.keys()) elif isinstance(location, list): return location else: return [] @app.route("/depot/readdepot") def read_depot(): from mower.solvers.depot_reader import DepotManager a = DepotManager() return a.读取仓库() @app.route("/running") def running(): return "true" if mower_thread and mower_thread.is_alive() else "false" @app.route("/start") @get_require_token def start(): global mower_thread global log_lines if mower_thread and mower_thread.is_alive(): return "false" # 创建 tmp 文件夹 tmp_dir = get_path("@app/tmp") tmp_dir.mkdir(exist_ok=True) config.stop_mower.clear() config.operators = {} from mower.__main__ import main mower_thread = Thread(target=main, daemon=True) mower_thread.start() config.idle = False log_lines = [] return "true" @app.route("/stop") @get_require_token def stop(): global mower_thread if mower_thread is None: return "true" config.stop_mower.set() mower_thread.join(10) if mower_thread.is_alive(): logger.error("mower-ng线程仍在运行") return "false" else: logger.info("成功停止mower-ng线程") mower_thread = None config.idle = True return "true" @app.route("/stop-maa") @get_require_token def stop_maa(): global mower_thread if mower_thread is None: return "true" config.stop_maa.set() return "OK" def conn_send(text): if not config.webview_process.is_alive(): return "" config.parent_conn.send(text) return config.parent_conn.recv() @app.route("/dialog/file") @get_require_token def open_file_dialog(): return conn_send("file") @app.route("/dialog/folder") @get_require_token def open_folder_dialog(): return conn_send("folder") @app.route("/import", methods=["POST"]) @post_require_token def import_from_image(): img = request.files["img"] if img.mimetype == "application/json": data = json.load(img) else: try: from PIL import Image from mower.utils import qrcode img = Image.open(img) data = qrcode.decode(img) except Exception as e: msg = f"排班表导入失败:{e}" logger.exception(msg) return msg if data: config.plan = config.PlanModel(**data) config.save_plan() return "排班已加载" else: return "排班表导入失败!" @app.route("/sss-copilot", methods=["GET", "POST"]) @post_require_token def upload_sss_copilot(): copilot = get_path("@app/sss.json") if request.method == "GET": if copilot.is_file(): with copilot.open("r", encoding="utf-8") as f: data = json.load(f) else: return {"exists": False} else: data = request.files["copilot"] data = json.load(data) with copilot.open("w", encoding="utf-8") as f: json.dump(data, f, ensure_ascii=False, indent=2) return { "exists": True, "title": data["doc"]["title"], "details": data["doc"]["details"], "operators": data["opers"], } @app.route("/dialog/save/img", methods=["POST"]) @post_require_token def save_file_dialog(): img = request.files["img"] from PIL import Image from mower.utils import qrcode upper = Image.open(img) img = qrcode.export( config.plan.model_dump(exclude_none=True), upper, config.conf.theme ) buffer = BytesIO() img.save(buffer, format="JPEG") buffer.seek(0) return send_file(buffer, "image/jpeg") @app.route("/export-json") def export_json(): return send_file(config.plan_path) @app.route("/auto-get-path") def auto_get_path(): from mower.utils.device.emulator.auto_get_path import auto_get_path auto_get_path() config.save_conf() return "OK" @app.route("/get-adb-serial") def get_adb_serial(): from mower.utils.device.emulator.auto_get_path import get_adb_serial try: get_adb_serial() config.save_conf() return "OK" except Exception: return "ERROR" @app.route("/check-maa") @get_require_token def get_maa_adb_version(): try: asst_path = os.path.dirname(Path(config.conf.maa_path) / "Python" / "asst") if asst_path not in sys.path: sys.path.append(asst_path) from asst.asst import Asst Asst.load(config.conf.maa_path) asst = Asst() version = asst.get_version() asst.set_instance_option(2, config.conf.maa_touch_option) if asst.connect(config.conf.maa_adb_path, config.conf.adb): maa_msg = f"Maa {version} 加载成功" else: maa_msg = "连接失败,请检查Maa日志!" except Exception as e: maa_msg = "Maa加载失败:" + str(e) logger.exception(maa_msg) return maa_msg @app.route("/maa-conn-preset") @get_require_token def get_maa_conn_presets(): try: with open( os.path.join(config.conf.maa_path, "resource", "config.json"), "r", encoding="utf-8", ) as f: presets = [i["configName"] for i in json.load(f)["connection"]] except Exception as e: logger.exception(e) presets = [] return presets @app.route("/record/getMoodRatios") def get_mood_ratios(): from mower.solvers.infra import record return record.get_mood_ratios() @app.route("/getwatermark") def getwatermark(): from mower.__init__ import __version__ return __version__ def str2date(target: str): try: return datetime.strptime(target, "%Y-%m-%d").date() except ValueError: return datetime.strptime(target, "%Y/%m/%d").date() def date2str(target: date): try: return datetime.strftime(target, "%Y-%m-%d") except ValueError: return datetime.strftime(target, "%Y/%m/%d") @app.route("/report/getReportData") def get_report_data(): from mower.solvers.infra.report import ReportSolver a = ReportSolver() return a.get_report_data() @app.route("/report/getOrundumData") def get_orundum_data(): from mower.solvers.infra.report import ReportSolver a = ReportSolver() return a.get_orundum_data() @app.route("/test-email") @get_require_token def test_email(): from mower.utils.email import Email email = Email("mower测试邮件", config.conf.mail_subject + "测试邮件", None) try: email.send() except Exception as e: msg = "邮件发送失败!\n" + str(e) logger.exception(msg) return msg return "邮件发送成功!" @app.route("/test-screenshot") @get_require_token def test_screenshot(): import base64 from mower.utils.image import img2bytes interval = config.conf.screenshot_interval if config.conf.screencap_strategy != "scrcpy": config.conf.screenshot_interval = 0 try: if not config.device.check_device_screen(): return { "success": False, "reason": "mower-ng仅支持1920x1080分辨率、280DPI,请修改模拟器设置。", } for _ in range(3): config.recog.update() img = config.recog.img config.screenshot_avg = None for _ in range(5): config.recog.update() img = config.recog.img data = base64.b64encode(img2bytes(img)).decode("ascii") elapsed = round(config.screenshot_avg) result = {"success": True, "elapsed": elapsed, "screenshot": data} except Exception as e: logger.exception(e) result = {"success": False, "reason": str(e)} config.conf.screenshot_interval = interval return result @app.route("/check-skland") @get_require_token def test_skland(): from mower.solvers.skland import SKLand return SKLand().test_connect() @app.route("/idle") def get_idle_status(): return str(config.idle) @app.route("/task", methods=["GET", "POST"]) @post_require_token def get_count(): import pytz from mower import __main__ base_scheduler = __main__.base_scheduler if request.method == "POST": from mower.data import agent_list from mower.utils.operators import SkillUpgradeSupport from mower.utils.scheduler_task import SchedulerTask, TaskTypes try: req = request.json task = req["task"] logger.debug(f"收到新增任务请求:{req}") if base_scheduler and mower_thread.is_alive(): if task: utc_time = datetime.strptime(task["time"], "%Y-%m-%dT%H:%M:%S.%f%z") task_time = ( utc_time.replace(tzinfo=pytz.utc) .astimezone(get_localzone()) .replace(tzinfo=None) ) new_task = SchedulerTask( time=task_time, task_plan=task["plan"], task_type=task["task_type"], meta_data=task["meta_data"], ) if base_scheduler.find_next_task( compare_time=task_time, compare_type="=" ): raise Exception("找到同时间任务请勿重复添加") if new_task.type == TaskTypes.SKILL_UPGRADE: supports = [] for s in req["upgrade_support"]: if s["name"] not in list(agent_list.keys()) or s[ "swap_name" ] not in list(agent_list.keys()): raise Exception("干员名不正确") supports.append( SkillUpgradeSupport( name=s["name"], skill_level=s["skill_level"], efficiency=s["efficiency"], match=s["match"], swap_name=s["swap_name"], ) ) if len(supports) == 0: raise Exception("请添加专精工具人") base_scheduler.op_data.skill_upgrade_supports = supports logger.error("更新专精工具人完毕") base_scheduler.tasks.append(new_task) logger.debug(f"成功:{str(new_task)}") return "添加任务成功!" raise Exception("添加任务失败!!") except Exception as e: logger.exception(f"添加任务失败:{str(e)}") return str(e) else: if base_scheduler and mower_thread and mower_thread.is_alive(): from jsonpickle import encode return [ json.loads( encode( i, unpicklable=False, ) ) for i in base_scheduler.tasks ] else: return [] @app.route("/debug/log") @get_require_token def list_log(): result = [i.name for i in get_path("@app/log").iterdir()] result.sort(key=lambda i: (-len(i), i), reverse=True) return result @app.route("/debug/log/") @get_require_token def send_log_file(filename): result = [] split_str = "save_screenshot" log_lines = [] sc = None lines = ( Path(safe_join(log_folder, filename)).read_text(encoding="utf-8").splitlines() ) start = lines[0][:14] for line in lines: if split_str in line: result.append({"screenshot": sc, "log": log_lines, "time": start}) sc = line.split(split_str)[-1].strip() log_lines = [] start = line[:14] log_lines.append(line) result.append({"screenshot": sc, "log": log_lines, "time": start}) return result @app.route("/debug/screenshot/") def send_screenshot(filename): return send_from_directory(screenshot_folder, filename)