feat: 重构EGM计算核心并添加实时日志推送功能

将EGM计算逻辑从webview_app.py移到main.py中的run_egm函数
添加实时日志推送和计算结果回调机制
支持后台线程计算不阻塞前端
This commit is contained in:
dmy
2026-03-02 22:49:38 +08:00
parent 47d3b7b6b4
commit 6f0f8d02a8
3 changed files with 168 additions and 248 deletions

View File

@@ -7,6 +7,7 @@ import os
import sys
import json
import math
import threading
from pathlib import Path
from typing import Dict, Any, List
from datetime import datetime
@@ -17,14 +18,8 @@ from loguru import logger
project_root = Path(__file__).parent
sys.path.insert(0, str(project_root))
from core import (
Parameter, para,
func_ng, min_i, rs_fun, rc_fun, rg_fun,
bd_area, thunder_density, arc_possibility,
rg_line_function_factory, solve_circle_intersection,
solve_circle_line_intersection, Draw
)
import numpy as np
from core import para
from main import parameter_display, run_egm
class WebHandler:
@@ -45,6 +40,35 @@ class WebHandler:
self.callback(log_entry)
class LoguruWebHandler:
"""Loguru 日志处理器,将 loguru 日志转发到 Web 界面"""
def __init__(self, app: 'EGMWebApp'):
self.app = app
def write(self, message):
"""loguru handler 的写入方法"""
record = message.record
level = record['level'].name.lower()
# 映射 loguru 级别到前端级别
level_map = {
'trace': 'debug',
'debug': 'debug',
'info': 'info',
'success': 'info',
'warning': 'warning',
'error': 'error',
'critical': 'error'
}
frontend_level = level_map.get(level, 'info')
# 提取消息文本
msg = record['message']
if msg.strip():
self.app.add_log(frontend_level, msg)
class EGMWebApp:
"""EGM 计算程序的 Web 界面后端"""
@@ -52,9 +76,10 @@ class EGMWebApp:
self.window = None
self.web_handler = None
self.logs: List[Dict[str, str]] = []
self._loguru_handler_id = None
def add_log(self, level: str, message: str):
"""添加日志"""
"""添加日志并实时推送到前端"""
log_entry = {
"level": level,
"time": datetime.now().strftime("%H:%M:%S"),
@@ -62,23 +87,59 @@ class EGMWebApp:
}
self.logs.append(log_entry)
# 实时推送到前端
if self.window:
try:
import json
js_code = f'if(window.addLogFromBackend){{window.addLogFromBackend({json.dumps(log_entry)})}}'
self.window.evaluate_js(js_code)
except Exception as e:
logger.error(f"推送日志到前端失败: {e}")
def get_logs(self) -> List[Dict[str, str]]:
"""获取日志列表"""
logs = self.logs.copy()
self.logs = []
return logs
def _setup_loguru_handler(self):
"""设置 loguru 处理器,捕获所有 logger 调用"""
self._loguru_handler_id = logger.add(
LoguruWebHandler(self),
format="{message}",
level="DEBUG"
)
def _remove_loguru_handler(self):
"""移除 loguru 处理器"""
if self._loguru_handler_id is not None:
logger.remove(self._loguru_handler_id)
self._loguru_handler_id = None
def calculate(self, params: Dict[str, Any]) -> Dict[str, Any]:
"""
执行 EGM 计算
执行 EGM 计算(启动后台线程,立即返回)
Args:
params: 包含 parameter, advance, optional 的字典
Returns:
计算结果字典
计算状态字典
"""
self.logs = [] # 清空日志
# 启动后台线程执行计算
thread = threading.Thread(target=self._calculate_thread, args=(params,))
thread.daemon = True
thread.start()
return {"status": "started", "message": "计算已启动"}
def _calculate_thread(self, params: Dict[str, Any]):
"""后台线程中执行计算"""
# 设置 loguru 处理器,捕获所有 logger.info/debug 等调用
self._setup_loguru_handler()
self.add_log("info", "开始 EGM 计算...")
try:
@@ -112,233 +173,49 @@ class EGMWebApp:
para.voltage_n = int(optional_data.get('voltage_n', 3))
para.max_i = float(optional_data.get('max_i', 200))
self.add_log("info", f"参数: 额定电压={para.rated_voltage}kV, 雷暴日={para.td}d, 海拔={para.altitude}m")
# 设置 ac_or_dc 参数
ac_or_dc_value = str(parameter_data.get('ac_or_dc', 'AC'))
para.ac_or_dc = 'DC' if 'DC' in ac_or_dc_value.upper() else 'AC'
logger.info("开始执行 _do_calculate...")
# 调用 main.py 的参数显示函数,日志会被 loguru handler 捕获
parameter_display(para)
# 执行实际计算
self.add_log("info", "EGM 计算中...")
result = self._do_calculate()
logger.info("开始执行 EGM 计算...")
# 调试输出
logger.info(f"_do_calculate 返回 keys: {list(result.keys())}")
logger.info(f"日志列表: {self.logs}")
# 调用 main.py 的核心计算函数
result = run_egm()
# 将日志添加到结果中(在返回之前添加最后一条日志)
self.add_log("info", "EGM 计算完成")
# 创建一个新的返回值,确保 logs 字段被包含
final_result = {
"success": result.get("success", True),
"message": result.get("message", "计算完成"),
"data": result.get("data", {}),
"logs": self.logs,
"DEBUG_VERSION": "v2" # 标记版本
}
# 推送结果到前端
self._send_result_to_frontend(result)
logger.info(f"最终返回结果 keys: {list(final_result.keys())}")
logger.info(f"日志数量: {len(self.logs)}")
logger.info(f"DEBUG: self.logs = {self.logs}")
return final_result
# 移除 loguru 处理器
self._remove_loguru_handler()
except Exception as e:
self.add_log("error", f"计算失败: {str(e)}")
import traceback
traceback.print_exc()
return {
# 移除 loguru 处理器
self._remove_loguru_handler()
# 推送错误到前端
self._send_result_to_frontend({
"success": False,
"message": f"计算失败: {str(e)}",
"error": str(e),
"logs": self.logs
}
"error": str(e)
})
def _do_calculate(self) -> Dict[str, Any]:
"""执行实际的EGM计算"""
try:
h_whole = para.h_arm[0]
except Exception as e:
self.add_log("error", f"获取参数失败: {str(e)}")
raise
string_g_len = para.string_g_len
string_c_len = para.string_c_len
h_g_sag = para.h_g_sag
h_c_sag = para.h_c_sag
gc_x = para.gc_x.copy()
h_arm = para.h_arm
gc_y = [
h_whole - string_g_len - h_g_sag * 2 / 3,
]
if len(h_arm) > 1:
for hoo in h_arm[1:]:
gc_y.append(hoo - string_c_len - h_c_sag * 2 / 3)
if len(gc_y) > 2:
phase_n = 3
else:
phase_n = 1
td = para.td
ng = func_ng(td)
avr_n_sf = 0
ground_angels = para.ground_angels
voltage_n = para.voltage_n
n_sf_phases = np.zeros((phase_n, voltage_n))
results = []
for ground_angel in ground_angels:
self.add_log("info", f"地面倾角 {ground_angel / math.pi * 180:.3f}°")
rg_type = None
rg_x = None
rg_y = None
for phase_conductor_foo in range(phase_n):
rs_x = gc_x[phase_conductor_foo]
rs_y = gc_y[phase_conductor_foo]
rc_x = gc_x[phase_conductor_foo + 1]
rc_y = gc_y[phase_conductor_foo + 1]
if phase_n == 1:
rg_type = "g"
if phase_n > 1:
if phase_conductor_foo < 2:
rg_type = "c"
rg_x = gc_x[phase_conductor_foo + 2]
rg_y = gc_y[phase_conductor_foo + 2]
else:
rg_type = "g"
rated_voltage = para.rated_voltage
for u_bar in range(voltage_n):
u_ph = rated_voltage / 1.732
insulator_c_len = para.insulator_c_len
i_min = min_i(insulator_c_len, u_ph)
_min_i = i_min
_max_i = para.max_i
i_max = _min_i
for i_bar in np.linspace(_min_i, _max_i, int((_max_i - _min_i) / 1)):
rs = rs_fun(i_bar)
rc = rc_fun(i_bar, u_ph)
rg = rg_fun(i_bar, rc_y, u_ph, typ=rg_type)
rg_line_func = None
if rg_type == "g":
rg_line_func = rg_line_function_factory(rg, ground_angel)
rs_rc_circle_intersection = solve_circle_intersection(
rs, rc, rs_x, rs_y, rc_x, rc_y
)
i_max = i_bar
if not rs_rc_circle_intersection:
continue
circle_rc_or_rg_line_intersection = None
if rg_type == "g":
circle_rc_or_rg_line_intersection = solve_circle_line_intersection(
rc, rc_x, rc_y, rg_line_func
)
elif rg_type == "c":
circle_rc_or_rg_line_intersection = solve_circle_intersection(
rg, rc, rg_x, rg_y, rc_x, rc_y
)
if not circle_rc_or_rg_line_intersection:
if rg_type == "g":
if rg_line_func(rc_x) > rc_y:
i_min = i_bar
continue
else:
continue
min_distance_intersection = (
np.sum(
(
np.array(rs_rc_circle_intersection)
- np.array(circle_rc_or_rg_line_intersection)
)
** 2
)
** 0.5
)
if min_distance_intersection < 0.1:
break
self.add_log("info", f"最大电流为 {i_max:.2f}, 最小电流为 {i_min:.2f}")
curt_fineness = 0.1
if i_min > i_max or abs(i_min - i_max) < curt_fineness:
self.add_log("info", "最大电流小于等于最小电流,没有暴露弧。")
continue
curt_segment_n = int((i_max - i_min) / curt_fineness)
i_curt_samples, d_curt = np.linspace(
i_min, i_max, curt_segment_n + 1, retstep=True
)
bd_area_vec = np.vectorize(bd_area)
ip_a = para.Ip_a
ip_b = para.Ip_b
bd_area_vec_result = bd_area_vec(
i_curt_samples,
u_ph,
rc_x,
rc_y,
rs_x,
rs_y,
rg_x,
rg_y,
ground_angel,
rg_type,
)
thunder_density_result = thunder_density(
i_curt_samples, td, ip_a, ip_b
)
cal_bd_np = bd_area_vec_result * thunder_density_result
calculus = np.sum(cal_bd_np[:-1] + cal_bd_np[1:]) / 2 * d_curt
n_sf = (
2
* ng
/ 10
* calculus
* arc_possibility(rated_voltage, insulator_c_len)
)
avr_n_sf += n_sf / voltage_n
n_sf_phases[phase_conductor_foo][u_bar] = n_sf
self.add_log("info", f"{phase_conductor_foo + 1}, 跳闸率: {n_sf:.16f} 次/(100km·a)")
result = {
"ground_angle": f"{ground_angel / math.pi * 180:.3f}°",
"tripping_rate": avr_n_sf,
"phases": np.mean(n_sf_phases, axis=1).tolist()
}
results.append(result)
return {
"success": True,
"message": "计算完成",
"data": {
"tripping_rate": f"{avr_n_sf:.16f} 次/(100km·a)",
"results": results,
"parameters": {
"rated_voltage": para.rated_voltage,
"td": para.td,
"altitude": para.altitude,
"ground_angels": [a / math.pi * 180 for a in para.ground_angels],
"max_i": para.max_i
}
}
}
def _send_result_to_frontend(self, result: Dict[str, Any]):
"""将计算结果推送到前端"""
if self.window:
try:
js_code = f'if(window.receiveResult){{window.receiveResult({json.dumps(result)})}}'
self.window.evaluate_js(js_code)
except Exception as e:
logger.error(f"推送结果到前端失败: {e}")
def dict_to_toml(self, obj: Dict[str, Any], indent: str = '') -> str:
"""