from video import Video from threading import Thread from queue import SimpleQueue from sanitize_filename import sanitize from pathlib import Path from time import sleep class Manager: def __init__(self): self.video_list: list[Video] = [] self.bvid_set = set() self.queue = SimpleQueue() def get_info_worker(self): while not self.queue.empty(): video: Video = self.queue.get() video.get_info() sleep(5) def add_videos_by_number(self, video_numbers: list[str]): for video_number in video_numbers: try: video = Video(video_number) except: continue if (bvid := video.v.get_bvid()) not in self.bvid_set: self.bvid_set.add(bvid) self.video_list.append(video) self.queue.put(video) Thread(target=self.get_info_worker).start() def download_worker(self, parent_dir: Path): while not self.queue.empty(): video: Video = self.queue.get() filename = sanitize(video.title) + ".m4a" video.download(parent_dir / filename) sleep(5) def download(self, parent_dir: str | Path = Path(".")): if isinstance(parent_dir, str): parent_dir = Path(parent_dir) for video in self.video_list: print(video) self.queue.put(video) Thread(target=self.download_worker, args=(parent_dir,)).start() def get_progress(self): return [ { "number": v.number, "title": v.title, "received": v.received_bytes, "total": v.length, } for v in self.video_list ]