#!/usr/bin/env python # -*- coding: utf-8 -*- import argparse import asyncio import httpx from bilibili_api import live, sync if __name__ == "__main__": parser = argparse.ArgumentParser( description="将 B 站直播间弹幕通过 ServerTap 转发到 Spigot 服务器" ) parser.add_argument("--room", type=int, required=True, help="直播房间号") parser.add_argument("--host", type=str, help="ServerTap 的 IP") parser.add_argument("--port", type=str, help="ServerTap 的端口") parser.add_argument("--key", type=str, help="ServerTap 的认证信息") args = parser.parse_args() room = live.LiveDanmaku(args.room) headers = {"key": args.key} if args.key else {} host = args.host if args.host else "127.0.0.1" port = args.port if args.port else "4567" @room.on("DANMU_MSG") async def on_danmaku(event): danmu = event["data"]["info"][1] user = event["data"]["info"][2][1] msg = f"[直播间弹幕]<{user}>{danmu}" async with httpx.AsyncClient() as client: response = await client.post( f"http://{host}:{port}/v1/chat/broadcast", headers=headers, data={"message": msg}, ) if response.status_code != 200: print(msg) print(response.text) sync(room.connect())