ruff格式化代码

This commit is contained in:
li-xiaochen 2025-01-25 17:49:17 +08:00
parent cae312a272
commit 25f9915fcc
10 changed files with 84 additions and 43 deletions

View file

@ -9,8 +9,7 @@ mimetypes.add_type("text/html", ".html")
mimetypes.add_type("text/css", ".css")
mimetypes.add_type("application/javascript", ".js")
if __name__ == '__main__':
if __name__ == "__main__":
# 如果当前路径存在临时文件夹,则删除
if Path(update_tmp_folder).exists():
shutil.rmtree(update_tmp_folder)

View file

@ -17,6 +17,7 @@ class ConfModel(BaseModel):
class Total(ConfModel):
"""整体"""
# 所在页面
page: str = "init"
# 是否已展示帮助文档
@ -25,6 +26,7 @@ class Total(ConfModel):
class UpdatePart(ConfModel):
"""更新代码"""
# mower-ng 代码分支
branch: str = "slow"
# PyPI 仓库镜像

View file

@ -10,7 +10,9 @@ update_tmp_folder = "download_tmp"
upgrade_script_name = "upgrade.bat"
# 获取最新版本发布信息
get_new_version_url = "https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest"
get_new_version_url = (
"https://git.zhaozuohong.vip/api/v1/repos/mower-ng/launcher/releases/latest"
)
# 下载新版本压缩包名
file_name = "launcher.7z"

View file

@ -25,7 +25,7 @@ def download_file(download_name, download_url, destination_folder):
response = requests.get(download_url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get('content-length', 0))
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
start_time = time.time()
last_update_time = time.time() # 记录上次更新时间
@ -43,7 +43,9 @@ def download_file(download_name, download_url, destination_folder):
else:
download_speed = 0
progress_percent = (downloaded_size / total_size) * 100 if total_size != 0 else 0
progress_percent = (
(downloaded_size / total_size) * 100 if total_size != 0 else 0
)
# 检查是否需要更新进度信息,每1秒更新一次
if current_time - last_update_time >= 1:
@ -52,19 +54,25 @@ def download_file(download_name, download_url, destination_folder):
formatted_total_size = format_size(total_size)
formatted_speed = format_size(download_speed) + "/s"
custom_event(LogType.info,
f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}")
custom_event(
LogType.info,
f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}",
)
last_update_time = current_time # 更新上次更新时间
end_time = time.time()
total_elapsed_time = end_time - start_time
average_download_speed = downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0
average_download_speed = (
downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0
)
# 格式化输出
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
formatted_average_download_speed = format_size(average_download_speed) + "/s"
custom_event(LogType.info,
f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}")
custom_event(
LogType.info,
f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}",
)
else:
custom_event(LogType.error, f"下载失败: {response.status_code}")
return False

View file

@ -37,7 +37,9 @@ class MyExtractCallback(ExtractCallback):
pass
def extract_7z_file(file_name, file_path, destination_folder, delete_after_extract=True):
def extract_7z_file(
file_name, file_path, destination_folder, delete_after_extract=True
):
"""
解压7z文件到指定文件夹
:param file_name: 7z文件的名称
@ -52,14 +54,16 @@ def extract_7z_file(file_name, file_path, destination_folder, delete_after_extra
custom_event(LogType.info, f"开始解压文件: {file_name}")
try:
start_time = time.time()
with py7zr.SevenZipFile(file_path, mode='r') as z:
with py7zr.SevenZipFile(file_path, mode="r") as z:
callback = MyExtractCallback()
z.extractall(path=destination_folder, callback=callback)
end_time = time.time()
total_elapsed_time = end_time - start_time
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
custom_event(LogType.info,
f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}")
custom_event(
LogType.info,
f"解压完成: {file_name}, 总大小: {format_size(callback.total_size)}, 耗时: {formatted_total_elapsed_time}",
)
except Exception as e:
custom_event(LogType.error, f"解压失败: {repr(e)}")
return False

View file

@ -3,7 +3,7 @@ import os
def format_size(size_bytes):
"""格式化文件大小为人类可读的形式"""
for unit in ['B', 'KB', 'MB', 'GB', 'TB']:
for unit in ["B", "KB", "MB", "GB", "TB"]:
if size_bytes < 1024:
return f"{size_bytes:.2f} {unit}"
size_bytes /= 1024
@ -26,5 +26,7 @@ def check_command_path(command, cwd=None):
exec_command_path = os.getcwd()
if cwd:
exec_command_path = os.path.join(exec_command_path, cwd)
full_command_path = os.path.abspath(os.path.join(exec_command_path, command_path)) + ".exe"
full_command_path = (
os.path.abspath(os.path.join(exec_command_path, command_path)) + ".exe"
)
return os.path.exists(full_command_path), full_command_path

View file

@ -9,13 +9,13 @@ from launcher.sys_config import sys_config
# 配置日志
def setup_logger():
log_level = sys_config.get('log_level')
log_level = sys_config.get("log_level")
logger = logging.getLogger("launcher.log")
logger.setLevel(log_level)
# 设置标准输出编码为 UTF-8
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding='utf-8')
sys.stdout = io.TextIOWrapper(sys.stdout.buffer, encoding="utf-8")
# 控制台输出
console_handler = logging.StreamHandler()
@ -24,7 +24,7 @@ def setup_logger():
# 文件输出
file_path = os.path.join(os.getcwd(), "launcher.log")
file_handler = RotatingFileHandler(
file_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding='utf-8'
file_path, maxBytes=5 * 1024 * 1024, backupCount=3, encoding="utf-8"
)
file_handler.setLevel(logging.INFO)

View file

@ -7,6 +7,7 @@ class SysConfig:
"""
读取系统配置文件
"""
# 版本
version: str
# ui路径
@ -22,25 +23,25 @@ class SysConfig:
self.load_config()
def get_config_path(self):
if getattr(sys, 'frozen', False):
if getattr(sys, "frozen", False):
# logger.error("打包配置")
# 如果是打包后的可执行文件
base_path = sys._MEIPASS
config_subdir = 'launcher/sys_config' # 添加子目录
config_filename = 'config_dist.json'
config_subdir = "launcher/sys_config" # 添加子目录
config_filename = "config_dist.json"
else:
# logger.error("本地配置")
# 如果是本地开发环境
base_path = os.path.dirname(__file__)
config_subdir = '' # 本地开发环境不需要子目录
config_filename = 'config_local.json'
config_subdir = "" # 本地开发环境不需要子目录
config_filename = "config_local.json"
config_path = os.path.join(base_path, config_subdir, config_filename)
return config_path
def load_config(self):
try:
with open(self.config_path, 'r', encoding='utf-8') as file:
with open(self.config_path, "r", encoding="utf-8") as file:
self.config = json.load(file)
except FileNotFoundError:
pass

View file

@ -8,6 +8,11 @@ window = None
def start_webview():
global window
window = webview.create_window(f"mower-ng launcher {sys_config.get('version')}", sys_config.get('url'),
js_api=Api(), width=850, height=600)
webview.start(debug=sys_config.get('debug'))
window = webview.create_window(
f"mower-ng launcher {sys_config.get('version')}",
sys_config.get("url"),
js_api=Api(),
width=850,
height=600,
)
webview.start(debug=sys_config.get("debug"))

View file

@ -9,8 +9,14 @@ from subprocess import Popen
import requests
from launcher import config
from launcher.constants import download_git_url, download_python_url, get_new_version_url, upgrade_script_name, \
mirror_list, file_name
from launcher.constants import (
download_git_url,
download_python_url,
get_new_version_url,
upgrade_script_name,
mirror_list,
file_name,
)
from launcher.file.download import init_download, download_file
from launcher.file.extract import extract_7z_file
from launcher.file.utils import ensure_directory_exists, check_command_path
@ -20,7 +26,9 @@ from launcher.webview.events import custom_event, LogType
command_list = {
"download_git": lambda: init_download("git", download_git_url, os.getcwd()),
"download_python": lambda: init_download("python", download_python_url, os.getcwd()),
"download_python": lambda: init_download(
"python", download_python_url, os.getcwd()
),
"lfs": "git\\bin\\git lfs install",
"ensurepip": "python\\python -m ensurepip --default-pip",
"clone": "git\\bin\\git -c lfs.concurrenttransfers=100 clone https://git.zhaozuohong.vip/mower-ng/mower-ng.git --branch slow",
@ -49,9 +57,7 @@ def parse_stderr(stderr_output):
def check_command_end(command_key, output):
end_keywords = {
"webview": {"WebSocket客户端建立连接": "mower_ng已成功运行"}
}
end_keywords = {"webview": {"WebSocket客户端建立连接": "mower_ng已成功运行"}}
if command_key in end_keywords:
keywords = end_keywords[command_key]
for keyword in keywords:
@ -62,7 +68,6 @@ def check_command_end(command_key, output):
class Api:
def load_config(self):
logger.info("读取配置文件")
return config.conf.model_dump()
@ -146,30 +151,43 @@ class Api:
# 执行命令前先判断命令路径是否存在
exist, command_path = check_command_path(command, cwd)
if not exist:
custom_event(LogType.error, f"命令路径不存在:{command_path} 请尝试依赖修复并重新初始化。")
custom_event(
LogType.error,
f"命令路径不存在:{command_path} 请尝试依赖修复并重新初始化。",
)
return "failed"
try:
stdout_stderr = []
with subprocess.Popen(
command, stdout=subprocess.PIPE, stderr=subprocess.STDOUT, shell=True, cwd=cwd, bufsize=0,
universal_newlines=False
command,
stdout=subprocess.PIPE,
stderr=subprocess.STDOUT,
shell=True,
cwd=cwd,
bufsize=0,
universal_newlines=False,
) as p:
def process_lines(text_io):
for line in iter(text_io.readline, ''):
text = line.rstrip('\n').strip()
for line in iter(text_io.readline, ""):
text = line.rstrip("\n").strip()
custom_event(LogType.command_out, text)
if stdout_stderr is not None:
stdout_stderr.append(text)
if check_command_end(command_key, text):
break
detected_encoding = 'utf-8'
text_io = io.TextIOWrapper(p.stdout, encoding=detected_encoding, errors='replace')
detected_encoding = "utf-8"
text_io = io.TextIOWrapper(
p.stdout, encoding=detected_encoding, errors="replace"
)
try:
process_lines(text_io)
except UnicodeDecodeError:
p.stdout.seek(0) # 重新将流指针重置到开头
text_io = io.TextIOWrapper(p.stdout, encoding='gbk', errors='replace')
text_io = io.TextIOWrapper(
p.stdout, encoding="gbk", errors="replace"
)
process_lines(text_io)
finally:
text_io.close()