68 lines
2.6 KiB
Python
68 lines
2.6 KiB
Python
|
from exceptions import *
|
||
|
from bilibili_api import video, sync, HEADERS
|
||
|
import httpx
|
||
|
|
||
|
|
||
|
class Video:
|
||
|
def __init__(self, video_number: str):
|
||
|
if len(video_number) <= 2:
|
||
|
raise InvalidVideoNumberException(
|
||
|
f"The video number {video_number} is too short!"
|
||
|
)
|
||
|
if video_number[:2].upper() == "AV":
|
||
|
if not video_number[2:].isnumeric():
|
||
|
raise InvalidVideoNumberException(
|
||
|
f"Invalid AV video number {video_number}!"
|
||
|
)
|
||
|
else:
|
||
|
self.v = video.Video(aid="AV" + video_number[2:])
|
||
|
if video_number[:2].upper() != "BV":
|
||
|
raise InvalidVideoNumberException(f"Invalid video number {video_number}!")
|
||
|
else:
|
||
|
self.v = video.Video(bvid="BV" + video_number[2:])
|
||
|
|
||
|
try:
|
||
|
download_url_data = sync(self.v.get_download_url(0))
|
||
|
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
|
||
|
streams = detecter.detect_best_streams()
|
||
|
self.url = streams[1].url
|
||
|
except:
|
||
|
raise BiliBiliAPIException("Error happens with bilibili-api-python.")
|
||
|
if detecter.check_flv_stream() == True:
|
||
|
raise BadVideoException(
|
||
|
f"This video ({self.title})) is only available in flv format."
|
||
|
)
|
||
|
|
||
|
def get_info(self):
|
||
|
try:
|
||
|
info = sync(self.v.get_info())
|
||
|
except:
|
||
|
raise BiliBiliAPIException("Error happens with bilibili-api-python.")
|
||
|
self.title = info["title"]
|
||
|
self.author = info["owner"]
|
||
|
|
||
|
def get_url(self):
|
||
|
try:
|
||
|
download_url_data = sync(self.v.get_download_url(0))
|
||
|
detecter = video.VideoDownloadURLDataDetecter(data=download_url_data)
|
||
|
streams = detecter.detect_best_streams()
|
||
|
except:
|
||
|
raise BiliBiliAPIException("Error happens with bilibili-api-python.")
|
||
|
if detecter.check_flv_stream() == True:
|
||
|
raise BadVideoException(
|
||
|
f"This video ({self.title})) is only available in flv format."
|
||
|
)
|
||
|
self.url = streams[1].url
|
||
|
|
||
|
def download_video(self):
|
||
|
with httpx.stream("GET", self.url, headers=HEADERS) as r:
|
||
|
self.length = int(r.headers["content-length"])
|
||
|
self.received_bytes = 0
|
||
|
with open("demo.m4a", "wb") as f:
|
||
|
for chunk in r.iter_bytes(1024):
|
||
|
if not chunk:
|
||
|
break
|
||
|
self.received_bytes += len(chunk)
|
||
|
f.write(chunk)
|
||
|
print(f"downloaded {int(self.received_bytes / self.length * 100)}%")
|