Files
windfarm/gui.py

612 lines
24 KiB
Python
Raw Normal View History

import os
import sys
import io
import contextlib
import tempfile
import matplotlib.pyplot as plt
from nicegui import ui, events
from main import (
compare_design_methods,
export_to_dxf,
load_data_from_excel,
generate_wind_farm_data,
visualize_design,
export_all_scenarios_to_excel,
)
import pandas as pd
# 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
plt.rcParams["axes.unicode_minus"] = False
class Logger(io.StringIO):
def __init__(self, log_element):
super().__init__()
self.log_element = log_element
def write(self, message):
if message.strip():
self.log_element.push(message.strip())
super().write(message)
# 状态变量
state = {
"excel_path": None,
"original_filename": None,
"results": [],
"substation": None,
"turbines": None,
"temp_dir": os.path.join(tempfile.gettempdir(), "windfarm_gui_uploads"),
}
# 确保临时目录存在
if not os.path.exists(state["temp_dir"]):
os.makedirs(state["temp_dir"], exist_ok=True)
@ui.page("/")
def index():
# 注入 CSS 隐藏表格自带的复选框列,并定义选中行的高亮背景色
ui.add_head_html("""
<style>
.hide-selection-column .q-table__selection { display: none; }
.hide-selection-column .q-table tr.selected { background-color: #e3f2fd !important; }
</style>
""")
ui.query("body").style(
'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;'
)
# 定义 UI 元素引用容器,方便在函数中更新
refs = {
"log_box": None,
"results_table": None,
"plot_container": None,
"export_row": None,
"export_selected_btn": None, # 新增按钮引用
"status_label": None,
"upload_widget": None,
"run_btn": None,
"current_file_label": None,
}
async def handle_upload(e: events.UploadEventArguments):
try:
filename = None
content = None
if hasattr(e, "name"):
filename = e.name
if hasattr(e, "content"):
content = e.content
if content is None and hasattr(e, "file"):
file_obj = e.file
if not filename:
filename = getattr(
file_obj, "name", getattr(file_obj, "filename", None)
)
if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"):
content = file_obj.file
elif hasattr(file_obj, "read"):
content = file_obj
if not filename:
filename = "uploaded_file.xlsx"
if content is None:
ui.notify("上传失败: 无法解析文件内容", type="negative")
return
# 清理旧文件,确保目录中只有一个文件
if os.path.exists(state["temp_dir"]):
for f in os.listdir(state["temp_dir"]):
try:
os.remove(os.path.join(state["temp_dir"], f))
except:
pass
path = os.path.join(state["temp_dir"], filename)
if hasattr(content, "seek"):
try:
content.seek(0)
except Exception:
pass
data = content.read()
import inspect
if inspect.iscoroutine(data):
data = await data
with open(path, "wb") as f:
f.write(data)
state["excel_path"] = path
state["original_filename"] = filename
ui.notify(f"文件已上传: {filename}", type="positive")
if refs["current_file_label"]:
refs["current_file_label"].text = f"当前文件: {filename}"
# 加载数据
state["turbines"], state["substation"], _ = load_data_from_excel(path)
except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative")
def update_export_buttons():
if refs["export_row"]:
refs["export_row"].clear()
if not state["results"] or not refs["export_row"]:
return
# 获取带 _result 后缀的文件名
download_name = "wind_farm_design_result.xlsx"
if state.get("original_filename"):
name_no_ext = os.path.splitext(state["original_filename"])[0]
download_name = f"{name_no_ext}_result.xlsx"
# 寻找推荐方案:优先 Scenario 1 的最低成本,否则取全局最低成本
scenario1_results = [r for r in state["results"] if "Scenario 1" in r["name"]]
if scenario1_results:
best_res = min(scenario1_results, key=lambda x: x["cost"])
else:
best_res = min(state["results"], key=lambda x: x["cost"])
with refs["export_row"]:
ui.button(
"下载 Excel 对比表",
on_click=lambda: ui.download("wind_farm_design.xlsx", download_name),
).props("icon=download")
def export_best_dxf():
if state["substation"] is not None:
# 生成安全的文件名
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
dxf_name = f"design_best_{safe_name}.dxf"
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
dxf_name,
)
ui.download(dxf_name)
ui.notify(f'已导出推荐方案: {best_res["name"]}', type="positive")
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
ui.button(
f'导出推荐方案 DXF ({best_res["name"]})', on_click=export_best_dxf
).props("icon=architecture color=accent")
def export_selected_dxf():
if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning")
return
selected_row = refs["results_table"].selected[0]
row_name = selected_row.get("original_name", selected_row.get("name"))
selected_res = next(
(r for r in state["results"] if r["name"] == row_name), None
)
if selected_res and state["substation"] is not None:
safe_name = "".join(
[
c
for c in selected_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
dxf_name = f"design_{safe_name}.dxf"
export_to_dxf(
selected_res["turbines"],
state["substation"],
selected_res["eval"]["details"],
dxf_name,
)
ui.download(dxf_name)
ui.notify(f'已导出选中方案: {selected_res["name"]}', type="positive")
else:
ui.notify("无法导出:未找到方案数据或缺少升压站信息", type="negative")
# 记录此按钮引用,以便 handle_row_click 更新文字
refs["export_selected_btn"] = ui.button(
"导出选中方案 DXF", on_click=export_selected_dxf
).props("icon=architecture color=primary")
# 初始化按钮文字
clean_name = best_res["name"].replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
def export_all_dxf():
if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning")
return
import zipfile
# 1. 确定文件名
zip_filename = "all_designs_result.zip"
excel_result_name = "wind_farm_design_result.xlsx"
# 推断 main.py 生成的原始 Excel 路径
generated_excel_path = "wind_farm_design.xlsx"
if state.get("original_filename"):
name_no_ext = os.path.splitext(state["original_filename"])[0]
zip_filename = f"{name_no_ext}_result.zip"
excel_result_name = f"{name_no_ext}_result.xlsx"
if state.get("excel_path"):
dir_name = os.path.dirname(state["excel_path"])
generated_excel_path = os.path.join(
dir_name, f"{name_no_ext}_design.xlsx"
)
try:
with zipfile.ZipFile(
zip_filename, "w", zipfile.ZIP_DEFLATED
) as zipf:
# 2. 添加 Excel 结果表
if os.path.exists(generated_excel_path):
zipf.write(generated_excel_path, arcname=excel_result_name)
else:
# 尝试重新生成
try:
temp_excel = "temp_export.xlsx"
export_all_scenarios_to_excel(
state["results"], temp_excel
)
zipf.write(temp_excel, arcname=excel_result_name)
os.remove(temp_excel)
except Exception as e:
print(f"生成Excel失败: {e}")
# 3. 添加所有 DXF
for res in state["results"]:
safe_name = "".join(
[
c
for c in res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
dxf_name = f"design_{safe_name}.dxf"
export_to_dxf(
res["turbines"],
state["substation"],
res["eval"]["details"],
dxf_name,
)
zipf.write(dxf_name)
try:
os.remove(dxf_name)
except:
pass
ui.download(zip_filename)
ui.notify("已导出所有方案 (含Excel)", type="positive")
except Exception as ex:
ui.notify(f"导出失败: {ex}", type="negative")
ui.button("导出全部方案 DXF (ZIP)", on_click=export_all_dxf).props(
"icon=folder_zip color=secondary"
)
def update_plot(result):
if refs["plot_container"]:
refs["plot_container"].clear()
with refs["plot_container"]:
# 使用 ui.pyplot 上下文自动管理 figure 生命周期
with ui.pyplot(figsize=(10, 8)) as plot:
title = f"{result['name']}\nCost: ¥{result['cost']/10000:.2f}万 | Loss: {result['loss']:.2f} kW"
# 显式获取当前 ui.pyplot 创建的 axes并传递给绘图函数
# 确保绘图发生在正确的 figure 上
ax = plt.gca()
visualize_design(
result["turbines"],
state["substation"],
result["eval"]["details"],
title,
ax=ax,
)
async def handle_row_click(e):
# 获取被点击行的数据
row = e.args[1] if len(e.args) > 1 else None
if not row:
return
# 识别方案名称
row_name = row.get("original_name", row.get("name"))
if not row_name:
return
selected_res = next(
(r for r in state["results"] if r["name"] == row_name), None
)
if selected_res:
# 1. 更新拓扑图
update_plot(selected_res)
ui.notify(f"已切换至方案: {selected_res['name']}")
# 2. 通过设置 table 的 selected 属性来实现背景高亮
if refs["results_table"]:
refs["results_table"].selected = [row]
# 3. 更新“导出选中方案”按钮的文本
if refs["export_selected_btn"]:
clean_name = row_name.replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
from nicegui import run
import queue
async def run_analysis():
if not state["excel_path"]:
ui.notify("请先上传 Excel 坐标文件!", type="warning")
return
if refs["log_box"]:
refs["log_box"].clear()
log_queue = queue.Queue()
class QueueLogger(io.StringIO):
def write(self, message):
if message and message.strip():
log_queue.put(message.strip())
super().write(message)
def process_log_queue():
if refs["log_box"]:
new_msg = False
while not log_queue.empty():
try:
msg = log_queue.get_nowait()
refs["log_box"].push(msg)
new_msg = True
if msg.startswith("--- Scenario"):
scenario_name = msg.replace("---", "").strip()
if refs["status_label"]:
refs["status_label"].text = (
f"正在计算: {scenario_name}..."
)
elif "开始比较电缆方案" in msg:
if refs["status_label"]:
refs["status_label"].text = "准备开始计算..."
except queue.Empty:
break
if new_msg:
# 强制日志框滚动到最底部,确保最后一行可见
ui.run_javascript(
'const el = document.querySelector(".analysis-log-box"); if (el) { el.scrollTop = el.scrollHeight; }'
)
log_timer = ui.timer(0.1, process_log_queue)
if refs["status_label"]:
refs["status_label"].text = "初始化中..."
processing_dialog.open()
try:
# 2. 定义在线程中运行的任务
def task():
# 捕获 stdout 到我们的 QueueLogger
# 禁止 main.py 中的后台绘图,避免线程安全问题
with contextlib.redirect_stdout(QueueLogger()):
return compare_design_methods(
excel_path=state["excel_path"],
n_clusters_override=None,
interactive=False,
plot_results=False,
)
# 在后台线程运行计算任务
results = await run.io_bound(task)
state["results"] = results
if not state["excel_path"] and results:
if state["substation"] is None:
_, state["substation"] = generate_wind_farm_data(
n_turbines=30, layout="grid", spacing=800
)
# 计算完成后,自动寻找并显示最佳方案的拓扑图
best_res = None
if results:
# 默认推荐 Scenario 1 中成本最低的方案
scenario1_results = [r for r in results if "Scenario 1" in r["name"]]
if scenario1_results:
best_res = min(scenario1_results, key=lambda x: x["cost"])
else:
# 如果没有 Scenario 1则回退到全局最优
best_res = min(results, key=lambda x: x["cost"])
update_plot(best_res)
ui.notify(
f'计算完成!已自动加载推荐方案: {best_res["name"]}', type="positive"
)
# 更新结果表格
if refs["results_table"]:
table_data = []
best_row = None
for res in results:
name_display = res["name"]
is_best = False
if best_res and res["name"] == best_res["name"]:
name_display = f"(推荐) {name_display}"
is_best = True
# 生成备注信息
note = ""
original_name = res["name"]
# 识别算法
if "MST Method" in original_name:
note += "最小生成树算法(无容量约束基准); "
elif "Base" in original_name:
note += "基础扇区扫描(单次扫描); "
elif "Rotational" in original_name:
note += "旋转扫描优化(全局最优角度); "
elif "Esau-Williams" in original_name:
note += "Esau-Williams启发式算法(权衡距离与容量); "
# 识别电缆策略
if "Standard" in original_name:
note += "不包含可选电缆型号。"
elif "With Optional" in original_name:
note += "含可选电缆型号。"
elif "No Max" in original_name:
note += "不包含可选电缆型号,且可使用的最大截面电缆降一档截面。"
row_dict = {
"name": name_display,
"cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}",
"note": note,
"original_name": res["name"],
}
table_data.append(row_dict)
if is_best:
best_row = row_dict
refs["results_table"].rows = table_data
# 初始选中推荐方案,实现自动高亮
if best_row:
refs["results_table"].selected = [best_row]
update_export_buttons()
if refs["status_label"]:
refs["status_label"].text = "计算完成!"
except Exception as ex:
ui.notify(f"运行出错: {ex}", type="negative")
import traceback
traceback.print_exc()
finally:
log_timer.cancel()
process_log_queue()
processing_dialog.close()
with ui.dialog() as processing_dialog:
with ui.card().classes("w-96 items-center justify-center p-6"):
ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2")
ui.spinner(size="lg", color="primary")
refs["status_label"] = ui.label("准备中...").classes(
"mt-4 text-sm text-gray-700 font-medium"
)
with ui.expansion("查看实时日志", icon="terminal", value=True).classes(
"w-full mt-4 text-sm"
):
refs["log_box"] = ui.log(max_lines=100).classes(
"w-full h-32 text-xs font-mono bg-black text-green-400 analysis-log-box"
)
processing_dialog.props("persistent")
with ui.header().classes("bg-primary text-white p-4 shadow-lg"):
ui.label("海上风电场集电线路设计优化系统").classes("text-2xl font-bold")
ui.label("Wind Farm Collector System Design Optimizer").classes(
"text-sm opacity-80"
)
with ui.row().classes("w-full p-4 gap-4"):
with ui.card().classes("w-1/4 p-4 shadow-md"):
ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2")
def export_template():
from generate_template import create_template
try:
create_template()
ui.download("coordinates.xlsx")
ui.notify("Excel 模板导出成功", type="positive")
except Exception as ex:
ui.notify(f"模板导出失败: {ex}", type="negative")
ui.button("导出 Excel 模板", on_click=export_template).classes(
"w-full mb-4"
).props("icon=file_download outline color=primary")
ui.label("1. 上传坐标文件 (.xlsx)").classes("font-medium")
refs["upload_widget"] = ui.upload(
label="选择Excel文件", on_upload=handle_upload, auto_upload=True
).classes("w-full mb-2")
# refs['current_file_label'] = ui.label('未选择文件').classes('text-xs text-gray-500 mb-4 italic')
refs["run_btn"] = (
ui.button(
"运行方案对比",
on_click=run_analysis,
)
.classes("w-full mt-4 py-4")
.props("icon=play_arrow color=secondary")
)
with ui.column().classes("w-3/4 gap-4"):
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("方案对比结果 (点击行查看拓扑详情)").classes(
"text-xl font-semibold mb-2"
)
columns = [
{
"name": "name",
"label": "方案名称",
"field": "name",
"required": True,
"align": "left",
},
{
"name": "cost_wan",
"label": "总投资 (万元)",
"field": "cost_wan",
"sortable": True,
},
{
"name": "loss_kw",
"label": "线损 (kW)",
"field": "loss_kw",
"sortable": True,
},
{
"name": "note",
"label": "备注",
"field": "note",
"align": "left",
},
]
# 使用内置的 selection='single' 结合行点击事件实现背景高亮
# 这样可以完全由 Python 事件逻辑控制,不依赖 CSS 伪类
refs["results_table"] = ui.table(
columns=columns,
rows=[],
selection="single",
row_key="original_name",
).classes("w-full hide-selection-column")
refs["results_table"].on("row-click", handle_row_click)
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("拓扑可视化").classes("text-xl font-semibold mb-2")
refs["plot_container"] = ui.column().classes("w-full items-center")
with ui.card().classes("w-full p-4 shadow-md"):
ui.label("导出与下载").classes("text-xl font-semibold mb-2")
refs["export_row"] = ui.row().classes("gap-4")
def find_available_port(start_port=8080, max_attempts=10):
"""尝试寻找可用的端口"""
import socket
for port in range(start_port, start_port + max_attempts):
with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
try:
s.bind(("0.0.0.0", port))
return port
except OSError:
continue
return start_port # 默认返回起始端口,让 ui.run 报错
# 自动寻找可用端口,避免端口冲突
target_port = find_available_port(8080)
ui.run(title="海上风电场集电线路优化", port=target_port)