142 lines
4 KiB
Python
142 lines
4 KiB
Python
import logging
|
|
import shutil
|
|
import sys
|
|
import time
|
|
import traceback
|
|
from datetime import datetime, timedelta
|
|
from logging.handlers import TimedRotatingFileHandler
|
|
from pathlib import Path
|
|
from queue import Queue
|
|
from threading import Thread
|
|
|
|
import colorlog
|
|
|
|
from mower.utils import typealias as tp
|
|
from mower.utils.path import get_path
|
|
from mower.utils.traceback import log_info
|
|
|
|
BASIC_FORMAT = "%(asctime)s %(levelname)s %(message)s"
|
|
COLOR_FORMAT = f"%(log_color)s{BASIC_FORMAT}"
|
|
DATE_FORMAT = "%m-%d %H:%M:%S"
|
|
basic_formatter = logging.Formatter(BASIC_FORMAT, DATE_FORMAT)
|
|
color_formatter = colorlog.ColoredFormatter(COLOR_FORMAT, DATE_FORMAT)
|
|
|
|
|
|
_logger = logging.getLogger(__name__)
|
|
_logger.setLevel(logging.DEBUG)
|
|
|
|
|
|
class logger:
|
|
@staticmethod
|
|
def info(msg):
|
|
caller_class, relative_path, func_name, lineno = log_info()
|
|
_logger.info(f"[{caller_class}]{msg}")
|
|
_logger.debug(f"[{caller_class}] {relative_path}:{lineno} {func_name} {msg}")
|
|
|
|
@staticmethod
|
|
def debug(msg):
|
|
caller_class, relative_path, func_name, lineno = log_info()
|
|
_logger.debug(f"[{caller_class}] {relative_path}:{lineno} {func_name} {msg}")
|
|
|
|
@staticmethod
|
|
def warning(msg):
|
|
caller_class, relative_path, func_name, lineno = log_info()
|
|
_logger.warning(f"[{caller_class}] {relative_path}:{lineno} {func_name} {msg}")
|
|
|
|
@staticmethod
|
|
def error(msg):
|
|
caller_class, relative_path, func_name, lineno = log_info()
|
|
_logger.error(f"[{caller_class}] {relative_path}:{lineno} {func_name} {msg}")
|
|
|
|
@staticmethod
|
|
def exception(msg):
|
|
caller_class, relative_path, func_name, lineno = log_info()
|
|
_logger.exception(
|
|
f"[{caller_class}] {relative_path}:{lineno} {func_name} {msg}"
|
|
)
|
|
|
|
@staticmethod
|
|
def debug_exception(msg):
|
|
caller_class, relative_path, func_name, lineno = log_info()
|
|
_logger.debug(
|
|
f"[{caller_class}] {relative_path}:{lineno} {func_name} {msg}",
|
|
exc_info=True,
|
|
)
|
|
|
|
|
|
# d(ebug)hlr: 终端输出
|
|
dhlr = logging.StreamHandler(stream=sys.stdout)
|
|
dhlr.setFormatter(color_formatter)
|
|
dhlr.setLevel(logging.DEBUG)
|
|
_logger.addHandler(dhlr)
|
|
|
|
|
|
# f(ile)hlr: 文件记录
|
|
folder = Path(get_path("@app/log"))
|
|
folder.mkdir(exist_ok=True, parents=True)
|
|
fhlr = TimedRotatingFileHandler(
|
|
folder.joinpath("runtime.log"), encoding="utf8", backupCount=168
|
|
)
|
|
fhlr.setFormatter(basic_formatter)
|
|
fhlr.setLevel(logging.DEBUG)
|
|
_logger.addHandler(fhlr)
|
|
|
|
|
|
class Handler(logging.StreamHandler):
|
|
def emit(self, record: logging.LogRecord):
|
|
from mower.utils import config
|
|
|
|
msg = f"{record.asctime} {record.levelname} {record.message}"
|
|
if record.exc_info:
|
|
msg += "\n" + "".join(traceback.format_exception(*record.exc_info))
|
|
config.log_queue.put(msg)
|
|
|
|
|
|
# w(ebsocket)hlr: WebSocket
|
|
whlr = Handler()
|
|
whlr.setLevel(logging.INFO)
|
|
_logger.addHandler(whlr)
|
|
|
|
|
|
screenshot_folder = get_path("@app/screenshot")
|
|
screenshot_folder.mkdir(exist_ok=True, parents=True)
|
|
screenshot_queue = Queue()
|
|
cleanup_time = datetime.now()
|
|
|
|
|
|
def screenshot_cleanup():
|
|
from mower.utils import config
|
|
|
|
logger.info("清理过期截图")
|
|
start_time_ns = time.time_ns() - config.conf.screenshot * 3600 * 10**9
|
|
for i in screenshot_folder.iterdir():
|
|
if i.is_dir():
|
|
shutil.rmtree(i)
|
|
elif not i.stem.isnumeric():
|
|
i.unlink()
|
|
elif int(i.stem) < start_time_ns:
|
|
i.unlink()
|
|
global cleanup_time
|
|
cleanup_time = datetime.now()
|
|
|
|
|
|
def screenshot_worker():
|
|
from mower.utils.image import img2bytes
|
|
|
|
screenshot_cleanup()
|
|
while True:
|
|
now = datetime.now()
|
|
if now - cleanup_time > timedelta(hours=1):
|
|
screenshot_cleanup()
|
|
img, filename = screenshot_queue.get()
|
|
with screenshot_folder.joinpath(filename).open("wb") as f:
|
|
f.write(img2bytes(img))
|
|
|
|
|
|
Thread(target=screenshot_worker, daemon=True).start()
|
|
|
|
|
|
def save_screenshot(img: tp.Image) -> None:
|
|
filename = f"{time.time_ns()}.jpg"
|
|
logger.debug(filename)
|
|
screenshot_queue.put((img, filename))
|