mower-ng/mower/solvers/depot_reader.py

336 lines
11 KiB
Python

from datetime import datetime
from sqlalchemy import Column, ForeignKey, Integer, String, Text, create_engine, func
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from mower.data import key_mapping
from mower.utils.log import logger # noqa: F401
from mower.utils.path import get_path
Base = declarative_base()
class Item(Base):
__tablename__ = "items"
itemId = Column(String, primary_key=True)
counts = relationship("Count", back_populates="item")
class Count(Base):
__tablename__ = "counts"
id = Column(Integer, primary_key=True, autoincrement=True)
itemId = Column(String, ForeignKey("items.itemId"))
count = Column(Text)
time = Column(Text)
type = Column(String)
item = relationship("Item", back_populates="counts")
class Translation(Base):
__tablename__ = "translations"
itemId = Column(String, ForeignKey("items.itemId"), primary_key=True)
iconId = Column(String)
name = Column(String)
classifyType = Column(String)
sortId = Column(Integer)
category = Column(String, default="K未分类")
item = relationship("Item", uselist=False)
sort = {
"A常用": [
"至纯源石",
"合成玉",
"寻访凭证",
"十连寻访凭证",
"龙门币",
"高级凭证",
"资质凭证",
"招聘许可",
],
"B经验卡": ["基础作战记录", "初级作战记录", "中级作战记录", "高级作战记录"],
"C稀有度5": ["烧结核凝晶", "晶体电子单元", "D32钢", "双极纳米片", "聚合剂"],
"D稀有度4": [
"提纯源岩",
"改量装置",
"聚酸酯块",
"糖聚块",
"异铁块",
"酮阵列",
"转质盐聚块",
"切削原液",
"精炼溶剂",
"晶体电路",
"炽合金块",
"聚合凝胶",
"白马醇",
"三水锰矿",
"五水研磨石",
"RMA70-24",
"环烃预制体",
"固化纤维板",
],
"E稀有度3": [
"固源岩组",
"全新装置",
"聚酸酯组",
"糖组",
"异铁组",
"酮凝集组",
"转质盐组",
"化合切削液",
"半自然溶剂",
"晶体元件",
"炽合金",
"凝胶",
"扭转醇",
"轻锰矿",
"研磨石",
"RMA70-12",
"环烃聚质",
"褐素纤维",
],
"F稀有度2": ["固源岩", "装置", "聚酸酯", "", "异铁", "酮凝集"],
"G稀有度1": ["源岩", "破损装置", "酯原料", "代糖", "异铁碎片", "双酮"],
"H模组": ["模组数据块", "数据增补仪", "数据增补条"],
"I技能书": ["技巧概要·卷3", "技巧概要·卷2", "技巧概要·卷1"],
"J芯片相关": [
"重装双芯片",
"重装芯片组",
"重装芯片",
"狙击双芯片",
"狙击芯片组",
"狙击芯片",
"医疗双芯片",
"医疗芯片组",
"医疗芯片",
"术师双芯片",
"术师芯片组",
"术师芯片",
"先锋双芯片",
"先锋芯片组",
"先锋芯片",
"近卫双芯片",
"近卫芯片组",
"近卫芯片",
"辅助双芯片",
"辅助芯片组",
"辅助芯片",
"特种双芯片",
"特种芯片组",
"特种芯片",
"采购凭证",
"芯片助剂",
],
"K未分类": [],
}
class DepotManager:
def __init__(self):
self.sort = sort
self.read_time = 0
get_path("@app/tmp").mkdir(exist_ok=True)
self.path = get_path("@app/tmp/depot_data.db")
self.engine = create_engine(f"sqlite:///{self.path}")
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self.insert_translations()
def insert_translations(self):
with self.Session() as session:
for itemId, translation in key_mapping.items():
iconId, name, classifyType, sortId = (
translation[1],
translation[2],
translation[3],
translation[4],
)
translation_obj = (
session.query(Translation).filter_by(itemId=itemId).first()
)
if not translation_obj:
translation_obj = Translation(
itemId=itemId,
iconId=iconId,
name=name,
classifyType=classifyType,
sortId=sortId,
category="K未分类",
)
session.add(translation_obj)
for category, items in self.sort.items():
for name in items:
session.query(Translation).filter_by(name=name).update(
{Translation.category: category}
)
session.commit()
def 森空岛导入(self, items, time):
with self.Session() as session:
for item in items:
itemId = item["id"]
if not session.query(Item).filter_by(itemId=itemId).first():
session.add(Item(itemId=itemId))
session.merge(
Count(itemId=itemId, count=item["count"], time=time, type="SK")
)
session.commit()
def CV导入(self, items, time):
with self.Session() as session:
for name, count in items.items():
result = session.query(Translation).filter_by(name=name).first()
if result:
itemId = result.itemId
session.merge(
Count(itemId=itemId, count=count, time=time, type="CV")
)
else:
print(f"Item with name '{name}' not found in translations.")
session.commit()
def 读取仓库(self):
def _add_custom_fields(classified_data):
classified_data["B经验卡"]["全部经验(计算)"] = {
"number": (
classified_data.get("B经验卡", {})
.get("基础作战记录", {})
.get("number", 0)
* 200
+ classified_data.get("B经验卡", {})
.get("初级作战记录", {})
.get("number", 0)
* 400
+ classified_data.get("B经验卡", {})
.get("中级作战记录", {})
.get("number", 0)
* 1000
+ classified_data.get("B经验卡", {})
.get("高级作战记录", {})
.get("number", 0)
* 2000
),
"sort": 9999999,
"icon": "EXP",
}
合成玉数量 = classified_data["A常用"].get("合成玉", {"number": 0})["number"]
寻访凭证数量 = (
classified_data["A常用"].get("寻访凭证", {"number": 0})["number"]
+ classified_data["A常用"].get("十连寻访凭证", {"number": 0})["number"]
* 10
)
源石数量 = classified_data["A常用"].get("至纯源石", {"number": 0})["number"]
源石碎片 = classified_data["K未分类"].get("源石碎片", {"number": 0})[
"number"
]
classified_data["A常用"]["玉+卷"] = {
"number": round(合成玉数量 / 600 + 寻访凭证数量, 1),
"sort": 9999999,
"icon": "寻访凭证",
}
classified_data["A常用"]["玉+卷+石"] = {
"number": round((合成玉数量 + 源石数量 * 180) / 600 + 寻访凭证数量, 1),
"sort": 9999999,
"icon": "寻访凭证",
}
classified_data["A常用"]["额外+碎片"] = {
"number": round(
(合成玉数量 + 源石数量 * 180 + int(源石碎片 / 2) * 20) / 600
+ 寻访凭证数量,
1,
),
"sort": 9999999,
"icon": "寻访凭证",
}
return classified_data
def get_latest_counts_as_json():
with self.Session() as session:
overall_max_time = (session.query(func.max(Count.time))).scalar()
self.read_time = (
int(overall_max_time) if overall_max_time is not None else 0
)
top_two_times = (
session.query(Count.time)
.distinct()
.order_by(Count.time.desc())
.limit(2)
).all()
top_two_times = [time[0] for time in top_two_times]
counts = (
session.query(Translation, Count)
.filter(Count.time.in_(top_two_times))
.outerjoin(Count, Translation.itemId == Count.itemId)
.all()
)
result = {}
# 查询所有 category 不等于 'K未分类' 的行
translations = (
session.query(Translation)
.filter(Translation.category != "K未分类")
.all()
)
# 遍历查询结果并填充字典
for translation in translations:
category = translation.category
name = translation.name
# 如果 category 不在字典中,初始化该类别
if category not in result:
result[category] = {}
# 填充字典的结构
result[category][name] = {
"number": 0, # 默认值为0
"sort": translation.sortId, # 使用 sortId 作为排序值
"icon": translation.name, # 使用 iconId 作为图标
}
for translation, count in counts:
category = translation.category
if category not in result:
result[category] = {}
if not count or count.count is None:
latest_count = 0
else:
latest_count = int(count.count)
if category == "K未分类" and latest_count == 0:
continue
result[category][translation.name] = {
"number": latest_count,
"sort": translation.sortId,
"icon": translation.name,
}
if "K未分类" not in result:
result["K未分类"] = {}
return _add_custom_fields(result)
classified_data = get_latest_counts_as_json()
return [
classified_data,
"",
str(datetime.fromtimestamp(int(self.read_time))),
self.read_time,
]
def close_engine(self):
self.engine.dispose()
def __del__(self):
self.close_engine()