Files
windfarm/gui.py
dmy 837158270e fix: 优化文件保存对话框并启用原生窗口模式
- 添加 matplotlib.use('Agg') 设置非交互式后端
- 重构 save_file_with_dialog 函数,使用 PowerShell 原生对话框替代 Tkinter
- 解决 PyWebview/Tkinter 线程冲突导致的 PicklingError 问题
- 启用 native=True 原生窗口模式,提供更好的用户体验
2026-01-07 01:03:46 +08:00

1093 lines
44 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import contextlib
import io
import os
import sys
import tempfile
import matplotlib
matplotlib.use("Agg")
import matplotlib.backends.backend_svg
import matplotlib.pyplot as plt
import pandas as pd
from nicegui import app, events, ui
from main import (
compare_design_methods,
export_all_scenarios_to_excel,
export_to_dxf,
generate_wind_farm_data,
load_data_from_excel,
visualize_design,
)
# 尝试导入自动生成的版本号
try:
from version import VERSION
except ImportError:
VERSION = "v1.0"
# 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
plt.rcParams["axes.unicode_minus"] = False
class Logger(io.StringIO):
def __init__(self, log_element):
super().__init__()
self.log_element = log_element
def write(self, message):
if message.strip():
self.log_element.push(message.strip())
super().write(message)
# 状态变量
# 优先从环境变量 PROJECT_TEMP_DIR 读取,否则使用系统默认临时目录
_base_temp = os.environ.get("PROJECT_TEMP_DIR", tempfile.gettempdir())
state = {
"excel_path": None,
"original_filename": None,
"results": [],
"substation": None,
"turbines": None,
"cable_specs": None,
"system_params": None,
"temp_dir": os.path.join(_base_temp, "windfarm_gui_uploads"),
}
# 确保临时目录存在
if not os.path.exists(state["temp_dir"]):
os.makedirs(state["temp_dir"], exist_ok=True)
@ui.page("/")
def index():
# 注入 CSS 隐藏表格自带的复选框列,并定义选中行的高亮背景色
ui.add_head_html(
"""
<style>
.hide-selection-column .q-table__selection { display: none; }
.hide-selection-column .q-table tr.selected { background-color: #e3f2fd !important; }
.no-list .q-uploader__list { display: none !important; }
</style>
"""
)
ui.query("body").style(
'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;'
)
# 定义 UI 元素引用容器,方便在函数中更新
refs = {
"log_box": None,
"results_table": None,
"plot_container": None,
"export_row": None,
"export_selected_btn": None, # 新增按钮引用
"status_label": None,
"upload_widget": None,
"run_btn": None,
"current_file_container": None, # 替换 label 为 container
"info_container": None, # 新增信息展示容器
}
def update_info_panel():
if refs["info_container"]:
refs["info_container"].clear()
with refs["info_container"]:
# System Params - Always show
with ui.row().classes("w-full items-center gap-4 mb-2"):
ui.icon("settings", color="primary").classes("text-2xl")
ui.label("系统参数").classes("text-lg font-bold")
params_text = []
# 获取电压
v = 66000 # Default
is_default_v = True
if (
state.get("system_params")
and "voltage" in state["system_params"]
):
v = state["system_params"]["voltage"]
is_default_v = False
v_str = f"电压: {v / 1000:.1f} kV" if v >= 1000 else f"电压: {v} V"
if is_default_v:
v_str += " (默认)"
params_text.append(v_str)
# 获取功率因数
pf = 0.95 # Default
is_default_pf = True
if (
state.get("system_params")
and "power_factor" in state["system_params"]
):
pf = state["system_params"]["power_factor"]
is_default_pf = False
pf_str = f"功率因数: {pf}"
if is_default_pf:
pf_str += " (默认)"
params_text.append(pf_str)
# 获取电价
ep = 0.4 # Default
is_default_ep = True
if (
state.get("system_params")
and "electricity_price" in state["system_params"]
):
ep = state["system_params"]["electricity_price"]
is_default_ep = False
ep_str = f"电价: {ep} 元/kWh"
if is_default_ep:
ep_str += " (默认)"
params_text.append(ep_str)
# 获取工程运行期限
lifetime = 25 # Default
is_default_lifetime = True
if (
state.get("system_params")
and "project_lifetime" in state["system_params"]
):
lifetime = state["system_params"]["project_lifetime"]
is_default_lifetime = False
lifetime_str = f"工程运行期限: {lifetime}"
if is_default_lifetime:
lifetime_str += " (默认)"
params_text.append(lifetime_str)
# 获取折现率
discount_rate = 8 # Default
is_default_discount = True
if (
state.get("system_params")
and "discount_rate" in state["system_params"]
):
discount_rate = state["system_params"]["discount_rate"]
is_default_discount = False
discount_str = f"折现率: {discount_rate}%"
if is_default_discount:
discount_str += " (默认)"
params_text.append(discount_str)
# 获取年损耗小时数
annual_hours = 1400 # Default
is_default_hours = True
if (
state.get("system_params")
and "annual_loss_hours" in state["system_params"]
):
annual_hours = state["system_params"]["annual_loss_hours"]
is_default_hours = False
hours_str = f"年损耗小时数: {annual_hours} 小时"
if is_default_hours:
hours_str += " (默认)"
params_text.append(hours_str)
for p in params_text:
ui.chip(p, icon="bolt").props("outline color=primary")
ui.separator().classes("my-2")
# Cables
if state.get("cable_specs"):
with ui.row().classes("w-full items-center gap-2 mb-2"):
ui.icon("cable", color="secondary").classes("text-2xl")
ui.label("电缆规格参数").classes("text-lg font-bold")
columns = [
{
"name": "section",
"label": "截面 (mm²)",
"field": "section",
"align": "center",
},
{
"name": "capacity",
"label": "载流量 (A)",
"field": "capacity",
"align": "center",
},
{
"name": "resistance",
"label": "电阻 (Ω/km)",
"field": "resistance",
"align": "center",
},
{
"name": "cost",
"label": "参考单价(万元/km)",
"field": "cost",
"align": "center",
},
{
"name": "is_optional",
"label": "是否为可选",
"field": "is_optional",
"align": "center",
},
]
rows = []
for spec in state["cable_specs"]:
# spec is (section, capacity, resistance, cost, is_optional)
rows.append(
{
"section": spec[0],
"capacity": spec[1],
"resistance": spec[2],
"cost": f"{spec[3] / 10:.2f}"
if spec[3] is not None
else "0.00",
"is_optional": "Y" if len(spec) > 4 and spec[4] else "",
}
)
ui.table(columns=columns, rows=rows).classes("w-full").props(
"dense flat bordered"
)
else:
ui.label("未检测到电缆数据,将使用默认参数。").classes(
"text-gray-500 italic"
)
async def handle_upload(e: events.UploadEventArguments):
try:
filename = None
content = None
if hasattr(e, "name"):
filename = e.name
if hasattr(e, "content"):
content = e.content
if content is None and hasattr(e, "file"):
file_obj = e.file
if not filename:
filename = getattr(
file_obj, "name", getattr(file_obj, "filename", None)
)
if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"):
content = file_obj.file
elif hasattr(file_obj, "read"):
content = file_obj
if not filename:
filename = "uploaded_file.xlsx"
if content is None:
ui.notify("上传失败: 无法解析文件内容", type="negative")
return
# 清理旧文件,确保目录中只有一个文件
if os.path.exists(state["temp_dir"]):
for f in os.listdir(state["temp_dir"]):
try:
os.remove(os.path.join(state["temp_dir"], f))
except:
pass
path = os.path.join(state["temp_dir"], filename)
if hasattr(content, "seek"):
try:
content.seek(0)
except Exception:
pass
data = content.read()
import inspect
if inspect.iscoroutine(data):
data = await data
with open(path, "wb") as f:
f.write(data)
state["excel_path"] = path
state["original_filename"] = filename
ui.notify(f"文件已上传: {filename}", type="positive")
# 更新文件显示区域
if refs["current_file_container"]:
# refs["current_file_container"].set_visibility(True)
refs["current_file_container"].clear()
with refs["current_file_container"]:
with ui.row().classes(
"items-center w-full bg-blue-50 p-2 rounded border border-blue-200"
):
ui.icon("description", color="primary").classes("text-xl mr-2")
ui.label(filename).classes(
"font-medium text-gray-700 flex-grow"
)
ui.icon("check_circle", color="positive")
# 加载数据
try:
# 尝试解包 4 个返回值 (新版 main.py)
(
state["turbines"],
state["substation"],
state["cable_specs"],
state["system_params"],
) = load_data_from_excel(path)
except ValueError:
# 兼容旧版 (如果是 3 个返回值)
state["turbines"], state["substation"], state["cable_specs"] = (
load_data_from_excel(path)
)
state["system_params"] = {}
update_info_panel()
# 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用)
if refs["upload_widget"]:
refs["upload_widget"].reset()
except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)"):
"""
跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。
如果是浏览器模式(但在本地运行),尝试使用 tkinter 弹出对话框。
最后回退到使用 nicegui ui.download。
:param filename: 默认文件名
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
:param file_filter: 格式如 "Excel Files (*.xlsx)"
"""
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
# 这完全避免了 Python UI 库 (Tkinter/PyWebview) 的线程冲突和 PicklingError
import platform
import subprocess
if platform.system() == "Windows":
try:
# 构建 PowerShell 脚本
# 注意:过滤器格式为 "描述|*.ext|所有文件|*.*"
ps_filter = file_filter.replace("(", "|").replace(")", "")
if "|" not in ps_filter:
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
# 这里做一个简化的映射,确保格式正确
if "Excel" in file_filter:
ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*"
elif "DXF" in file_filter:
ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*"
elif "ZIP" in file_filter:
ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*"
else:
ps_filter = "All Files (*.*)|*.*"
ps_script = f"""
Add-Type -AssemblyName System.Windows.Forms
$d = New-Object System.Windows.Forms.SaveFileDialog
$d.Filter = "{ps_filter}"
$d.FileName = "{filename}"
$d.Title = "保存文件"
if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{
Write-Output $d.FileName
}}
"""
# 运行 PowerShell
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
print("DEBUG: invoking PowerShell SaveFileDialog...")
# 使用 run.io_bound 在线程中执行,避免阻塞 UI
def run_ps():
result = subprocess.run(
["powershell", "-Command", ps_script],
capture_output=True,
text=True,
startupinfo=startupinfo
)
return result.stdout.strip()
from nicegui import run
save_path = await run.io_bound(run_ps)
if save_path:
print(f"DEBUG: PowerShell returned path: {save_path}")
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
except Exception as e:
print(f"PowerShell dialog failed: {e}")
# 出错则回退到 ui.download
# 统一回退方案:浏览器下载
print("DEBUG: Using ui.download fallback")
temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path)
ui.download(temp_path)
def update_export_buttons():
if refs["export_row"]:
refs["export_row"].clear()
if not state["results"] or not refs["export_row"]:
return
# 获取文件名基础前缀
file_prefix = "wind_farm"
default_excel_name = "wind_farm_design_result.xlsx"
# 确定主对比 Excel 的生成路径 (对应 main.py 中的生成逻辑)
if state.get("excel_path"):
file_prefix = os.path.splitext(state["original_filename"])[0]
default_excel_name = f"{file_prefix}_result.xlsx"
# 这里的路径是 main.py 中生成的源文件路径,用于复制
source_excel_path = os.path.join(
state["temp_dir"], f"{file_prefix}_design.xlsx"
)
else:
source_excel_path = "wind_farm_design.xlsx"
# 寻找推荐方案
scenario1_results = [r for r in state["results"] if "Scenario 1" in r["name"]]
if scenario1_results:
best_res = min(scenario1_results, key=lambda x: x["cost"])
else:
best_res = min(state["results"], key=lambda x: x["cost"])
with refs["export_row"]:
# --- 下载 Excel ---
async def save_excel(path):
import shutil
# 如果源文件存在,则复制到目标路径
if os.path.exists(source_excel_path):
shutil.copy2(source_excel_path, path)
else:
# 如果不存在,重新生成
export_all_scenarios_to_excel(state["results"], path)
async def on_click_excel():
await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)"
)
ui.button(
"下载 Excel 对比表",
on_click=on_click_excel,
).props("icon=download")
# --- 导出推荐方案 DXF ---
def export_best_dxf():
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
# 包装为 async 任务,并在 NiceGUI 事件循环中执行
async def run_save():
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
# 这里的 export_best_dxf 本身是普通函数,绑定到 on_click
# 但我们需要它执行异步操作。最简单的是让 export_best_dxf 变为 async
# 或者在这里直接调用 run_save (但这在普通函数里不行)
# 更好的方法是将 export_best_dxf 定义为 async如下所示
return run_save()
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
# 将 export_best_dxf 改为 async 并重命名,以便直接用作回调
async def on_click_best_dxf():
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
ui.button(
f"导出推荐方案 DXF ({best_res['name']})", on_click=on_click_best_dxf
).props("icon=architecture color=accent")
# --- 导出选中方案 DXF ---
async def on_click_selected_dxf():
if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning")
return
selected_row = refs["results_table"].selected[0]
row_name = selected_row.get("original_name", selected_row.get("name"))
selected_res = next(
(r for r in state["results"] if r["name"] == row_name), None
)
if selected_res and state["substation"] is not None:
safe_name = "".join(
[
c
for c in selected_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
selected_res["turbines"],
state["substation"],
selected_res["eval"]["details"],
path,
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
else:
ui.notify(
"无法导出:未找到方案数据或缺少升压站信息", type="negative"
)
refs["export_selected_btn"] = ui.button(
"导出选中方案 DXF", on_click=on_click_selected_dxf
).props("icon=architecture color=primary")
clean_name = best_res["name"].replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
# --- 导出全部 ZIP ---
async def on_click_all_dxf():
if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning")
return
default_name = f"{file_prefix}_all_results.zip"
async def save_zip(path):
import zipfile
excel_result_name = f"{file_prefix}_summary.xlsx"
with zipfile.ZipFile(path, "w", zipfile.ZIP_DEFLATED) as zipf:
# 1. Excel
temp_excel = os.path.join(state["temp_dir"], "temp_export.xlsx")
export_all_scenarios_to_excel(state["results"], temp_excel)
zipf.write(temp_excel, arcname=excel_result_name)
try:
os.remove(temp_excel)
except:
pass
# 2. DXFs
for res in state["results"]:
safe_name = "".join(
[
c
for c in res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
dxf_name = os.path.join(
state["temp_dir"], f"{file_prefix}_{safe_name}.dxf"
)
export_to_dxf(
res["turbines"],
state["substation"],
res["eval"]["details"],
dxf_name,
)
zipf.write(dxf_name, arcname=os.path.basename(dxf_name))
try:
os.remove(dxf_name)
except:
pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)")
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary"
)
def update_plot(result):
if refs["plot_container"]:
refs["plot_container"].clear()
with refs["plot_container"]:
# 使用 ui.pyplot 上下文自动管理 figure 生命周期
with ui.pyplot(figsize=(10, 8)) as plot:
title = f"{result['name']}\nCost: ¥{result['cost'] / 10000:.2f}万 | Loss: {result['loss']:.2f} kW"
# 显式获取当前 ui.pyplot 创建的 axes并传递给绘图函数
# 确保绘图发生在正确的 figure 上
ax = plt.gca()
visualize_design(
result["turbines"],
state["substation"],
result["eval"]["details"],
title,
ax=ax,
)
async def handle_row_click(e):
# 获取被点击行的数据
row = e.args[1] if len(e.args) > 1 else None
if not row:
return
# 识别方案名称
row_name = row.get("original_name", row.get("name"))
if not row_name:
return
selected_res = next(
(r for r in state["results"] if r["name"] == row_name), None
)
if selected_res:
# 1. 更新拓扑图
update_plot(selected_res)
ui.notify(f"已切换至方案: {selected_res['name']}")
# 2. 通过设置 table 的 selected 属性来实现背景高亮
if refs["results_table"]:
refs["results_table"].selected = [row]
# 3. 更新“导出选中方案”按钮的文本
if refs["export_selected_btn"]:
clean_name = row_name.replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
import queue
from nicegui import run
async def run_analysis():
if not state["excel_path"]:
ui.notify("请先上传 Excel 坐标文件!", type="warning")
return
if refs["log_box"]:
refs["log_box"].clear()
log_queue = queue.Queue()
class QueueLogger(io.StringIO):
def write(self, message):
if message and message.strip():
log_queue.put(message.strip())
super().write(message)
def process_log_queue():
if refs["log_box"]:
new_msg = False
while not log_queue.empty():
try:
msg = log_queue.get_nowait()
refs["log_box"].push(msg)
new_msg = True
if msg.startswith("--- Scenario"):
scenario_name = msg.replace("---", "").strip()
if refs["status_label"]:
refs[
"status_label"
].text = f"正在计算: {scenario_name}..."
elif "开始比较电缆方案" in msg:
if refs["status_label"]:
refs["status_label"].text = "准备开始计算..."
except queue.Empty:
break
if new_msg and refs["log_box"]:
# 使用 JS 直接滚动 log 元素到最底部
# 增加一个小延时确保内容渲染完成
ui.run_javascript(
f'var el = document.getElementById("c{refs["log_box"].id}"); if(el) {{ setTimeout(() => {{ el.scrollTop = el.scrollHeight; }}, 10); }}'
)
log_timer = ui.timer(0.1, process_log_queue)
if refs["status_label"]:
refs["status_label"].text = "初始化中..."
processing_dialog.open()
try:
# 2. 定义在线程中运行的任务
def task():
# 捕获 stdout 到我们的 QueueLogger
# 禁止 main.py 中的后台绘图,避免线程安全问题
with contextlib.redirect_stdout(QueueLogger()):
return compare_design_methods(
excel_path=state["excel_path"],
n_clusters_override=None,
interactive=False,
plot_results=False,
)
# 在后台线程运行计算任务
results = await run.io_bound(task)
state["results"] = results
if not state["excel_path"] and results:
if state["substation"] is None:
_, state["substation"] = generate_wind_farm_data(
n_turbines=30, layout="grid", spacing=800
)
# 计算完成后,自动寻找并显示最佳方案的拓扑图
best_res = None
if results:
# 默认推荐 Scenario 1 中成本最低的方案
scenario1_results = [r for r in results if "Scenario 1" in r["name"]]
if scenario1_results:
best_res = min(scenario1_results, key=lambda x: x["cost"])
else:
# 如果没有 Scenario 1则回退到全局最优
best_res = min(results, key=lambda x: x["cost"])
update_plot(best_res)
ui.notify(
f"计算完成!已自动加载推荐方案: {best_res['name']}", type="positive"
)
# 更新结果表格
if refs["results_table"]:
table_data = []
best_row = None
for res in results:
name_display = res["name"]
is_best = False
if best_res and res["name"] == best_res["name"]:
name_display = f"(推荐) {name_display}"
is_best = True
# 生成备注信息
note = ""
original_name = res["name"]
# 识别算法
if "MST Method" in original_name:
note += "最小生成树算法(无容量约束基准); "
elif "Base" in original_name:
note += "基础扇区扫描(单次扫描); "
elif "Rotational" in original_name:
note += "旋转扫描优化(全局最优角度); "
elif "Esau-Williams" in original_name:
note += "Esau-Williams启发式算法(权衡距离与容量); "
# 识别电缆策略
if "Standard" in original_name:
note += "不包含可选电缆型号。"
elif "With Optional" in original_name:
note += "含可选电缆型号。"
elif "No Max" in original_name:
note += "不包含可选电缆型号,且可使用的最大截面电缆降一档截面。"
# 计算总长度(转换为公里)
total_length_m = sum(d["length"] for d in res["eval"]["details"])
total_length_km = total_length_m / 1000
row_dict = {
"name": name_display,
"cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}",
"total_length": f"{total_length_km:.2f}",
"total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}",
"note": note,
"original_name": res["name"],
}
table_data.append(row_dict)
if is_best:
best_row = row_dict
refs["results_table"].rows = table_data
# 初始选中推荐方案,实现自动高亮
if best_row:
refs["results_table"].selected = [best_row]
update_export_buttons()
if refs["status_label"]:
refs["status_label"].text = "计算完成!"
except Exception as ex:
ui.notify(f"运行出错: {ex}", type="negative")
import traceback
traceback.print_exc()
finally:
log_timer.cancel()
process_log_queue()
processing_dialog.close()
with ui.dialog() as processing_dialog:
with ui.card().classes("w-96 items-center justify-center p-6"):
ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2")
ui.spinner(size="lg", color="primary")
refs["status_label"] = ui.label("准备中...").classes(
"mt-4 text-sm text-gray-700 font-medium"
)
with ui.expansion("查看实时日志", icon="terminal", value=True).classes(
"w-full mt-4 text-sm"
):
# 直接控制 log 组件的样式和滚动,去除 scroll_area 中间层
refs["log_box"] = ui.log(max_lines=100).classes(
"w-full h-32 overflow-y-auto p-2 bg-black text-xs font-mono text-green-400 leading-snug"
)
processing_dialog.props("persistent")
with ui.header().classes(
"bg-primary text-white p-4 shadow-lg items-center no-wrap"
):
with ui.column().classes("gap-0"):
ui.label(f"海上风电场集电线路设计优化系统 {VERSION}").classes(
"text-2xl font-bold"
)
with ui.column().classes("gap-0"):
ui.label("Wind Farm Collector System Design Optimizer").classes(
"text-sm opacity-80"
)
ui.space()
ui.label("中能建西北院海上能源业务开发部").classes("text-xl font-bold")
with ui.row().classes("w-full p-4 gap-4"):
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2")
# 使用 items-stretch 确保所有子元素高度一致
with ui.row().classes("w-full items-stretch gap-4"):
# 1. 导出模板按钮
async def export_template():
import shutil
from generate_template import create_template
async def save_template(path):
# 生成模板到系统临时目录
temp_template = os.path.join(
state["temp_dir"], "coordinates_template.xlsx"
)
create_template(temp_template)
if os.path.exists(temp_template):
shutil.copy2(temp_template, path)
try:
os.remove(temp_template)
except:
pass
else:
raise FileNotFoundError("无法生成模板文件")
await save_file_with_dialog(
"coordinates.xlsx", save_template, "Excel Files (*.xlsx)"
)
ui.button("导出 Excel 模板", on_click=export_template).classes(
"flex-1 py-4"
).props("icon=file_download outline color=primary")
# 2. 上传文件区域 (垂直堆叠 Label 和 Upload 组件)
with ui.column().classes("flex-1 gap-0 justify-between"):
# 使用 .no-list CSS 隐藏 Quasar 默认列表,完全自定义文件显示
refs["upload_widget"] = (
ui.upload(
label="选择Excel文件",
on_upload=handle_upload,
auto_upload=True,
)
.classes("w-full no-list h-full")
.props("flat bordered color=primary")
)
# 自定义文件显示容器
refs["current_file_container"] = ui.column().classes("w-full")
# 初始状态不显示任何内容,直到选择文件后才显示
# with refs["current_file_container"]:
# ui.label("未选择文件").classes("text-xs text-gray-500 italic ml-1")
# 3. 运行按钮
refs["run_btn"] = (
ui.button(
"运行方案对比",
on_click=run_analysis,
)
.classes("flex-1 py-4")
.props("icon=play_arrow color=secondary")
)
with ui.column().classes("w-full gap-4"):
# 新增:信息展示卡片
with (
ui.card()
.classes("w-full p-4 shadow-md")
.style("max-height: 400px; overflow-y: auto;")
):
refs["info_container"] = ui.column().classes("w-full")
with refs["info_container"]:
ui.label("请上传 Excel 文件以查看系统参数和电缆规格...").classes(
"text-gray-500 italic"
)
with ui.card().classes("w-full p-0 shadow-md overflow-hidden"):
with (
ui.expansion(
"方案对比结果 (点击行查看拓扑详情)",
icon="analytics",
value=True,
)
.classes("w-full")
.props('header-class="text-xl font-semibold"')
):
columns = [
{
"name": "name",
"label": "方案名称",
"field": "name",
"required": True,
"align": "left",
},
{
"name": "cost_wan",
"label": "总投资 (万元)",
"field": "cost_wan",
"sortable": True,
},
{
"name": "loss_kw",
"label": "线损 (kW)",
"field": "loss_kw",
"sortable": True,
},
{
"name": "total_length",
"label": "总长度/km",
"field": "total_length",
"sortable": True,
},
{
"name": "total_cost_npv_wan",
"label": "总费用 (万元)",
"field": "total_cost_npv_wan",
"sortable": True,
},
{
"name": "note",
"label": "备注",
"field": "note",
"align": "left",
},
]
# 使用内置的 selection='single' 结合行点击事件实现背景高亮
# 这样可以完全由 Python 事件逻辑控制,不依赖 CSS 伪类
refs["results_table"] = ui.table(
columns=columns,
rows=[],
selection="single",
row_key="original_name",
).classes("w-full hide-selection-column")
refs["results_table"].on("row-click", handle_row_click)
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("拓扑可视化").classes("text-xl font-semibold mb-2")
refs["plot_container"] = ui.column().classes("w-full items-center")
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("导出与下载").classes("text-xl font-semibold mb-2")
refs["export_row"] = ui.row().classes("gap-4")
def find_available_port(start_port=8080, max_attempts=100):
"""尝试寻找可用的端口"""
import socket
for port in range(start_port, start_port + max_attempts):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
# 尝试绑定到 127.0.0.1,这是最常用的本地开发地址
s.bind(("127.0.0.1", port))
return port
except OSError:
continue
return start_port # 默认返回起始端口,让 ui.run 报错
# 自动寻找可用端口,避免端口冲突
target_port = find_available_port(8082) # 从 8082 开始,避开常用的 8080
# 检测是否为打包后的exe程序
import sys
if getattr(sys, "frozen", False):
# 修复无控制台模式下 stdout/stderr 为 None 导致的 logging 错误
class NullWriter:
def write(self, text):
pass
def flush(self):
pass
def isatty(self):
return False
if sys.stdout is None:
sys.stdout = NullWriter()
if sys.stderr is None:
sys.stderr = NullWriter()
# 打包环境下禁用日志配置,避免在无控制台模式下出现错误
import logging.config
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": True,
}
)
ui.run(
title="海上风电场集电线路优化",
host="127.0.0.1",
port=target_port,
reload=False,
window_size=(1280, 800),
native=True,
)
else:
# 普通使用环境保留日志功能
# ui.run(
# title="海上风电场集电线路优化",
# host="127.0.0.1",
# reload=True,
# port=target_port,
# native=False,
# )
ui.run(
title="海上风电场集电线路优化",
host="127.0.0.1",
port=target_port,
reload=True,
window_size=(1280, 800),
native=True,
)