feat: 使用队列优化日志处理并添加线程安全机制

This commit is contained in:
dmy
2026-03-03 11:48:35 +08:00
parent 9557e18fd1
commit 02bfcc18e4

View File

@@ -8,6 +8,7 @@ import sys
import json
import math
import threading
import queue
from pathlib import Path
from typing import Dict, Any, List
from datetime import datetime
@@ -43,8 +44,8 @@ class WebHandler:
class LoguruWebHandler:
"""Loguru 日志处理器,将 loguru 日志转发到 Web 界面"""
def __init__(self, app: 'EGMWebApp'):
self.app = app
def __init__(self, log_queue: queue.Queue):
self.log_queue = log_queue
def write(self, message):
"""loguru handler 的写入方法"""
@@ -66,7 +67,13 @@ class LoguruWebHandler:
# 提取消息文本
msg = record['message']
if msg.strip():
self.app.add_log(frontend_level, msg)
log_entry = {
"level": frontend_level,
"time": datetime.now().strftime("%H:%M:%S"),
"message": msg
}
# 将日志放入队列,由主线程处理
self.log_queue.put(log_entry)
class EGMWebApp:
@@ -77,6 +84,37 @@ class EGMWebApp:
self.web_handler = None
self.logs: List[Dict[str, str]] = []
self._loguru_handler_id = None
self._log_queue: queue.Queue = queue.Queue()
self._running = False
def _process_log_queue(self):
"""处理日志队列,在主线程中定时调用"""
if not self._running:
return
try:
# 处理队列中的所有日志
while not self._log_queue.empty():
try:
log_entry = self._log_queue.get_nowait()
self._push_log_to_frontend(log_entry)
except queue.Empty:
break
except Exception as e:
print(f"处理日志队列失败: {e}")
# 继续定时检查
if self._running:
threading.Timer(0.05, self._process_log_queue).start()
def _push_log_to_frontend(self, log_entry: Dict[str, str]):
"""推送单条日志到前端"""
if self.window:
try:
js_code = f'if(window.addLogFromBackend){{window.addLogFromBackend({json.dumps(log_entry)})}}'
self.window.evaluate_js(js_code)
except Exception as e:
print(f"推送日志到前端失败: {e}")
def add_log(self, level: str, message: str):
"""添加日志并实时推送到前端"""
@@ -87,14 +125,8 @@ class EGMWebApp:
}
self.logs.append(log_entry)
# 实时推送到前端
if self.window:
try:
import json
js_code = f'if(window.addLogFromBackend){{window.addLogFromBackend({json.dumps(log_entry)})}}'
self.window.evaluate_js(js_code)
except Exception as e:
logger.error(f"推送日志到前端失败: {e}")
# 将日志放入队列,由主线程处理
self._log_queue.put(log_entry)
def get_logs(self) -> List[Dict[str, str]]:
"""获取日志列表"""
@@ -105,7 +137,7 @@ class EGMWebApp:
def _setup_loguru_handler(self):
"""设置 loguru 处理器,捕获所有 logger 调用"""
self._loguru_handler_id = logger.add(
LoguruWebHandler(self),
LoguruWebHandler(self._log_queue),
format="{message}",
level="DEBUG"
)
@@ -127,6 +159,11 @@ class EGMWebApp:
计算状态字典
"""
self.logs = [] # 清空日志
self._log_queue = queue.Queue() # 重置队列
# 启动日志队列处理器
self._running = True
self._process_log_queue()
# 启动后台线程执行计算
thread = threading.Thread(target=self._calculate_thread, args=(params,))
@@ -179,7 +216,7 @@ class EGMWebApp:
para.ac_or_dc = 'DC' if 'DC' in ac_or_dc_value.upper() else 'AC'
# 调用 main.py 的参数显示函数,日志会被 loguru handler 捕获
parameter_display(para)
# parameter_display(para)
logger.info("开始执行 EGM 计算...")
@@ -191,6 +228,15 @@ class EGMWebApp:
# 推送结果到前端
self._send_result_to_frontend(result)
# 等待队列中的日志处理完毕
import time
time.sleep(0.1)
while not self._log_queue.empty():
time.sleep(0.05)
# 停止日志队列处理器
self._running = False
# 移除 loguru 处理器
self._remove_loguru_handler()
@@ -199,6 +245,9 @@ class EGMWebApp:
import traceback
traceback.print_exc()
# 停止日志队列处理器
self._running = False
# 移除 loguru 处理器
self._remove_loguru_handler()