from contextlib import asynccontextmanager from datetime import datetime, timedelta, timezone from typing import Annotated from fastapi import Depends, FastAPI from sqlmodel import Field, Session, SQLModel, create_engine, select class UserData(SQLModel): name: str class User(UserData, table=True): id: int | None = Field(default=None, primary_key=True) class SystemData(SQLModel): processor: str cpu_count: int emulator: str screencap: str avg_time: int branch: str commit: str class System(SystemData, table=True): id: int | None = Field(default=None, primary_key=True) upload_time: datetime | None = Field( default_factory=lambda: datetime.now(timezone.utc) ) user_id: int | None = Field(default=None, foreign_key="user.id") class PostData(UserData, SystemData): pass sqlite_file_name = "database.db" sqlite_url = f"sqlite:///{sqlite_file_name}" connect_args = {"check_same_thread": False} engine = create_engine(sqlite_url, connect_args=connect_args) def create_db_and_tables(): SQLModel.metadata.create_all(engine) def get_session(): with Session(engine) as session: yield session SessionDep = Annotated[Session, Depends(get_session)] @asynccontextmanager async def lifespan(app: FastAPI): create_db_and_tables() yield app = FastAPI(lifespan=lifespan) @app.post("/stat", response_model=SystemData) def upload(data: PostData, session: SessionDep): user_list = session.exec(select(User).where(User.name == data.name)) if (user_db := user_list.one_or_none()) is None: user_db = User.model_validate(data) session.add(user_db) session.commit() session.refresh(user_db) system_db = System.model_validate(data) system_db.user_id = user_db.id session.add(system_db) session.commit() return data @app.get("/user/today") def user_today(session: SessionDep) -> int: user_set = set() yesterday = datetime.now(timezone.utc) - timedelta(days=1) for system in session.exec(select(System).where(System.upload_time > yesterday)): user_set.add(system.user_id) return len(user_set) @app.get("/user/week") def user_week(session: SessionDep) -> int: user_set = set() last_week = datetime.now(timezone.utc) - timedelta(weeks=1) for system in session.exec(select(System).where(System.upload_time > last_week)): user_set.add(system.user_id) return len(user_set) @app.get("/screenshot/{screenshot_time}") def screenshot_rank(screenshot_time: int, session: SessionDep) -> float: yesterday = datetime.now(timezone.utc) - timedelta(days=1) total = 0 slower = 0 for system in session.exec(select(System).where(System.upload_time > yesterday)): total += 1 if system.avg_time > screenshot_time: slower += 1 return slower / total if total > 0 else 0