launcher/launcher/file/download.py
2025-01-25 17:49:17 +08:00

108 lines
4.1 KiB
Python

import os
import time
import requests
from launcher.file.extract import extract_7z_file
from launcher.file.utils import format_size
from launcher.webview.events import custom_event, LogType
def download_file(download_name, download_url, destination_folder):
"""
下载文件到指定文件夹
:param download_name: 下载内容名
:param download_url: 文件的下载 URL
:param destination_folder: 保存下载文件的目标文件夹
:return: 下载是否成功
"""
if not os.path.exists(destination_folder):
os.makedirs(destination_folder)
filename = os.path.basename(download_url)
download_path = os.path.join(destination_folder, filename)
custom_event(LogType.info, f"开始下载: {download_name}")
response = requests.get(download_url, stream=True)
if response.status_code == 200:
total_size = int(response.headers.get("content-length", 0))
downloaded_size = 0
start_time = time.time()
last_update_time = time.time() # 记录上次更新时间
block_size = 4096 # 每次读取的数据块大小
with open(download_path, "wb") as file:
for data in response.iter_content(chunk_size=block_size):
file.write(data)
downloaded_size += len(data)
current_time = time.time()
elapsed_time = current_time - start_time
if elapsed_time > 0:
download_speed = downloaded_size / elapsed_time # 字节/秒
else:
download_speed = 0
progress_percent = (
(downloaded_size / total_size) * 100 if total_size != 0 else 0
)
# 检查是否需要更新进度信息,每1秒更新一次
if current_time - last_update_time >= 1:
# 格式化输出
formatted_downloaded_size = format_size(downloaded_size)
formatted_total_size = format_size(total_size)
formatted_speed = format_size(download_speed) + "/s"
custom_event(
LogType.info,
f"下载进度: {progress_percent:.2f}% ({formatted_downloaded_size}/{formatted_total_size}), 下载速度: {formatted_speed}",
)
last_update_time = current_time # 更新上次更新时间
end_time = time.time()
total_elapsed_time = end_time - start_time
average_download_speed = (
downloaded_size / total_elapsed_time if total_elapsed_time != 0 else 0
)
# 格式化输出
formatted_total_elapsed_time = f"{total_elapsed_time:.2f}"
formatted_average_download_speed = format_size(average_download_speed) + "/s"
custom_event(
LogType.info,
f"下载完成: {filename}, 耗时: {formatted_total_elapsed_time}, 下载速度: {formatted_average_download_speed}",
)
else:
custom_event(LogType.error, f"下载失败: {response.status_code}")
return False
return True
def init_download(download_name, download_url, destination_folder):
"""
初始化中的下载任务,返回一个函数,用于执行下载任务
:param download_name: 下载任务的名称,用于在日志中记录
:param download_url: 下载文件的 URL
:param destination_folder: 下载文件的目标文件夹
:return: 一个函数,用于执行下载任务
"""
def download():
target_folder = os.path.join(download_name)
if os.path.exists(target_folder):
custom_event(LogType.info, f"{download_name} 文件夹已存在,跳过下载")
return True
filename = os.path.basename(download_url)
if not download_file(filename, download_url, destination_folder):
return False
download_path = os.path.join(destination_folder, filename)
if not extract_7z_file(filename, download_path, destination_folder, True):
return False
return True
return download