manage_tool/manage.py
2021-11-04 21:06:18 +08:00

254 lines
8.2 KiB
Python

from configparser import ConfigParser
from pathlib import Path
from json import dump, dumps
from subprocess import STDOUT, Popen, PIPE
from threading import Thread
from queue import Empty, SimpleQueue
import tkinter as tk
import tkinter.ttk as ttk
from tkinter import filedialog
from platform import system
config = ConfigParser()
options = {"ftp": ["host", "user", "pass", "lftp"], "file": ["local", "remote"]}
output_queue = SimpleQueue()
finished = False
win_os = system() == "Windows"
if win_os:
from subprocess import CREATE_NO_WINDOW
def check_config_valid(config: ConfigParser, options: dict[str, list[str]]) -> bool:
for sec in options:
if not config.has_section(sec):
return False
for opt in options[sec]:
if not config.has_option(sec, opt):
return False
return True
def set_default_config(config: ConfigParser, options: dict[str, list[str]]) -> None:
config.clear()
for sec in options:
config.add_section(sec)
for opt in options[sec]:
config[sec][opt] = ""
with open("config.ini", "w") as f:
config.write(f)
def generate_db(config: ConfigParser) -> str:
local_dir = Path(config["file"]["local"])
categories = Path(local_dir, "categories")
output = []
for category in categories.iterdir():
if not category.is_dir():
continue
output_category = {"name": category.name, "files": []}
for file in category.iterdir():
output_category["files"].append(file.name)
output.append(output_category)
with open(Path(local_dir, "categories", "db.json"), "w") as f:
dump(output, f)
return dumps(output)
def sync_all_command() -> list[str]:
file_path = config["file"]["local"]
if win_os:
drive = file_path[0]
file_path = f'/cygdrive/{drive}{Path(config["file"]["local"]).as_posix()[2:]}'
return [
config["ftp"]["lftp"],
f'-e set log:enabled true; set mirror:set-permissions false; mirror -R -e --parallel=10 {file_path} {config["file"]["remote"]}; exit',
"-u",
f'{config["ftp"]["user"]},{config["ftp"]["pass"]}',
config["ftp"]["host"],
]
def sync_pdf_command() -> list[str]:
file_path = config["file"]["local"]
if win_os:
drive = file_path[0]
file_path = f'/cygdrive/{drive}{Path(config["file"]["local"]).as_posix()[2:]}'
return [
config["ftp"]["lftp"],
f'-e set log:enabled true; set mirror:set-permissions false; mirror -R -e --parallel=10 {file_path}/categories {config["file"]["remote"]}/categories; exit',
"-u",
f'{config["ftp"]["user"]},{config["ftp"]["pass"]}',
config["ftp"]["host"],
]
def sync_func(command: list[str]) -> None:
global finished
finished = False
db_log = generate_db(config)
output_queue.put_nowait(f"db.json: {db_log}\n")
output_queue.put_nowait(f"command: {command}\n")
output_queue.put_nowait("Started lftp...\n")
if win_os:
proc = Popen(command, stdout=PIPE, stderr=STDOUT, creationflags=CREATE_NO_WINDOW)
else:
proc = Popen(command, stdout=PIPE, stderr=STDOUT)
ret_val = None
while ret_val is None:
ret_val = proc.poll()
line = proc.stdout.readline()
if line:
output_queue.put_nowait(line)
output_queue.put_nowait(f"lftp exited with code {ret_val}.\n")
finished = True
def display_output():
try:
while True:
line = output_queue.get_nowait()
lftp_output.insert(tk.END, line)
except Empty:
pass
lftp_output.see(tk.END)
if not finished:
root.after(200, display_output)
def sync_all_files(_: tk.Event) -> None:
get_config()
lftp_output.delete("1.0", tk.END)
Thread(target=sync_func, args=(sync_all_command(),), daemon=True).start()
root.after(200, display_output)
def sync_pdf_files(_: tk.Event) -> None:
get_config()
lftp_output.delete("1.0", tk.END)
Thread(target=sync_func, args=(sync_pdf_command(),), daemon=True).start()
root.after(200, display_output)
def select_lftp(_: tk.Event) -> None:
file_path = filedialog.askopenfilename()
config["ftp"]["lftp"] = file_path
lftp_entry.delete(0, tk.END)
lftp_entry.insert(0, file_path)
def select_local(_: tk.Event) -> None:
file_path = filedialog.askdirectory()
config["file"]["local"] = file_path
local_entry.delete(0, tk.END)
local_entry.insert(0, file_path)
def load_config(_: tk.Event) -> None:
config.read("config.ini")
if not check_config_valid(config, options):
set_default_config(config, options)
host_entry.delete(0, tk.END)
host_entry.insert(0, config["ftp"]["host"])
user_entry.delete(0, tk.END)
user_entry.insert(0, config["ftp"]["user"])
pass_entry.delete(0, tk.END)
pass_entry.insert(0, config["ftp"]["pass"])
lftp_entry.delete(0, tk.END)
lftp_entry.insert(0, config["ftp"]["lftp"])
local_entry.delete(0, tk.END)
local_entry.insert(0, config["file"]["local"])
remote_entry.delete(0, tk.END)
remote_entry.insert(0, config["file"]["remote"])
def get_config() -> None:
config["ftp"]["host"] = host_entry.get()
config["ftp"]["user"] = user_entry.get()
config["ftp"]["pass"] = pass_entry.get()
config["ftp"]["lftp"] = lftp_entry.get()
config["file"]["local"] = local_entry.get()
config["file"]["remote"] = remote_entry.get()
def save_config(_: tk.Event) -> None:
get_config()
with open("config.ini", "w") as f:
config.write(f)
config.read("config.ini")
if not check_config_valid(config, options):
set_default_config(config, options)
root = tk.Tk()
ftp_frm = ttk.Frame(root, border=1)
ftp_frm.grid(row=0, column=0, pady=10, padx=10)
ttk.Label(ftp_frm, text="FTP Settings").grid(row=0, column=0, columnspan=3)
ttk.Label(ftp_frm, text="Hostname", width=10).grid(row=1, column=0)
host_entry = ttk.Entry(ftp_frm, width=40)
host_entry.insert(0, config["ftp"]["host"])
host_entry.grid(row=1, column=1, columnspan=2)
ttk.Label(ftp_frm, text="Username", width=10).grid(row=2, column=0)
user_entry = ttk.Entry(ftp_frm, width=40)
user_entry.insert(0, config["ftp"]["user"])
user_entry.grid(row=2, column=1, columnspan=2)
ttk.Label(ftp_frm, text="Password", width=10).grid(row=3, column=0)
pass_entry = ttk.Entry(ftp_frm, width=40, show="*")
pass_entry.insert(0, config["ftp"]["pass"])
pass_entry.grid(row=3, column=1, columnspan=2)
ttk.Label(ftp_frm, text="lftp Path", width=10).grid(row=4, column=0)
lftp_entry = ttk.Entry(ftp_frm, width=29)
lftp_entry.insert(0, config["ftp"]["lftp"])
lftp_entry.grid(row=4, column=1)
lftp_select_btn = ttk.Button(ftp_frm, text="Select...", width=10)
lftp_select_btn.grid(row=4, column=2)
lftp_select_btn.bind("<Button-1>", select_lftp)
file_frm = ttk.Frame(root)
file_frm.grid(row=1, column=0, pady=10, padx=10)
ttk.Label(file_frm, text="File Path Settings").grid(row=0, column=0, columnspan=3)
ttk.Label(file_frm, text="Local Path", width=12).grid(row=1, column=0)
local_entry = ttk.Entry(file_frm, width=27)
local_entry.insert(0, config["file"]["local"])
local_entry.grid(row=1, column=1)
local_select_btn = ttk.Button(file_frm, text="Select...", width=10)
local_select_btn.grid(row=1, column=2)
local_select_btn.bind("<Button-1>", select_local)
ttk.Label(file_frm, text="Remote Path", width=12).grid(row=2, column=0)
remote_entry = ttk.Entry(file_frm, width=38)
remote_entry.insert(0, config["file"]["remote"])
remote_entry.grid(row=2, column=1, columnspan=2)
actions_frm = ttk.Frame(root)
actions_frm.grid(row=2, column=0, pady=10, padx=10)
load_btn = ttk.Button(actions_frm, text="Load Config", width=12)
load_btn.grid(row=0, column=0)
load_btn.bind("<Button-1>", load_config)
save_btn = ttk.Button(actions_frm, text="Save Config", width=12)
save_btn.grid(row=0, column=1)
save_btn.bind("<Button-1>", save_config)
sync_all_btn = ttk.Button(actions_frm, text="Sync all Files", width=12)
sync_all_btn.grid(row=0, column=2)
sync_all_btn.bind("<Button-1>", sync_all_files)
sync_pdf_btn = ttk.Button(actions_frm, text="Sync PDF Files", width=12)
sync_pdf_btn.grid(row=0, column=3)
sync_pdf_btn.bind("<Button-1>", sync_pdf_files)
lftp_output = tk.Text(actions_frm, width=53, height=12)
lftp_output.grid(row=1, column=0, columnspan=4)
root.mainloop()