读取基建报表使用数据库/使用ORM简化操作

This commit is contained in:
fuyn101 2024-10-08 11:47:20 +08:00
parent f89d535156
commit 03b6f8134f
4 changed files with 378 additions and 440 deletions

View file

@ -17,7 +17,6 @@ from mower.utils import config
from mower.utils.graph import SceneGraphSolver
from mower.utils.image import loadimg
from mower.utils.log import logger
from mower.utils.path import get_path
from mower.utils.recognize import Scene
# 向下x变大 = 0
@ -162,16 +161,10 @@ class depotREC(SceneGraphSolver):
def __init__(self) -> None:
start_time = datetime.now()
# sift = cv2.SIFT_create()
orb = cv2.ORB_create()
bf = cv2.BFMatcher(cv2.NORM_HAMMING2, crossCheck=True)
# flann = cv2.FlannBasedMatcher(
# dict(algorithm=1, trees=2), dict(checks=50))
self.detector = orb
self.matcher = bf
self.仓库输出 = get_path("@app/tmp/depotresult.csv")
with lzma.open(f"{__rootdir__}/models/CONSUME.pkl", "rb") as pkl:
self.knn模型_CONSUME = pickle.load(pkl)
with lzma.open(f"{__rootdir__}/models/MATERIAL.pkl", "rb") as pkl:
@ -262,15 +255,6 @@ class depotREC(SceneGraphSolver):
else:
logger.info("仓库扫描: 这个分类下没有物品")
logger.info(f"仓库扫描: {self.结果字典}")
result = [
int(datetime.now().timestamp()),
json.dumps(self.结果字典, ensure_ascii=False),
{"森空岛输出仅占位": ""},
]
depotinfo = pd.DataFrame([result], columns=["Timestamp", "Data", "json"])
depotinfo.to_csv(
self.仓库输出, mode="a", index=False, header=False, encoding="utf-8"
)
depot_manager = DepotManager()
depot_manager.CV导入(self.结果字典, int(datetime.now().timestamp()))

View file

@ -1,257 +1,196 @@
import sqlite3
from datetime import datetime
from sqlalchemy import Column, ForeignKey, Integer, String, Text, create_engine
from sqlalchemy.orm import declarative_base, relationship, sessionmaker
from mower.data import key_mapping
from mower.utils.path import get_path
Base = declarative_base()
class Item(Base):
__tablename__ = "items"
itemId = Column(String, primary_key=True)
counts = relationship("Count", back_populates="item")
class Count(Base):
__tablename__ = "counts"
id = Column(Integer, primary_key=True, autoincrement=True)
itemId = Column(String, ForeignKey("items.itemId"))
count = Column(Text)
time = Column(Text)
type = Column(String)
item = relationship("Item", back_populates="counts")
class Translation(Base):
__tablename__ = "translations"
itemId = Column(String, ForeignKey("items.itemId"), primary_key=True)
iconId = Column(String)
name = Column(String)
classifyType = Column(String)
sortId = Column(Integer)
category = Column(String, default="K未分类")
item = relationship("Item", uselist=False)
sort = {
"A常用": [
"至纯源石",
"合成玉",
"寻访凭证",
"十连寻访凭证",
"龙门币",
"高级凭证",
"资质凭证",
"招聘许可",
],
"B经验卡": ["基础作战记录", "初级作战记录", "中级作战记录", "高级作战记录"],
"C稀有度5": ["烧结核凝晶", "晶体电子单元", "D32钢", "双极纳米片", "聚合剂"],
"D稀有度4": [
"提纯源岩",
"改量装置",
"聚酸酯块",
"糖聚块",
"异铁块",
"酮阵列",
"转质盐聚块",
"切削原液",
"精炼溶剂",
"晶体电路",
"炽合金块",
"聚合凝胶",
"白马醇",
"三水锰矿",
"五水研磨石",
"RMA70-24",
"环烃预制体",
"固化纤维板",
],
"E稀有度3": [
"固源岩组",
"全新装置",
"聚酸酯组",
"糖组",
"异铁组",
"酮凝集组",
"转质盐组",
"化合切削液",
"半自然溶剂",
"晶体元件",
"炽合金",
"凝胶",
"扭转醇",
"轻锰矿",
"研磨石",
"RMA70-12",
"环烃聚质",
"褐素纤维",
],
"F稀有度2": ["固源岩", "装置", "聚酸酯", "", "异铁", "酮凝集"],
"G稀有度1": ["源岩", "破损装置", "酯原料", "代糖", "异铁碎片", "双酮"],
"H模组": ["模组数据块", "数据增补仪", "数据增补条"],
"I技能书": ["技巧概要·卷3", "技巧概要·卷2", "技巧概要·卷1"],
"J芯片相关": [
"重装双芯片",
"重装芯片组",
"重装芯片",
"狙击双芯片",
"狙击芯片组",
"狙击芯片",
"医疗双芯片",
"医疗芯片组",
"医疗芯片",
"术师双芯片",
"术师芯片组",
"术师芯片",
"先锋双芯片",
"先锋芯片组",
"先锋芯片",
"近卫双芯片",
"近卫芯片组",
"近卫芯片",
"辅助双芯片",
"辅助芯片组",
"辅助芯片",
"特种双芯片",
"特种芯片组",
"特种芯片",
"采购凭证",
"芯片助剂",
],
"K未分类": [],
}
class DepotManager:
def __init__(self):
self.sort = {
"A常用": [
"至纯源石",
"合成玉",
"寻访凭证",
"十连寻访凭证",
"龙门币",
"高级凭证",
"资质凭证",
"招聘许可",
],
"B经验卡": ["基础作战记录", "初级作战记录", "中级作战记录", "高级作战记录"],
"C稀有度5": ["烧结核凝晶", "晶体电子单元", "D32钢", "双极纳米片", "聚合剂"],
"D稀有度4": [
"提纯源岩",
"改量装置",
"聚酸酯块",
"糖聚块",
"异铁块",
"酮阵列",
"转质盐聚块",
"切削原液",
"精炼溶剂",
"晶体电路",
"炽合金块",
"聚合凝胶",
"白马醇",
"三水锰矿",
"五水研磨石",
"RMA70-24",
"环烃预制体",
"固化纤维板",
],
"E稀有度3": [
"固源岩组",
"全新装置",
"聚酸酯组",
"糖组",
"异铁组",
"酮凝集组",
"转质盐组",
"化合切削液",
"半自然溶剂",
"晶体元件",
"炽合金",
"凝胶",
"扭转醇",
"轻锰矿",
"研磨石",
"RMA70-12",
"环烃聚质",
"褐素纤维",
],
"F稀有度2": ["固源岩", "装置", "聚酸酯", "", "异铁", "酮凝集"],
"G稀有度1": ["源岩", "破损装置", "酯原料", "代糖", "异铁碎片", "双酮"],
"H模组": ["模组数据块", "数据增补仪", "数据增补条"],
"I技能书": ["技巧概要·卷3", "技巧概要·卷2", "技巧概要·卷1"],
"J芯片相关": [
"重装双芯片",
"重装芯片组",
"重装芯片",
"狙击双芯片",
"狙击芯片组",
"狙击芯片",
"医疗双芯片",
"医疗芯片组",
"医疗芯片",
"术师双芯片",
"术师芯片组",
"术师芯片",
"先锋双芯片",
"先锋芯片组",
"先锋芯片",
"近卫双芯片",
"近卫芯片组",
"近卫芯片",
"辅助双芯片",
"辅助芯片组",
"辅助芯片",
"特种双芯片",
"特种芯片组",
"特种芯片",
"采购凭证",
"芯片助剂",
],
"K未分类": [],
}
self.sort = sort
self.read_time = 0
self.path = get_path("@app/tmp/depot_data.db")
self.conn = self.初始化数据库()
self.engine = create_engine(f"sqlite:///{self.path}", echo=True)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
self.insert_translations()
def 初始化数据库(self):
conn = sqlite3.connect(self.path)
def create_tables(conn):
expected_items_columns = [("itemId", "TEXT")]
expected_counts_columns = [
("id", "INTEGER"),
("itemId", "TEXT"),
("count", "TEXT"),
("time", "TEXT"),
("type", "TEXT"),
]
expected_translations_columns = [
("itemId", "TEXT"),
("iconId", "TEXT"),
("name", "TEXT"),
("classifyType", "TEXT"),
("sortId", "INTEGER"),
("category", "TEXT"),
]
create_items_sql = """
CREATE TABLE IF NOT EXISTS items (
itemId TEXT PRIMARY KEY
def insert_translations(self):
with self.Session() as session:
for itemId, translation in key_mapping.items():
iconId, name, classifyType, sortId = (
translation[1],
translation[2],
translation[3],
translation[4],
)
"""
create_counts_sql = """
CREATE TABLE IF NOT EXISTS counts (
id INTEGER PRIMARY KEY AUTOINCREMENT,
itemId TEXT,
count TEXT,
time TEXT,
type TEXT,
FOREIGN KEY (itemId) REFERENCES items (itemId)
translation_obj = (
session.query(Translation).filter_by(itemId=itemId).first()
)
"""
create_translations_sql = """
CREATE TABLE IF NOT EXISTS translations (
itemId TEXT PRIMARY KEY,
iconId TEXT,
name TEXT,
classifyType TEXT,
sortId INTEGER,
category TEXT DEFAULT 'K未分类'
)
"""
def validate_table_structure(conn, table_name, expected_columns):
cursor = conn.cursor()
cursor.execute(f"PRAGMA table_info({table_name})")
columns = cursor.fetchall()
current_columns = [(col[1], col[2]) for col in columns]
return current_columns == expected_columns
def recreate_table(conn, table_name, create_sql):
cursor = conn.cursor()
cursor.execute(f"DROP TABLE IF EXISTS {table_name}")
cursor.execute(create_sql)
conn.commit()
print(f"Table {table_name} recreated.")
def insert_translations(conn):
cursor = conn.cursor()
for itemId, translation in key_mapping.items():
iconId, name, classifyType, sortId = (
translation[1],
translation[2],
translation[3],
translation[4],
if not translation_obj:
translation_obj = Translation(
itemId=itemId,
iconId=iconId,
name=name,
classifyType=classifyType,
sortId=sortId,
category="K未分类",
)
cursor.execute(
"""
INSERT OR REPLACE INTO translations (itemId, iconId, name, classifyType, sortId, category)
VALUES (?, ?, ?, ?, ?, ?)
""",
(itemId, iconId, name, classifyType, sortId, "K未分类"),
session.add(translation_obj)
for category, items in self.sort.items():
for name in items:
session.query(Translation).filter_by(name=name).update(
{Translation.category: category}
)
for category, items in self.sort.items():
for name in items:
cursor.execute(
"""
UPDATE translations
SET category = ?
WHERE name = ?
""",
(category, name),
)
print(f"Updated category for {name} to {category}.")
conn.commit()
if not validate_table_structure(conn, "items", expected_items_columns):
recreate_table(conn, "items", create_items_sql)
print("重建 items 表")
if not validate_table_structure(conn, "counts", expected_counts_columns):
recreate_table(conn, "counts", create_counts_sql)
print("重建 counts 表")
if not validate_table_structure(
conn, "translations", expected_translations_columns
):
recreate_table(conn, "translations", create_translations_sql)
insert_translations(conn)
print("重建 translations 表并插入数据")
create_tables(conn)
return conn
session.commit()
def 森空岛导入(self, items, time):
cursor = self.conn.cursor()
for item in items:
itemId = item["id"]
cursor.execute(
"""
INSERT OR REPLACE INTO items (itemId)
VALUES (?)
""",
(itemId,),
)
cursor.execute(
"""
INSERT OR REPLACE INTO counts (itemId, count, time, type)
VALUES (?, ?, ?, ?)
""",
(itemId, item["count"], time, "SK"),
)
self.conn.commit()
with self.Session() as session: # 使用上下文管理器
for item in items:
itemId = item["id"]
if not session.query(Item).filter_by(itemId=itemId).first():
session.add(Item(itemId=itemId))
session.merge(
Count(itemId=itemId, count=item["count"], time=time, type="SK")
)
session.commit()
def CV导入(self, items, time):
cursor = self.conn.cursor()
for name, count in items.items():
cursor.execute(
"""
SELECT itemId FROM translations WHERE name = ?
""",
(name,),
)
result = cursor.fetchone()
if result:
itemId = result[0]
cursor.execute(
"""
INSERT OR REPLACE INTO counts (itemId, count, time, type)
VALUES (?, ?, ?, ?)
""",
(itemId, count, time, "CV"),
)
else:
print(f"Item with name '{name}' not found in translations.")
self.conn.commit()
with self.Session() as session: # 使用上下文管理器
for name, count in items.items():
result = session.query(Translation).filter_by(name=name).first()
if result:
itemId = result.itemId
session.merge(
Count(itemId=itemId, count=count, time=time, type="CV")
)
else:
print(f"Item with name '{name}' not found in translations.")
session.commit()
def 读取仓库(self):
def _add_custom_fields(classified_data):
@ -301,47 +240,38 @@ class DepotManager:
return classified_data
def get_latest_counts_as_json():
cursor = self.conn.cursor()
cursor.execute(
"SELECT itemId, category, sortId, name, iconId FROM translations"
)
translations = cursor.fetchall()
result = {}
for itemId, category, sortId, name, iconId in translations:
cursor.execute(
"""
SELECT count, MAX(time)
FROM counts
WHERE itemId = ?
""",
(itemId,),
with self.Session() as session: # 使用上下文管理器
# 你的读取逻辑
counts = (
session.query(Translation, Count)
.outerjoin(Count, Translation.itemId == Count.itemId)
.all()
)
count_result = cursor.fetchone()
if count_result and count_result[0]:
latest_count = int(count_result[0])
self.read_time = int(count_result[1])
else:
latest_count = 0
result = {}
for translation, count in counts:
category = translation.category
if category not in result:
result[category] = {}
if category not in result:
result[category] = {}
if not count or count.count is None:
latest_count = 0
else:
latest_count = int(count.count)
self.read_time = (
int(count.time) if count.time else self.read_time
)
if category == "K未分类" and latest_count == 0:
continue
if category == "K未分类" and latest_count == 0:
continue
result[category][name] = {
"number": latest_count,
"sort": sortId,
"icon": name,
}
self.conn.close()
return _add_custom_fields(result)
result[category][translation.name] = {
"number": latest_count,
"sort": translation.sortId,
"icon": translation.name,
}
print(result)
return _add_custom_fields(result)
classified_data = get_latest_counts_as_json()
@ -351,3 +281,9 @@ class DepotManager:
str(datetime.fromtimestamp(int(self.read_time))),
self.read_time,
]
def close_engine(self):
self.engine.dispose()
def __del__(self):
self.close_engine()

View file

@ -1,8 +1,9 @@
import datetime
import os
import cv2
import pandas as pd
from sqlalchemy import Column, Date, Integer, create_engine
from sqlalchemy.ext.declarative import declarative_base
from sqlalchemy.orm import sessionmaker
from mower.models import noto_sans
from mower.utils import config
@ -14,26 +15,30 @@ from mower.utils.log import logger
from mower.utils.path import get_path
from mower.utils.recognize import Scene, tp
Base = declarative_base()
def remove_blank(target: str):
if target is None or target == "":
return target
target.strip()
target.replace(" ", "")
target.replace("\u3000", "")
return target
# Database model for reports
class Report(Base):
__tablename__ = "reports"
id = Column(Integer, primary_key=True, autoincrement=True)
date = Column(Date, nullable=False, unique=True)
作战录像 = Column(Integer)
赤金 = Column(Integer)
龙门币订单 = Column(Integer)
龙门币订单数 = Column(Integer)
合成玉 = Column(Integer)
合成玉订单数量 = Column(Integer)
class ReportSolver(SceneGraphSolver):
def __init__(self) -> None:
super().__init__()
self.record_path = get_path("@app/tmp/report.csv")
self.db_path = get_path("@app/tmp/report.db")
self.low_range_gray = (100, 100, 100)
self.high_range_gray = (255, 255, 255)
self.date = (
(datetime.datetime.now() - datetime.timedelta(hours=4)).date().__str__()
)
self.date = (datetime.datetime.now() - datetime.timedelta(hours=4)).date()
self.digitReader = DigitReader()
self.report_res = {
"作战录像": None,
@ -45,6 +50,12 @@ class ReportSolver(SceneGraphSolver):
}
self.reload_time = 0
# Setup SQLAlchemy with dynamic DATABASE_URL
self.DATABASE_URL = f"sqlite:///{self.db_path}" # Dynamic DB URL
self.engine = create_engine(self.DATABASE_URL)
Base.metadata.create_all(self.engine)
self.Session = sessionmaker(bind=self.engine)
def run(self):
if self.has_record():
logger.info("今天的基报看过了")
@ -60,7 +71,6 @@ class ReportSolver(SceneGraphSolver):
def transition(self) -> bool:
if self.scene() == Scene.RIIC_REPORT:
return self.read_report()
else:
self.scene_graph_step(Scene.RIIC_REPORT)
@ -79,18 +89,23 @@ class ReportSolver(SceneGraphSolver):
return True
self.reload_time += 1
self.sleep(1)
return
def record_report(self):
logger.info(f"存入{self.date}的数据{self.report_res}")
try:
res_df = pd.DataFrame(self.report_res, index=[self.date])
res_df.to_csv(
self.record_path,
mode="a",
header=not os.path.exists(self.record_path),
encoding="gbk",
)
with self.Session() as session:
report = Report(
date=self.date,
作战录像=self.report_res["作战录像"],
赤金=self.report_res["赤金"],
龙门币订单=self.report_res["龙门币订单"],
龙门币订单数=self.report_res["龙门币订单数"],
合成玉=self.report_res["合成玉"],
合成玉订单数量=self.report_res["合成玉订单数量"],
)
session.merge(report)
session.commit()
except Exception as e:
logger.exception(f"存入数据失败:{e}")
self.tap((1253, 81), interval=2)
@ -109,17 +124,11 @@ class ReportSolver(SceneGraphSolver):
def has_record(self):
try:
if os.path.exists(self.record_path) is False:
logger.debug("基报不存在")
return False
df = pd.read_csv(self.record_path, encoding="gbk", on_bad_lines="skip")
for item in df.iloc:
if item[0] == self.date:
return True
return False
except PermissionError:
logger.info("report.csv正在被占用")
except pd.errors.EmptyDataError:
with self.Session() as session: # 使用上下文管理器
record_exists = session.query(Report).filter_by(date=self.date).first()
return record_exists is not None
except Exception as e:
logger.exception(f"查询数据库失败:{e}")
return False
def crop_report(self):
@ -194,16 +203,129 @@ class ReportSolver(SceneGraphSolver):
return value
def get_report_data(self):
# 连接数据库
try:
with self.Session() as session:
format_data = []
def get_report_data():
record_path = get_path("@app/tmp/report.csv")
try:
data = {}
if os.path.exists(record_path) is False:
logger.debug("基报不存在")
return False
df = pd.read_csv(record_path, encoding="gbk")
data = df.to_dict("dict")
print(data)
except PermissionError:
logger.info("report.csv正在被占用")
# 查询所有报告数据
records = session.query(Report).all()
# 将记录转化为所需格式
for record in records:
format_data.append(
{
"日期": record.date.strftime(
"%Y-%m-%d"
), # 日期格式化为字符串
"作战录像": record.作战录像,
"赤金": record.赤金,
"制造总数": int(record.赤金 + record.作战录像),
"龙门币订单": record.龙门币订单,
"反向作战录像": -record.作战录像,
"龙门币订单数": record.龙门币订单数,
"每单获取龙门币": int(
record.龙门币订单 / record.龙门币订单数
)
if record.龙门币订单数
else 0,
}
)
# 如果格式化后的数据少于15条则添加缺失的日期
earliest_date = (
min(record.date for record in records)
if records
else datetime.date.today()
)
if len(format_data) < 15:
for i in range(1, 16 - len(format_data)):
format_data.insert(
0,
{
"日期": (
earliest_date - datetime.timedelta(days=i + 1)
).strftime("%Y-%m-%d"),
"作战录像": "-",
"赤金": "-",
"龙门币订单": "-",
"龙门币订单数": "-",
"每单获取龙门币": "-",
},
)
logger.debug(format_data)
return format_data
except Exception as e:
logger.exception(f"读取数据库失败: {e}")
def get_orundum_data(self):
try:
format_data = []
with self.Session() as session:
# 查询所有报告数据
records = session.query(Report).all()
earliest_date = datetime.datetime.now()
# 初始化制造合成玉的开始日期
begin_make_orundum = (earliest_date + datetime.timedelta(days=1)).date()
if len(records) >= 15:
for i in range(len(records) - 1, -1, -1):
record = records[i]
if 0 < i < len(records) - 15:
continue
if record.合成玉 > 0:
begin_make_orundum = record.date
else:
for record in records:
if record.合成玉 > 0:
begin_make_orundum = record.date
if begin_make_orundum > earliest_date.date():
return format_data
total_orundum = 0
for record in records:
total_orundum += record.合成玉
format_data.append(
{
"日期": record.date.strftime("%Y-%m-%d"),
"合成玉": record.合成玉,
"合成玉订单数量": record.合成玉订单数量,
"抽数": round((record.合成玉 / 600), 1),
"累计制造合成玉": total_orundum,
}
)
if len(format_data) < 15:
earliest_date = records[0].date
for i in range(1, 16 - len(format_data)):
format_data.insert(
0,
{
"日期": (
earliest_date - datetime.timedelta(days=i + 1)
).strftime("%Y-%m-%d"),
"合成玉": "-",
"合成玉订单数量": "-",
"抽数": "-",
"累计制造合成玉": 0,
},
)
logger.debug(format_data)
return format_data
except Exception as e:
logger.exception(f"获取合成玉数据失败:{e}")
return []
def close_engine(self):
self.engine.dispose()
def __del__(self):
self.close_engine()

114
server.py
View file

@ -18,6 +18,7 @@ from tzlocal import get_localzone
from werkzeug.exceptions import NotFound
from mower import __system__
from mower.solvers.infra.report import ReportSolver
from mower.utils import config
from mower.utils.log import logger
from mower.utils.path import get_path
@ -402,119 +403,14 @@ def date2str(target: datetime.date):
@app.route("/report/getReportData")
def get_report_data():
import pandas as pd
record_path = get_path("@app/tmp/report.csv")
try:
format_data = []
if os.path.exists(record_path) is False:
logger.debug("基报不存在")
return False
df = pd.read_csv(record_path, encoding="gbk")
data = df.to_dict("records")
earliest_date = str2date(data[0]["Unnamed: 0"])
for item in data:
format_data.append(
{
"日期": date2str(
str2date(item["Unnamed: 0"]) - datetime.timedelta(days=1)
),
"作战录像": item["作战录像"],
"赤金": item["赤金"],
"制造总数": int(item["赤金"] + item["作战录像"]),
"龙门币订单": item["龙门币订单"],
"反向作战录像": -item["作战录像"],
"龙门币订单数": item["龙门币订单数"],
"每单获取龙门币": int(item["龙门币订单"] / item["龙门币订单数"]),
}
)
if len(format_data) < 15:
for i in range(1, 16 - len(format_data)):
format_data.insert(
0,
{
"日期": date2str(
earliest_date - datetime.timedelta(days=i + 1)
),
"作战录像": "-",
"赤金": "-",
"龙门币订单": "-",
"龙门币订单数": "-",
"每单获取龙门币": "-",
},
)
logger.debug(format_data)
return format_data
except PermissionError:
logger.info("report.csv正在被占用")
a = ReportSolver()
return a.get_report_data()
@app.route("/report/getOrundumData")
def get_orundum_data():
import pandas as pd
record_path = get_path("@app/tmp/report.csv")
try:
format_data = []
if os.path.exists(record_path) is False:
logger.debug("基报不存在")
return False
df = pd.read_csv(record_path, encoding="gbk")
data = df.to_dict("records")
earliest_date = datetime.datetime.now()
begin_make_orundum = (earliest_date + datetime.timedelta(days=1)).date()
print(begin_make_orundum)
if len(data) >= 15:
for i in range(len(data) - 1, -1, -1):
if 0 < i < len(data) - 15:
data.pop(i)
else:
logger.debug("合成玉{}".format(data[i]["合成玉"]))
if data[i]["合成玉"] > 0:
begin_make_orundum = str2date(data[i]["Unnamed: 0"])
else:
for item in data:
if item["合成玉"] > 0:
begin_make_orundum = str2date(item["Unnamed: 0"])
if begin_make_orundum > earliest_date.date():
return format_data
total_orundum = 0
for item in data:
total_orundum = total_orundum + item["合成玉"]
format_data.append(
{
"日期": date2str(
str2date(item["Unnamed: 0"]) - datetime.timedelta(days=1)
),
"合成玉": item["合成玉"],
"合成玉订单数量": item["合成玉订单数量"],
"抽数": round((item["合成玉"] / 600), 1),
"累计制造合成玉": total_orundum,
}
)
if len(format_data) < 15:
earliest_date = str2date(data[0]["Unnamed: 0"])
for i in range(1, 16 - len(format_data)):
format_data.insert(
0,
{
"日期": date2str(
earliest_date - datetime.timedelta(days=i + 1)
),
"合成玉": "-",
"合成玉订单数量": "-",
"抽数": "-",
"累计制造合成玉": 0,
},
)
logger.debug(format_data)
return format_data
except PermissionError:
logger.info("report.csv正在被占用")
a = ReportSolver()
return a.get_orundum_data()
@app.route("/test-email")