按功能拆分代码

This commit is contained in:
li-xiaochen 2024-12-14 19:54:21 +08:00
parent d687e0426a
commit effcc2874e
13 changed files with 382 additions and 341 deletions

View file

@ -1,351 +1,18 @@
import io
import json
import mimetypes import mimetypes
import os import os
import subprocess
import threading
import time
from pathlib import Path from pathlib import Path
from shutil import rmtree
from subprocess import CREATE_NO_WINDOW, Popen
import chardet from launcher.constants import upgrade_script_name
import py7zr from launcher.webview import start_webview
import requests
import webview
from py7zr.callbacks import ExtractCallback
from launcher import config
from launcher.sys_config import sys_config
from log import logger
mimetypes.add_type("text/html", ".html") mimetypes.add_type("text/html", ".html")
mimetypes.add_type("text/css", ".css") mimetypes.add_type("text/css", ".css")
mimetypes.add_type("application/javascript", ".js") mimetypes.add_type("application/javascript", ".js")
version = "v0.3" if __name__ == '__main__':
get_new_version_url = (
"https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest"
)
download_git_url = "https://list.zhaozuohong.vip/mower-ng/git.7z"
download_python_url = "https://list.zhaozuohong.vip/mower-ng/python.7z"
upgrade_script_name = "upgrade.bat"
def custom_event(data):
logger.info(data)
data = json.dumps({"log": data + "\n"})
js = f"var event = new CustomEvent('log', {{detail: {data}}}); window.dispatchEvent(event);"
window.evaluate_js(js)
def format_size(size_bytes):
"""格式化文件大小为人类可读的形式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.2f} PB"
def download_file(download_name, download_url, destination_folder):
"""
下载文件到指定文件夹
:param download_name: 下载内容名
:param download_url: 文件的下载 URL
:param destination_folder: 保存下载文件的目标文件夹
:return: 下载是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
filename = os.path.basename(download_url)
download_path = os.path.join(destination_folder, filename)
custom_event(f"开始下载: {download_name}")
response = requests.get(download_url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = time.time() # 记录上次更新时间
block_size = 4096 # 每次读取的数据块大小
with open(download_path, "wb") as file:
for data in response.iter_content(chunk_size=block_size):
file.write(data)
downloaded_size += len(data)
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > 0:
download_speed = downloaded_size / elapsed_time # 字节/秒
else:
download_speed = 0
progress_percent = (downloaded_size / total_size) * 100 if total_size != 0 else 0
# 检查是否需要更新进度信息,每1秒更新一次
if current_time - last_update_time >= 1:
# 格式化输出
formatted_downloaded_size = format_size(downloaded_size)
formatted_total_size = format_size(total_size)
formatted_speed = format_size(download_speed) + "/s"
custom_event(
f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}")
last_update_time = current_time # 更新上次更新时间
end_time = time.time()
total_elapsed_time = end_time - start_time
average_download_speed = downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0
# 格式化输出
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
formatted_average_download_speed = format_size(average_download_speed) + "/s"
custom_event(
f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}")
else:
logger.error(f"下载失败: {response.status_code}")
return False
return True
class MyExtractCallback(ExtractCallback):
def __init__(self):
super().__init__()
self.total_size = 0
self.last_print_time = time.time()
def report_start(self, archive_name, archive_format):
pass
def report_start_preparation(self):
pass
def report_end(self, processing_file_path, wrote_bytes):
self.total_size += int(wrote_bytes)
current_time = time.time()
if current_time - self.last_print_time >= 1.0: # 至少每隔1秒输出一次
custom_event(f"已解压: {format_size(self.total_size)}")
self.last_print_time = current_time
def report_postprocess(self):
pass
def report_update(self, message):
pass
def report_warning(self, message):
pass
def extract_7z_file(file_name, file_path, destination_folder, delete_after_extract=True):
"""
解压7z文件到指定文件夹
:param file_name: 7z文件的名称
:param file_path: 7z文件的路径
:param destination_folder: 解压目标文件夹
:param delete_after_extract: 解压后删除压缩文件默认删除
:return: 解压是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
custom_event(f"开始解压文件: {file_name}")
try:
start_time = time.time()
with py7zr.SevenZipFile(file_path, mode='r') as z:
callback = MyExtractCallback()
z.extractall(path=destination_folder, callback=callback)
end_time = time.time()
total_elapsed_time = end_time - start_time
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
custom_event(
f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}")
except Exception as e:
e.print_exc()
custom_event(f"解压失败: {str(e)}")
return False
if delete_after_extract:
try:
os.remove(file_path)
custom_event(f"删除{file_path}成功")
except OSError as e:
custom_event(f"删除{file_path}失败: {str(e)}")
return True
def init_download(download_name, download_url, destination_folder):
"""
初始化中的下载任务返回一个函数用于执行下载任务
:param download_name: 下载任务的名称用于在日志中记录
:param download_url: 下载文件的 URL
:param destination_folder: 下载文件的目标文件夹
:return: 一个函数用于执行下载任务
"""
def download():
target_folder = os.path.join(download_name)
if os.path.exists(target_folder):
custom_event(f"{download_name} 文件夹已存在,跳过下载")
return True
filename = os.path.basename(download_url)
if not download_file(filename, download_url, destination_folder):
return False
download_path = os.path.join(destination_folder, filename)
if not extract_7z_file(filename, download_path, destination_folder, True):
return False
return True
return download
mirror_list = {
"pypi": "https://pypi.org/simple",
"aliyun": "https://mirrors.aliyun.com/pypi/simple/",
"tuna": "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple",
"sjtu": "https://mirror.sjtu.edu.cn/pypi/web/simple",
}
command_list = {
"download_git": lambda: init_download("git", download_git_url, os.getcwd()),
"download_python": lambda: init_download("python", download_python_url, os.getcwd()),
"lfs": "git\\bin\\git lfs install",
"ensurepip": "python\\python -m ensurepip --default-pip",
"clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow",
"fetch": lambda: f"..\\git\\bin\\git fetch origin {config.conf.branch} --progress",
"switch": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=100 switch -f {config.conf.branch} --progress",
"reset": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=200 reset --hard origin/{config.conf.branch}",
"pip_install": lambda: f"..\\python\\Scripts\\pip install --no-cache-dir -i {mirror_list[config.conf.mirror]} -r requirements.txt --no-warn-script-location",
"webview": "start ..\\python\\pythonw webview_ui.py",
"manager": "start ..\\python\\pythonw manager.py",
}
def detect_encoding(stream):
# 读取一部分数据来检测编码
raw_data = stream.read(4096)
result = chardet.detect(raw_data)
stream.seek(0) # 将流指针重置到开头
return result['encoding']
def read_stream(stream, log_func):
detected_encoding = detect_encoding(stream)
text_io = io.TextIOWrapper(stream, encoding=detected_encoding, errors='replace')
try:
for line in iter(text_io.readline, ''):
text = line.rstrip('\n')
custom_event(text.strip() + "\n")
finally:
text_io.close()
class Api:
def load_config(self):
logger.info("读取配置文件")
return config.conf.model_dump()
def save_config(self, conf):
logger.info(f"更新配置文件{conf}")
config.conf = config.Conf(**conf)
config.save_conf()
def get_version(self):
return version
def get_new_version(self):
logger.info("获取最新版本号")
response = requests.get(get_new_version_url)
return response.json()
# 更新启动器本身
def update_self(self, download_url):
logger.info(f"开始更新启动器 {download_url}")
# 下载压缩包的全路径
file_name = os.path.basename(download_url)
current_path = os.getcwd()
if not download_file("launcher", download_url, current_path):
return "下载新版本失败"
download_path = os.path.join(current_path, file_name)
script_path = os.path.join(os.getcwd(), upgrade_script_name)
folder_path = os.path.join(os.getcwd(), "_internal")
exe_path = os.path.join(os.getcwd(), "launcher.exe")
with open(script_path, "w") as b:
temp_list = "@echo off\n"
temp_list += "timeout /t 3 /nobreak\n" # 等待进程退出
temp_list += f"rmdir {folder_path}\n" # 删除_internal
temp_list += f"del {exe_path}\n" # 删除exe
temp_list += f"tar -xf {download_path} -C ..\n" # 解压压缩包
temp_list += "timeout /t 1 /nobreak\n" # 等待解压
temp_list += f"start {exe_path}\n" # 启动新程序
temp_list += f"del {download_path}\n" # 删除压缩包
temp_list += "exit"
b.write(temp_list)
# 不显示cmd窗口
Popen([script_path], creationflags=CREATE_NO_WINDOW)
os._exit(0)
def rm_site_packages(self):
site_packages_path = Path("./python/Lib/site-packages")
if site_packages_path.exists():
rmtree(site_packages_path)
return "site-packages目录移除成功"
return "python\\Lib\\site-packages目录不存在"
def rm_python_scripts(self):
python_scripts_path = Path("./python/Scripts")
if python_scripts_path.exists():
rmtree(python_scripts_path)
return "Scripts目录移除成功"
return "python\\Scripts目录不存在"
def run(self, command, cwd=None):
command = command_list[command]
if callable(command):
command = command()
if callable(command):
return "success" if command() else "failed"
custom_event(command + "\n")
try:
with subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd, bufsize=0,
universal_newlines=False
) as p:
stdout_thread = threading.Thread(target=read_stream, args=(p.stdout, logger.info))
stderr_thread = threading.Thread(target=read_stream, args=(p.stderr, logger.info))
stdout_thread.start()
stderr_thread.start()
stdout_thread.join()
stderr_thread.join()
if p.returncode == 0:
return "success"
except Exception as e:
logger.error(f"command {command} 执行异常{e}")
logger.exception(e)
custom_event(str(e))
return "failed"
# 如果当前路径存在更新脚本,则删除 # 如果当前路径存在更新脚本,则删除
if Path(upgrade_script_name).exists(): if Path(upgrade_script_name).exists():
os.remove(upgrade_script_name) os.remove(upgrade_script_name)
window = webview.create_window( start_webview()
f"mower-ng launcher {version}", sys_config.get('url'), js_api=Api()
)
webview.start()

24
launcher/constants.py Normal file
View file

@ -0,0 +1,24 @@
"""
constants.py
该模块定义了应用程序中使用的各种常量这些常量包括API URL和其他配置参数
"""
# 更新脚本名
upgrade_script_name = "upgrade.bat"
# 获取最新版本发布信息
get_new_version_url = "https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest"
# 下载地址
download_git_url = "https://list.zhaozuohong.vip/mower-ng/git.7z"
download_python_url = "https://list.zhaozuohong.vip/mower-ng/python.7z"
# pip镜像地址
mirror_list = {
"pypi": "https://pypi.org/simple",
"aliyun": "https://mirrors.aliyun.com/pypi/simple/",
"tuna": "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple",
"sjtu": "https://mirror.sjtu.edu.cn/pypi/web/simple",
}

View file

100
launcher/file/download.py Normal file
View file

@ -0,0 +1,100 @@
import os
import time
import requests
from launcher.file.extract import extract_7z_file
from launcher.file.utils import format_size
from launcher.webview.events import custom_event
def download_file(download_name, download_url, destination_folder):
"""
下载文件到指定文件夹
:param download_name: 下载内容名
:param download_url: 文件的下载 URL
:param destination_folder: 保存下载文件的目标文件夹
:return: 下载是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
filename = os.path.basename(download_url)
download_path = os.path.join(destination_folder, filename)
custom_event(f"开始下载: {download_name}")
response = requests.get(download_url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = time.time() # 记录上次更新时间
block_size = 4096 # 每次读取的数据块大小
with open(download_path, "wb") as file:
for data in response.iter_content(chunk_size=block_size):
file.write(data)
downloaded_size += len(data)
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > 0:
download_speed = downloaded_size / elapsed_time # 字节/秒
else:
download_speed = 0
progress_percent = (downloaded_size / total_size) * 100 if total_size != 0 else 0
# 检查是否需要更新进度信息,每1秒更新一次
if current_time - last_update_time >= 1:
# 格式化输出
formatted_downloaded_size = format_size(downloaded_size)
formatted_total_size = format_size(total_size)
formatted_speed = format_size(download_speed) + "/s"
custom_event(
f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}")
last_update_time = current_time # 更新上次更新时间
end_time = time.time()
total_elapsed_time = end_time - start_time
average_download_speed = downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0
# 格式化输出
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
formatted_average_download_speed = format_size(average_download_speed) + "/s"
custom_event(
f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}")
else:
custom_event(f"下载失败: {response.status_code}")
return False
return True
def init_download(download_name, download_url, destination_folder):
"""
初始化中的下载任务返回一个函数用于执行下载任务
:param download_name: 下载任务的名称用于在日志中记录
:param download_url: 下载文件的 URL
:param destination_folder: 下载文件的目标文件夹
:return: 一个函数用于执行下载任务
"""
def download():
target_folder = os.path.join(download_name)
if os.path.exists(target_folder):
custom_event(f"{download_name} 文件夹已存在,跳过下载")
return True
filename = os.path.basename(download_url)
if not download_file(filename, download_url, destination_folder):
return False
download_path = os.path.join(destination_folder, filename)
if not extract_7z_file(filename, download_path, destination_folder, True):
return False
return True
return download

75
launcher/file/extract.py Normal file
View file

@ -0,0 +1,75 @@
import os
import time
from py7zr import py7zr
from py7zr.callbacks import ExtractCallback
from launcher.file.utils import format_size
from launcher.webview.events import custom_event
class MyExtractCallback(ExtractCallback):
def __init__(self):
super().__init__()
self.total_size = 0
self.last_print_time = time.time()
def report_start(self, archive_name, archive_format):
pass
def report_start_preparation(self):
pass
def report_end(self, processing_file_path, wrote_bytes):
self.total_size += int(wrote_bytes)
current_time = time.time()
if current_time - self.last_print_time >= 1.0: # 至少每隔1秒输出一次
custom_event(f"已解压: {format_size(self.total_size)}")
self.last_print_time = current_time
def report_postprocess(self):
pass
def report_update(self, message):
pass
def report_warning(self, message):
pass
def extract_7z_file(file_name, file_path, destination_folder, delete_after_extract=True):
"""
解压7z文件到指定文件夹
:param file_name: 7z文件的名称
:param file_path: 7z文件的路径
:param destination_folder: 解压目标文件夹
:param delete_after_extract: 解压后删除压缩文件默认删除
:return: 解压是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
custom_event(f"开始解压文件: {file_name}")
try:
start_time = time.time()
with py7zr.SevenZipFile(file_path, mode='r') as z:
callback = MyExtractCallback()
z.extractall(path=destination_folder, callback=callback)
end_time = time.time()
total_elapsed_time = end_time - start_time
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
custom_event(
f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}")
except Exception as e:
e.print_exc()
custom_event(f"解压失败: {str(e)}")
return False
if delete_after_extract:
try:
os.remove(file_path)
custom_event(f"删除{file_path}成功")
except OSError as e:
custom_event(f"删除{file_path}失败: {str(e)}")
return True

7
launcher/file/utils.py Normal file
View file

@ -0,0 +1,7 @@
def format_size(size_bytes):
"""格式化文件大小为人类可读的形式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.2f} PB"

View file

@ -22,7 +22,7 @@ def setup_logger():
# 文件输出 # 文件输出
file_handler = RotatingFileHandler( file_handler = RotatingFileHandler(
"launcher.log", maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8' "../launcher.log", maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8'
) )
file_handler.setLevel(logging.INFO) file_handler.setLevel(logging.INFO)

View file

@ -8,6 +8,8 @@ class SysConfig:
""" """
读取系统配置文件 读取系统配置文件
""" """
# 版本
version: str
# ui路径 # ui路径
url: str url: str
# 日志输出级别 # 日志输出级别

View file

@ -1,2 +1,3 @@
version: "v0.3"
url: "ui/dist/index.html" url: "ui/dist/index.html"
log_level: "ERROR" log_level: "ERROR"

View file

@ -1,2 +1,3 @@
version: "dev"
url: "http://localhost:5173/" url: "http://localhost:5173/"
log_level: "INFO" log_level: "INFO"

View file

@ -0,0 +1,12 @@
import webview
from launcher.webview.api import Api
from launcher.sys_config import sys_config
window = None
def start_webview():
global window
window = webview.create_window(f"mower-ng launcher {sys_config.get('version')}", sys_config.get('url'), js_api=Api())
webview.start()

141
launcher/webview/api.py Normal file
View file

@ -0,0 +1,141 @@
import io
import os
import subprocess
import threading
from _winapi import CREATE_NO_WINDOW
from pathlib import Path
from shutil import rmtree
from subprocess import Popen
import chardet
import requests
from launcher import config
from launcher.constants import download_git_url, download_python_url, get_new_version_url, upgrade_script_name, \
mirror_list
from launcher.file.download import init_download, download_file
from launcher.log import logger
from launcher.sys_config import sys_config
from launcher.webview.events import custom_event
command_list = {
"download_git": lambda: init_download("git", download_git_url, os.getcwd()),
"download_python": lambda: init_download("python", download_python_url, os.getcwd()),
"lfs": "git\\bin\\git lfs install",
"ensurepip": "python\\python -m ensurepip --default-pip",
"clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow",
"fetch": lambda: f"..\\git\\bin\\git fetch origin {config.conf.branch} --progress",
"switch": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=100 switch -f {config.conf.branch} --progress",
"reset": lambda: f"..\\git\\bin\\git -c lfs.concurrenttransfers=200 reset --hard origin/{config.conf.branch}",
"pip_install": lambda: f"..\\python\\Scripts\\pip install --no-cache-dir -i {mirror_list[config.conf.mirror]} -r requirements.txt --no-warn-script-location",
"webview": "start ..\\python\\pythonw webview_ui.py",
"manager": "start ..\\python\\pythonw manager.py",
}
def detect_encoding(stream):
# 读取一部分数据来检测编码
raw_data = stream.read(4096)
result = chardet.detect(raw_data)
stream.seek(0) # 将流指针重置到开头
return result['encoding']
def read_stream(stream, log_func):
detected_encoding = detect_encoding(stream)
text_io = io.TextIOWrapper(stream, encoding=detected_encoding, errors='replace')
try:
for line in iter(text_io.readline, ''):
text = line.rstrip('\n')
custom_event(text.strip() + "\n")
finally:
text_io.close()
class Api:
def load_config(self):
logger.info("读取配置文件")
return config.conf.model_dump()
def save_config(self, conf):
logger.info(f"更新配置文件{conf}")
config.conf = config.Conf(**conf)
config.save_conf()
def get_version(self):
return sys_config.get("version")
def get_new_version(self):
logger.info("获取最新版本号")
response = requests.get(get_new_version_url)
return response.json()
# 更新启动器本身
def update_self(self, download_url):
logger.info(f"开始更新启动器 {download_url}")
# 下载压缩包的全路径
file_name = os.path.basename(download_url)
current_path = os.getcwd()
if not download_file("launcher", download_url, current_path):
return "下载新版本失败"
download_path = os.path.join(current_path, file_name)
script_path = os.path.join(os.getcwd(), upgrade_script_name)
folder_path = os.path.join(os.getcwd(), "_internal")
exe_path = os.path.join(os.getcwd(), "launcher.exe")
with open(script_path, "w") as b:
temp_list = "@echo off\n"
temp_list += "timeout /t 3 /nobreak\n" # 等待进程退出
temp_list += f"rmdir {folder_path}\n" # 删除_internal
temp_list += f"del {exe_path}\n" # 删除exe
temp_list += f"tar -xf {download_path} -C ..\n" # 解压压缩包
temp_list += "timeout /t 1 /nobreak\n" # 等待解压
temp_list += f"start {exe_path}\n" # 启动新程序
temp_list += f"del {download_path}\n" # 删除压缩包
temp_list += "exit"
b.write(temp_list)
# 不显示cmd窗口
Popen([script_path], creationflags=CREATE_NO_WINDOW)
os._exit(0)
def rm_site_packages(self):
site_packages_path = Path("./python/Lib/site-packages")
if site_packages_path.exists():
rmtree(site_packages_path)
return "site-packages目录移除成功"
return "python\\Lib\\site-packages目录不存在"
def rm_python_scripts(self):
python_scripts_path = Path("./python/Scripts")
if python_scripts_path.exists():
rmtree(python_scripts_path)
return "Scripts目录移除成功"
return "python\\Scripts目录不存在"
def run(self, command, cwd=None):
command = command_list[command]
if callable(command):
command = command()
if callable(command):
return "success" if command() else "failed"
custom_event(command + "\n")
try:
with subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd, bufsize=0,
universal_newlines=False
) as p:
stdout_thread = threading.Thread(target=read_stream, args=(p.stdout, logger.info))
stderr_thread = threading.Thread(target=read_stream, args=(p.stderr, logger.info))
stdout_thread.start()
stderr_thread.start()
stdout_thread.join()
stderr_thread.join()
if p.returncode == 0:
return "success"
except Exception as e:
logger.exception(e)
custom_event(str(e))
return "failed"

View file

@ -0,0 +1,11 @@
import json
import launcher
from launcher.log import logger
def custom_event(data):
logger.info(data)
data = json.dumps({"log": data + "\n"})
js = f"var event = new CustomEvent('log', {{detail: {data}}}); window.dispatchEvent(event);"
launcher.webview.window.evaluate_js(js)