import contextlib import io import os import sys import tempfile import matplotlib matplotlib.use("Agg") import matplotlib.backends.backend_svg import matplotlib.pyplot as plt import pandas as pd from nicegui import app, events, ui, run from main import ( compare_design_methods, export_all_scenarios_to_excel, export_to_dxf, generate_wind_farm_data, load_data_from_excel, visualize_design, ) # 尝试导入自动生成的版本号 try: from version import VERSION except ImportError: VERSION = "v1.0" # 设置matplotlib支持中文显示 plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"] plt.rcParams["axes.unicode_minus"] = False class Logger(io.StringIO): def __init__(self, log_element): super().__init__() self.log_element = log_element def write(self, message): if message.strip(): self.log_element.push(message.strip()) super().write(message) # 状态变量 # 优先从环境变量 PROJECT_TEMP_DIR 读取,否则使用系统默认临时目录 _base_temp = os.environ.get("PROJECT_TEMP_DIR", tempfile.gettempdir()) state = { "excel_path": None, "original_filename": None, "results": [], "substation": None, "turbines": None, "cable_specs": None, "system_params": None, "temp_dir": os.path.join(_base_temp, "windfarm_gui_uploads"), } # 确保临时目录存在 if not os.path.exists(state["temp_dir"]): os.makedirs(state["temp_dir"], exist_ok=True) @ui.page("/") def index(): # 注入 CSS 隐藏表格自带的复选框列,并定义选中行的高亮背景色 ui.add_head_html( """ """ ) ui.query("body").style( 'background-color: #f0f2f5; font-family: "Segoe UI", Tahoma, Geneva, Verdana, sans-serif;' ) # 定义 UI 元素引用容器,方便在函数中更新 refs = { "log_box": None, "results_table": None, "plot_container": None, "export_row": None, "export_selected_btn": None, # 新增按钮引用 "status_label": None, "upload_widget": None, "run_btn": None, "current_file_container": None, # 替换 label 为 container "info_container": None, # 新增信息展示容器 "ga_switch": None, # 遗传算法开关 "mip_switch": None, # MIP开关 "log_content": "", # 存储计算日志内容 } def update_info_panel(): if refs["info_container"]: refs["info_container"].clear() with refs["info_container"]: # System Params - Always show with ui.row().classes("w-full items-center gap-4 mb-2"): ui.icon("settings", color="primary").classes("text-2xl") ui.label("系统参数").classes("text-lg font-bold") params_text = [] # 获取电压 v = 66000 # Default is_default_v = True if ( state.get("system_params") and "voltage" in state["system_params"] ): v = state["system_params"]["voltage"] is_default_v = False v_str = f"电压: {v / 1000:.1f} kV" if v >= 1000 else f"电压: {v} V" if is_default_v: v_str += " (默认)" params_text.append(v_str) # 获取功率因数 pf = 0.95 # Default is_default_pf = True if ( state.get("system_params") and "power_factor" in state["system_params"] ): pf = state["system_params"]["power_factor"] is_default_pf = False pf_str = f"功率因数: {pf}" if is_default_pf: pf_str += " (默认)" params_text.append(pf_str) # 获取电价 ep = 0.4 # Default is_default_ep = True if ( state.get("system_params") and "electricity_price" in state["system_params"] ): ep = state["system_params"]["electricity_price"] is_default_ep = False ep_str = f"电价: {ep} 元/kWh" if is_default_ep: ep_str += " (默认)" params_text.append(ep_str) # 获取工程运行期限 lifetime = 25 # Default is_default_lifetime = True if ( state.get("system_params") and "project_lifetime" in state["system_params"] ): lifetime = state["system_params"]["project_lifetime"] is_default_lifetime = False lifetime_str = f"工程运行期限: {lifetime} 年" if is_default_lifetime: lifetime_str += " (默认)" params_text.append(lifetime_str) # 获取折现率 discount_rate = 8 # Default is_default_discount = True if ( state.get("system_params") and "discount_rate" in state["system_params"] ): discount_rate = state["system_params"]["discount_rate"] is_default_discount = False discount_str = f"折现率: {discount_rate}%" if is_default_discount: discount_str += " (默认)" params_text.append(discount_str) # 获取年损耗小时数 annual_hours = 1400 # Default is_default_hours = True if ( state.get("system_params") and "annual_loss_hours" in state["system_params"] ): annual_hours = state["system_params"]["annual_loss_hours"] is_default_hours = False hours_str = f"年损耗小时数: {annual_hours} 小时" if is_default_hours: hours_str += " (默认)" params_text.append(hours_str) for p in params_text: ui.chip(p, icon="bolt").props("outline color=primary") ui.separator().classes("my-2") # Cables if state.get("cable_specs"): with ui.row().classes("w-full items-center gap-2 mb-2"): ui.icon("cable", color="secondary").classes("text-2xl") ui.label("电缆规格参数").classes("text-lg font-bold") columns = [ { "name": "section", "label": "截面 (mm²)", "field": "section", "align": "center", }, { "name": "capacity", "label": "载流量 (A)", "field": "capacity", "align": "center", }, { "name": "resistance", "label": "电阻 (Ω/km)", "field": "resistance", "align": "center", }, { "name": "cost", "label": "参考单价(万元/km)", "field": "cost", "align": "center", }, { "name": "is_optional", "label": "是否为可选", "field": "is_optional", "align": "center", }, ] rows = [] for spec in state["cable_specs"]: # spec is (section, capacity, resistance, cost, is_optional) rows.append( { "section": spec[0], "capacity": spec[1], "resistance": spec[2], "cost": f"{spec[3] / 10:.2f}" if spec[3] is not None else "0.00", "is_optional": "Y" if len(spec) > 4 and spec[4] else "", } ) ui.table(columns=columns, rows=rows).classes("w-full").props( "dense flat bordered" ) else: ui.label("未检测到电缆数据,将使用默认参数。").classes( "text-gray-500 italic" ) async def handle_upload(e: events.UploadEventArguments): try: filename = None content = None if hasattr(e, "name"): filename = e.name if hasattr(e, "content"): content = e.content if content is None and hasattr(e, "file"): file_obj = e.file if not filename: filename = getattr( file_obj, "name", getattr(file_obj, "filename", None) ) if hasattr(file_obj, "file") and hasattr(file_obj.file, "read"): content = file_obj.file elif hasattr(file_obj, "read"): content = file_obj if not filename: filename = "uploaded_file.xlsx" if content is None: ui.notify("上传失败: 无法解析文件内容", type="negative") return # 清理旧文件,确保目录中只有一个文件 if os.path.exists(state["temp_dir"]): for f in os.listdir(state["temp_dir"]): try: os.remove(os.path.join(state["temp_dir"], f)) except: pass path = os.path.join(state["temp_dir"], filename) if hasattr(content, "seek"): try: content.seek(0) except Exception: pass data = content.read() import inspect if inspect.iscoroutine(data): data = await data with open(path, "wb") as f: f.write(data) state["excel_path"] = path state["original_filename"] = filename ui.notify(f"文件已上传: {filename}", type="positive") # 更新文件显示区域 if refs["current_file_container"]: # refs["current_file_container"].set_visibility(True) refs["current_file_container"].clear() with refs["current_file_container"]: with ui.row().classes( "items-center w-full bg-blue-50 p-2 rounded border border-blue-200" ): ui.icon("description", color="primary").classes("text-xl mr-2") ui.label(filename).classes( "font-medium text-gray-700 flex-grow" ) ui.icon("check_circle", color="positive") # 加载数据 try: # 尝试解包 4 个返回值 (新版 main.py) ( state["turbines"], state["substation"], state["cable_specs"], state["system_params"], ) = load_data_from_excel(path) except ValueError: # 兼容旧版 (如果是 3 个返回值) state["turbines"], state["substation"], state["cable_specs"] = ( load_data_from_excel(path) ) state["system_params"] = {} update_info_panel() # 清空方案对比结果和拓扑可视化 state["results"] = [] if refs["results_table"]: refs["results_table"].rows = [] refs["results_table"].selected = [] if refs["plot_container"]: refs["plot_container"].clear() if refs["export_row"]: refs["export_row"].clear() # 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用) if refs["upload_widget"]: refs["upload_widget"].reset() except Exception as ex: ui.notify(f"上传处理失败: {ex}", type="negative") async def save_file_with_dialog( filename, callback, file_filter="All files (*.*)", sender=None ): """ 跨平台文件保存助手。 如果是原生模式,弹出系统保存对话框。 如果是浏览器模式(但在本地运行),尝试使用 tkinter 弹出对话框。 最后回退到使用 nicegui ui.download。 :param filename: 默认文件名 :param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None :param file_filter: 格式如 "Excel Files (*.xlsx)" :param sender: 触发该操作的 UI 组件,用于在操作期间禁用以防重复点击 """ if sender: sender.disable() try: # 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows) import platform import subprocess if platform.system() == "Windows": try: # 构建 PowerShell 脚本 # 注意:过滤器格式为 "描述|*.ext|所有文件|*.*" ps_filter = file_filter.replace("(", "|").replace(")", "") if "|" not in ps_filter: ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}" # 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx") # 这里做一个简化的映射,确保格式正确 if "Excel" in file_filter: ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*" elif "DXF" in file_filter: ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*" elif "ZIP" in file_filter: ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*" else: ps_filter = "All Files (*.*)|*.*" ps_script = f""" Add-Type -AssemblyName System.Windows.Forms $d = New-Object System.Windows.Forms.SaveFileDialog $d.Filter = "{ps_filter}" $d.FileName = "{filename}" $d.Title = "保存文件" if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{ Write-Output $d.FileName }} """ # 运行 PowerShell # 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁) startupinfo = subprocess.STARTUPINFO() startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW print("DEBUG: invoking PowerShell SaveFileDialog...") # 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环 # 这样按钮的禁用状态可以立即同步到前端 result = await run.io_bound( subprocess.run, ["powershell", "-Command", ps_script], capture_output=True, text=True, startupinfo=startupinfo, ) save_path = result.stdout.strip() if save_path: print(f"DEBUG: PowerShell returned path: {save_path}") await callback(save_path) ui.notify(f"文件已保存至: {save_path}", type="positive") return else: print("DEBUG: PowerShell dialog cancelled or empty result.") # 用户取消,直接返回,不回退 return except Exception as e: print(f"PowerShell dialog failed: {e}") # 出错则回退到 ui.download # 统一回退方案:浏览器下载 print("DEBUG: Using ui.download fallback") temp_path = os.path.join(state["temp_dir"], filename) await callback(temp_path) ui.download(temp_path) finally: if sender: sender.enable() def update_export_buttons(): if refs["export_row"]: refs["export_row"].clear() if not state["results"] or not refs["export_row"]: return # 获取文件名基础前缀 file_prefix = "wind_farm" default_excel_name = "wind_farm_design_result.xlsx" # 确定主对比 Excel 的生成路径 (对应 main.py 中的生成逻辑) if state.get("excel_path"): file_prefix = os.path.splitext(state["original_filename"])[0] default_excel_name = f"{file_prefix}_result.xlsx" # 这里的路径是 main.py 中生成的源文件路径,用于复制 source_excel_path = os.path.join( state["temp_dir"], f"{file_prefix}_design.xlsx" ) else: source_excel_path = "wind_farm_design.xlsx" # 寻找推荐方案 scenario1_results = [r for r in state["results"] if "Scenario 1" in r["name"]] if scenario1_results: best_res = min(scenario1_results, key=lambda x: x["cost"]) else: best_res = min(state["results"], key=lambda x: x["cost"]) with refs["export_row"]: # --- 下载 Excel --- async def save_excel(path): import shutil # 如果源文件存在,则复制到目标路径 if os.path.exists(source_excel_path): shutil.copy2(source_excel_path, path) else: # 如果不存在,重新生成 export_all_scenarios_to_excel(state["results"], path) async def on_click_excel(e): await save_file_with_dialog( default_excel_name, save_excel, "Excel Files (*.xlsx)", sender=e.sender, ) ui.button( "下载 Excel 对比表", on_click=on_click_excel, ).props("icon=download") # --- 导出推荐方案 DXF --- async def on_click_best_dxf(e): if state["substation"] is not None: safe_name = "".join( [ c for c in best_res["name"] if c.isalnum() or c in (" ", "-", "_") ] ).strip() default_name = f"{file_prefix}_best_{safe_name}.dxf" async def save_dxf(path): export_to_dxf( best_res["turbines"], state["substation"], best_res["eval"]["details"], path, ) await save_file_with_dialog( default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender ) else: ui.notify("缺少升压站数据,无法导出 DXF", type="negative") ui.button( f"导出推荐方案 DXF ({best_res['name']})", on_click=on_click_best_dxf ).props("icon=architecture color=accent") # --- 导出选中方案 DXF --- async def on_click_selected_dxf(e): if not refs["results_table"] or not refs["results_table"].selected: ui.notify("请先在上方表格中选择一个方案", type="warning") return selected_row = refs["results_table"].selected[0] row_name = selected_row.get("original_name", selected_row.get("name")) selected_res = next( (r for r in state["results"] if r["name"] == row_name), None ) if selected_res and state["substation"] is not None: safe_name = "".join( [ c for c in selected_res["name"] if c.isalnum() or c in (" ", "-", "_") ] ).strip() default_name = f"{file_prefix}_{safe_name}.dxf" async def save_dxf(path): export_to_dxf( selected_res["turbines"], state["substation"], selected_res["eval"]["details"], path, ) await save_file_with_dialog( default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender ) else: ui.notify( "无法导出:未找到方案数据或缺少升压站信息", type="negative" ) refs["export_selected_btn"] = ui.button( "导出选中方案 DXF", on_click=on_click_selected_dxf ).props("icon=architecture color=primary") clean_name = best_res["name"].replace("(推荐) ", "") refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})") # --- 导出全部 ZIP --- async def on_click_all_dxf(e): if not state["results"] or state["substation"] is None: ui.notify("无方案数据可导出", type="warning") return default_name = f"{file_prefix}_all_results.zip" async def save_zip(path): import zipfile excel_result_name = f"{file_prefix}_summary.xlsx" with zipfile.ZipFile(path, "w", zipfile.ZIP_DEFLATED) as zipf: # 1. Excel temp_excel = os.path.join(state["temp_dir"], "temp_export.xlsx") export_all_scenarios_to_excel(state["results"], temp_excel) zipf.write(temp_excel, arcname=excel_result_name) try: os.remove(temp_excel) except: pass # 2. DXFs for res in state["results"]: safe_name = "".join( [ c for c in res["name"] if c.isalnum() or c in (" ", "-", "_") ] ).strip() dxf_name = os.path.join( state["temp_dir"], f"{file_prefix}_{safe_name}.dxf" ) export_to_dxf( res["turbines"], state["substation"], res["eval"]["details"], dxf_name, ) zipf.write(dxf_name, arcname=os.path.basename(dxf_name)) try: os.remove(dxf_name) except: pass await save_file_with_dialog( default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender ) ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props( "icon=folder_zip color=secondary" ) # --- 导出计算日志 --- async def on_click_export_log(e): # 尝试多种方式获取日志内容 log_content = "" method_used = "unknown" # 方法1: 首先尝试从保存的日志内容获取 if refs.get("log_content") and refs["log_content"].strip(): log_content = refs["log_content"] method_used = "saved_memory" # 方法2: 如果保存的日志为空,尝试使用 JavaScript 获取 log 组件的内容 if not log_content.strip() and refs["log_box"]: try: log_id = refs["log_box"].id js_code = f""" (function() {{ const logElement = document.querySelector("#c{log_id}"); if (logElement) {{ console.log("Found log element:", logElement); return logElement.innerText || logElement.textContent || ""; }} console.log("Log element not found for ID: c{log_id}"); return ""; }})() """ result = await ui.run_javascript(js_code) if result and result.strip(): log_content = result method_used = "javascript" except Exception as js_error: print(f"JavaScript method failed: {js_error}") if not log_content.strip(): ui.notify( "没有可导出的日志内容。请先运行计算任务。", type="warning" ) print(f"Log export failed. Method tried: {method_used}") return print( f"Successfully exported log using method: {method_used}, length: {len(log_content)}" ) default_name = f"{file_prefix}_calculation_log.txt" async def save_log(path): with open(path, "w", encoding="utf-8") as f: f.write(log_content) await save_file_with_dialog( default_name, save_log, "Text Files (*.txt)", sender=e.sender ) ui.button("导出计算日志", on_click=on_click_export_log).props( "icon=description color=info" ) def update_plot(result): if refs["plot_container"]: refs["plot_container"].clear() with refs["plot_container"]: # 使用 ui.pyplot 上下文自动管理 figure 生命周期 with ui.pyplot(figsize=(10, 8)) as plot: title = f"{result['name']}\nCost: ¥{result['cost'] / 10000:.2f}万 | Loss: {result['loss']:.2f} kW" # 显式获取当前 ui.pyplot 创建的 axes,并传递给绘图函数 # 确保绘图发生在正确的 figure 上 ax = plt.gca() visualize_design( result["turbines"], state["substation"], result["eval"]["details"], title, ax=ax, ) async def handle_row_click(e): # 获取被点击行的数据 row = e.args[1] if len(e.args) > 1 else None if not row: return # 识别方案名称 row_name = row.get("original_name", row.get("name")) if not row_name: return selected_res = next( (r for r in state["results"] if r["name"] == row_name), None ) if selected_res: # 1. 更新拓扑图 update_plot(selected_res) ui.notify(f"已切换至方案: {selected_res['name']}") # 2. 通过设置 table 的 selected 属性来实现背景高亮 if refs["results_table"]: refs["results_table"].selected = [row] # 3. 更新“导出选中方案”按钮的文本 if refs["export_selected_btn"]: clean_name = row_name.replace("(推荐) ", "") refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})") import queue async def run_analysis(): if not state["excel_path"]: ui.notify("请先上传 Excel 坐标文件!", type="warning") return if refs["log_box"]: refs["log_box"].clear() # 重置日志内容 refs["log_content"] = "" log_queue = queue.Queue() # 获取开关状态 use_ga = refs["ga_switch"].value if refs["ga_switch"] else False use_mip = refs["mip_switch"].value if refs["mip_switch"] else False print(f"Switch values: GA={use_ga}, MIP={use_mip}") class QueueLogger(io.StringIO): def write(self, message): if message and message.strip(): log_queue.put(message.strip()) super().write(message) def process_log_queue(): if refs["log_box"]: new_msg = False while not log_queue.empty(): try: msg = log_queue.get_nowait() refs["log_box"].push(msg) # 同时保存到日志内容中 refs["log_content"] += msg + "\n" new_msg = True if msg.startswith("--- Scenario"): scenario_name = msg.replace("---", "").strip() if refs["status_label"]: refs[ "status_label" ].text = f"正在计算: {scenario_name}..." elif "开始比较电缆方案" in msg: if refs["status_label"]: refs["status_label"].text = "准备开始计算..." except queue.Empty: break if new_msg and refs["log_box"]: # 使用 JS 直接滚动 log 元素到最底部 # 增加一个小延时确保内容渲染完成 ui.run_javascript( f'var el = document.getElementById("c{refs["log_box"].id}"); if(el) {{ setTimeout(() => {{ el.scrollTop = el.scrollHeight; }}, 10); }}' ) log_timer = ui.timer(0.1, process_log_queue) if refs["status_label"]: refs["status_label"].text = "初始化中..." processing_dialog.open() try: # 2. 定义在线程中运行的任务 def task(): # 捕获 stdout 到我们的 QueueLogger # 禁止 main.py 中的后台绘图,避免线程安全问题 with contextlib.redirect_stdout(QueueLogger()): return compare_design_methods( excel_path=state["excel_path"], n_clusters_override=None, interactive=False, plot_results=False, use_ga=use_ga, use_mip=use_mip, ) # 在后台线程运行计算任务 results = await run.io_bound(task) state["results"] = results if not state["excel_path"] and results: if state["substation"] is None: _, state["substation"] = generate_wind_farm_data( n_turbines=30, layout="grid", spacing=800 ) # 计算完成后,自动寻找并显示最佳方案的拓扑图 best_res = None if results: # 默认推荐 Scenario 1 中成本最低的方案 scenario1_results = [r for r in results if "Scenario 1" in r["name"]] if scenario1_results: best_res = min(scenario1_results, key=lambda x: x["cost"]) else: # 如果没有 Scenario 1,则回退到全局最优 best_res = min(results, key=lambda x: x["cost"]) update_plot(best_res) ui.notify( f"计算完成!已自动加载推荐方案: {best_res['name']}", type="positive" ) # 更新结果表格 if refs["results_table"]: table_data = [] best_row = None for res in results: name_display = res["name"] is_best = False if best_res and res["name"] == best_res["name"]: name_display = f"(推荐) {name_display}" is_best = True # 生成备注信息 note = "" original_name = res["name"] # 识别算法 if "MST Method" in original_name: note += "最小生成树算法(无容量约束基准); " elif "Base" in original_name: note += "基础扇区扫描(单次扫描); " elif "Rotational" in original_name: note += "旋转扫描优化(全局最优角度); " elif "Esau-Williams" in original_name: note += "Esau-Williams启发式算法(权衡距离与容量); " # 识别电缆策略 if "Standard" in original_name: note += "不包含可选电缆型号。" elif "With Optional" in original_name: note += "含可选电缆型号。" elif "No Max" in original_name: note += "不包含可选电缆型号,且可使用的最大截面电缆降一档截面。" # 计算总长度(转换为公里) total_length_m = sum(d["length"] for d in res["eval"]["details"]) total_length_km = total_length_m / 1000 # 获取回路数 (通过统计从升压站发出的连接) n_circuits = sum( 1 for d in res["eval"]["details"] if d["source"] == "substation" or d["target"] == "substation" ) row_dict = { "name": name_display, "n_circuits": n_circuits, "cost_wan": f"{res['cost'] / 10000:.2f}", "loss_kw": f"{res['loss']:.2f}", "total_length": f"{total_length_km:.2f}", "npv_loss_wan": f"{res.get('npv_loss', 0) / 10000:.2f}", "total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}", "note": note, "original_name": res["name"], } table_data.append(row_dict) if is_best: best_row = row_dict refs["results_table"].rows = table_data # 初始选中推荐方案,实现自动高亮 if best_row: refs["results_table"].selected = [best_row] update_export_buttons() if refs["status_label"]: refs["status_label"].text = "计算完成!" except Exception as ex: ui.notify(f"运行出错: {ex}", type="negative") import traceback traceback.print_exc() finally: log_timer.cancel() process_log_queue() processing_dialog.close() with ui.dialog() as processing_dialog: with ui.card().classes("w-96 items-center justify-center p-6"): ui.label("正在计算方案...").classes("text-xl font-bold text-primary mb-2") ui.spinner(size="lg", color="primary") refs["status_label"] = ui.label("准备中...").classes( "mt-4 text-sm text-gray-700 font-medium" ) with ui.expansion("查看实时日志", icon="terminal", value=True).classes( "w-full mt-4 text-sm" ): # 直接控制 log 组件的样式和滚动,去除 scroll_area 中间层 refs["log_box"] = ui.log(max_lines=100).classes( "w-full h-32 overflow-y-auto p-2 bg-black text-xs font-mono text-green-400 leading-snug" ) processing_dialog.props("persistent") with ui.header().classes( "bg-primary text-white p-4 shadow-lg items-center no-wrap" ): with ui.column().classes("gap-0"): ui.label(f"海上风电场集电线路设计优化系统 {VERSION}").classes( "text-2xl font-bold" ) with ui.column().classes("gap-0"): ui.label("Wind Farm Collector System Design Optimizer").classes( "text-sm opacity-80" ) ui.space() ui.label("中能建西北院海上能源业务开发部").classes("text-xl font-bold") with ui.row().classes("w-full p-4 gap-4"): with ui.card().classes("w-full p-4 shadow-md"): ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2") # 使用 items-stretch 确保所有子元素高度一致 with ui.row().classes("w-full items-stretch gap-4"): # 1. 导出模板按钮 async def export_template(e): import shutil from generate_template import create_template async def save_template(path): # 生成模板到系统临时目录 temp_template = os.path.join( state["temp_dir"], "coordinates_template.xlsx" ) create_template(temp_template) if os.path.exists(temp_template): shutil.copy2(temp_template, path) try: os.remove(temp_template) except: pass else: raise FileNotFoundError("无法生成模板文件") await save_file_with_dialog( "coordinates.xlsx", save_template, "Excel Files (*.xlsx)", sender=e.sender, ) ui.button("导出 Excel 模板", on_click=export_template).classes( "flex-1 py-4" ).props("icon=file_download outline color=primary") # 2. 上传文件区域 (垂直堆叠 Label 和 Upload 组件) with ui.column().classes("flex-1 gap-0 justify-between"): # 使用 .no-list CSS 隐藏 Quasar 默认列表,完全自定义文件显示 refs["upload_widget"] = ( ui.upload( label="选择Excel文件", on_upload=handle_upload, auto_upload=True, ) .classes("w-full no-list h-full") .props("flat bordered color=primary") ) # 自定义文件显示容器 refs["current_file_container"] = ui.column().classes("w-full") # 初始状态不显示任何内容,直到选择文件后才显示 # with refs["current_file_container"]: # ui.label("未选择文件").classes("text-xs text-gray-500 italic ml-1") # 3. 运行按钮 refs["run_btn"] = ( ui.button( "运行方案对比", on_click=run_analysis, ) .classes("flex-1 py-4") .props("icon=play_arrow color=secondary") ) # 4. 遗传算法开关 with ui.column().classes("flex-1 gap-0 justify-center items-center"): refs["ga_switch"] = ui.switch("启用遗传算法", value=False).props( "color=orange" ) # 5. MIP开关 with ui.column().classes("flex-1 gap-0 justify-center items-center"): refs["mip_switch"] = ui.switch("启用MIP", value=False).props( "color=blue" ) with ui.column().classes("w-full gap-4"): # 新增:信息展示卡片 with ( ui.card() .classes("w-full p-4 shadow-md") .style("max-height: 400px; overflow-y: auto;") ): refs["info_container"] = ui.column().classes("w-full") with refs["info_container"]: ui.label("请上传 Excel 文件以查看系统参数和电缆规格...").classes( "text-gray-500 italic" ) with ui.card().classes("w-full p-0 shadow-md overflow-hidden"): with ( ui.expansion( "方案对比结果 (点击行查看拓扑详情)", icon="analytics", value=True, ) .classes("w-full") .props('header-class="text-xl font-semibold"') ): columns = [ { "name": "name", "label": "方案名称", "field": "name", "required": True, "align": "left", }, { "name": "n_circuits", "label": "回路数", "field": "n_circuits", "sortable": True, }, { "name": "cost_wan", "label": "总投资 (万元)", "field": "cost_wan", "sortable": True, }, { "name": "loss_kw", "label": "线损 (kW)", "field": "loss_kw", "sortable": True, }, { "name": "total_length", "label": "总长度/km", "field": "total_length", "sortable": True, }, { "name": "npv_loss_wan", "label": "损耗费用净现值 (万元)", "field": "npv_loss_wan", "sortable": True, }, { "name": "total_cost_npv_wan", "label": "总费用 (万元)", "field": "total_cost_npv_wan", "sortable": True, }, { "name": "note", "label": "备注", "field": "note", "align": "left", }, ] # 使用内置的 selection='single' 结合行点击事件实现背景高亮 # 这样可以完全由 Python 事件逻辑控制,不依赖 CSS 伪类 refs["results_table"] = ui.table( columns=columns, rows=[], selection="single", row_key="original_name", ).classes("w-full hide-selection-column") refs["results_table"].on("row-click", handle_row_click) with ui.card().classes("w-full p-4 shadow-md"): ui.label("拓扑可视化").classes("text-xl font-semibold mb-2") refs["plot_container"] = ui.column().classes("w-full items-center") with ui.card().classes("w-full p-4 shadow-md"): ui.label("导出与下载").classes("text-xl font-semibold mb-2") refs["export_row"] = ui.row().classes("gap-4") def find_available_port(start_port=8080, max_attempts=100): """尝试寻找可用的端口""" import socket for port in range(start_port, start_port + max_attempts): with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s: try: # 尝试绑定到 127.0.0.1,这是最常用的本地开发地址 s.bind(("127.0.0.1", port)) return port except OSError: continue return start_port # 默认返回起始端口,让 ui.run 报错 # 自动寻找可用端口,避免端口冲突 target_port = find_available_port(8082) # 从 8082 开始,避开常用的 8080 # 检测是否为打包后的exe程序 import sys if getattr(sys, "frozen", False): # 修复无控制台模式下 stdout/stderr 为 None 导致的 logging 错误 class NullWriter: def write(self, text): pass def flush(self): pass def isatty(self): return False if sys.stdout is None: sys.stdout = NullWriter() if sys.stderr is None: sys.stderr = NullWriter() # 打包环境下禁用日志配置,避免在无控制台模式下出现错误 import logging.config logging.config.dictConfig( { "version": 1, "disable_existing_loggers": True, } ) ui.run( title="海上风电场集电线路优化", host="127.0.0.1", port=target_port, reload=False, window_size=(1280, 800), native=True, ) else: # 普通使用环境保留日志功能 # ui.run( # title="海上风电场集电线路优化", # host="127.0.0.1", # reload=True, # port=target_port, # native=False, # ) ui.run( title="海上风电场集电线路优化", host="127.0.0.1", port=target_port, reload=True, window_size=(1280, 800), native=True, )