新增功能:初始化时下载python和git

This commit is contained in:
li-xiaochen 2024-12-12 07:46:08 +08:00
parent 00212e447b
commit 8a9b8d1c47
4 changed files with 209 additions and 21 deletions

1
.gitignore vendored
View file

@ -1,4 +1,5 @@
/conf.yml /conf.yml
/conf.json
/dist /dist
# Byte-compiled / optimized / DLL files # Byte-compiled / optimized / DLL files

View file

@ -2,15 +2,18 @@ import io
import json import json
import mimetypes import mimetypes
import os import os
import shutil
import subprocess import subprocess
import threading import threading
import time
from pathlib import Path from pathlib import Path
from shutil import rmtree from shutil import rmtree
from subprocess import CREATE_NO_WINDOW, Popen from subprocess import CREATE_NO_WINDOW, Popen
import chardet
import py7zr
import requests import requests
import webview import webview
from py7zr.callbacks import ExtractCallback
from launcher import config from launcher import config
from log import logger from log import logger
@ -25,24 +28,197 @@ get_new_version_url = (
"https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest" "https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest"
) )
download_git_url = "https://list.zhaozuohong.vip/mower-ng/git.7z"
download_python_url = "https://list.zhaozuohong.vip/mower-ng/python.7z"
upgrade_script_name = "upgrade.bat" upgrade_script_name = "upgrade.bat"
def custom_event(data): def custom_event(data):
data = json.dumps({"log": data}) logger.info(data)
data = json.dumps({"log": data + "\n"})
js = f"var event = new CustomEvent('log', {{detail: {data}}}); window.dispatchEvent(event);" js = f"var event = new CustomEvent('log', {{detail: {data}}}); window.dispatchEvent(event);"
window.evaluate_js(js) window.evaluate_js(js)
def format_size(size_bytes):
"""格式化文件大小为人类可读的形式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
if size_bytes < 1024:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
return f"{size_bytes:.2f} PB"
def download_file(download_name, download_url, destination_folder):
"""
下载文件到指定文件夹
:param download_name: 下载内容名
:param download_url: 文件的下载 URL
:param destination_folder: 保存下载文件的目标文件夹
:return: 下载是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
filename = os.path.basename(download_url)
download_path = os.path.join(destination_folder, filename)
custom_event(f"开始下载: {download_name}")
response = requests.get(download_url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
downloaded_size = 0
start_time = time.time()
last_update_time = time.time() # 记录上次更新时间
block_size = 4096 # 每次读取的数据块大小
with open(download_path, "wb") as file:
for data in response.iter_content(chunk_size=block_size):
file.write(data)
downloaded_size += len(data)
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > 0:
download_speed = downloaded_size / elapsed_time # 字节/秒
else:
download_speed = 0
progress_percent = (downloaded_size / total_size) * 100 if total_size != 0 else 0
# 检查是否需要更新进度信息,每1秒更新一次
if current_time - last_update_time >= 1:
# 格式化输出
formatted_downloaded_size = format_size(downloaded_size)
formatted_total_size = format_size(total_size)
formatted_speed = format_size(download_speed) + "/s"
custom_event(
f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}")
last_update_time = current_time # 更新上次更新时间
end_time = time.time()
total_elapsed_time = end_time - start_time
average_download_speed = downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0
# 格式化输出
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
formatted_average_download_speed = format_size(average_download_speed) + "/s"
custom_event(
f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}")
else:
logger.error(f"下载失败: {response.status_code}")
return False
return True
class MyExtractCallback(ExtractCallback):
def __init__(self):
super().__init__()
self.total_size = 0
self.last_print_time = time.time()
def report_start(self, archive_name, archive_format):
pass
def report_start_preparation(self):
pass
def report_end(self, processing_file_path, wrote_bytes):
self.total_size += int(wrote_bytes)
current_time = time.time()
if current_time - self.last_print_time >= 1.0: # 至少每隔1秒输出一次
custom_event(f"已解压: {format_size(self.total_size)}")
self.last_print_time = current_time
def report_postprocess(self):
pass
def report_update(self, message):
pass
def report_warning(self, message):
pass
def extract_7z_file(file_name, file_path, destination_folder, delete_after_extract=True):
"""
解压7z文件到指定文件夹
:param file_name: 7z文件的名称
:param file_path: 7z文件的路径
:param destination_folder: 解压目标文件夹
:param delete_after_extract: 解压后删除压缩文件默认删除
:return: 解压是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
custom_event(f"开始解压文件: {file_name}")
try:
start_time = time.time()
with py7zr.SevenZipFile(file_path, mode='r') as z:
callback = MyExtractCallback()
z.extractall(path=destination_folder, callback=callback)
end_time = time.time()
total_elapsed_time = end_time - start_time
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
custom_event(
f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}")
except Exception as e:
e.print_exc()
custom_event(f"解压失败: {str(e)}")
return False
if delete_after_extract:
try:
os.remove(file_path)
custom_event(f"删除{file_path}成功")
except OSError as e:
custom_event(f"删除{file_path}失败: {str(e)}")
return True
def init_download(download_name, download_url, destination_folder):
"""
初始化中的下载任务返回一个函数用于执行下载任务
:param download_name: 下载任务的名称用于在日志中记录
:param download_url: 下载文件的 URL
:param destination_folder: 下载文件的目标文件夹
:return: 一个函数用于执行下载任务
"""
def download():
target_folder = os.path.join(download_name)
if os.path.exists(target_folder):
custom_event(f"{download_name} 文件夹已存在,跳过下载")
return True
filename = os.path.basename(download_url)
if not download_file(filename, download_url, destination_folder):
return False
download_path = os.path.join(destination_folder, filename)
if not extract_7z_file(filename, download_path, destination_folder, True):
return False
return True
return download
mirror_list = { mirror_list = {
"pypi": "https://pypi.org/simple", "pypi": "https://pypi.org/simple",
"aliyun": "https://mirrors.aliyun.com/pypi/simple/", "aliyun": "https://mirrors.aliyun.com/pypi/simple/",
"tuna": "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple", "tuna": "https://mirrors.tuna.tsinghua.edu.cn/pypi/web/simple",
"sjtu": "https://mirror.sjtu.edu.cn/pypi/web/simple", "sjtu": "https://mirror.sjtu.edu.cn/pypi/web/simple",
} }
command_list = { command_list = {
"download_git": lambda: init_download("git", download_git_url, os.getcwd()),
"download_python": lambda: init_download("python", download_python_url, os.getcwd()),
"lfs": "git\\bin\\git lfs install", "lfs": "git\\bin\\git lfs install",
"ensurepip": "python\\python -m ensurepip --default-pip", "ensurepip": "python\\python -m ensurepip --default-pip",
"clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow", "clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow",
@ -55,12 +231,20 @@ command_list = {
} }
def detect_encoding(stream):
# 读取一部分数据来检测编码
raw_data = stream.read(4096)
result = chardet.detect(raw_data)
stream.seek(0) # 将流指针重置到开头
return result['encoding']
def read_stream(stream, log_func): def read_stream(stream, log_func):
text_io = io.TextIOWrapper(stream, encoding='utf-8', errors='replace') detected_encoding = detect_encoding(stream)
text_io = io.TextIOWrapper(stream, encoding=detected_encoding, errors='replace')
try: try:
for line in iter(text_io.readline, ''): for line in iter(text_io.readline, ''):
text = line.rstrip('\n') text = line.rstrip('\n')
log_func(f"log: {text}")
custom_event(text.strip() + "\n") custom_event(text.strip() + "\n")
finally: finally:
text_io.close() text_io.close()
@ -87,18 +271,14 @@ class Api:
# 更新启动器本身 # 更新启动器本身
def update_self(self, download_url): def update_self(self, download_url):
logger.info(f"开始更新启动器 {download_url}")
# 下载压缩包的全路径 # 下载压缩包的全路径
download_path = os.path.join(os.getcwd(), os.path.basename(download_url)) file_name = os.path.basename(download_url)
logger.info(f"下载新版本: {download_url}{download_path}") current_path = os.getcwd()
response = requests.get(download_url, stream=True) if not download_file("launcher", download_url, current_path):
if response.status_code == 200: return "下载新版本失败"
with open(download_path, "wb") as file:
shutil.copyfileobj(response.raw, file)
logger.info("下载完成")
else:
logger.error(f"下载新版本失败: {response.status_code}")
return f"下载新版本失败: {response.status_code}"
download_path = os.path.join(current_path, file_name)
script_path = os.path.join(os.getcwd(), upgrade_script_name) script_path = os.path.join(os.getcwd(), upgrade_script_name)
folder_path = os.path.join(os.getcwd(), "_internal") folder_path = os.path.join(os.getcwd(), "_internal")
exe_path = os.path.join(os.getcwd(), "launcher.exe") exe_path = os.path.join(os.getcwd(), "launcher.exe")
@ -135,9 +315,9 @@ class Api:
command = command_list[command] command = command_list[command]
if callable(command): if callable(command):
command = command() command = command()
if callable(command):
return "success" if command() else "failed"
custom_event(command + "\n") custom_event(command + "\n")
logger.info(f"cwd {cwd}")
logger.info(f"command {command}")
try: try:
with subprocess.Popen( with subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd, bufsize=0, command, stdout=subprocess.PIPE, stderr=subprocess.PIPE, shell=True, cwd=cwd, bufsize=0,

8
log.py
View file

@ -1,8 +1,9 @@
import io
import logging import logging
import sys
from logging.handlers import RotatingFileHandler from logging.handlers import RotatingFileHandler
log_level = logging.INFO
log_level = logging.ERROR
# 配置日志 # 配置日志
@ -10,6 +11,9 @@ def setup_logger():
logger = logging.getLogger("launcher.log") logger = logging.getLogger("launcher.log")
logger.setLevel(log_level) logger.setLevel(log_level)
# 设置标准输出编码为 UTF-8
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
# 控制台输出 # 控制台输出
console_handler = logging.StreamHandler() console_handler = logging.StreamHandler()
console_handler.setLevel(log_level) console_handler.setLevel(log_level)

View file

@ -1,13 +1,16 @@
<script setup> <script setup>
const steps = ref([ const steps = ref([
{ title: '设置 Git LFS', command: ['lfs'] }, {
title: '下载 git、python',
command: ['download_git', 'download_python']
},
{ {
title: '安装 pip', title: '安装 pip',
command: ['ensurepip'] command: ['ensurepip']
}, },
{ {
title: '下载 mower-ng 代码', title: '下载 mower-ng 代码',
command: ['clone'] command: ['lfs', 'clone']
} }
]) ])
provide('steps', steps) provide('steps', steps)