From 00d480edbb9c9f92dfc992072215644d19931e6e Mon Sep 17 00:00:00 2001 From: dmy Date: Sun, 4 Jan 2026 14:01:16 +0800 Subject: [PATCH] =?UTF-8?q?refactor:=20=E4=BB=A3=E7=A0=81=E6=A0=BC?= =?UTF-8?q?=E5=BC=8F=E7=BB=9F=E4=B8=80=E5=92=8CExcel=E5=AF=BC=E5=87=BA?= =?UTF-8?q?=E4=BC=98=E5=8C=96?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- gui.py | 376 ++++++++++++++++++++++++++++++++++++-------------------- main.py | 11 +- 2 files changed, 254 insertions(+), 133 deletions(-) diff --git a/gui.py b/gui.py index 2994bb1..3aaffde 100644 --- a/gui.py +++ b/gui.py @@ -4,12 +4,19 @@ import io import contextlib import matplotlib.pyplot as plt from nicegui import ui, events -from main import compare_design_methods, export_to_dxf, load_data_from_excel, generate_wind_farm_data, visualize_design +from main import ( + compare_design_methods, + export_to_dxf, + load_data_from_excel, + generate_wind_farm_data, + visualize_design, +) import pandas as pd # 设置matplotlib支持中文显示 -plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial'] -plt.rcParams['axes.unicode_minus'] = False +plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"] +plt.rcParams["axes.unicode_minus"] = False + class Logger(io.StringIO): def __init__(self, log_element): @@ -21,190 +28,255 @@ class Logger(io.StringIO): self.log_element.push(message.strip()) super().write(message) + # 状态变量 state = { - 'excel_path': None, - 'results': [], - 'substation': None, - 'turbines': None, - 'temp_dir': '.gemini/tmp/gui_uploads' + "excel_path": None, + "results": [], + "substation": None, + "turbines": None, + "temp_dir": ".gemini/tmp/gui_uploads", } # 确保临时目录存在 -if not os.path.exists(state['temp_dir']): - os.makedirs(state['temp_dir'], exist_ok=True) +if not os.path.exists(state["temp_dir"]): + os.makedirs(state["temp_dir"], exist_ok=True) -@ui.page('/') + +@ui.page("/") def index(): - ui.query('body').style('background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;') - + ui.query("body").style( + 'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;' + ) + # 定义 UI 元素引用容器,方便在函数中更新 refs = { - 'log_box': None, - 'results_table': None, - 'plot_container': None, - 'export_row': None, - 'status_label': None, - 'upload_widget': None, - 'clusters_input': None, - 'run_btn': None + "log_box": None, + "results_table": None, + "plot_container": None, + "export_row": None, + "status_label": None, + "upload_widget": None, + "clusters_input": None, + "run_btn": None, + "current_file_label": None, } async def handle_upload(e: events.UploadEventArguments): try: filename = None content = None - if hasattr(e, 'name'): + if hasattr(e, "name"): filename = e.name - if hasattr(e, 'content'): + if hasattr(e, "content"): content = e.content - if content is None and hasattr(e, 'file'): + if content is None and hasattr(e, "file"): file_obj = e.file if not filename: - filename = getattr(file_obj, 'name', getattr(file_obj, 'filename', None)) - if hasattr(file_obj, 'file') and hasattr(file_obj.file, 'read'): - content = file_obj.file - elif hasattr(file_obj, 'read'): + filename = getattr( + file_obj, "name", getattr(file_obj, "filename", None) + ) + if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"): + content = file_obj.file + elif hasattr(file_obj, "read"): content = file_obj if not filename: - filename = 'uploaded_file.xlsx' + filename = "uploaded_file.xlsx" if content is None: - ui.notify('上传失败: 无法解析文件内容', type='negative') + ui.notify("上传失败: 无法解析文件内容", type="negative") return - path = os.path.join(state['temp_dir'], filename) - if hasattr(content, 'seek'): - try: content.seek(0) - except Exception: pass + + # 清理旧文件,确保目录中只有一个文件 + if os.path.exists(state["temp_dir"]): + for f in os.listdir(state["temp_dir"]): + try: + os.remove(os.path.join(state["temp_dir"], f)) + except: + pass + + path = os.path.join(state["temp_dir"], filename) + if hasattr(content, "seek"): + try: + content.seek(0) + except Exception: + pass data = content.read() import inspect + if inspect.iscoroutine(data): data = await data - with open(path, 'wb') as f: + with open(path, "wb") as f: f.write(data) - state['excel_path'] = path - ui.notify(f'文件已上传: {filename}', type='positive') - state['turbines'], state['substation'], _ = load_data_from_excel(path) + + state["excel_path"] = path + ui.notify(f"文件已上传: {filename}", type="positive") + if refs["current_file_label"]: + refs["current_file_label"].text = f"当前文件: {filename}" + + # 加载数据 + state["turbines"], state["substation"], _ = load_data_from_excel(path) except Exception as ex: - ui.notify(f'上传处理失败: {ex}', type='negative') + ui.notify(f"上传处理失败: {ex}", type="negative") def update_export_buttons(): - if refs['export_row']: - refs['export_row'].clear() - if not state['results'] or not refs['export_row']: + if refs["export_row"]: + refs["export_row"].clear() + if not state["results"] or not refs["export_row"]: return - with refs['export_row']: - ui.button('下载 Excel 对比表', on_click=lambda: ui.download('wind_farm_design.xlsx')).props('icon=download') + with refs["export_row"]: + ui.button( + "下载 Excel 对比表", + on_click=lambda: ui.download("wind_farm_design.xlsx"), + ).props("icon=download") best_idx = 0 - for i, res in enumerate(state['results']): - if res['cost'] < state['results'][best_idx]['cost']: + for i, res in enumerate(state["results"]): + if res["cost"] < state["results"][best_idx]["cost"]: best_idx = i - best_res = state['results'][best_idx] + best_res = state["results"][best_idx] + def export_best_dxf(): - dxf_name = 'best_design.dxf' - if state['substation'] is not None: - export_to_dxf(best_res['turbines'], state['substation'], best_res['eval']['details'], dxf_name) + dxf_name = "best_design.dxf" + if state["substation"] is not None: + export_to_dxf( + best_res["turbines"], + state["substation"], + best_res["eval"]["details"], + dxf_name, + ) ui.download(dxf_name) - ui.notify(f'已导出推荐方案: {best_res["name"]}', type='positive') + ui.notify(f'已导出推荐方案: {best_res["name"]}', type="positive") else: - ui.notify('缺少升压站数据,无法导出 DXF', type='negative') - ui.button(f'导出推荐方案 DXF ({best_res["name"]})', on_click=export_best_dxf).props('icon=architecture color=accent') + ui.notify("缺少升压站数据,无法导出 DXF", type="negative") + + ui.button( + f'导出推荐方案 DXF ({best_res["name"]})', on_click=export_best_dxf + ).props("icon=architecture color=accent") def update_plot(result): - if refs['plot_container']: - refs['plot_container'].clear() - with refs['plot_container']: + if refs["plot_container"]: + refs["plot_container"].clear() + with refs["plot_container"]: # 使用 ui.pyplot 上下文自动管理 figure 生命周期 with ui.pyplot(figsize=(10, 8)) as plot: title = f"{result['name']}\nCost: ¥{result['cost']/10000:.2f}万 | Loss: {result['loss']:.2f} kW" # 显式获取当前 ui.pyplot 创建的 axes,并传递给绘图函数 # 确保绘图发生在正确的 figure 上 ax = plt.gca() - visualize_design(result['turbines'], state['substation'], result['eval']['details'], title, ax=ax) + visualize_design( + result["turbines"], + state["substation"], + result["eval"]["details"], + title, + ax=ax, + ) def handle_row_click(e): # ui.table row-click args: [evt, row, index] row = None if e.args and isinstance(e.args, list) and len(e.args) > 1: row = e.args[1] - - if not row or 'name' not in row: return - - row_name = row['name'] - selected_res = next((r for r in state['results'] if r['name'] == row_name), None) + + if not row or "name" not in row: + return + + row_name = row["name"] + selected_res = next( + (r for r in state["results"] if r["name"] == row_name), None + ) if selected_res: update_plot(selected_res) - ui.notify(f'已切换至方案: {row_name}') + ui.notify(f"已切换至方案: {row_name}") from nicegui import run import queue async def run_analysis(n_clusters): - if not state['excel_path']: - ui.notify('请先上传 Excel 坐标文件!', type='warning') - if refs['log_box']: - refs['log_box'].clear() + if not state["excel_path"]: + ui.notify("请先上传 Excel 坐标文件!", type="warning") + if refs["log_box"]: + refs["log_box"].clear() log_queue = queue.Queue() + class QueueLogger(io.StringIO): def write(self, message): if message and message.strip(): log_queue.put(message.strip()) super().write(message) + def process_log_queue(): - if refs['log_box']: + if refs["log_box"]: while not log_queue.empty(): try: msg = log_queue.get_nowait() - refs['log_box'].push(msg) - if msg.startswith('--- Scenario'): - scenario_name = msg.replace('---', '').strip() - if refs['status_label']: - refs['status_label'].text = f"正在计算: {scenario_name}..." - elif '开始比较电缆方案' in msg: - if refs['status_label']: refs['status_label'].text = "准备开始计算..." - except queue.Empty: break + refs["log_box"].push(msg) + if msg.startswith("--- Scenario"): + scenario_name = msg.replace("---", "").strip() + if refs["status_label"]: + refs["status_label"].text = ( + f"正在计算: {scenario_name}..." + ) + elif "开始比较电缆方案" in msg: + if refs["status_label"]: + refs["status_label"].text = "准备开始计算..." + except queue.Empty: + break + log_timer = ui.timer(0.1, process_log_queue) - if refs['status_label']: refs['status_label'].text = "初始化中..." + if refs["status_label"]: + refs["status_label"].text = "初始化中..." processing_dialog.open() - try: + try: # 2. 定义在线程中运行的任务 def task(): # 捕获 stdout 到我们的 QueueLogger # 禁止 main.py 中的后台绘图,避免线程安全问题 with contextlib.redirect_stdout(QueueLogger()): return compare_design_methods( - excel_path=state['excel_path'], - n_clusters_override=n_clusters, + excel_path=state["excel_path"], + n_clusters_override=n_clusters, interactive=False, - plot_results=False + plot_results=False, ) - + # 在后台线程运行计算任务 results = await run.io_bound(task) - - state['results'] = results - if not state['excel_path'] and results: - if state['substation'] is None: - _, state['substation'] = generate_wind_farm_data(n_turbines=30, layout='grid', spacing=800) - + + state["results"] = results + if not state["excel_path"] and results: + if state["substation"] is None: + _, state["substation"] = generate_wind_farm_data( + n_turbines=30, layout="grid", spacing=800 + ) + # 更新结果表格 - if refs['results_table']: + if refs["results_table"]: table_data = [] for res in results: - table_data.append({'name': res['name'], 'cost_wan': round(res['cost'] / 10000, 2), 'loss_kw': round(res['loss'], 2)}) - refs['results_table'].rows = table_data - refs['results_table'].update() - + table_data.append( + { + "name": res["name"], + "cost_wan": round(res["cost"] / 10000, 2), + "loss_kw": round(res["loss"], 2), + } + ) + refs["results_table"].rows = table_data + refs["results_table"].update() + # 计算完成后,自动寻找并显示最佳方案的拓扑图 if results: - best_res = min(results, key=lambda x: x['cost']) + best_res = min(results, key=lambda x: x["cost"]) update_plot(best_res) - ui.notify(f'计算完成!已自动加载推荐方案: {best_res["name"]}', type='positive') - + ui.notify( + f'计算完成!已自动加载推荐方案: {best_res["name"]}', type="positive" + ) + update_export_buttons() - if refs['status_label']: refs['status_label'].text = "计算完成!" + if refs["status_label"]: + refs["status_label"].text = "计算完成!" except Exception as ex: - ui.notify(f'运行出错: {ex}', type='negative') + ui.notify(f"运行出错: {ex}", type="negative") import traceback + traceback.print_exc() finally: log_timer.cancel() @@ -212,43 +284,85 @@ def index(): processing_dialog.close() with ui.dialog() as processing_dialog: - with ui.card().classes('w-96 items-center justify-center p-6'): - ui.label('正在计算方案...').classes('text-xl font-bold text-primary mb-2') - ui.spinner(size='lg', color='primary') - refs['status_label'] = ui.label('准备中...').classes('mt-4 text-sm text-gray-700 font-medium') - with ui.expansion('查看实时日志', icon='terminal', value=True).classes('w-full mt-4 text-sm'): - refs['log_box'] = ui.log(max_lines=100).classes('w-full h-32 text-xs font-mono bg-black text-green-400') - processing_dialog.props('persistent') + with ui.card().classes("w-96 items-center justify-center p-6"): + ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2") + ui.spinner(size="lg", color="primary") + refs["status_label"] = ui.label("准备中...").classes( + "mt-4 text-sm text-gray-700 font-medium" + ) + with ui.expansion("查看实时日志", icon="terminal", value=True).classes( + "w-full mt-4 text-sm" + ): + refs["log_box"] = ui.log(max_lines=100).classes( + "w-full h-32 text-xs font-mono bg-black text-green-400" + ) + processing_dialog.props("persistent") - with ui.header().classes('bg-primary text-white p-4 shadow-lg'): - ui.label('海上风电场集电线路设计优化系统').classes('text-2xl font-bold') - ui.label('Wind Farm Collector System Design Optimizer').classes('text-sm opacity-80') + with ui.header().classes("bg-primary text-white p-4 shadow-lg"): + ui.label("海上风电场集电线路设计优化系统").classes("text-2xl font-bold") + ui.label("Wind Farm Collector System Design Optimizer").classes( + "text-sm opacity-80" + ) - with ui.row().classes('w-full p-4 gap-4'): - with ui.card().classes('w-1/4 p-4 shadow-md'): - ui.label('配置面板').classes('text-xl font-semibold mb-4 border-b pb-2') - ui.label('1. 上传坐标文件 (.xlsx)').classes('font-medium') - refs['upload_widget'] = ui.upload(label='选择Excel文件', on_upload=handle_upload, auto_upload=True).classes('w-full mb-4') - ui.label('2. 参数设置').classes('font-medium mt-4') - refs['clusters_input'] = ui.number('指定回路数 (可选)', value=None, format='%d', placeholder='自动计算').classes('w-full mb-4') - refs['run_btn'] = ui.button('运行方案对比', on_click=lambda: run_analysis(refs['clusters_input'].value)).classes('w-full mt-4 py-4').props('icon=play_arrow color=secondary') + with ui.row().classes("w-full p-4 gap-4"): + with ui.card().classes("w-1/4 p-4 shadow-md"): + ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2") + ui.label("1. 上传坐标文件 (.xlsx)").classes("font-medium") + refs["upload_widget"] = ui.upload( + label="选择Excel文件", on_upload=handle_upload, auto_upload=True + ).classes("w-full mb-2") + # refs['current_file_label'] = ui.label('未选择文件').classes('text-xs text-gray-500 mb-4 italic') - with ui.column().classes('w-3/4 gap-4'): - with ui.card().classes('w-full p-4 shadow-md'): - ui.label('方案对比结果 (点击行查看拓扑详情)').classes('text-xl font-semibold mb-2') + ui.label("2. 参数设置").classes("font-medium mt-4") + refs["clusters_input"] = ui.number( + "指定回路数 (可选)", value=None, format="%d", placeholder="自动计算" + ).classes("w-full mb-4") + refs["run_btn"] = ( + ui.button( + "运行方案对比", + on_click=lambda: run_analysis(refs["clusters_input"].value), + ) + .classes("w-full mt-4 py-4") + .props("icon=play_arrow color=secondary") + ) + + with ui.column().classes("w-3/4 gap-4"): + with ui.card().classes("w-full p-4 shadow-md"): + ui.label("方案对比结果 (点击行查看拓扑详情)").classes( + "text-xl font-semibold mb-2" + ) columns = [ - {'name': 'name', 'label': '方案名称', 'field': 'name', 'required': True, 'align': 'left'}, - {'name': 'cost_wan', 'label': '总投资 (万元)', 'field': 'cost_wan', 'sortable': True}, - {'name': 'loss_kw', 'label': '线损 (kW)', 'field': 'loss_kw', 'sortable': True}, + { + "name": "name", + "label": "方案名称", + "field": "name", + "required": True, + "align": "left", + }, + { + "name": "cost_wan", + "label": "总投资 (万元)", + "field": "cost_wan", + "sortable": True, + }, + { + "name": "loss_kw", + "label": "线损 (kW)", + "field": "loss_kw", + "sortable": True, + }, ] # 移除 selection='single',改为纯行点击交互 - refs['results_table'] = ui.table(columns=columns, rows=[]).classes('w-full') - refs['results_table'].on('row-click', handle_row_click) - with ui.card().classes('w-full p-4 shadow-md'): - ui.label('拓扑可视化').classes('text-xl font-semibold mb-2') - refs['plot_container'] = ui.column().classes('w-full items-center') - with ui.card().classes('w-full p-4 shadow-md'): - ui.label('导出与下载').classes('text-xl font-semibold mb-2') - refs['export_row'] = ui.row().classes('gap-4') + refs["results_table"] = ui.table(columns=columns, rows=[]).classes( + "w-full" + ) + refs["results_table"].on("row-click", handle_row_click) + with ui.card().classes("w-full p-4 shadow-md"): + ui.label("拓扑可视化").classes("text-xl font-semibold mb-2") + refs["plot_container"] = ui.column().classes("w-full items-center") + with ui.card().classes("w-full p-4 shadow-md"): + ui.label("导出与下载").classes("text-xl font-semibold mb-2") + refs["export_row"] = ui.row().classes("gap-4") -ui.run(title='海上风电场集电线路优化', port=8080) \ No newline at end of file + +ui.run(title="海上风电场集电线路优化", port=8080) diff --git a/main.py b/main.py index 75a9f36..5f02fa7 100644 --- a/main.py +++ b/main.py @@ -838,7 +838,8 @@ def export_all_scenarios_to_excel(results, filename): :param filename: 输出文件路径 """ try: - with pd.ExcelWriter(filename) as writer: + # 使用 openpyxl 引擎以便后续写入单元格 + with pd.ExcelWriter(filename, engine='openpyxl') as writer: # 1. 总览 Sheet summary_data = [] for res in results: @@ -882,7 +883,13 @@ def export_all_scenarios_to_excel(results, filename): 'Cost (¥)': conn['cable']['cost'] }) df = pd.DataFrame(data) - df.to_excel(writer, sheet_name=safe_name, index=False) + + # 从第 2 行开始写入数据(startrow=1,Excel中为第2行),留出第 1 行写标题 + df.to_excel(writer, sheet_name=safe_name, index=False, startrow=1) + + # 在第一行写入方案名称 + ws = writer.sheets[safe_name] + ws.cell(row=1, column=1, value=f"Scenario: {res['name']}") print(f"成功导出包含所有方案的Excel文件: {filename}") except Exception as e: