""" EGM 输电线路绕击跳闸率计算程序 - Pywebview 界面 使用 Vue 3 + Quasar + TypeScript + Tailwind CSS 作为前端 """ import os import sys import json import math import threading import queue from pathlib import Path from typing import Dict, Any, List from datetime import datetime import webview from loguru import logger # 添加项目根目录到路径 project_root = Path(__file__).parent sys.path.insert(0, str(project_root)) from core import Parameter from main import parameter_display, run_egm class WebHandler: """Web日志处理器""" def __init__(self, callback=None): self.callback = callback self.logs: List[Dict[str, str]] = [] def write(self, message): if message.strip(): log_entry = { "level": "info", "time": datetime.now().strftime("%H:%M:%S"), "message": message.strip() } self.logs.append(log_entry) if self.callback: self.callback(log_entry) class LoguruWebHandler: """Loguru 日志处理器,将 loguru 日志转发到 Web 界面""" def __init__(self, log_queue: queue.Queue): self.log_queue = log_queue def write(self, message): """loguru handler 的写入方法""" record = message.record level = record['level'].name.lower() # 映射 loguru 级别到前端级别 level_map = { 'trace': 'debug', 'debug': 'debug', 'info': 'info', 'success': 'info', 'warning': 'warning', 'error': 'error', 'critical': 'error' } frontend_level = level_map.get(level, 'info') # 提取消息文本 msg = record['message'] if msg.strip(): log_entry = { "level": frontend_level, "time": datetime.now().strftime("%H:%M:%S"), "message": msg } # 将日志放入队列,由主线程处理 self.log_queue.put(log_entry) class EGMWebApp: """EGM 计算程序的 Web 界面后端""" def __init__(self): self.window = None self.web_handler = None self.logs: List[Dict[str, str]] = [] self._loguru_handler_id = None self._log_queue: queue.Queue = queue.Queue() self._running = False def _process_log_queue(self): """处理日志队列,在主线程中定时调用""" if not self._running: return try: # 处理队列中的所有日志 while not self._log_queue.empty(): try: log_entry = self._log_queue.get_nowait() self._push_log_to_frontend(log_entry) except queue.Empty: break except Exception as e: print(f"处理日志队列失败: {e}") # 继续定时检查 if self._running: threading.Timer(0.05, self._process_log_queue).start() def _push_log_to_frontend(self, log_entry: Dict[str, str]): """推送单条日志到前端""" if self.window: try: js_code = f'if(window.addLogFromBackend){{window.addLogFromBackend({json.dumps(log_entry)})}}' self.window.evaluate_js(js_code) except Exception as e: print(f"推送日志到前端失败: {e}") def add_log(self, level: str, message: str): """添加日志并实时推送到前端""" log_entry = { "level": level, "time": datetime.now().strftime("%H:%M:%S"), "message": message } self.logs.append(log_entry) # 将日志放入队列,由主线程处理 self._log_queue.put(log_entry) def get_logs(self) -> List[Dict[str, str]]: """获取日志列表""" logs = self.logs.copy() self.logs = [] return logs def _setup_loguru_handler(self): """设置 loguru 处理器,捕获所有 logger 调用""" self._loguru_handler_id = logger.add( LoguruWebHandler(self._log_queue), format="{message}", level="DEBUG" ) def _remove_loguru_handler(self): """移除 loguru 处理器""" if self._loguru_handler_id is not None: logger.remove(self._loguru_handler_id) self._loguru_handler_id = None def calculate(self, params: Dict[str, Any]) -> Dict[str, Any]: """ 执行 EGM 计算(启动后台线程,立即返回) Args: params: 包含 parameter, advance, optional 的字典 Returns: 计算状态字典 """ self.logs = [] # 清空日志 self._log_queue = queue.Queue() # 重置队列 # 启动日志队列处理器 self._running = True self._process_log_queue() # 启动后台线程执行计算 thread = threading.Thread(target=self._calculate_thread, args=(params,)) thread.daemon = True thread.start() return {"status": "started", "message": "计算已启动"} def _calculate_thread(self, params: Dict[str, Any]): """后台线程中执行计算""" # 设置 loguru 处理器,捕获所有 logger.info/debug 等调用 self._setup_loguru_handler() self.add_log("info", "开始 EGM 计算...") try: # 解析参数 parameter_data = params.get('parameter', {}) advance_data = params.get('advance', {}) optional_data = params.get('optional', {}) # 创建局部参数对象 para = Parameter() para.h_g_sag = float(parameter_data.get('h_g_sag', 11.67)) para.h_c_sag = float(parameter_data.get('h_c_sag', 14.43)) para.td = int(parameter_data.get('td', 20)) para.insulator_c_len = float(parameter_data.get('insulator_c_len', 7.02)) para.string_c_len = float(parameter_data.get('string_c_len', 9.2)) para.string_g_len = float(parameter_data.get('string_g_len', 0.5)) para.gc_x = list(parameter_data.get('gc_x', [17.9, 17])) para.ground_angels = [ angel / 180 * math.pi for angel in parameter_data.get('ground_angels', [0]) ] para.h_arm = list(parameter_data.get('h_arm', [150, 130])) para.altitude = int(parameter_data.get('altitude', 1000)) # 解析电压等级字符串,如 "500kV" -> 500 rated_voltage_str = str(parameter_data.get('rated_voltage', '500kV')) para.rated_voltage = float(rated_voltage_str.replace('kV', '').replace('±', '')) para.ng = float(advance_data.get('ng', -1)) para.Ip_a = float(advance_data.get('Ip_a', -1)) para.Ip_b = float(advance_data.get('Ip_b', -1)) para.voltage_n = int(optional_data.get('voltage_n', 3)) para.max_i = float(optional_data.get('max_i', 200)) # 设置 ac_or_dc 参数 ac_or_dc_value = str(parameter_data.get('ac_or_dc', 'AC')) para.ac_or_dc = 'DC' if 'DC' in ac_or_dc_value.upper() else 'AC' # 调用 main.py 的参数显示函数,日志会被 loguru handler 捕获 # parameter_display(para) logger.info("开始执行 EGM 计算...") # 调用 main.py 的核心计算函数 result = run_egm(para) self.add_log("info", "EGM 计算完成") # 推送结果到前端 self._send_result_to_frontend(result) # 等待队列中的日志处理完毕 import time time.sleep(0.1) while not self._log_queue.empty(): time.sleep(0.05) # 停止日志队列处理器 self._running = False # 移除 loguru 处理器 self._remove_loguru_handler() except Exception as e: self.add_log("error", f"计算失败: {str(e)}") import traceback traceback.print_exc() # 停止日志队列处理器 self._running = False # 移除 loguru 处理器 self._remove_loguru_handler() # 推送错误到前端 self._send_result_to_frontend({ "success": False, "message": f"计算失败: {str(e)}", "error": str(e) }) def _send_result_to_frontend(self, result: Dict[str, Any]): """将计算结果推送到前端""" if self.window: try: js_code = f'if(window.receiveResult){{window.receiveResult({json.dumps(result)})}}' self.window.evaluate_js(js_code) except Exception as e: logger.error(f"推送结果到前端失败: {e}") def dict_to_toml(self, obj: Dict[str, Any], indent: str = '') -> str: """ 将字典转换为 TOML 格式字符串 Args: obj: 参数字典 indent: 缩进字符串 Returns: TOML 格式字符串 """ result = '' for key, value in obj.items(): if value is None: continue if isinstance(value, list): result += f'{indent}{key} = [{", ".join(str(v) for v in value)}]\n' elif isinstance(value, dict): result += f'\n{indent}[{key}]\n' result += self.dict_to_toml(value, indent) elif isinstance(value, str): result += f'{indent}{key} = "{value}"\n' elif isinstance(value, bool): result += f'{indent}{key} = {str(value).lower()}\n' else: result += f'{indent}{key} = {value}\n' return result def export_config(self, params: Dict[str, Any]) -> Dict[str, Any]: """ 导出配置为 TOML 文件,弹出保存对话框 Args: params: 参数字典 Returns: 包含保存状态和路径的字典 """ try: # 生成默认文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') default_filename = f'egm_config_{timestamp}.toml' # 打开保存文件对话框 result = self.window.create_file_dialog( webview.SAVE_DIALOG, directory='', save_filename=default_filename, file_types=('TOML Files (*.toml)', 'All files (*.*)') ) if result and len(result) > 0: file_path = result[0] # 转换为 TOML 格式 toml_content = self.dict_to_toml(params) # 写入文件 with open(file_path, 'w', encoding='utf-8') as f: f.write(toml_content) return { "success": True, "message": f"配置已保存到: {file_path}", "file_path": file_path } else: return { "success": False, "message": "用户取消了保存操作" } except Exception as e: logger.error(f"导出配置失败: {str(e)}") return { "success": False, "message": f"保存失败: {str(e)}" } def export_log(self, log_text: str) -> Dict[str, Any]: """ 导出日志为 TXT 文件,弹出保存对话框 Args: log_text: 日志文本内容 Returns: 包含保存状态和路径的字典 """ try: # 生成默认文件名 timestamp = datetime.now().strftime('%Y%m%d_%H%M%S') default_filename = f'egm_log_{timestamp}.txt' # 打开保存文件对话框 result = self.window.create_file_dialog( webview.SAVE_DIALOG, directory='', save_filename=default_filename, file_types=('Text Files (*.txt)', 'All files (*.*)') ) if result and len(result) > 0: file_path = result[0] # 写入文件 with open(file_path, 'w', encoding='utf-8') as f: f.write(log_text) return { "success": True, "message": f"日志已保存到: {file_path}", "file_path": file_path } else: return { "success": False, "message": "用户取消了保存操作" } except Exception as e: logger.error(f"导出日志失败: {str(e)}") return { "success": False, "message": f"保存失败: {str(e)}" } def get_default_config(self) -> Dict[str, Any]: """ 获取默认配置 Returns: 默认配置字典 """ return { "parameter": { "rated_voltage": 750, "h_c_sag": 14.43, "h_g_sag": 11.67, "insulator_c_len": 7.02, "string_c_len": 9.2, "string_g_len": 0.5, "h_arm": [150, 130], "gc_x": [17.9, 17], "ground_angels": [0], "altitude": 1000, "td": 20 }, "advance": { "ng": -1, "Ip_a": -1, "Ip_b": -1 }, "optional": { "voltage_n": 3, "max_i": 200 } } def start_webview(): """启动 pywebview 界面""" # 确定前端 URL # 在开发环境中使用 Vite 开发服务器 # 在生产环境中使用构建后的文件 dev_mode = os.getenv('EGM_DEV_MODE', 'true').lower() == 'true' if dev_mode: # 开发模式:使用 Vite 开发服务器 url = 'http://localhost:5173' logger.info(f"开发模式:使用 Vite 开发服务器 {url}") logger.info("请先在 webui 目录中运行: npm install && npm run dev") else: # 生产模式:使用构建后的文件 dist_path = project_root / 'webui' / 'dist' if not dist_path.exists(): logger.error(f"构建目录不存在: {dist_path}") logger.error("请先运行: cd webui && npm run build") sys.exit(1) url = f'file://{dist_path / "index.html"}' logger.info(f"生产模式:使用构建文件 {url}") # 创建 API 实例 api = EGMWebApp() # 创建窗口 window = webview.create_window( title='EGM 输电线路绕击跳闸率计算', url=url, js_api=api, width=1200, height=900, resizable=True, min_size=(800, 600) ) # 将窗口对象传递给 API api.window = window # 启动 logger.info("启动 EGM Web 界面...") webview.start(debug=dev_mode) if __name__ == '__main__': # 配置日志 logger.remove() logger.add(sys.stderr, level="INFO") logger.add("egm_webui.log", rotation="10 MB", retention="7 days") # 启动界面 start_webview()