- 重新启用run.io_bound()调用,使用后台线程执行PowerShell脚本 - 为所有文件保存操作添加按钮防重复点击功能 - 新增win32_helper模块,提供Win32 API和COM接口的文件对话框 - 简化导出最佳方案DXF的代码结构 - 改进异步操作和错误处理机制
1067 lines
43 KiB
Python
1067 lines
43 KiB
Python
import contextlib
|
||
import io
|
||
import os
|
||
import sys
|
||
import tempfile
|
||
|
||
import matplotlib
|
||
|
||
matplotlib.use("Agg")
|
||
import matplotlib.backends.backend_svg
|
||
import matplotlib.pyplot as plt
|
||
import pandas as pd
|
||
from nicegui import app, events, ui, run
|
||
|
||
from main import (
|
||
compare_design_methods,
|
||
export_all_scenarios_to_excel,
|
||
export_to_dxf,
|
||
generate_wind_farm_data,
|
||
load_data_from_excel,
|
||
visualize_design,
|
||
)
|
||
|
||
# 尝试导入自动生成的版本号
|
||
try:
|
||
from version import VERSION
|
||
except ImportError:
|
||
VERSION = "v1.0"
|
||
|
||
# 设置matplotlib支持中文显示
|
||
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
|
||
plt.rcParams["axes.unicode_minus"] = False
|
||
|
||
|
||
class Logger(io.StringIO):
|
||
def __init__(self, log_element):
|
||
super().__init__()
|
||
self.log_element = log_element
|
||
|
||
def write(self, message):
|
||
if message.strip():
|
||
self.log_element.push(message.strip())
|
||
super().write(message)
|
||
|
||
|
||
# 状态变量
|
||
# 优先从环境变量 PROJECT_TEMP_DIR 读取,否则使用系统默认临时目录
|
||
_base_temp = os.environ.get("PROJECT_TEMP_DIR", tempfile.gettempdir())
|
||
state = {
|
||
"excel_path": None,
|
||
"original_filename": None,
|
||
"results": [],
|
||
"substation": None,
|
||
"turbines": None,
|
||
"cable_specs": None,
|
||
"system_params": None,
|
||
"temp_dir": os.path.join(_base_temp, "windfarm_gui_uploads"),
|
||
}
|
||
|
||
# 确保临时目录存在
|
||
if not os.path.exists(state["temp_dir"]):
|
||
os.makedirs(state["temp_dir"], exist_ok=True)
|
||
|
||
|
||
@ui.page("/")
|
||
def index():
|
||
# 注入 CSS 隐藏表格自带的复选框列,并定义选中行的高亮背景色
|
||
ui.add_head_html(
|
||
"""
|
||
<style>
|
||
.hide-selection-column .q-table__selection { display: none; }
|
||
.hide-selection-column .q-table tr.selected { background-color: #e3f2fd !important; }
|
||
.no-list .q-uploader__list { display: none !important; }
|
||
</style>
|
||
"""
|
||
)
|
||
ui.query("body").style(
|
||
'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;'
|
||
)
|
||
|
||
# 定义 UI 元素引用容器,方便在函数中更新
|
||
refs = {
|
||
"log_box": None,
|
||
"results_table": None,
|
||
"plot_container": None,
|
||
"export_row": None,
|
||
"export_selected_btn": None, # 新增按钮引用
|
||
"status_label": None,
|
||
"upload_widget": None,
|
||
"run_btn": None,
|
||
"current_file_container": None, # 替换 label 为 container
|
||
"info_container": None, # 新增信息展示容器
|
||
}
|
||
|
||
def update_info_panel():
|
||
if refs["info_container"]:
|
||
refs["info_container"].clear()
|
||
with refs["info_container"]:
|
||
# System Params - Always show
|
||
with ui.row().classes("w-full items-center gap-4 mb-2"):
|
||
ui.icon("settings", color="primary").classes("text-2xl")
|
||
ui.label("系统参数").classes("text-lg font-bold")
|
||
|
||
params_text = []
|
||
|
||
# 获取电压
|
||
v = 66000 # Default
|
||
is_default_v = True
|
||
if (
|
||
state.get("system_params")
|
||
and "voltage" in state["system_params"]
|
||
):
|
||
v = state["system_params"]["voltage"]
|
||
is_default_v = False
|
||
|
||
v_str = f"电压: {v / 1000:.1f} kV" if v >= 1000 else f"电压: {v} V"
|
||
if is_default_v:
|
||
v_str += " (默认)"
|
||
params_text.append(v_str)
|
||
|
||
# 获取功率因数
|
||
pf = 0.95 # Default
|
||
is_default_pf = True
|
||
if (
|
||
state.get("system_params")
|
||
and "power_factor" in state["system_params"]
|
||
):
|
||
pf = state["system_params"]["power_factor"]
|
||
is_default_pf = False
|
||
|
||
pf_str = f"功率因数: {pf}"
|
||
if is_default_pf:
|
||
pf_str += " (默认)"
|
||
params_text.append(pf_str)
|
||
|
||
# 获取电价
|
||
ep = 0.4 # Default
|
||
is_default_ep = True
|
||
if (
|
||
state.get("system_params")
|
||
and "electricity_price" in state["system_params"]
|
||
):
|
||
ep = state["system_params"]["electricity_price"]
|
||
is_default_ep = False
|
||
|
||
ep_str = f"电价: {ep} 元/kWh"
|
||
if is_default_ep:
|
||
ep_str += " (默认)"
|
||
params_text.append(ep_str)
|
||
|
||
# 获取工程运行期限
|
||
lifetime = 25 # Default
|
||
is_default_lifetime = True
|
||
if (
|
||
state.get("system_params")
|
||
and "project_lifetime" in state["system_params"]
|
||
):
|
||
lifetime = state["system_params"]["project_lifetime"]
|
||
is_default_lifetime = False
|
||
|
||
lifetime_str = f"工程运行期限: {lifetime} 年"
|
||
if is_default_lifetime:
|
||
lifetime_str += " (默认)"
|
||
params_text.append(lifetime_str)
|
||
|
||
# 获取折现率
|
||
discount_rate = 8 # Default
|
||
is_default_discount = True
|
||
if (
|
||
state.get("system_params")
|
||
and "discount_rate" in state["system_params"]
|
||
):
|
||
discount_rate = state["system_params"]["discount_rate"]
|
||
is_default_discount = False
|
||
|
||
discount_str = f"折现率: {discount_rate}%"
|
||
if is_default_discount:
|
||
discount_str += " (默认)"
|
||
params_text.append(discount_str)
|
||
|
||
# 获取年损耗小时数
|
||
annual_hours = 1400 # Default
|
||
is_default_hours = True
|
||
if (
|
||
state.get("system_params")
|
||
and "annual_loss_hours" in state["system_params"]
|
||
):
|
||
annual_hours = state["system_params"]["annual_loss_hours"]
|
||
is_default_hours = False
|
||
|
||
hours_str = f"年损耗小时数: {annual_hours} 小时"
|
||
if is_default_hours:
|
||
hours_str += " (默认)"
|
||
params_text.append(hours_str)
|
||
|
||
for p in params_text:
|
||
ui.chip(p, icon="bolt").props("outline color=primary")
|
||
|
||
ui.separator().classes("my-2")
|
||
|
||
# Cables
|
||
if state.get("cable_specs"):
|
||
with ui.row().classes("w-full items-center gap-2 mb-2"):
|
||
ui.icon("cable", color="secondary").classes("text-2xl")
|
||
ui.label("电缆规格参数").classes("text-lg font-bold")
|
||
|
||
columns = [
|
||
{
|
||
"name": "section",
|
||
"label": "截面 (mm²)",
|
||
"field": "section",
|
||
"align": "center",
|
||
},
|
||
{
|
||
"name": "capacity",
|
||
"label": "载流量 (A)",
|
||
"field": "capacity",
|
||
"align": "center",
|
||
},
|
||
{
|
||
"name": "resistance",
|
||
"label": "电阻 (Ω/km)",
|
||
"field": "resistance",
|
||
"align": "center",
|
||
},
|
||
{
|
||
"name": "cost",
|
||
"label": "参考单价(万元/km)",
|
||
"field": "cost",
|
||
"align": "center",
|
||
},
|
||
{
|
||
"name": "is_optional",
|
||
"label": "是否为可选",
|
||
"field": "is_optional",
|
||
"align": "center",
|
||
},
|
||
]
|
||
rows = []
|
||
for spec in state["cable_specs"]:
|
||
# spec is (section, capacity, resistance, cost, is_optional)
|
||
rows.append(
|
||
{
|
||
"section": spec[0],
|
||
"capacity": spec[1],
|
||
"resistance": spec[2],
|
||
"cost": f"{spec[3] / 10:.2f}"
|
||
if spec[3] is not None
|
||
else "0.00",
|
||
"is_optional": "Y" if len(spec) > 4 and spec[4] else "",
|
||
}
|
||
)
|
||
ui.table(columns=columns, rows=rows).classes("w-full").props(
|
||
"dense flat bordered"
|
||
)
|
||
else:
|
||
ui.label("未检测到电缆数据,将使用默认参数。").classes(
|
||
"text-gray-500 italic"
|
||
)
|
||
|
||
async def handle_upload(e: events.UploadEventArguments):
|
||
try:
|
||
filename = None
|
||
content = None
|
||
if hasattr(e, "name"):
|
||
filename = e.name
|
||
if hasattr(e, "content"):
|
||
content = e.content
|
||
if content is None and hasattr(e, "file"):
|
||
file_obj = e.file
|
||
if not filename:
|
||
filename = getattr(
|
||
file_obj, "name", getattr(file_obj, "filename", None)
|
||
)
|
||
if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"):
|
||
content = file_obj.file
|
||
elif hasattr(file_obj, "read"):
|
||
content = file_obj
|
||
if not filename:
|
||
filename = "uploaded_file.xlsx"
|
||
if content is None:
|
||
ui.notify("上传失败: 无法解析文件内容", type="negative")
|
||
return
|
||
|
||
# 清理旧文件,确保目录中只有一个文件
|
||
if os.path.exists(state["temp_dir"]):
|
||
for f in os.listdir(state["temp_dir"]):
|
||
try:
|
||
os.remove(os.path.join(state["temp_dir"], f))
|
||
except:
|
||
pass
|
||
|
||
path = os.path.join(state["temp_dir"], filename)
|
||
if hasattr(content, "seek"):
|
||
try:
|
||
content.seek(0)
|
||
except Exception:
|
||
pass
|
||
data = content.read()
|
||
import inspect
|
||
|
||
if inspect.iscoroutine(data):
|
||
data = await data
|
||
with open(path, "wb") as f:
|
||
f.write(data)
|
||
|
||
state["excel_path"] = path
|
||
state["original_filename"] = filename
|
||
ui.notify(f"文件已上传: {filename}", type="positive")
|
||
|
||
# 更新文件显示区域
|
||
if refs["current_file_container"]:
|
||
# refs["current_file_container"].set_visibility(True)
|
||
refs["current_file_container"].clear()
|
||
with refs["current_file_container"]:
|
||
with ui.row().classes(
|
||
"items-center w-full bg-blue-50 p-2 rounded border border-blue-200"
|
||
):
|
||
ui.icon("description", color="primary").classes("text-xl mr-2")
|
||
ui.label(filename).classes(
|
||
"font-medium text-gray-700 flex-grow"
|
||
)
|
||
ui.icon("check_circle", color="positive")
|
||
|
||
# 加载数据
|
||
try:
|
||
# 尝试解包 4 个返回值 (新版 main.py)
|
||
(
|
||
state["turbines"],
|
||
state["substation"],
|
||
state["cable_specs"],
|
||
state["system_params"],
|
||
) = load_data_from_excel(path)
|
||
except ValueError:
|
||
# 兼容旧版 (如果是 3 个返回值)
|
||
state["turbines"], state["substation"], state["cable_specs"] = (
|
||
load_data_from_excel(path)
|
||
)
|
||
state["system_params"] = {}
|
||
|
||
update_info_panel()
|
||
|
||
# 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用)
|
||
if refs["upload_widget"]:
|
||
refs["upload_widget"].reset()
|
||
|
||
except Exception as ex:
|
||
ui.notify(f"上传处理失败: {ex}", type="negative")
|
||
|
||
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)", sender=None):
|
||
"""
|
||
跨平台文件保存助手。
|
||
如果是原生模式,弹出系统保存对话框。
|
||
如果是浏览器模式(但在本地运行),尝试使用 tkinter 弹出对话框。
|
||
最后回退到使用 nicegui ui.download。
|
||
:param filename: 默认文件名
|
||
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
|
||
:param file_filter: 格式如 "Excel Files (*.xlsx)"
|
||
:param sender: 触发该操作的 UI 组件,用于在操作期间禁用以防重复点击
|
||
"""
|
||
if sender:
|
||
sender.disable()
|
||
try:
|
||
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
|
||
import platform
|
||
import subprocess
|
||
|
||
if platform.system() == "Windows":
|
||
try:
|
||
# 构建 PowerShell 脚本
|
||
# 注意:过滤器格式为 "描述|*.ext|所有文件|*.*"
|
||
ps_filter = file_filter.replace("(", "|").replace(")", "")
|
||
if "|" not in ps_filter:
|
||
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
|
||
|
||
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
|
||
# 这里做一个简化的映射,确保格式正确
|
||
if "Excel" in file_filter:
|
||
ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*"
|
||
elif "DXF" in file_filter:
|
||
ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*"
|
||
elif "ZIP" in file_filter:
|
||
ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*"
|
||
else:
|
||
ps_filter = "All Files (*.*)|*.*"
|
||
|
||
ps_script = f"""
|
||
Add-Type -AssemblyName System.Windows.Forms
|
||
$d = New-Object System.Windows.Forms.SaveFileDialog
|
||
$d.Filter = "{ps_filter}"
|
||
$d.FileName = "{filename}"
|
||
$d.Title = "保存文件"
|
||
if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{
|
||
Write-Output $d.FileName
|
||
}}
|
||
"""
|
||
|
||
# 运行 PowerShell
|
||
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
|
||
startupinfo = subprocess.STARTUPINFO()
|
||
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
|
||
|
||
print("DEBUG: invoking PowerShell SaveFileDialog...")
|
||
|
||
# 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环
|
||
# 这样按钮的禁用状态可以立即同步到前端
|
||
result = await run.io_bound(
|
||
subprocess.run,
|
||
["powershell", "-Command", ps_script],
|
||
capture_output=True,
|
||
text=True,
|
||
startupinfo=startupinfo
|
||
)
|
||
save_path = result.stdout.strip()
|
||
if save_path:
|
||
print(f"DEBUG: PowerShell returned path: {save_path}")
|
||
await callback(save_path)
|
||
ui.notify(f"文件已保存至: {save_path}", type="positive")
|
||
return
|
||
else:
|
||
print("DEBUG: PowerShell dialog cancelled or empty result.")
|
||
# 用户取消,直接返回,不回退
|
||
return
|
||
|
||
except Exception as e:
|
||
print(f"PowerShell dialog failed: {e}")
|
||
# 出错则回退到 ui.download
|
||
|
||
# 统一回退方案:浏览器下载
|
||
print("DEBUG: Using ui.download fallback")
|
||
temp_path = os.path.join(state["temp_dir"], filename)
|
||
await callback(temp_path)
|
||
ui.download(temp_path)
|
||
finally:
|
||
if sender:
|
||
sender.enable()
|
||
|
||
def update_export_buttons():
|
||
if refs["export_row"]:
|
||
refs["export_row"].clear()
|
||
if not state["results"] or not refs["export_row"]:
|
||
return
|
||
|
||
# 获取文件名基础前缀
|
||
file_prefix = "wind_farm"
|
||
default_excel_name = "wind_farm_design_result.xlsx"
|
||
|
||
# 确定主对比 Excel 的生成路径 (对应 main.py 中的生成逻辑)
|
||
if state.get("excel_path"):
|
||
file_prefix = os.path.splitext(state["original_filename"])[0]
|
||
default_excel_name = f"{file_prefix}_result.xlsx"
|
||
# 这里的路径是 main.py 中生成的源文件路径,用于复制
|
||
source_excel_path = os.path.join(
|
||
state["temp_dir"], f"{file_prefix}_design.xlsx"
|
||
)
|
||
else:
|
||
source_excel_path = "wind_farm_design.xlsx"
|
||
|
||
# 寻找推荐方案
|
||
scenario1_results = [r for r in state["results"] if "Scenario 1" in r["name"]]
|
||
if scenario1_results:
|
||
best_res = min(scenario1_results, key=lambda x: x["cost"])
|
||
else:
|
||
best_res = min(state["results"], key=lambda x: x["cost"])
|
||
|
||
with refs["export_row"]:
|
||
# --- 下载 Excel ---
|
||
async def save_excel(path):
|
||
import shutil
|
||
|
||
# 如果源文件存在,则复制到目标路径
|
||
if os.path.exists(source_excel_path):
|
||
shutil.copy2(source_excel_path, path)
|
||
else:
|
||
# 如果不存在,重新生成
|
||
export_all_scenarios_to_excel(state["results"], path)
|
||
|
||
async def on_click_excel(e):
|
||
await save_file_with_dialog(
|
||
default_excel_name, save_excel, "Excel Files (*.xlsx)", sender=e.sender
|
||
)
|
||
|
||
ui.button(
|
||
"下载 Excel 对比表",
|
||
on_click=on_click_excel,
|
||
).props("icon=download")
|
||
|
||
# --- 导出推荐方案 DXF ---
|
||
async def on_click_best_dxf(e):
|
||
if state["substation"] is not None:
|
||
safe_name = "".join(
|
||
[
|
||
c
|
||
for c in best_res["name"]
|
||
if c.isalnum() or c in (" ", "-", "_")
|
||
]
|
||
).strip()
|
||
default_name = f"{file_prefix}_best_{safe_name}.dxf"
|
||
|
||
async def save_dxf(path):
|
||
export_to_dxf(
|
||
best_res["turbines"],
|
||
state["substation"],
|
||
best_res["eval"]["details"],
|
||
path,
|
||
)
|
||
|
||
await save_file_with_dialog(
|
||
default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
|
||
)
|
||
else:
|
||
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
|
||
|
||
ui.button(
|
||
f"导出推荐方案 DXF ({best_res['name']})", on_click=on_click_best_dxf
|
||
).props("icon=architecture color=accent")
|
||
|
||
# --- 导出选中方案 DXF ---
|
||
async def on_click_selected_dxf(e):
|
||
if not refs["results_table"] or not refs["results_table"].selected:
|
||
ui.notify("请先在上方表格中选择一个方案", type="warning")
|
||
return
|
||
|
||
selected_row = refs["results_table"].selected[0]
|
||
row_name = selected_row.get("original_name", selected_row.get("name"))
|
||
selected_res = next(
|
||
(r for r in state["results"] if r["name"] == row_name), None
|
||
)
|
||
|
||
if selected_res and state["substation"] is not None:
|
||
safe_name = "".join(
|
||
[
|
||
c
|
||
for c in selected_res["name"]
|
||
if c.isalnum() or c in (" ", "-", "_")
|
||
]
|
||
).strip()
|
||
default_name = f"{file_prefix}_{safe_name}.dxf"
|
||
|
||
async def save_dxf(path):
|
||
export_to_dxf(
|
||
selected_res["turbines"],
|
||
state["substation"],
|
||
selected_res["eval"]["details"],
|
||
path,
|
||
)
|
||
|
||
await save_file_with_dialog(
|
||
default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
|
||
)
|
||
else:
|
||
ui.notify(
|
||
"无法导出:未找到方案数据或缺少升压站信息", type="negative"
|
||
)
|
||
|
||
refs["export_selected_btn"] = ui.button(
|
||
"导出选中方案 DXF", on_click=on_click_selected_dxf
|
||
).props("icon=architecture color=primary")
|
||
|
||
clean_name = best_res["name"].replace("(推荐) ", "")
|
||
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
|
||
|
||
# --- 导出全部 ZIP ---
|
||
async def on_click_all_dxf(e):
|
||
if not state["results"] or state["substation"] is None:
|
||
ui.notify("无方案数据可导出", type="warning")
|
||
return
|
||
|
||
default_name = f"{file_prefix}_all_results.zip"
|
||
|
||
async def save_zip(path):
|
||
import zipfile
|
||
|
||
excel_result_name = f"{file_prefix}_summary.xlsx"
|
||
|
||
with zipfile.ZipFile(path, "w", zipfile.ZIP_DEFLATED) as zipf:
|
||
# 1. Excel
|
||
temp_excel = os.path.join(state["temp_dir"], "temp_export.xlsx")
|
||
export_all_scenarios_to_excel(state["results"], temp_excel)
|
||
zipf.write(temp_excel, arcname=excel_result_name)
|
||
try:
|
||
os.remove(temp_excel)
|
||
except:
|
||
pass
|
||
|
||
# 2. DXFs
|
||
for res in state["results"]:
|
||
safe_name = "".join(
|
||
[
|
||
c
|
||
for c in res["name"]
|
||
if c.isalnum() or c in (" ", "-", "_")
|
||
]
|
||
).strip()
|
||
dxf_name = os.path.join(
|
||
state["temp_dir"], f"{file_prefix}_{safe_name}.dxf"
|
||
)
|
||
export_to_dxf(
|
||
res["turbines"],
|
||
state["substation"],
|
||
res["eval"]["details"],
|
||
dxf_name,
|
||
)
|
||
zipf.write(dxf_name, arcname=os.path.basename(dxf_name))
|
||
try:
|
||
os.remove(dxf_name)
|
||
except:
|
||
pass
|
||
|
||
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender)
|
||
|
||
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
|
||
"icon=folder_zip color=secondary"
|
||
)
|
||
|
||
def update_plot(result):
|
||
if refs["plot_container"]:
|
||
refs["plot_container"].clear()
|
||
with refs["plot_container"]:
|
||
# 使用 ui.pyplot 上下文自动管理 figure 生命周期
|
||
with ui.pyplot(figsize=(10, 8)) as plot:
|
||
title = f"{result['name']}\nCost: ¥{result['cost'] / 10000:.2f}万 | Loss: {result['loss']:.2f} kW"
|
||
# 显式获取当前 ui.pyplot 创建的 axes,并传递给绘图函数
|
||
# 确保绘图发生在正确的 figure 上
|
||
ax = plt.gca()
|
||
visualize_design(
|
||
result["turbines"],
|
||
state["substation"],
|
||
result["eval"]["details"],
|
||
title,
|
||
ax=ax,
|
||
)
|
||
|
||
async def handle_row_click(e):
|
||
# 获取被点击行的数据
|
||
row = e.args[1] if len(e.args) > 1 else None
|
||
if not row:
|
||
return
|
||
|
||
# 识别方案名称
|
||
row_name = row.get("original_name", row.get("name"))
|
||
if not row_name:
|
||
return
|
||
|
||
selected_res = next(
|
||
(r for r in state["results"] if r["name"] == row_name), None
|
||
)
|
||
if selected_res:
|
||
# 1. 更新拓扑图
|
||
update_plot(selected_res)
|
||
ui.notify(f"已切换至方案: {selected_res['name']}")
|
||
|
||
# 2. 通过设置 table 的 selected 属性来实现背景高亮
|
||
if refs["results_table"]:
|
||
refs["results_table"].selected = [row]
|
||
|
||
# 3. 更新“导出选中方案”按钮的文本
|
||
if refs["export_selected_btn"]:
|
||
clean_name = row_name.replace("(推荐) ", "")
|
||
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
|
||
|
||
import queue
|
||
|
||
async def run_analysis():
|
||
if not state["excel_path"]:
|
||
ui.notify("请先上传 Excel 坐标文件!", type="warning")
|
||
return
|
||
if refs["log_box"]:
|
||
refs["log_box"].clear()
|
||
log_queue = queue.Queue()
|
||
|
||
class QueueLogger(io.StringIO):
|
||
def write(self, message):
|
||
if message and message.strip():
|
||
log_queue.put(message.strip())
|
||
super().write(message)
|
||
|
||
def process_log_queue():
|
||
if refs["log_box"]:
|
||
new_msg = False
|
||
while not log_queue.empty():
|
||
try:
|
||
msg = log_queue.get_nowait()
|
||
refs["log_box"].push(msg)
|
||
new_msg = True
|
||
if msg.startswith("--- Scenario"):
|
||
scenario_name = msg.replace("---", "").strip()
|
||
if refs["status_label"]:
|
||
refs[
|
||
"status_label"
|
||
].text = f"正在计算: {scenario_name}..."
|
||
elif "开始比较电缆方案" in msg:
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = "准备开始计算..."
|
||
except queue.Empty:
|
||
break
|
||
if new_msg and refs["log_box"]:
|
||
# 使用 JS 直接滚动 log 元素到最底部
|
||
# 增加一个小延时确保内容渲染完成
|
||
ui.run_javascript(
|
||
f'var el = document.getElementById("c{refs["log_box"].id}"); if(el) {{ setTimeout(() => {{ el.scrollTop = el.scrollHeight; }}, 10); }}'
|
||
)
|
||
|
||
log_timer = ui.timer(0.1, process_log_queue)
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = "初始化中..."
|
||
processing_dialog.open()
|
||
try:
|
||
# 2. 定义在线程中运行的任务
|
||
def task():
|
||
# 捕获 stdout 到我们的 QueueLogger
|
||
# 禁止 main.py 中的后台绘图,避免线程安全问题
|
||
with contextlib.redirect_stdout(QueueLogger()):
|
||
return compare_design_methods(
|
||
excel_path=state["excel_path"],
|
||
n_clusters_override=None,
|
||
interactive=False,
|
||
plot_results=False,
|
||
)
|
||
|
||
# 在后台线程运行计算任务
|
||
results = await run.io_bound(task)
|
||
|
||
state["results"] = results
|
||
if not state["excel_path"] and results:
|
||
if state["substation"] is None:
|
||
_, state["substation"] = generate_wind_farm_data(
|
||
n_turbines=30, layout="grid", spacing=800
|
||
)
|
||
|
||
# 计算完成后,自动寻找并显示最佳方案的拓扑图
|
||
best_res = None
|
||
if results:
|
||
# 默认推荐 Scenario 1 中成本最低的方案
|
||
scenario1_results = [r for r in results if "Scenario 1" in r["name"]]
|
||
if scenario1_results:
|
||
best_res = min(scenario1_results, key=lambda x: x["cost"])
|
||
else:
|
||
# 如果没有 Scenario 1,则回退到全局最优
|
||
best_res = min(results, key=lambda x: x["cost"])
|
||
|
||
update_plot(best_res)
|
||
ui.notify(
|
||
f"计算完成!已自动加载推荐方案: {best_res['name']}", type="positive"
|
||
)
|
||
|
||
# 更新结果表格
|
||
if refs["results_table"]:
|
||
table_data = []
|
||
best_row = None
|
||
for res in results:
|
||
name_display = res["name"]
|
||
is_best = False
|
||
if best_res and res["name"] == best_res["name"]:
|
||
name_display = f"(推荐) {name_display}"
|
||
is_best = True
|
||
|
||
# 生成备注信息
|
||
note = ""
|
||
original_name = res["name"]
|
||
|
||
# 识别算法
|
||
if "MST Method" in original_name:
|
||
note += "最小生成树算法(无容量约束基准); "
|
||
elif "Base" in original_name:
|
||
note += "基础扇区扫描(单次扫描); "
|
||
elif "Rotational" in original_name:
|
||
note += "旋转扫描优化(全局最优角度); "
|
||
elif "Esau-Williams" in original_name:
|
||
note += "Esau-Williams启发式算法(权衡距离与容量); "
|
||
|
||
# 识别电缆策略
|
||
if "Standard" in original_name:
|
||
note += "不包含可选电缆型号。"
|
||
elif "With Optional" in original_name:
|
||
note += "含可选电缆型号。"
|
||
elif "No Max" in original_name:
|
||
note += "不包含可选电缆型号,且可使用的最大截面电缆降一档截面。"
|
||
|
||
# 计算总长度(转换为公里)
|
||
total_length_m = sum(d["length"] for d in res["eval"]["details"])
|
||
total_length_km = total_length_m / 1000
|
||
|
||
row_dict = {
|
||
"name": name_display,
|
||
"cost_wan": f"{res['cost'] / 10000:.2f}",
|
||
"loss_kw": f"{res['loss']:.2f}",
|
||
"total_length": f"{total_length_km:.2f}",
|
||
"npv_loss_wan": f"{res.get('npv_loss', 0) / 10000:.2f}",
|
||
"total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}",
|
||
"note": note,
|
||
"original_name": res["name"],
|
||
}
|
||
table_data.append(row_dict)
|
||
if is_best:
|
||
best_row = row_dict
|
||
|
||
refs["results_table"].rows = table_data
|
||
# 初始选中推荐方案,实现自动高亮
|
||
if best_row:
|
||
refs["results_table"].selected = [best_row]
|
||
|
||
update_export_buttons()
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = "计算完成!"
|
||
except Exception as ex:
|
||
ui.notify(f"运行出错: {ex}", type="negative")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
finally:
|
||
log_timer.cancel()
|
||
process_log_queue()
|
||
processing_dialog.close()
|
||
|
||
with ui.dialog() as processing_dialog:
|
||
with ui.card().classes("w-96 items-center justify-center p-6"):
|
||
ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2")
|
||
ui.spinner(size="lg", color="primary")
|
||
refs["status_label"] = ui.label("准备中...").classes(
|
||
"mt-4 text-sm text-gray-700 font-medium"
|
||
)
|
||
with ui.expansion("查看实时日志", icon="terminal", value=True).classes(
|
||
"w-full mt-4 text-sm"
|
||
):
|
||
# 直接控制 log 组件的样式和滚动,去除 scroll_area 中间层
|
||
refs["log_box"] = ui.log(max_lines=100).classes(
|
||
"w-full h-32 overflow-y-auto p-2 bg-black text-xs font-mono text-green-400 leading-snug"
|
||
)
|
||
processing_dialog.props("persistent")
|
||
|
||
with ui.header().classes(
|
||
"bg-primary text-white p-4 shadow-lg items-center no-wrap"
|
||
):
|
||
with ui.column().classes("gap-0"):
|
||
ui.label(f"海上风电场集电线路设计优化系统 {VERSION}").classes(
|
||
"text-2xl font-bold"
|
||
)
|
||
with ui.column().classes("gap-0"):
|
||
ui.label("Wind Farm Collector System Design Optimizer").classes(
|
||
"text-sm opacity-80"
|
||
)
|
||
ui.space()
|
||
ui.label("中能建西北院海上能源业务开发部").classes("text-xl font-bold")
|
||
|
||
with ui.row().classes("w-full p-4 gap-4"):
|
||
with ui.card().classes("w-full p-4 shadow-md"):
|
||
ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2")
|
||
|
||
# 使用 items-stretch 确保所有子元素高度一致
|
||
with ui.row().classes("w-full items-stretch gap-4"):
|
||
# 1. 导出模板按钮
|
||
async def export_template(e):
|
||
import shutil
|
||
|
||
from generate_template import create_template
|
||
|
||
async def save_template(path):
|
||
# 生成模板到系统临时目录
|
||
temp_template = os.path.join(
|
||
state["temp_dir"], "coordinates_template.xlsx"
|
||
)
|
||
create_template(temp_template)
|
||
if os.path.exists(temp_template):
|
||
shutil.copy2(temp_template, path)
|
||
try:
|
||
os.remove(temp_template)
|
||
except:
|
||
pass
|
||
else:
|
||
raise FileNotFoundError("无法生成模板文件")
|
||
|
||
await save_file_with_dialog(
|
||
"coordinates.xlsx", save_template, "Excel Files (*.xlsx)", sender=e.sender
|
||
)
|
||
|
||
ui.button("导出 Excel 模板", on_click=export_template).classes(
|
||
"flex-1 py-4"
|
||
).props("icon=file_download outline color=primary")
|
||
|
||
# 2. 上传文件区域 (垂直堆叠 Label 和 Upload 组件)
|
||
with ui.column().classes("flex-1 gap-0 justify-between"):
|
||
# 使用 .no-list CSS 隐藏 Quasar 默认列表,完全自定义文件显示
|
||
refs["upload_widget"] = (
|
||
ui.upload(
|
||
label="选择Excel文件",
|
||
on_upload=handle_upload,
|
||
auto_upload=True,
|
||
)
|
||
.classes("w-full no-list h-full")
|
||
.props("flat bordered color=primary")
|
||
)
|
||
|
||
# 自定义文件显示容器
|
||
refs["current_file_container"] = ui.column().classes("w-full")
|
||
# 初始状态不显示任何内容,直到选择文件后才显示
|
||
# with refs["current_file_container"]:
|
||
# ui.label("未选择文件").classes("text-xs text-gray-500 italic ml-1")
|
||
|
||
# 3. 运行按钮
|
||
refs["run_btn"] = (
|
||
ui.button(
|
||
"运行方案对比",
|
||
on_click=run_analysis,
|
||
)
|
||
.classes("flex-1 py-4")
|
||
.props("icon=play_arrow color=secondary")
|
||
)
|
||
|
||
with ui.column().classes("w-full gap-4"):
|
||
# 新增:信息展示卡片
|
||
with (
|
||
ui.card()
|
||
.classes("w-full p-4 shadow-md")
|
||
.style("max-height: 400px; overflow-y: auto;")
|
||
):
|
||
refs["info_container"] = ui.column().classes("w-full")
|
||
with refs["info_container"]:
|
||
ui.label("请上传 Excel 文件以查看系统参数和电缆规格...").classes(
|
||
"text-gray-500 italic"
|
||
)
|
||
|
||
with ui.card().classes("w-full p-0 shadow-md overflow-hidden"):
|
||
with (
|
||
ui.expansion(
|
||
"方案对比结果 (点击行查看拓扑详情)",
|
||
icon="analytics",
|
||
value=True,
|
||
)
|
||
.classes("w-full")
|
||
.props('header-class="text-xl font-semibold"')
|
||
):
|
||
columns = [
|
||
{
|
||
"name": "name",
|
||
"label": "方案名称",
|
||
"field": "name",
|
||
"required": True,
|
||
"align": "left",
|
||
},
|
||
{
|
||
"name": "cost_wan",
|
||
"label": "总投资 (万元)",
|
||
"field": "cost_wan",
|
||
"sortable": True,
|
||
},
|
||
{
|
||
"name": "loss_kw",
|
||
"label": "线损 (kW)",
|
||
"field": "loss_kw",
|
||
"sortable": True,
|
||
},
|
||
{
|
||
"name": "total_length",
|
||
"label": "总长度/km",
|
||
"field": "total_length",
|
||
"sortable": True,
|
||
},
|
||
{
|
||
"name": "npv_loss_wan",
|
||
"label": "损耗费用净现值 (万元)",
|
||
"field": "npv_loss_wan",
|
||
"sortable": True,
|
||
},
|
||
{
|
||
"name": "total_cost_npv_wan",
|
||
"label": "总费用 (万元)",
|
||
"field": "total_cost_npv_wan",
|
||
"sortable": True,
|
||
},
|
||
{
|
||
"name": "note",
|
||
"label": "备注",
|
||
"field": "note",
|
||
"align": "left",
|
||
},
|
||
]
|
||
# 使用内置的 selection='single' 结合行点击事件实现背景高亮
|
||
# 这样可以完全由 Python 事件逻辑控制,不依赖 CSS 伪类
|
||
refs["results_table"] = ui.table(
|
||
columns=columns,
|
||
rows=[],
|
||
selection="single",
|
||
row_key="original_name",
|
||
).classes("w-full hide-selection-column")
|
||
refs["results_table"].on("row-click", handle_row_click)
|
||
with ui.card().classes("w-full p-4 shadow-md"):
|
||
ui.label("拓扑可视化").classes("text-xl font-semibold mb-2")
|
||
refs["plot_container"] = ui.column().classes("w-full items-center")
|
||
with ui.card().classes("w-full p-4 shadow-md"):
|
||
ui.label("导出与下载").classes("text-xl font-semibold mb-2")
|
||
refs["export_row"] = ui.row().classes("gap-4")
|
||
|
||
|
||
def find_available_port(start_port=8080, max_attempts=100):
|
||
"""尝试寻找可用的端口"""
|
||
import socket
|
||
|
||
for port in range(start_port, start_port + max_attempts):
|
||
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
|
||
try:
|
||
# 尝试绑定到 127.0.0.1,这是最常用的本地开发地址
|
||
s.bind(("127.0.0.1", port))
|
||
return port
|
||
except OSError:
|
||
continue
|
||
return start_port # 默认返回起始端口,让 ui.run 报错
|
||
|
||
|
||
# 自动寻找可用端口,避免端口冲突
|
||
target_port = find_available_port(8082) # 从 8082 开始,避开常用的 8080
|
||
|
||
# 检测是否为打包后的exe程序
|
||
import sys
|
||
|
||
if getattr(sys, "frozen", False):
|
||
# 修复无控制台模式下 stdout/stderr 为 None 导致的 logging 错误
|
||
class NullWriter:
|
||
def write(self, text):
|
||
pass
|
||
|
||
def flush(self):
|
||
pass
|
||
|
||
def isatty(self):
|
||
return False
|
||
|
||
if sys.stdout is None:
|
||
sys.stdout = NullWriter()
|
||
if sys.stderr is None:
|
||
sys.stderr = NullWriter()
|
||
|
||
# 打包环境下禁用日志配置,避免在无控制台模式下出现错误
|
||
import logging.config
|
||
|
||
logging.config.dictConfig(
|
||
{
|
||
"version": 1,
|
||
"disable_existing_loggers": True,
|
||
}
|
||
)
|
||
ui.run(
|
||
title="海上风电场集电线路优化",
|
||
host="127.0.0.1",
|
||
port=target_port,
|
||
reload=False,
|
||
window_size=(1280, 800),
|
||
native=True,
|
||
)
|
||
else:
|
||
# 普通使用环境保留日志功能
|
||
# ui.run(
|
||
# title="海上风电场集电线路优化",
|
||
# host="127.0.0.1",
|
||
# reload=True,
|
||
# port=target_port,
|
||
# native=False,
|
||
# )
|
||
ui.run(
|
||
title="海上风电场集电线路优化",
|
||
host="127.0.0.1",
|
||
port=target_port,
|
||
reload=True,
|
||
window_size=(1280, 800),
|
||
native=True,
|
||
)
|