5
0
Fork 0
stats/main.py

105 lines
2.7 KiB
Python

from contextlib import asynccontextmanager
from datetime import datetime, timedelta, timezone
from typing import Annotated
from fastapi import Depends, FastAPI
from sqlmodel import Field, Session, SQLModel, create_engine, select
class UserData(SQLModel):
name: str
class User(UserData, table=True):
id: int | None = Field(default=None, primary_key=True)
class SystemData(SQLModel):
processor: str
cpu_count: int
emulator: str
screencap: str
avg_time: int
branch: str
commit: str
class System(SystemData, table=True):
id: int | None = Field(default=None, primary_key=True)
upload_time: datetime | None = Field(
default_factory=lambda: datetime.now(timezone.utc)
)
user_id: int | None = Field(default=None, foreign_key="user.id")
class PostData(UserData, SystemData):
pass
sqlite_file_name = "database.db"
sqlite_url = f"sqlite:///{sqlite_file_name}"
connect_args = {"check_same_thread": False}
engine = create_engine(sqlite_url, connect_args=connect_args)
def create_db_and_tables():
SQLModel.metadata.create_all(engine)
def get_session():
with Session(engine) as session:
yield session
SessionDep = Annotated[Session, Depends(get_session)]
@asynccontextmanager
async def lifespan(app: FastAPI):
create_db_and_tables()
yield
app = FastAPI(lifespan=lifespan)
@app.post("/stat")
def upload(data: PostData, session: SessionDep) -> int:
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
total = 0
slower = 0
for system in session.exec(select(System).where(System.upload_time > yesterday)):
total += 1
if system.avg_time > data.avg_time:
slower += 1
ratio = round(slower / total * 100) if total > 0 else 0
user_list = session.exec(select(User).where(User.name == data.name))
if (user_db := user_list.one_or_none()) is None:
user_db = User.model_validate(data)
session.add(user_db)
session.commit()
session.refresh(user_db)
system_db = System.model_validate(data)
system_db.user_id = user_db.id
session.add(system_db)
session.commit()
return ratio
@app.get("/user/today")
def user_today(session: SessionDep) -> int:
user_set = set()
yesterday = datetime.now(timezone.utc) - timedelta(days=1)
for system in session.exec(select(System).where(System.upload_time > yesterday)):
user_set.add(system.user_id)
return len(user_set)
@app.get("/user/week")
def user_week(session: SessionDep) -> int:
user_set = set()
last_week = datetime.now(timezone.utc) - timedelta(weeks=1)
for system in session.exec(select(System).where(System.upload_time > last_week)):
user_set.add(system.user_id)
return len(user_set)