Download manager

This commit is contained in:
zhbaor 2023-02-21 17:50:53 +08:00
parent 8077378510
commit 23c8f3884d
6 changed files with 76 additions and 94 deletions

42
manager/__init__.py Normal file
View file

@ -0,0 +1,42 @@
from video import Video
from threading import Thread
from sanitize_filename import sanitize
from pathlib import Path
class Manager:
def __init__(self):
self.video_list: list[Video] = []
def add_videos_by_number(self, video_numbers: str):
for video_number in video_numbers.split():
try:
video = Video(video_number)
except:
continue
Thread(target=video.get_info).start()
self.video_list.append(video)
def download(self, id: int, parent_dir: str | Path = Path(".")):
if isinstance(parent_dir, str):
parent_dir = Path(parent_dir)
if not 0 <= id < len(self.video_list):
raise Exception(f"id ({id}) out of range!")
video = self.video_list[id]
if not hasattr(video, "title"):
raise Exception(f"No information for video {video.number}")
filename = sanitize(video.title) + ".m4a"
t = Thread(target=video.download, args=(parent_dir / filename,))
t.start()
def get_progress(self):
return [
{
"number": v.number,
"title": v.title,
"author": v.author,
"received": v.received_bytes,
"total": v.length,
}
for v in self.video_list
]

19
poetry.lock generated
View file

@ -1360,6 +1360,23 @@ type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple" url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors" reference = "mirrors"
[[package]]
name = "sanitize-filename"
version = "1.2.0"
description = "A permissive filename sanitizer."
category = "main"
optional = false
python-versions = "~=3.7"
files = [
{file = "sanitize_filename-1.2.0-py3-none-any.whl", hash = "sha256:a5be41a4371c84cb4a666a9c3baa70e1b2086a3e50b86c7ba5dd579f5ad2f330"},
{file = "sanitize_filename-1.2.0.tar.gz", hash = "sha256:e75933e96d426e306eef8c270cc24c3e1971d8715288c9776d801d3d8e7b941a"},
]
[package.source]
type = "legacy"
url = "https://pypi.tuna.tsinghua.edu.cn/simple"
reference = "mirrors"
[[package]] [[package]]
name = "setuptools" name = "setuptools"
version = "67.3.2" version = "67.3.2"
@ -1636,4 +1653,4 @@ reference = "mirrors"
[metadata] [metadata]
lock-version = "2.0" lock-version = "2.0"
python-versions = "^3.11" python-versions = "^3.11"
content-hash = "2425af85d515b4c0218da49241baf50abfeb6ac1b82dd315092716cd4668cfc0" content-hash = "b6c2ea94f40c44e176ac7606d5a83e0629e41489541d8677ad5fc898de9d89d2"

View file

@ -8,6 +8,7 @@ license = "GPL-3.0-only"
[tool.poetry.dependencies] [tool.poetry.dependencies]
python = "^3.11" python = "^3.11"
bilibili-api-python = "^15.1.0" bilibili-api-python = "^15.1.0"
sanitize-filename = "^1.2.0"
[tool.poetry.group.dev.dependencies] [tool.poetry.group.dev.dependencies]
black = "^23.1.0" black = "^23.1.0"

View file

@ -1,15 +0,0 @@
from bilibili_api import HEADERS
import httpx
def download_video(url: str):
with httpx.stream("GET", url, headers=HEADERS) as r:
length = int(r.headers["content-length"])
received_bytes = 0
with open("demo.m4a", "wb") as f:
for chunk in r.iter_bytes(1024):
if not chunk:
break
received_bytes += len(chunk)
f.write(chunk)
print(f"downloaded {int(received_bytes / length * 100)}%")

View file

@ -1,57 +0,0 @@
from bilibili_api import video, sync
def geturl(vn: str) -> str:
"""Get audio download URL from AV or BV number with bilibili-api-python.
Parameters
----------
vn : str
AV or BV number.
Returns
-------
str
URL of audio stream.
Raises
------
InvalidVideoNumberException
If the format of AV or BV video number is invalid.
BiliBiliAPIException
If bilibili-api-python raises exception.
BadVideoException
If video is in flv format.
"""
class InvalidVideoNumberException(Exception):
pass
class BiliBiliAPIException(Exception):
pass
class BadVideoException(Exception):
pass
if len(vn) <= 2:
raise InvalidVideoNumberException("Video number too short!")
if vn[:2].upper() == "AV":
if not vn[2:].isnumeric():
raise InvalidVideoNumberException("Invalid AV video number!")
else:
v = video.Video(aid="AV" + vn[2:])
if vn[:2].upper() != "BV":
raise InvalidVideoNumberException("Invalid video number!")
else:
v = video.Video(bvid="BV" + vn[2:])
try:
download_url_data = sync(v.get_download_url(0))
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detecter.detect_best_streams()
except:
raise BiliBiliAPIException("Error happens with bilibili-api-python.")
if detecter.check_flv_stream() == True:
raise BadVideoException("Video is only available in flv format.")
return streams[1].url

View file

@ -1,11 +1,15 @@
from exceptions import * from .exceptions import *
from bilibili_api import video, sync, HEADERS from bilibili_api import video, sync, HEADERS
import httpx from httpx import stream
import os from os import PathLike
class Video: class Video:
def __init__(self, video_number: str): def __init__(self, video_number: str):
self.title = None
self.author = None
self.received_bytes = 0
self.length = 0
video_number = video_number.strip() video_number = video_number.strip()
if len(video_number) <= 2: if len(video_number) <= 2:
raise InvalidVideoNumberException( raise InvalidVideoNumberException(
@ -17,23 +21,13 @@ class Video:
f"Invalid AV video number {video_number}!" f"Invalid AV video number {video_number}!"
) )
else: else:
self.v = video.Video(aid="AV" + video_number[2:]) self.number = "AV" + video_number[2:]
self.v = video.Video(aid=self.number)
if video_number[:2].upper() != "BV": if video_number[:2].upper() != "BV":
raise InvalidVideoNumberException(f"Invalid video number {video_number}!") raise InvalidVideoNumberException(f"Invalid video number {video_number}!")
else: else:
self.v = video.Video(bvid="BV" + video_number[2:]) self.number = "BV" + video_number[2:]
self.v = video.Video(bvid=self.number)
try:
download_url_data = sync(self.v.get_download_url(0))
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
streams = detecter.detect_best_streams()
self.url = streams[1].url
except:
raise BiliBiliAPIException("Error happens with bilibili-api-python.")
if detecter.check_flv_stream() == True:
raise BadVideoException(
f"This video ({self.title})) is only available in flv format."
)
def get_info(self): def get_info(self):
try: try:
@ -41,7 +35,7 @@ class Video:
except: except:
raise BiliBiliAPIException("Error happens with bilibili-api-python.") raise BiliBiliAPIException("Error happens with bilibili-api-python.")
self.title = info["title"] self.title = info["title"]
self.author = info["owner"] self.author = info["owner"]["name"]
def get_url(self): def get_url(self):
try: try:
@ -54,10 +48,10 @@ class Video:
raise BadVideoException( raise BadVideoException(
f"This video ({self.title})) is only available in flv format." f"This video ({self.title})) is only available in flv format."
) )
self.url = streams[1].url return streams[1].url
def download(self, file: int | str | bytes | os.PathLike): def download(self, file: int | str | bytes | PathLike):
with httpx.stream("GET", self.url, headers=HEADERS) as r: with stream("GET", self.get_url(), headers=HEADERS) as r:
self.length = int(r.headers["content-length"]) self.length = int(r.headers["content-length"])
self.received_bytes = 0 self.received_bytes = 0
with open(file, "wb") as f: with open(file, "wb") as f: