369 lines
14 KiB
Python
369 lines
14 KiB
Python
import os
|
||
import sys
|
||
import io
|
||
import contextlib
|
||
import matplotlib.pyplot as plt
|
||
from nicegui import ui, events
|
||
from main import (
|
||
compare_design_methods,
|
||
export_to_dxf,
|
||
load_data_from_excel,
|
||
generate_wind_farm_data,
|
||
visualize_design,
|
||
)
|
||
import pandas as pd
|
||
|
||
# 设置matplotlib支持中文显示
|
||
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
|
||
plt.rcParams["axes.unicode_minus"] = False
|
||
|
||
|
||
class Logger(io.StringIO):
|
||
def __init__(self, log_element):
|
||
super().__init__()
|
||
self.log_element = log_element
|
||
|
||
def write(self, message):
|
||
if message.strip():
|
||
self.log_element.push(message.strip())
|
||
super().write(message)
|
||
|
||
|
||
# 状态变量
|
||
state = {
|
||
"excel_path": None,
|
||
"results": [],
|
||
"substation": None,
|
||
"turbines": None,
|
||
"temp_dir": ".gemini/tmp/gui_uploads",
|
||
}
|
||
|
||
# 确保临时目录存在
|
||
if not os.path.exists(state["temp_dir"]):
|
||
os.makedirs(state["temp_dir"], exist_ok=True)
|
||
|
||
|
||
@ui.page("/")
|
||
def index():
|
||
ui.query("body").style(
|
||
'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;'
|
||
)
|
||
|
||
# 定义 UI 元素引用容器,方便在函数中更新
|
||
refs = {
|
||
"log_box": None,
|
||
"results_table": None,
|
||
"plot_container": None,
|
||
"export_row": None,
|
||
"status_label": None,
|
||
"upload_widget": None,
|
||
"clusters_input": None,
|
||
"run_btn": None,
|
||
"current_file_label": None,
|
||
}
|
||
|
||
async def handle_upload(e: events.UploadEventArguments):
|
||
try:
|
||
filename = None
|
||
content = None
|
||
if hasattr(e, "name"):
|
||
filename = e.name
|
||
if hasattr(e, "content"):
|
||
content = e.content
|
||
if content is None and hasattr(e, "file"):
|
||
file_obj = e.file
|
||
if not filename:
|
||
filename = getattr(
|
||
file_obj, "name", getattr(file_obj, "filename", None)
|
||
)
|
||
if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"):
|
||
content = file_obj.file
|
||
elif hasattr(file_obj, "read"):
|
||
content = file_obj
|
||
if not filename:
|
||
filename = "uploaded_file.xlsx"
|
||
if content is None:
|
||
ui.notify("上传失败: 无法解析文件内容", type="negative")
|
||
return
|
||
|
||
# 清理旧文件,确保目录中只有一个文件
|
||
if os.path.exists(state["temp_dir"]):
|
||
for f in os.listdir(state["temp_dir"]):
|
||
try:
|
||
os.remove(os.path.join(state["temp_dir"], f))
|
||
except:
|
||
pass
|
||
|
||
path = os.path.join(state["temp_dir"], filename)
|
||
if hasattr(content, "seek"):
|
||
try:
|
||
content.seek(0)
|
||
except Exception:
|
||
pass
|
||
data = content.read()
|
||
import inspect
|
||
|
||
if inspect.iscoroutine(data):
|
||
data = await data
|
||
with open(path, "wb") as f:
|
||
f.write(data)
|
||
|
||
state["excel_path"] = path
|
||
ui.notify(f"文件已上传: {filename}", type="positive")
|
||
if refs["current_file_label"]:
|
||
refs["current_file_label"].text = f"当前文件: {filename}"
|
||
|
||
# 加载数据
|
||
state["turbines"], state["substation"], _ = load_data_from_excel(path)
|
||
except Exception as ex:
|
||
ui.notify(f"上传处理失败: {ex}", type="negative")
|
||
|
||
def update_export_buttons():
|
||
if refs["export_row"]:
|
||
refs["export_row"].clear()
|
||
if not state["results"] or not refs["export_row"]:
|
||
return
|
||
with refs["export_row"]:
|
||
ui.button(
|
||
"下载 Excel 对比表",
|
||
on_click=lambda: ui.download("wind_farm_design.xlsx"),
|
||
).props("icon=download")
|
||
best_idx = 0
|
||
for i, res in enumerate(state["results"]):
|
||
if res["cost"] < state["results"][best_idx]["cost"]:
|
||
best_idx = i
|
||
best_res = state["results"][best_idx]
|
||
|
||
def export_best_dxf():
|
||
dxf_name = "best_design.dxf"
|
||
if state["substation"] is not None:
|
||
export_to_dxf(
|
||
best_res["turbines"],
|
||
state["substation"],
|
||
best_res["eval"]["details"],
|
||
dxf_name,
|
||
)
|
||
ui.download(dxf_name)
|
||
ui.notify(f'已导出推荐方案: {best_res["name"]}', type="positive")
|
||
else:
|
||
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
|
||
|
||
ui.button(
|
||
f'导出推荐方案 DXF ({best_res["name"]})', on_click=export_best_dxf
|
||
).props("icon=architecture color=accent")
|
||
|
||
def update_plot(result):
|
||
if refs["plot_container"]:
|
||
refs["plot_container"].clear()
|
||
with refs["plot_container"]:
|
||
# 使用 ui.pyplot 上下文自动管理 figure 生命周期
|
||
with ui.pyplot(figsize=(10, 8)) as plot:
|
||
title = f"{result['name']}\nCost: ¥{result['cost']/10000:.2f}万 | Loss: {result['loss']:.2f} kW"
|
||
# 显式获取当前 ui.pyplot 创建的 axes,并传递给绘图函数
|
||
# 确保绘图发生在正确的 figure 上
|
||
ax = plt.gca()
|
||
visualize_design(
|
||
result["turbines"],
|
||
state["substation"],
|
||
result["eval"]["details"],
|
||
title,
|
||
ax=ax,
|
||
)
|
||
|
||
def handle_row_click(e):
|
||
# ui.table row-click args: [evt, row, index]
|
||
row = None
|
||
if e.args and isinstance(e.args, list) and len(e.args) > 1:
|
||
row = e.args[1]
|
||
|
||
if not row or "name" not in row:
|
||
return
|
||
|
||
row_name = row["name"]
|
||
selected_res = next(
|
||
(r for r in state["results"] if r["name"] == row_name), None
|
||
)
|
||
if selected_res:
|
||
update_plot(selected_res)
|
||
ui.notify(f"已切换至方案: {row_name}")
|
||
|
||
from nicegui import run
|
||
import queue
|
||
|
||
async def run_analysis(n_clusters):
|
||
if not state["excel_path"]:
|
||
ui.notify("请先上传 Excel 坐标文件!", type="warning")
|
||
if refs["log_box"]:
|
||
refs["log_box"].clear()
|
||
log_queue = queue.Queue()
|
||
|
||
class QueueLogger(io.StringIO):
|
||
def write(self, message):
|
||
if message and message.strip():
|
||
log_queue.put(message.strip())
|
||
super().write(message)
|
||
|
||
def process_log_queue():
|
||
if refs["log_box"]:
|
||
while not log_queue.empty():
|
||
try:
|
||
msg = log_queue.get_nowait()
|
||
refs["log_box"].push(msg)
|
||
if msg.startswith("--- Scenario"):
|
||
scenario_name = msg.replace("---", "").strip()
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = (
|
||
f"正在计算: {scenario_name}..."
|
||
)
|
||
elif "开始比较电缆方案" in msg:
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = "准备开始计算..."
|
||
except queue.Empty:
|
||
break
|
||
|
||
log_timer = ui.timer(0.1, process_log_queue)
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = "初始化中..."
|
||
processing_dialog.open()
|
||
try:
|
||
# 2. 定义在线程中运行的任务
|
||
def task():
|
||
# 捕获 stdout 到我们的 QueueLogger
|
||
# 禁止 main.py 中的后台绘图,避免线程安全问题
|
||
with contextlib.redirect_stdout(QueueLogger()):
|
||
return compare_design_methods(
|
||
excel_path=state["excel_path"],
|
||
n_clusters_override=n_clusters,
|
||
interactive=False,
|
||
plot_results=False,
|
||
)
|
||
|
||
# 在后台线程运行计算任务
|
||
results = await run.io_bound(task)
|
||
|
||
state["results"] = results
|
||
if not state["excel_path"] and results:
|
||
if state["substation"] is None:
|
||
_, state["substation"] = generate_wind_farm_data(
|
||
n_turbines=30, layout="grid", spacing=800
|
||
)
|
||
|
||
# 更新结果表格
|
||
if refs["results_table"]:
|
||
table_data = []
|
||
for res in results:
|
||
table_data.append(
|
||
{
|
||
"name": res["name"],
|
||
"cost_wan": round(res["cost"] / 10000, 2),
|
||
"loss_kw": round(res["loss"], 2),
|
||
}
|
||
)
|
||
refs["results_table"].rows = table_data
|
||
refs["results_table"].update()
|
||
|
||
# 计算完成后,自动寻找并显示最佳方案的拓扑图
|
||
if results:
|
||
best_res = min(results, key=lambda x: x["cost"])
|
||
update_plot(best_res)
|
||
ui.notify(
|
||
f'计算完成!已自动加载推荐方案: {best_res["name"]}', type="positive"
|
||
)
|
||
|
||
update_export_buttons()
|
||
if refs["status_label"]:
|
||
refs["status_label"].text = "计算完成!"
|
||
except Exception as ex:
|
||
ui.notify(f"运行出错: {ex}", type="negative")
|
||
import traceback
|
||
|
||
traceback.print_exc()
|
||
finally:
|
||
log_timer.cancel()
|
||
process_log_queue()
|
||
processing_dialog.close()
|
||
|
||
with ui.dialog() as processing_dialog:
|
||
with ui.card().classes("w-96 items-center justify-center p-6"):
|
||
ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2")
|
||
ui.spinner(size="lg", color="primary")
|
||
refs["status_label"] = ui.label("准备中...").classes(
|
||
"mt-4 text-sm text-gray-700 font-medium"
|
||
)
|
||
with ui.expansion("查看实时日志", icon="terminal", value=True).classes(
|
||
"w-full mt-4 text-sm"
|
||
):
|
||
refs["log_box"] = ui.log(max_lines=100).classes(
|
||
"w-full h-32 text-xs font-mono bg-black text-green-400"
|
||
)
|
||
processing_dialog.props("persistent")
|
||
|
||
with ui.header().classes("bg-primary text-white p-4 shadow-lg"):
|
||
ui.label("海上风电场集电线路设计优化系统").classes("text-2xl font-bold")
|
||
ui.label("Wind Farm Collector System Design Optimizer").classes(
|
||
"text-sm opacity-80"
|
||
)
|
||
|
||
with ui.row().classes("w-full p-4 gap-4"):
|
||
with ui.card().classes("w-1/4 p-4 shadow-md"):
|
||
ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2")
|
||
ui.label("1. 上传坐标文件 (.xlsx)").classes("font-medium")
|
||
refs["upload_widget"] = ui.upload(
|
||
label="选择Excel文件", on_upload=handle_upload, auto_upload=True
|
||
).classes("w-full mb-2")
|
||
# refs['current_file_label'] = ui.label('未选择文件').classes('text-xs text-gray-500 mb-4 italic')
|
||
|
||
ui.label("2. 参数设置").classes("font-medium mt-4")
|
||
refs["clusters_input"] = ui.number(
|
||
"指定回路数 (可选)", value=None, format="%d", placeholder="自动计算"
|
||
).classes("w-full mb-4")
|
||
refs["run_btn"] = (
|
||
ui.button(
|
||
"运行方案对比",
|
||
on_click=lambda: run_analysis(refs["clusters_input"].value),
|
||
)
|
||
.classes("w-full mt-4 py-4")
|
||
.props("icon=play_arrow color=secondary")
|
||
)
|
||
|
||
with ui.column().classes("w-3/4 gap-4"):
|
||
with ui.card().classes("w-full p-4 shadow-md"):
|
||
ui.label("方案对比结果 (点击行查看拓扑详情)").classes(
|
||
"text-xl font-semibold mb-2"
|
||
)
|
||
columns = [
|
||
{
|
||
"name": "name",
|
||
"label": "方案名称",
|
||
"field": "name",
|
||
"required": True,
|
||
"align": "left",
|
||
},
|
||
{
|
||
"name": "cost_wan",
|
||
"label": "总投资 (万元)",
|
||
"field": "cost_wan",
|
||
"sortable": True,
|
||
},
|
||
{
|
||
"name": "loss_kw",
|
||
"label": "线损 (kW)",
|
||
"field": "loss_kw",
|
||
"sortable": True,
|
||
},
|
||
]
|
||
# 移除 selection='single',改为纯行点击交互
|
||
refs["results_table"] = ui.table(columns=columns, rows=[]).classes(
|
||
"w-full"
|
||
)
|
||
refs["results_table"].on("row-click", handle_row_click)
|
||
with ui.card().classes("w-full p-4 shadow-md"):
|
||
ui.label("拓扑可视化").classes("text-xl font-semibold mb-2")
|
||
refs["plot_container"] = ui.column().classes("w-full items-center")
|
||
with ui.card().classes("w-full p-4 shadow-md"):
|
||
ui.label("导出与下载").classes("text-xl font-semibold mb-2")
|
||
refs["export_row"] = ui.row().classes("gap-4")
|
||
|
||
|
||
ui.run(title="海上风电场集电线路优化", port=8080)
|