import io import json import mimetypes import os import subprocess import threading import time from pathlib import Path from shutil import rmtree from subprocess import CREATE_NO_WINDOW, Popen import chardet import py7zr import requests import webview from py7zr.callbacks import ExtractCallback from launcher import config from log import logger mimetypes.add_type("text/html", ".html") mimetypes.add_type("text/css", ".css") mimetypes.add_type("application/javascript", ".js") version = "v0.3" get_new_version_url = ( "https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest" ) download_git_url = "https://list.zhaozuohong.vip/mower-ng/git.7z" download_python_url = "https://list.zhaozuohong.vip/mower-ng/python.7z" upgrade_script_name = "upgrade.bat" def custom_event(data): logger.info(data) data = json.dumps({"log": data + "\n"}) js = f"var event = new CustomEvent('log', {{detail: {data}}}); window.dispatchEvent(event);" window.evaluate_js(js) def format_size(size_bytes): """格式化文件大小为人类可读的形式""" for unit in ['B', 'KB', 'MB', 'GB', 'TB']: if size_bytes < 1024: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024 return f"{size_bytes:.2f} PB" def download_file(download_name, download_url, destination_folder): """ 下载文件到指定文件夹 :param download_name: 下载内容名 :param download_url: 文件的下载 URL :param destination_folder: 保存下载文件的目标文件夹 :return: 下载是否成功 """ if not os.path.exists(destination_folder): os.makedirs(destination_folder) filename = os.path.basename(download_url) download_path = os.path.join(destination_folder, filename) custom_event(f"开始下载: {download_name}") response = requests.get(download_url, stream=True) if response.status_code == 200: total_size = int(response.headers.get('content-length', 0)) downloaded_size = 0 start_time = time.time() last_update_time = time.time() # 记录上次更新时间 block_size = 4096 # 每次读取的数据块大小 with open(download_path, "wb") as file: for data in response.iter_content(chunk_size=block_size): file.write(data) downloaded_size += len(data) current_time = time.time() elapsed_time = current_time - start_time if elapsed_time > 0: download_speed = downloaded_size / elapsed_time # 字节/秒 else: download_speed = 0 progress_percent = (downloaded_size / total_size) * 100 if total_size != 0 else 0 # 检查是否需要更新进度信息,每1秒更新一次 if current_time - last_update_time >= 1: # 格式化输出 formatted_downloaded_size = format_size(downloaded_size) formatted_total_size = format_size(total_size) formatted_speed = format_size(download_speed) + "/s" custom_event( f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}") last_update_time = current_time # 更新上次更新时间 end_time = time.time() total_elapsed_time = end_time - start_time average_download_speed = downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0 # 格式化输出 formatted_total_elapsed_time = f"{total_elapsed_time:.2f} 秒" formatted_average_download_speed = format_size(average_download_speed) + "/s" custom_event( f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}") else: logger.error(f"下载失败: {response.status_code}") return False return True class MyExtractCallback(ExtractCallback): def __init__(self): super().__init__() self.total_size = 0 self.last_print_time = time.time() def report_start(self, archive_name, archive_format): pass def report_start_preparation(self): pass def report_end(self, processing_file_path, wrote_bytes): self.total_size += int(wrote_bytes) current_time = time.time() if current_time - self.last_print_time >= 1.0: # 至少每隔1秒输出一次 custom_event(f"已解压: {format_size(self.total_size)}") self.last_print_time = current_time def report_postprocess(self): pass def report_update(self, message): pass def report_warning(self, message): pass def extract_7z_file(file_name, file_path, destination_folder, delete_after_extract=True): """ 解压7z文件到指定文件夹 :param file_name: 7z文件的名称 :param file_path: 7z文件的路径 :param destination_folder: 解压目标文件夹 :param delete_after_extract: 解压后删除压缩文件,默认删除 :return: 解压是否成功 """ if not os.path.exists(destination_folder): os.makedirs(destination_folder) custom_event(f"开始解压文件: {file_name}") try: start_time = time.time() with py7zr.SevenZipFile(file_path, mode='r') as z: callback = MyExtractCallback() z.extractall(path=destination_folder, callback=callback) end_time = time.time() total_elapsed_time = end_time - start_time formatted_total_elapsed_time = f"{total_elapsed_time:.2f} 秒" custom_event( f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}") except Exception as e: e.print_exc() custom_event(f"解压失败: {str(e)}") return False if delete_after_extract: try: os.remove(file_path) custom_event(f"删除{file_path}成功") except OSError as e: custom_event(f"删除{file_path}失败: {str(e)}") return True def init_download(download_name, download_url, destination_folder): """ 初始化中的下载任务,返回一个函数,用于执行下载任务 :param download_name: 下载任务的名称,用于在日志中记录 :param download_url: 下载文件的 URL :param destination_folder: 下载文件的目标文件夹 :return: 一个函数,用于执行下载任务 """ def download(): target_folder = os.path.join(download_name) if os.path.exists(target_folder): custom_event(f"{download_name} 文件夹已存在,跳过下载") return True filename = os.path.basename(download_url) if not download_file(filename, download_url, destination_folder): return False download_path = os.path.join(destination_folder, filename) if not extract_7z_file(filename, download_path, destination_folder, True): return False return True return download mirror_list = { "pypi": "https://pypi.org/simple", "aliyun": "https://mirrors.aliyun.com/pypi/simple/", "tuna": "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple", "sjtu": "https://mirror.sjtu.edu.cn/pypi/web/simple", } command_list = { "download_git": lambda: init_download("git", download_git_url, os.getcwd()), "download_python": lambda: init_download("python", download_python_url, os.getcwd()), "lfs": "git\\bin\\git lfs install", "ensurepip": "python\\python -m ensurepip --default-pip", "clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow", "fetch": lambda: f"..\\git\\bin\\git fetch origin {config.conf.branch} --progress", "switch": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=100 switch -f {config.conf.branch} --progress", "reset": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=200 reset --hard origin/{config.conf.branch}", "pip_install": lambda: f"..\\python\\Scripts\\pip install --no-cache-dir -i {mirror_list[config.conf.mirror]} -r requirements.txt --no-warn-script-location", "webview": "start ..\\python\\pythonw webview_ui.py", "manager": "start ..\\python\\pythonw manager.py", } def detect_encoding(stream): # 读取一部分数据来检测编码 raw_data = stream.read(4096) result = chardet.detect(raw_data) stream.seek(0) # 将流指针重置到开头 return result['encoding'] def read_stream(stream, log_func): detected_encoding = detect_encoding(stream) text_io = io.TextIOWrapper(stream, encoding=detected_encoding, errors='replace') try: for line in iter(text_io.readline, ''): text = line.rstrip('\n') custom_event(text.strip() + "\n") finally: text_io.close() class Api: def load_config(self): logger.info("读取配置文件") return config.conf.model_dump() def save_config(self, conf): logger.info(f"更新配置文件{conf}") config.conf = config.Conf(**conf) config.save_conf() def get_version(self): return version def get_new_version(self): logger.info("获取最新版本号") response = requests.get(get_new_version_url) return response.json() # 更新启动器本身 def update_self(self, download_url): logger.info(f"开始更新启动器 {download_url}") # 下载压缩包的全路径 file_name = os.path.basename(download_url) current_path = os.getcwd() if not download_file("launcher", download_url, current_path): return "下载新版本失败" download_path = os.path.join(current_path, file_name) script_path = os.path.join(os.getcwd(), upgrade_script_name) folder_path = os.path.join(os.getcwd(), "_internal") exe_path = os.path.join(os.getcwd(), "launcher.exe") with open(script_path, "w") as b: temp_list = "@echo off\n" temp_list += "timeout /t 3 /nobreak\n" # 等待进程退出 temp_list += f"rmdir {folder_path}\n" # 删除_internal temp_list += f"del {exe_path}\n" # 删除exe temp_list += f"tar -xf {download_path} -C ..\n" # 解压压缩包 temp_list += "timeout /t 1 /nobreak\n" # 等待解压 temp_list += f"start {exe_path}\n" # 启动新程序 temp_list += f"del {download_path}\n" # 删除压缩包 temp_list += "exit" b.write(temp_list) # 不显示cmd窗口 Popen([script_path], creationflags=CREATE_NO_WINDOW) os._exit(0) def rm_site_packages(self): site_packages_path = Path("./python/Lib/site-packages") if site_packages_path.exists(): rmtree(site_packages_path) return "site-packages目录移除成功" return "python\\Lib\\site-packages目录不存在" def rm_python_scripts(self): python_scripts_path = Path("./python/Scripts") if python_scripts_path.exists(): rmtree(python_scripts_path) return "Scripts目录移除成功" return "python\\Scripts目录不存在" def run(self, command, cwd=None): command = command_list[command] if callable(command): command = command() if callable(command): return "success" if command() else "failed" custom_event(command + "\n") try: with subprocess.Popen( command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd, bufsize=0, universal_newlines=False ) as p: stdout_thread = threading.Thread(target=read_stream, args=(p.stdout, logger.info)) stderr_thread = threading.Thread(target=read_stream, args=(p.stderr, logger.info)) stdout_thread.start() stderr_thread.start() stdout_thread.join() stderr_thread.join() if p.returncode == 0: return "success" except Exception as e: logger.error(f"command {command} 执行异常{e}") logger.exception(e) custom_event(str(e)) return "failed" # 如果当前路径存在更新脚本,则删除 if Path(upgrade_script_name).exists(): os.remove(upgrade_script_name) # url = "ui/dist/index.html" url = "http://localhost:5173/" window = webview.create_window( f"mower-ng launcher {version}", url, js_api=Api() ) webview.start()