diff --git a/launcher.py b/launcher.py index 115cc4f..5e92a69 100644 --- a/launcher.py +++ b/launcher.py @@ -9,8 +9,7 @@ mimetypes.add_type("text/html", ".html") mimetypes.add_type("text/css", ".css") mimetypes.add_type("application/javascript", ".js") -if __name__ == '__main__': - +if __name__ == "__main__": # 如果当前路径存在临时文件夹,则删除 if Path(update_tmp_folder).exists(): shutil.rmtree(update_tmp_folder) diff --git a/launcher/config/conf.py b/launcher/config/conf.py index e411765..3f9eff2 100644 --- a/launcher/config/conf.py +++ b/launcher/config/conf.py @@ -17,6 +17,7 @@ class ConfModel(BaseModel): class Total(ConfModel): """整体""" + # 所在页面 page: str = "init" # 是否已展示帮助文档 @@ -25,6 +26,7 @@ class Total(ConfModel): class UpdatePart(ConfModel): """更新代码""" + # mower-ng 代码分支 branch: str = "slow" # PyPI 仓库镜像 diff --git a/launcher/constants.py b/launcher/constants.py index 213679a..8d9d0d4 100644 --- a/launcher/constants.py +++ b/launcher/constants.py @@ -10,7 +10,9 @@ update_tmp_folder = "download_tmp" upgrade_script_name = "upgrade.bat" # 获取最新版本发布信息 -get_new_version_url = "https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest" +get_new_version_url = ( + "https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest" +) # 下载新版本压缩包名 file_name = "launcher.7z" diff --git a/launcher/file/download.py b/launcher/file/download.py index 2acf26a..03825f8 100644 --- a/launcher/file/download.py +++ b/launcher/file/download.py @@ -25,7 +25,7 @@ def download_file(download_name, download_url, destination_folder): response = requests.get(download_url, stream=True) if response.status_code == 200: - total_size = int(response.headers.get('content-length', 0)) + total_size = int(response.headers.get("content-length", 0)) downloaded_size = 0 start_time = time.time() last_update_time = time.time() # 记录上次更新时间 @@ -43,7 +43,9 @@ def download_file(download_name, download_url, destination_folder): else: download_speed = 0 - progress_percent = (downloaded_size / total_size) * 100 if total_size != 0 else 0 + progress_percent = ( + (downloaded_size / total_size) * 100 if total_size != 0 else 0 + ) # 检查是否需要更新进度信息,每1秒更新一次 if current_time - last_update_time >= 1: @@ -52,19 +54,25 @@ def download_file(download_name, download_url, destination_folder): formatted_total_size = format_size(total_size) formatted_speed = format_size(download_speed) + "/s" - custom_event(LogType.info, - f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}") + custom_event( + LogType.info, + f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}", + ) last_update_time = current_time # 更新上次更新时间 end_time = time.time() total_elapsed_time = end_time - start_time - average_download_speed = downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0 + average_download_speed = ( + downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0 + ) # 格式化输出 formatted_total_elapsed_time = f"{total_elapsed_time:.2f} 秒" formatted_average_download_speed = format_size(average_download_speed) + "/s" - custom_event(LogType.info, - f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}") + custom_event( + LogType.info, + f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}", + ) else: custom_event(LogType.error, f"下载失败: {response.status_code}") return False diff --git a/launcher/file/extract.py b/launcher/file/extract.py index 1588c41..a1c523c 100644 --- a/launcher/file/extract.py +++ b/launcher/file/extract.py @@ -37,7 +37,9 @@ class MyExtractCallback(ExtractCallback): pass -def extract_7z_file(file_name, file_path, destination_folder, delete_after_extract=True): +def extract_7z_file( + file_name, file_path, destination_folder, delete_after_extract=True +): """ 解压7z文件到指定文件夹 :param file_name: 7z文件的名称 @@ -52,14 +54,16 @@ def extract_7z_file(file_name, file_path, destination_folder, delete_after_extra custom_event(LogType.info, f"开始解压文件: {file_name}") try: start_time = time.time() - with py7zr.SevenZipFile(file_path, mode='r') as z: + with py7zr.SevenZipFile(file_path, mode="r") as z: callback = MyExtractCallback() z.extractall(path=destination_folder, callback=callback) end_time = time.time() total_elapsed_time = end_time - start_time formatted_total_elapsed_time = f"{total_elapsed_time:.2f} 秒" - custom_event(LogType.info, - f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}") + custom_event( + LogType.info, + f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}", + ) except Exception as e: custom_event(LogType.error, f"解压失败: {repr(e)}") return False diff --git a/launcher/file/utils.py b/launcher/file/utils.py index 9bb040b..5ab58dc 100644 --- a/launcher/file/utils.py +++ b/launcher/file/utils.py @@ -3,7 +3,7 @@ import os def format_size(size_bytes): """格式化文件大小为人类可读的形式""" - for unit in ['B', 'KB', 'MB', 'GB', 'TB']: + for unit in ["B", "KB", "MB", "GB", "TB"]: if size_bytes < 1024: return f"{size_bytes:.2f} {unit}" size_bytes /= 1024 @@ -26,5 +26,7 @@ def check_command_path(command, cwd=None): exec_command_path = os.getcwd() if cwd: exec_command_path = os.path.join(exec_command_path, cwd) - full_command_path = os.path.abspath(os.path.join(exec_command_path, command_path)) + ".exe" + full_command_path = ( + os.path.abspath(os.path.join(exec_command_path, command_path)) + ".exe" + ) return os.path.exists(full_command_path), full_command_path diff --git a/launcher/log.py b/launcher/log.py index 1797c98..7588b05 100644 --- a/launcher/log.py +++ b/launcher/log.py @@ -9,13 +9,13 @@ from launcher.sys_config import sys_config # 配置日志 def setup_logger(): - log_level = sys_config.get('log_level') + log_level = sys_config.get("log_level") logger = logging.getLogger("launcher.log") logger.setLevel(log_level) # 设置标准输出编码为 UTF-8 - sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8') + sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8") # 控制台输出 console_handler = logging.StreamHandler() @@ -24,7 +24,7 @@ def setup_logger(): # 文件输出 file_path = os.path.join(os.getcwd(), "launcher.log") file_handler = RotatingFileHandler( - file_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8' + file_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8" ) file_handler.setLevel(logging.INFO) diff --git a/launcher/sys_config/__init__.py b/launcher/sys_config/__init__.py index 94853c0..b7be326 100644 --- a/launcher/sys_config/__init__.py +++ b/launcher/sys_config/__init__.py @@ -7,6 +7,7 @@ class SysConfig: """ 读取系统配置文件 """ + # 版本 version: str # ui路径 @@ -22,25 +23,25 @@ class SysConfig: self.load_config() def get_config_path(self): - if getattr(sys, 'frozen', False): + if getattr(sys, "frozen", False): # logger.error("打包配置") # 如果是打包后的可执行文件 base_path = sys._MEIPASS - config_subdir = 'launcher/sys_config' # 添加子目录 - config_filename = 'config_dist.json' + config_subdir = "launcher/sys_config" # 添加子目录 + config_filename = "config_dist.json" else: # logger.error("本地配置") # 如果是本地开发环境 base_path = os.path.dirname(__file__) - config_subdir = '' # 本地开发环境不需要子目录 - config_filename = 'config_local.json' + config_subdir = "" # 本地开发环境不需要子目录 + config_filename = "config_local.json" config_path = os.path.join(base_path, config_subdir, config_filename) return config_path def load_config(self): try: - with open(self.config_path, 'r', encoding='utf-8') as file: + with open(self.config_path, "r", encoding="utf-8") as file: self.config = json.load(file) except FileNotFoundError: pass diff --git a/launcher/webview/__init__.py b/launcher/webview/__init__.py index f450a28..09ae4d6 100644 --- a/launcher/webview/__init__.py +++ b/launcher/webview/__init__.py @@ -8,6 +8,11 @@ window = None def start_webview(): global window - window = webview.create_window(f"mower-ng launcher {sys_config.get('version')}", sys_config.get('url'), - js_api=Api(), width=850, height=600) - webview.start(debug=sys_config.get('debug')) + window = webview.create_window( + f"mower-ng launcher {sys_config.get('version')}", + sys_config.get("url"), + js_api=Api(), + width=850, + height=600, + ) + webview.start(debug=sys_config.get("debug")) diff --git a/launcher/webview/api.py b/launcher/webview/api.py index bee397a..1960b38 100644 --- a/launcher/webview/api.py +++ b/launcher/webview/api.py @@ -9,8 +9,14 @@ from subprocess import Popen import requests from launcher import config -from launcher.constants import download_git_url, download_python_url, get_new_version_url, upgrade_script_name, \ - mirror_list, file_name +from launcher.constants import ( + download_git_url, + download_python_url, + get_new_version_url, + upgrade_script_name, + mirror_list, + file_name, +) from launcher.file.download import init_download, download_file from launcher.file.extract import extract_7z_file from launcher.file.utils import ensure_directory_exists, check_command_path @@ -20,7 +26,9 @@ from launcher.webview.events import custom_event, LogType command_list = { "download_git": lambda: init_download("git", download_git_url, os.getcwd()), - "download_python": lambda: init_download("python", download_python_url, os.getcwd()), + "download_python": lambda: init_download( + "python", download_python_url, os.getcwd() + ), "lfs": "git\\bin\\git lfs install", "ensurepip": "python\\python -m ensurepip --default-pip", "clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow", @@ -49,9 +57,7 @@ def parse_stderr(stderr_output): def check_command_end(command_key, output): - end_keywords = { - "webview": {"WebSocket客户端建立连接": "mower_ng已成功运行"} - } + end_keywords = {"webview": {"WebSocket客户端建立连接": "mower_ng已成功运行"}} if command_key in end_keywords: keywords = end_keywords[command_key] for keyword in keywords: @@ -62,7 +68,6 @@ def check_command_end(command_key, output): class Api: - def load_config(self): logger.info("读取配置文件") return config.conf.model_dump() @@ -146,30 +151,43 @@ class Api: # 执行命令前先判断命令路径是否存在 exist, command_path = check_command_path(command, cwd) if not exist: - custom_event(LogType.error, f"命令路径不存在:{command_path} 请尝试依赖修复并重新初始化。") + custom_event( + LogType.error, + f"命令路径不存在:{command_path} 请尝试依赖修复并重新初始化。", + ) return "failed" try: stdout_stderr = [] with subprocess.Popen( - command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, cwd=cwd, bufsize=0, - universal_newlines=False + command, + stdout=subprocess.PIPE, + stderr=subprocess.STDOUT, + shell=True, + cwd=cwd, + bufsize=0, + universal_newlines=False, ) as p: + def process_lines(text_io): - for line in iter(text_io.readline, ''): - text = line.rstrip('\n').strip() + for line in iter(text_io.readline, ""): + text = line.rstrip("\n").strip() custom_event(LogType.command_out, text) if stdout_stderr is not None: stdout_stderr.append(text) if check_command_end(command_key, text): break - detected_encoding = 'utf-8' - text_io = io.TextIOWrapper(p.stdout, encoding=detected_encoding, errors='replace') + detected_encoding = "utf-8" + text_io = io.TextIOWrapper( + p.stdout, encoding=detected_encoding, errors="replace" + ) try: process_lines(text_io) except UnicodeDecodeError: p.stdout.seek(0) # 重新将流指针重置到开头 - text_io = io.TextIOWrapper(p.stdout, encoding='gbk', errors='replace') + text_io = io.TextIOWrapper( + p.stdout, encoding="gbk", errors="replace" + ) process_lines(text_io) finally: text_io.close()