#!/usr/bin/env python # -*- coding: utf-8 -*- from typing import Optional import argparse import httpx from bilibili_api import live, sync class Broadcaster: def __init__( self, key: Optional[str] = "", host: Optional[str] = "127.0.0.1", port: Optional[int] = 4567, ): self.url = f"http://{host}:{port}/v1/chat/broadcast" self.headers = {"key": key} if key else {} async def broadcast_msg(self, msg: str): async with httpx.AsyncClient() as client: response = await client.post( self.url, headers=self.headers, data={"message": msg}, ) if response.status_code != 200: print(msg) print(response.text) if __name__ == "__main__": parser = argparse.ArgumentParser( description="将 B 站直播间弹幕通过 ServerTap 转发到 Spigot 服务器" ) parser.add_argument("--room", type=int, required=True, help="直播房间号") parser.add_argument("--host", type=str, help="ServerTap 的 IP") parser.add_argument("--port", type=str, help="ServerTap 的端口") parser.add_argument("--key", type=str, help="ServerTap 的认证信息") args = parser.parse_args() room = live.LiveDanmaku(args.room) key = args.key if args.key else "" host = args.host if args.host else "127.0.0.1" port = args.port if args.port else "4567" bc = Broadcaster(key, host, port) mc_color = { "dark_red": "§4", "red": "§c", "gold": "§6", "yellow": "§e", "dark_green": "§2", "green": "§a", "aqua": "§b", "dark_aqua": "§3", "dark_blue": "§1", "blue": "§9", "light_purple": "§d", "dark_purple": "§5", "white": "§f", "gray": "§7", "dark_gray": "§8", "black": "§0", } msg_prefix = f"[{mc_color['light_purple']}BiliBili{mc_color['white']}]" @room.on("DANMU_MSG") async def _(event): danmu = event["data"]["info"][1] user = event["data"]["info"][2][1] msg = f"{msg_prefix}{mc_color['yellow']}{user}{mc_color['white']}: {danmu}" await bc.broadcast_msg(msg) @room.on("INTERACT_WORD") async def _(event): user = event["data"]["data"]["uname"] msg = f"{msg_prefix}{mc_color['yellow']}{user}进入了直播间" await bc.broadcast_msg(msg) sync(room.connect())