Files
windfarm/gui.py
dmy 15d8f4881d feat: 改进文件保存对话框,支持跨平台系统原生保存
主要改进:
1. 新增 save_file_with_dialog 函数
   - 优先使用 PyWebview 原生模式保存对话框
   - 回退到 Tkinter 对话框(本地环境)
   - 最终回退到浏览器下载方式

2. 优化所有导出功能
   - Excel 对比表导出支持系统保存对话框
   - DXF 文件导出支持系统保存对话框
   - ZIP 批量导出支持系统保存对话框
   - 模板导出支持系统保存对话框

3. 代码质量改进
   - 统一异步函数命名规范(on_click_*)
   - 改进代码格式化和缩进
   - 添加详细的调试日志

4. 用户体验提升
   - 用户可以自由选择保存位置
   - 支持文件类型过滤
   - 自动处理文件名后缀
2026-01-05 21:32:46 +08:00

1033 lines
41 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import os
import sys
import io
import contextlib
import tempfile
import matplotlib.pyplot as plt
import matplotlib.backends.backend_svg
from nicegui import ui, events, app
from main import (
compare_design_methods,
export_to_dxf,
load_data_from_excel,
generate_wind_farm_data,
visualize_design,
export_all_scenarios_to_excel,
)
import pandas as pd
# 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
plt.rcParams["axes.unicode_minus"] = False
class Logger(io.StringIO):
def __init__(self, log_element):
super().__init__()
self.log_element = log_element
def write(self, message):
if message.strip():
self.log_element.push(message.strip())
super().write(message)
# 状态变量
# 优先从环境变量 PROJECT_TEMP_DIR 读取,否则使用系统默认临时目录
_base_temp = os.environ.get("PROJECT_TEMP_DIR", tempfile.gettempdir())
state = {
"excel_path": None,
"original_filename": None,
"results": [],
"substation": None,
"turbines": None,
"cable_specs": None,
"system_params": None,
"temp_dir": os.path.join(_base_temp, "windfarm_gui_uploads"),
}
# 确保临时目录存在
if not os.path.exists(state["temp_dir"]):
os.makedirs(state["temp_dir"], exist_ok=True)
@ui.page("/")
def index():
# 注入 CSS 隐藏表格自带的复选框列,并定义选中行的高亮背景色
ui.add_head_html(
"""
<style>
.hide-selection-column .q-table__selection { display: none; }
.hide-selection-column .q-table tr.selected { background-color: #e3f2fd !important; }
.no-list .q-uploader__list { display: none !important; }
</style>
"""
)
ui.query("body").style(
'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;'
)
# 定义 UI 元素引用容器,方便在函数中更新
refs = {
"log_box": None,
"results_table": None,
"plot_container": None,
"export_row": None,
"export_selected_btn": None, # 新增按钮引用
"status_label": None,
"upload_widget": None,
"run_btn": None,
"current_file_container": None, # 替换 label 为 container
"info_container": None, # 新增信息展示容器
}
def update_info_panel():
if refs["info_container"]:
refs["info_container"].clear()
with refs["info_container"]:
# System Params - Always show
with ui.row().classes("w-full items-center gap-4 mb-2"):
ui.icon("settings", color="primary").classes("text-2xl")
ui.label("系统参数").classes("text-lg font-bold")
params_text = []
# 获取电压
v = 66000 # Default
is_default_v = True
if (
state.get("system_params")
and "voltage" in state["system_params"]
):
v = state["system_params"]["voltage"]
is_default_v = False
v_str = f"电压: {v/1000:.1f} kV" if v >= 1000 else f"电压: {v} V"
if is_default_v:
v_str += " (默认)"
params_text.append(v_str)
# 获取功率因数
pf = 0.95 # Default
is_default_pf = True
if (
state.get("system_params")
and "power_factor" in state["system_params"]
):
pf = state["system_params"]["power_factor"]
is_default_pf = False
pf_str = f"功率因数: {pf}"
if is_default_pf:
pf_str += " (默认)"
params_text.append(pf_str)
for p in params_text:
ui.chip(p, icon="bolt").props("outline color=primary")
ui.separator().classes("my-2")
# Cables
if state.get("cable_specs"):
with ui.row().classes("w-full items-center gap-2 mb-2"):
ui.icon("cable", color="secondary").classes("text-2xl")
ui.label("电缆规格参数").classes("text-lg font-bold")
columns = [
{
"name": "section",
"label": "截面 (mm²)",
"field": "section",
"align": "center",
},
{
"name": "capacity",
"label": "载流量 (A)",
"field": "capacity",
"align": "center",
},
{
"name": "resistance",
"label": "电阻 (Ω/km)",
"field": "resistance",
"align": "center",
},
{
"name": "cost",
"label": "参考单价 (元/m)",
"field": "cost",
"align": "center",
},
]
rows = []
for spec in state["cable_specs"]:
# spec is (section, capacity, resistance, cost, is_optional)
rows.append(
{
"section": spec[0],
"capacity": spec[1],
"resistance": spec[2],
"cost": spec[3],
}
)
ui.table(columns=columns, rows=rows).classes("w-full").props(
"dense flat bordered"
)
else:
ui.label("未检测到电缆数据,将使用默认参数。").classes(
"text-gray-500 italic"
)
async def handle_upload(e: events.UploadEventArguments):
try:
filename = None
content = None
if hasattr(e, "name"):
filename = e.name
if hasattr(e, "content"):
content = e.content
if content is None and hasattr(e, "file"):
file_obj = e.file
if not filename:
filename = getattr(
file_obj, "name", getattr(file_obj, "filename", None)
)
if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"):
content = file_obj.file
elif hasattr(file_obj, "read"):
content = file_obj
if not filename:
filename = "uploaded_file.xlsx"
if content is None:
ui.notify("上传失败: 无法解析文件内容", type="negative")
return
# 清理旧文件,确保目录中只有一个文件
if os.path.exists(state["temp_dir"]):
for f in os.listdir(state["temp_dir"]):
try:
os.remove(os.path.join(state["temp_dir"], f))
except:
pass
path = os.path.join(state["temp_dir"], filename)
if hasattr(content, "seek"):
try:
content.seek(0)
except Exception:
pass
data = content.read()
import inspect
if inspect.iscoroutine(data):
data = await data
with open(path, "wb") as f:
f.write(data)
state["excel_path"] = path
state["original_filename"] = filename
ui.notify(f"文件已上传: {filename}", type="positive")
# 更新文件显示区域
if refs["current_file_container"]:
refs["current_file_container"].clear()
with refs["current_file_container"]:
with ui.row().classes(
"items-center w-full bg-blue-50 p-2 rounded border border-blue-200"
):
ui.icon("description", color="primary").classes("text-xl mr-2")
ui.label(filename).classes(
"font-medium text-gray-700 flex-grow"
)
ui.icon("check_circle", color="positive")
# 加载数据
try:
# 尝试解包 4 个返回值 (新版 main.py)
(
state["turbines"],
state["substation"],
state["cable_specs"],
state["system_params"],
) = load_data_from_excel(path)
except ValueError:
# 兼容旧版 (如果是 3 个返回值)
state["turbines"], state["substation"], state["cable_specs"] = (
load_data_from_excel(path)
)
state["system_params"] = {}
update_info_panel()
# 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用)
if refs["upload_widget"]:
refs["upload_widget"].reset()
except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)"):
"""
跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。
如果是浏览器模式(但在本地运行),尝试使用 tkinter 弹出对话框。
最后回退到使用 nicegui ui.download。
:param filename: 默认文件名
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
:param file_filter: 格式如 "Excel Files (*.xlsx)"
"""
# 检测是否为原生模式 (PyWebview)
is_native = False
native_window = None
try:
# 使用 getattr 安全获取 app.native避免属性不存在错误
# 并在 reload=True 时 native 可能未能正确初始化
n_obj = getattr(app, "native", None)
if n_obj and getattr(n_obj, "main_window", None):
is_native = True
native_window = n_obj.main_window
except Exception as e:
print(f"DEBUG: Native check error: {e}")
print(
f"DEBUG: save_file_with_dialog called. is_native={is_native}, filename={filename}"
)
if is_native and native_window:
try:
# PyWebview 的 create_file_dialog 的 file_types 参数期望一个字符串元组
# 格式如: ('Description (*.ext)', 'All files (*.*)')
file_types = (file_filter,)
print(f"DEBUG: calling create_file_dialog with types={file_types}")
# 在 Native 模式下create_file_dialog 是同步阻塞的
# 注意:必须使用 app.native.SAVE_DIALOG
save_path = native_window.create_file_dialog(
app.native.SAVE_DIALOG,
directory="",
save_filename=filename,
file_types=file_types,
)
print(f"DEBUG: save_path result: {save_path}")
# 用户取消
if not save_path:
return
# 处理返回类型 (PyWebview 可能返回字符串或列表)
if isinstance(save_path, (list, tuple)):
if not save_path:
return
save_path = save_path[0]
# 确保文件名后缀正确
if not save_path.lower().endswith(
os.path.splitext(filename)[1].lower()
):
save_path += os.path.splitext(filename)[1]
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return # 成功处理,退出
except Exception as e:
import traceback
traceback.print_exc()
print(f"ERROR in save_file_with_dialog (native): {e}")
# ui.notify(f"原生保存失败,尝试其他方式: {e}", type="warning")
print(f"原生保存失败,尝试其他方式: {e}")
# 继续向下执行,尝试 fallback
# 非 Native 模式 (或 Native 失败),尝试使用 Tkinter (仅限本地环境)
try:
import tkinter as tk
from tkinter import filedialog
from nicegui import run
print("DEBUG: Attempting Tkinter dialog...")
def get_save_path_tk(default_name, f_filter):
try:
# 创建隐藏的根窗口
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True) # 尝试置顶
# 转换 filter 格式: "Excel Files (*.xlsx)" -> [("Excel Files", "*.xlsx")]
filetypes = []
if "(" in f_filter and ")" in f_filter:
desc = f_filter.split("(")[0].strip()
ext = f_filter.split("(")[1].split(")")[0]
filetypes.append((desc, ext))
filetypes.append(("All files", "*.*"))
path = filedialog.asksaveasfilename(
initialfile=default_name, filetypes=filetypes, title="保存文件"
)
root.destroy()
return path
except Exception as ex:
print(f"Tkinter inner error: {ex}")
return None
# 在线程中运行 tkinter避免阻塞 asyncio 事件循环
save_path = await run.io_bound(get_save_path_tk, filename, file_filter)
if save_path:
print(f"DEBUG: Tkinter save_path: {save_path}")
# 确保文件名后缀正确
if not save_path.lower().endswith(
os.path.splitext(filename)[1].lower()
):
save_path += os.path.splitext(filename)[1]
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return # 成功处理
elif save_path is None:
print("DEBUG: Tkinter dialog cancelled or failed silently.")
# 如果是用户取消(返回空字符串),通常不需要回退到下载。
# 但这里如果 Tkinter 彻底失败返回 None可能需要回退。
# askopenfilename 返回空字符串表示取消。我们假设 None 是异常。
# 这里简化处理:只要没拿到路径且没报错,就认为是取消。
if save_path == "":
return
except Exception as e:
print(f"Tkinter dialog failed: {e}")
# Fallback to ui.download if tkinter fails
# 最后的回退方案:浏览器下载
print("DEBUG: Falling back to ui.download")
temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path)
ui.download(temp_path)
def update_export_buttons():
if refs["export_row"]:
refs["export_row"].clear()
if not state["results"] or not refs["export_row"]:
return
# 获取文件名基础前缀
file_prefix = "wind_farm"
default_excel_name = "wind_farm_design_result.xlsx"
# 确定主对比 Excel 的生成路径 (对应 main.py 中的生成逻辑)
if state.get("excel_path"):
file_prefix = os.path.splitext(state["original_filename"])[0]
default_excel_name = f"{file_prefix}_result.xlsx"
# 这里的路径是 main.py 中生成的源文件路径,用于复制
source_excel_path = os.path.join(
state["temp_dir"], f"{file_prefix}_design.xlsx"
)
else:
source_excel_path = "wind_farm_design.xlsx"
# 寻找推荐方案
scenario1_results = [r for r in state["results"] if "Scenario 1" in r["name"]]
if scenario1_results:
best_res = min(scenario1_results, key=lambda x: x["cost"])
else:
best_res = min(state["results"], key=lambda x: x["cost"])
with refs["export_row"]:
# --- 下载 Excel ---
async def save_excel(path):
import shutil
# 如果源文件存在,则复制到目标路径
if os.path.exists(source_excel_path):
shutil.copy2(source_excel_path, path)
else:
# 如果不存在,重新生成
export_all_scenarios_to_excel(state["results"], path)
async def on_click_excel():
await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)"
)
ui.button(
"下载 Excel 对比表",
on_click=on_click_excel,
).props("icon=download")
# --- 导出推荐方案 DXF ---
def export_best_dxf():
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
# 包装为 async 任务,并在 NiceGUI 事件循环中执行
async def run_save():
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
# 这里的 export_best_dxf 本身是普通函数,绑定到 on_click
# 但我们需要它执行异步操作。最简单的是让 export_best_dxf 变为 async
# 或者在这里直接调用 run_save (但这在普通函数里不行)
# 更好的方法是将 export_best_dxf 定义为 async如下所示
return run_save()
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
# 将 export_best_dxf 改为 async 并重命名,以便直接用作回调
async def on_click_best_dxf():
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
ui.button(
f'导出推荐方案 DXF ({best_res["name"]})', on_click=on_click_best_dxf
).props("icon=architecture color=accent")
# --- 导出选中方案 DXF ---
async def on_click_selected_dxf():
if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning")
return
selected_row = refs["results_table"].selected[0]
row_name = selected_row.get("original_name", selected_row.get("name"))
selected_res = next(
(r for r in state["results"] if r["name"] == row_name), None
)
if selected_res and state["substation"] is not None:
safe_name = "".join(
[
c
for c in selected_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
selected_res["turbines"],
state["substation"],
selected_res["eval"]["details"],
path,
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
else:
ui.notify(
"无法导出:未找到方案数据或缺少升压站信息", type="negative"
)
refs["export_selected_btn"] = ui.button(
"导出选中方案 DXF", on_click=on_click_selected_dxf
).props("icon=architecture color=primary")
clean_name = best_res["name"].replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
# --- 导出全部 ZIP ---
async def on_click_all_dxf():
if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning")
return
default_name = f"{file_prefix}_all_results.zip"
async def save_zip(path):
import zipfile
excel_result_name = f"{file_prefix}_summary.xlsx"
with zipfile.ZipFile(path, "w", zipfile.ZIP_DEFLATED) as zipf:
# 1. Excel
temp_excel = os.path.join(state["temp_dir"], "temp_export.xlsx")
export_all_scenarios_to_excel(state["results"], temp_excel)
zipf.write(temp_excel, arcname=excel_result_name)
try:
os.remove(temp_excel)
except:
pass
# 2. DXFs
for res in state["results"]:
safe_name = "".join(
[
c
for c in res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
dxf_name = os.path.join(
state["temp_dir"], f"{file_prefix}_{safe_name}.dxf"
)
export_to_dxf(
res["turbines"],
state["substation"],
res["eval"]["details"],
dxf_name,
)
zipf.write(dxf_name, arcname=os.path.basename(dxf_name))
try:
os.remove(dxf_name)
except:
pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)")
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary"
)
def update_plot(result):
if refs["plot_container"]:
refs["plot_container"].clear()
with refs["plot_container"]:
# 使用 ui.pyplot 上下文自动管理 figure 生命周期
with ui.pyplot(figsize=(10, 8)) as plot:
title = f"{result['name']}\nCost: ¥{result['cost']/10000:.2f}万 | Loss: {result['loss']:.2f} kW"
# 显式获取当前 ui.pyplot 创建的 axes并传递给绘图函数
# 确保绘图发生在正确的 figure 上
ax = plt.gca()
visualize_design(
result["turbines"],
state["substation"],
result["eval"]["details"],
title,
ax=ax,
)
async def handle_row_click(e):
# 获取被点击行的数据
row = e.args[1] if len(e.args) > 1 else None
if not row:
return
# 识别方案名称
row_name = row.get("original_name", row.get("name"))
if not row_name:
return
selected_res = next(
(r for r in state["results"] if r["name"] == row_name), None
)
if selected_res:
# 1. 更新拓扑图
update_plot(selected_res)
ui.notify(f"已切换至方案: {selected_res['name']}")
# 2. 通过设置 table 的 selected 属性来实现背景高亮
if refs["results_table"]:
refs["results_table"].selected = [row]
# 3. 更新“导出选中方案”按钮的文本
if refs["export_selected_btn"]:
clean_name = row_name.replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
from nicegui import run
import queue
async def run_analysis():
if not state["excel_path"]:
ui.notify("请先上传 Excel 坐标文件!", type="warning")
return
if refs["log_box"]:
refs["log_box"].clear()
log_queue = queue.Queue()
class QueueLogger(io.StringIO):
def write(self, message):
if message and message.strip():
log_queue.put(message.strip())
super().write(message)
def process_log_queue():
if refs["log_box"]:
new_msg = False
while not log_queue.empty():
try:
msg = log_queue.get_nowait()
refs["log_box"].push(msg)
new_msg = True
if msg.startswith("--- Scenario"):
scenario_name = msg.replace("---", "").strip()
if refs["status_label"]:
refs["status_label"].text = (
f"正在计算: {scenario_name}..."
)
elif "开始比较电缆方案" in msg:
if refs["status_label"]:
refs["status_label"].text = "准备开始计算..."
except queue.Empty:
break
if new_msg and refs["log_box"]:
# 使用 JS 直接滚动 log 元素到最底部
# 增加一个小延时确保内容渲染完成
ui.run_javascript(
f'var el = document.getElementById("c{refs["log_box"].id}"); if(el) {{ setTimeout(() => {{ el.scrollTop = el.scrollHeight; }}, 10); }}'
)
log_timer = ui.timer(0.1, process_log_queue)
if refs["status_label"]:
refs["status_label"].text = "初始化中..."
processing_dialog.open()
try:
# 2. 定义在线程中运行的任务
def task():
# 捕获 stdout 到我们的 QueueLogger
# 禁止 main.py 中的后台绘图,避免线程安全问题
with contextlib.redirect_stdout(QueueLogger()):
return compare_design_methods(
excel_path=state["excel_path"],
n_clusters_override=None,
interactive=False,
plot_results=False,
)
# 在后台线程运行计算任务
results = await run.io_bound(task)
state["results"] = results
if not state["excel_path"] and results:
if state["substation"] is None:
_, state["substation"] = generate_wind_farm_data(
n_turbines=30, layout="grid", spacing=800
)
# 计算完成后,自动寻找并显示最佳方案的拓扑图
best_res = None
if results:
# 默认推荐 Scenario 1 中成本最低的方案
scenario1_results = [r for r in results if "Scenario 1" in r["name"]]
if scenario1_results:
best_res = min(scenario1_results, key=lambda x: x["cost"])
else:
# 如果没有 Scenario 1则回退到全局最优
best_res = min(results, key=lambda x: x["cost"])
update_plot(best_res)
ui.notify(
f'计算完成!已自动加载推荐方案: {best_res["name"]}', type="positive"
)
# 更新结果表格
if refs["results_table"]:
table_data = []
best_row = None
for res in results:
name_display = res["name"]
is_best = False
if best_res and res["name"] == best_res["name"]:
name_display = f"(推荐) {name_display}"
is_best = True
# 生成备注信息
note = ""
original_name = res["name"]
# 识别算法
if "MST Method" in original_name:
note += "最小生成树算法(无容量约束基准); "
elif "Base" in original_name:
note += "基础扇区扫描(单次扫描); "
elif "Rotational" in original_name:
note += "旋转扫描优化(全局最优角度); "
elif "Esau-Williams" in original_name:
note += "Esau-Williams启发式算法(权衡距离与容量); "
# 识别电缆策略
if "Standard" in original_name:
note += "不包含可选电缆型号。"
elif "With Optional" in original_name:
note += "含可选电缆型号。"
elif "No Max" in original_name:
note += "不包含可选电缆型号,且可使用的最大截面电缆降一档截面。"
row_dict = {
"name": name_display,
"cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}",
"note": note,
"original_name": res["name"],
}
table_data.append(row_dict)
if is_best:
best_row = row_dict
refs["results_table"].rows = table_data
# 初始选中推荐方案,实现自动高亮
if best_row:
refs["results_table"].selected = [best_row]
update_export_buttons()
if refs["status_label"]:
refs["status_label"].text = "计算完成!"
except Exception as ex:
ui.notify(f"运行出错: {ex}", type="negative")
import traceback
traceback.print_exc()
finally:
log_timer.cancel()
process_log_queue()
processing_dialog.close()
with ui.dialog() as processing_dialog:
with ui.card().classes("w-96 items-center justify-center p-6"):
ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2")
ui.spinner(size="lg", color="primary")
refs["status_label"] = ui.label("准备中...").classes(
"mt-4 text-sm text-gray-700 font-medium"
)
with ui.expansion("查看实时日志", icon="terminal", value=True).classes(
"w-full mt-4 text-sm"
):
# 直接控制 log 组件的样式和滚动,去除 scroll_area 中间层
refs["log_box"] = ui.log(max_lines=100).classes(
"w-full h-32 overflow-y-auto p-2 bg-black text-xs font-mono text-green-400 leading-snug"
)
processing_dialog.props("persistent")
with ui.header().classes(
"bg-primary text-white p-4 shadow-lg items-center no-wrap"
):
with ui.column().classes("gap-0"):
ui.label("海上风电场集电线路设计优化系统 v1.0").classes(
"text-2xl font-bold"
)
with ui.column().classes("gap-0"):
ui.label("Wind Farm Collector System Design Optimizer").classes(
"text-sm opacity-80"
)
ui.space()
ui.label("中能建西北院海上能源业务开发部").classes("text-xl font-bold")
with ui.row().classes("w-full p-4 gap-4"):
with ui.card().classes("w-1/4 p-4 shadow-md"):
ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2")
async def export_template():
from generate_template import create_template
import shutil
async def save_template(path):
# 生成模板到当前目录
create_template()
source = "coordinates.xlsx"
if os.path.exists(source):
shutil.copy2(source, path)
else:
raise FileNotFoundError("无法生成模板文件")
await save_file_with_dialog(
"coordinates.xlsx",
save_template,
"Excel Files (*.xlsx)"
)
ui.button("导出 Excel 模板", on_click=export_template).classes(
"w-full mb-4"
).props("icon=file_download outline color=primary")
async def test_save_dialog():
async def dummy_callback(path):
# 仅作为测试,实际不写入文件,只弹出通知
ui.notify(f"测试成功!选定路径: {path}", type="info")
await save_file_with_dialog(
"test_save_dialog.txt", dummy_callback, "Text Files (*.txt)"
)
ui.button("测试对话框", on_click=test_save_dialog).classes(
"w-full mb-4"
).props("icon=bug_report outline color=orange")
ui.label("1. 上传坐标文件 (.xlsx)").classes("font-medium")
# 使用 .no-list CSS 隐藏 Quasar 默认列表,完全自定义文件显示
refs["upload_widget"] = ui.upload(
label="选择Excel文件", on_upload=handle_upload, auto_upload=True
).classes("w-full mb-2 no-list")
# 自定义文件显示容器
refs["current_file_container"] = ui.column().classes("w-full mb-4")
with refs["current_file_container"]:
ui.label("未选择文件").classes("text-xs text-gray-500 italic ml-1")
refs["run_btn"] = (
ui.button(
"运行方案对比",
on_click=run_analysis,
)
.classes("w-full mt-4 py-4")
.props("icon=play_arrow color=secondary")
)
with ui.column().classes("w-3/4 gap-4"):
# 新增:信息展示卡片
with (
ui.card()
.classes("w-full p-4 shadow-md")
.style("max-height: 400px; overflow-y: auto;")
):
refs["info_container"] = ui.column().classes("w-full")
ui.label("请上传 Excel 文件以查看系统参数和电缆规格...").classes(
"text-gray-500 italic"
)
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("方案对比结果 (点击行查看拓扑详情)").classes(
"text-xl font-semibold mb-2"
)
columns = [
{
"name": "name",
"label": "方案名称",
"field": "name",
"required": True,
"align": "left",
},
{
"name": "cost_wan",
"label": "总投资 (万元)",
"field": "cost_wan",
"sortable": True,
},
{
"name": "loss_kw",
"label": "线损 (kW)",
"field": "loss_kw",
"sortable": True,
},
{
"name": "note",
"label": "备注",
"field": "note",
"align": "left",
},
]
# 使用内置的 selection='single' 结合行点击事件实现背景高亮
# 这样可以完全由 Python 事件逻辑控制,不依赖 CSS 伪类
refs["results_table"] = ui.table(
columns=columns,
rows=[],
selection="single",
row_key="original_name",
).classes("w-full hide-selection-column")
refs["results_table"].on("row-click", handle_row_click)
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("拓扑可视化").classes("text-xl font-semibold mb-2")
refs["plot_container"] = ui.column().classes("w-full items-center")
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("导出与下载").classes("text-xl font-semibold mb-2")
refs["export_row"] = ui.row().classes("gap-4")
def find_available_port(start_port=8080, max_attempts=100):
"""尝试寻找可用的端口"""
import socket
for port in range(start_port, start_port + max_attempts):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
# 尝试绑定到 127.0.0.1,这是最常用的本地开发地址
s.bind(("127.0.0.1", port))
return port
except OSError:
continue
return start_port # 默认返回起始端口,让 ui.run 报错
# 自动寻找可用端口,避免端口冲突
target_port = find_available_port(8082) # 从 8082 开始,避开常用的 8080
# 检测是否为打包后的exe程序
import sys
if getattr(sys, "frozen", False):
# 修复无控制台模式下 stdout/stderr 为 None 导致的 logging 错误
class NullWriter:
def write(self, text):
pass
def flush(self):
pass
def isatty(self):
return False
if sys.stdout is None:
sys.stdout = NullWriter()
if sys.stderr is None:
sys.stderr = NullWriter()
# 打包环境下禁用日志配置,避免在无控制台模式下出现错误
import logging.config
logging.config.dictConfig(
{
"version": 1,
"disable_existing_loggers": True,
}
)
ui.run(
title="海上风电场集电线路优化",
host="127.0.0.1",
port=target_port,
reload=False,
window_size=(1280, 800),
native=True,
)
else:
# 普通使用环境保留日志功能
# ui.run(title="海上风电场集电线路优化", host='127.0.0.1', reload=True, port=target_port, native=False)
ui.run(
title="海上风电场集电线路优化",
host="127.0.0.1",
port=target_port,
reload=False,
window_size=(1280, 800),
native=True,
)