180 lines
7.8 KiB
Python
180 lines
7.8 KiB
Python
import io
|
|
import os
|
|
import subprocess
|
|
import threading
|
|
from _winapi import CREATE_NO_WINDOW
|
|
from pathlib import Path
|
|
from shutil import rmtree
|
|
from subprocess import Popen
|
|
|
|
import requests
|
|
|
|
from launcher import config
|
|
from launcher.constants import download_git_url, download_python_url, get_new_version_url, upgrade_script_name, \
|
|
mirror_list
|
|
from launcher.file.download import init_download, download_file
|
|
from launcher.file.extract import extract_7z_file
|
|
from launcher.file.utils import ensure_directory_exists, check_command_path
|
|
from launcher.log import logger
|
|
from launcher.sys_config import sys_config
|
|
from launcher.webview.events import custom_event, LogType
|
|
|
|
command_list = {
|
|
"download_git": lambda: init_download("git", download_git_url, os.getcwd()),
|
|
"download_python": lambda: init_download("python", download_python_url, os.getcwd()),
|
|
"lfs": "git\\bin\\git lfs install",
|
|
"ensurepip": "python\\python -m ensurepip --default-pip",
|
|
"clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow",
|
|
"fetch": lambda: f"..\\git\\bin\\git fetch origin {config.conf.branch} --progress",
|
|
"switch": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=100 switch -f {config.conf.branch} --progress",
|
|
"reset": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=200 reset --hard origin/{config.conf.branch}",
|
|
"pip_tools_install": lambda: f"..\\python\\Scripts\\pip install --no-cache-dir -i {mirror_list[config.conf.mirror]} pip-tools --no-warn-script-location",
|
|
"pip_sync": lambda: f"..\\python\\Scripts\\pip-sync -i {mirror_list[config.conf.mirror]} requirements.txt",
|
|
"webview": "start ..\\python\\pythonw webview_ui.py",
|
|
"manager": "start ..\\python\\pythonw manager.py",
|
|
}
|
|
|
|
|
|
def parse_stderr(stderr_output):
|
|
error_keywords = {
|
|
"fatal: destination path 'mower-ng' already exists and is not an empty directory.": "mower-ng文件夹已存在并且非空。",
|
|
"index.lock': File exists": "上一个git命令正在执行,请等待执行结束或在任务管理器中杀掉git进程,并确保上方提示的index.lock文件删除后再次运行。",
|
|
"Could not resolve host": "网络出现错误,请检查网络是否通畅。",
|
|
"ReadTimeoutError": "网络连接超时,请检查网络连接或尝试更换镜像源。",
|
|
"No space left on device": "磁盘空间不足。",
|
|
}
|
|
for keyword, message in error_keywords.items():
|
|
if keyword in stderr_output:
|
|
return message
|
|
return "未定义的错误"
|
|
|
|
|
|
def read_stream(stream, log_type, output_list=None):
|
|
def process_lines(text_io):
|
|
for line in iter(text_io.readline, ''):
|
|
text = line.rstrip('\n').strip()
|
|
custom_event(log_type, text)
|
|
if output_list is not None:
|
|
output_list.append(text)
|
|
|
|
detected_encoding = 'utf-8'
|
|
text_io = io.TextIOWrapper(stream, encoding=detected_encoding, errors='replace')
|
|
try:
|
|
process_lines(text_io)
|
|
except UnicodeDecodeError:
|
|
stream.seek(0) # 重新将流指针重置到开头
|
|
text_io = io.TextIOWrapper(stream, encoding='gbk', errors='replace')
|
|
process_lines(text_io)
|
|
finally:
|
|
text_io.close()
|
|
|
|
|
|
class Api:
|
|
|
|
def load_config(self):
|
|
logger.info("读取配置文件")
|
|
return config.conf.model_dump()
|
|
|
|
def save_config(self, conf):
|
|
logger.info(f"更新配置文件{conf}")
|
|
config.conf = config.Conf(**conf)
|
|
config.save_conf()
|
|
|
|
def get_version(self):
|
|
return sys_config.get("version")
|
|
|
|
def get_new_version(self):
|
|
logger.info("获取最新版本号")
|
|
response = requests.get(get_new_version_url)
|
|
return response.json()
|
|
|
|
# 更新启动器本身
|
|
def update_self(self, download_url):
|
|
logger.info(f"开始更新启动器 {download_url}")
|
|
file_name = os.path.basename(download_url)
|
|
file_name = "launcher.7z"
|
|
current_path = os.getcwd()
|
|
download_tmp_folder = os.path.join(current_path, "download_tmp")
|
|
# 确保 download_tmp 文件夹存在
|
|
ensure_directory_exists(download_tmp_folder)
|
|
if not download_file("launcher", download_url, download_tmp_folder):
|
|
return "下载新版本失败"
|
|
|
|
download_path = os.path.join(download_tmp_folder, file_name)
|
|
if not extract_7z_file("launcher", download_path, download_tmp_folder, True):
|
|
return "解压新版本失败"
|
|
|
|
exe_path = os.path.join(current_path, "launcher.exe")
|
|
folder_path = os.path.join(current_path, "_internal")
|
|
new_exe_path = os.path.join(download_tmp_folder, "launcher", "launcher.exe")
|
|
new_folder_path = os.path.join(download_tmp_folder, "launcher", "_internal")
|
|
script_path = os.path.join(download_tmp_folder, upgrade_script_name)
|
|
with open(script_path, "w") as b:
|
|
temp_list = "@echo off\n"
|
|
temp_list += "timeout /t 3 /nobreak\n" # 等待进程退出
|
|
temp_list += f"rmdir {folder_path}\n" # 删除_internal
|
|
temp_list += f"del {exe_path}\n" # 删除exe
|
|
temp_list += f"xcopy /e /i /y {new_folder_path} {folder_path}\n"
|
|
temp_list += f"move {new_exe_path} {exe_path}\n"
|
|
temp_list += "timeout /t 1 /nobreak\n" # 等待操作完成
|
|
temp_list += f"start {exe_path}\n" # 启动新程序
|
|
temp_list += "exit"
|
|
b.write(temp_list)
|
|
# 不显示cmd窗口
|
|
Popen([script_path], creationflags=CREATE_NO_WINDOW)
|
|
os._exit(0)
|
|
|
|
def rm_site_packages(self):
|
|
site_packages_path = Path("./python/Lib/site-packages")
|
|
if site_packages_path.exists():
|
|
rmtree(site_packages_path)
|
|
return "site-packages目录移除成功"
|
|
return "python\\Lib\\site-packages目录不存在"
|
|
|
|
def rm_python_scripts(self):
|
|
python_scripts_path = Path("./python/Scripts")
|
|
if python_scripts_path.exists():
|
|
rmtree(python_scripts_path)
|
|
return "Scripts目录移除成功"
|
|
return "python\\Scripts目录不存在"
|
|
|
|
def run(self, command, cwd=None):
|
|
command = command_list[command]
|
|
if callable(command):
|
|
command = command()
|
|
if callable(command):
|
|
return "success" if command() else "failed"
|
|
if cwd is not None:
|
|
custom_event(LogType.info, f"命令执行目录:{cwd}")
|
|
custom_event(LogType.execute_command, command)
|
|
# 执行命令前先判断命令路径是否存在
|
|
exist, command_path = check_command_path(command, cwd)
|
|
if not exist:
|
|
custom_event(LogType.error, f"命令路径不存在:{command_path} 请尝试依赖修复并重新初始化。")
|
|
return "failed"
|
|
try:
|
|
stdout_stderr = []
|
|
with subprocess.Popen(
|
|
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd, bufsize=0,
|
|
universal_newlines=False
|
|
) as p:
|
|
stdout_thread = threading.Thread(target=read_stream, args=(p.stdout, LogType.command_out))
|
|
stderr_thread = threading.Thread(target=read_stream,
|
|
args=(p.stderr, LogType.command_out, stdout_stderr))
|
|
|
|
stdout_thread.start()
|
|
stderr_thread.start()
|
|
|
|
stdout_thread.join()
|
|
stderr_thread.join()
|
|
|
|
if p.returncode == 0:
|
|
return "success"
|
|
else:
|
|
error_message = parse_stderr("\n".join(stdout_stderr))
|
|
custom_event(LogType.error, f"命令执行失败,{error_message}")
|
|
return "failed"
|
|
except Exception as e:
|
|
logger.exception(e)
|
|
custom_event(LogType.error, str(e))
|
|
return "failed"
|