Compare commits

...

4 Commits

Author SHA1 Message Date
dmy
f2a960e789 feat: 优化回路数计算逻辑,提升报表准确性 2026-01-07 16:55:11 +08:00
dmy
87cea6ed86 feat: 优化文件保存对话框并增强系统稳定性
- 重新启用run.io_bound()调用,使用后台线程执行PowerShell脚本
- 为所有文件保存操作添加按钮防重复点击功能
- 新增win32_helper模块,提供Win32 API和COM接口的文件对话框
- 简化导出最佳方案DXF的代码结构
- 改进异步操作和错误处理机制
2026-01-07 12:47:58 +08:00
dmy
e0b5b0c3dc feat: 在方案对比表格中增加损耗费用净现值列
- 新增'损耗费用净现值 (万元)'列,显示生命周期内损耗费用的净现值
- 使用npv_loss字段替代annual_loss_cost,考虑折现率对全生命周期成本的影响
- 支持按损耗费用净现值排序,便于方案经济性对比
2026-01-07 11:27:29 +08:00
dmy
7aef58de1e fix: 修正损耗计算单位从瓦特(W)转换为千瓦(kW)
- 将evaluate_design函数中的损耗计算结果从W转换为kW
- loss_w变量存储三相损耗(W),loss_kw转换为kW后累加
- 确保total_loss返回值单位为kW,与后续经济性分析计算一致
2026-01-07 10:01:32 +08:00
4 changed files with 399 additions and 181 deletions

16
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File with Arguments",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"args": "--excel abc.xlsx"
},
]
}

208
gui.py
View File

@@ -10,7 +10,7 @@ matplotlib.use("Agg")
import matplotlib.backends.backend_svg import matplotlib.backends.backend_svg
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
from nicegui import app, events, ui from nicegui import app, events, ui, run
from main import ( from main import (
compare_design_methods, compare_design_methods,
@@ -347,7 +347,7 @@ def index():
except Exception as ex: except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative") ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)"): async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)", sender=None):
""" """
跨平台文件保存助手。 跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。 如果是原生模式,弹出系统保存对话框。
@@ -356,77 +356,84 @@ def index():
:param filename: 默认文件名 :param filename: 默认文件名
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None :param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
:param file_filter: 格式如 "Excel Files (*.xlsx)" :param file_filter: 格式如 "Excel Files (*.xlsx)"
:param sender: 触发该操作的 UI 组件,用于在操作期间禁用以防重复点击
""" """
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows) if sender:
# 这完全避免了 Python UI 库 (Tkinter/PyWebview) 的线程冲突和 PicklingError sender.disable()
import platform try:
import subprocess # 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
import platform
import subprocess
if platform.system() == "Windows": if platform.system() == "Windows":
try: try:
# 构建 PowerShell 脚本 # 构建 PowerShell 脚本
# 注意:过滤器格式为 "描述|*.ext|所有文件|*.*" # 注意:过滤器格式为 "描述|*.ext|所有文件|*.*"
ps_filter = file_filter.replace("(", "|").replace(")", "") ps_filter = file_filter.replace("(", "|").replace(")", "")
if "|" not in ps_filter: if "|" not in ps_filter:
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}" ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx") # 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
# 这里做一个简化的映射,确保格式正确 # 这里做一个简化的映射,确保格式正确
if "Excel" in file_filter: if "Excel" in file_filter:
ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*" ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*"
elif "DXF" in file_filter: elif "DXF" in file_filter:
ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*" ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*"
elif "ZIP" in file_filter: elif "ZIP" in file_filter:
ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*" ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*"
else: else:
ps_filter = "All Files (*.*)|*.*" ps_filter = "All Files (*.*)|*.*"
ps_script = f""" ps_script = f"""
Add-Type -AssemblyName System.Windows.Forms Add-Type -AssemblyName System.Windows.Forms
$d = New-Object System.Windows.Forms.SaveFileDialog $d = New-Object System.Windows.Forms.SaveFileDialog
$d.Filter = "{ps_filter}" $d.Filter = "{ps_filter}"
$d.FileName = "{filename}" $d.FileName = "{filename}"
$d.Title = "保存文件" $d.Title = "保存文件"
if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{ if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{
Write-Output $d.FileName Write-Output $d.FileName
}} }}
""" """
# 运行 PowerShell # 运行 PowerShell
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁) # 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
startupinfo = subprocess.STARTUPINFO() startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
print("DEBUG: invoking PowerShell SaveFileDialog...") print("DEBUG: invoking PowerShell SaveFileDialog...")
# 在 native 模式下直接同步执行,不使用 run.io_bound() # 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环
result = subprocess.run( # 这样按钮的禁用状态可以立即同步到前端
["powershell", "-Command", ps_script], result = await run.io_bound(
capture_output=True, subprocess.run,
text=True, ["powershell", "-Command", ps_script],
startupinfo=startupinfo capture_output=True,
) text=True,
save_path = result.stdout.strip() startupinfo=startupinfo
)
save_path = result.stdout.strip()
if save_path:
print(f"DEBUG: PowerShell returned path: {save_path}")
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
if save_path: except Exception as e:
print(f"DEBUG: PowerShell returned path: {save_path}") print(f"PowerShell dialog failed: {e}")
await callback(save_path) # 出错则回退到 ui.download
ui.notify(f"文件已保存至: {save_path}", type="positive")
return
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
except Exception as e: # 统一回退方案:浏览器下载
print(f"PowerShell dialog failed: {e}") print("DEBUG: Using ui.download fallback")
# 出错则回退到 ui.download temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path)
# 统一回退方案:浏览器下载 ui.download(temp_path)
print("DEBUG: Using ui.download fallback") finally:
temp_path = os.path.join(state["temp_dir"], filename) if sender:
await callback(temp_path) sender.enable()
ui.download(temp_path)
def update_export_buttons(): def update_export_buttons():
if refs["export_row"]: if refs["export_row"]:
@@ -468,9 +475,9 @@ def index():
# 如果不存在,重新生成 # 如果不存在,重新生成
export_all_scenarios_to_excel(state["results"], path) export_all_scenarios_to_excel(state["results"], path)
async def on_click_excel(): async def on_click_excel(e):
await save_file_with_dialog( await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)" default_excel_name, save_excel, "Excel Files (*.xlsx)", sender=e.sender
) )
ui.button( ui.button(
@@ -479,41 +486,7 @@ def index():
).props("icon=download") ).props("icon=download")
# --- 导出推荐方案 DXF --- # --- 导出推荐方案 DXF ---
def export_best_dxf(): async def on_click_best_dxf(e):
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
# 包装为 async 任务,并在 NiceGUI 事件循环中执行
async def run_save():
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
# 这里的 export_best_dxf 本身是普通函数,绑定到 on_click
# 但我们需要它执行异步操作。最简单的是让 export_best_dxf 变为 async
# 或者在这里直接调用 run_save (但这在普通函数里不行)
# 更好的方法是将 export_best_dxf 定义为 async如下所示
return run_save()
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
# 将 export_best_dxf 改为 async 并重命名,以便直接用作回调
async def on_click_best_dxf():
if state["substation"] is not None: if state["substation"] is not None:
safe_name = "".join( safe_name = "".join(
[ [
@@ -533,7 +506,7 @@ def index():
) )
await save_file_with_dialog( await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)" default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
) )
else: else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative") ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
@@ -543,7 +516,7 @@ def index():
).props("icon=architecture color=accent") ).props("icon=architecture color=accent")
# --- 导出选中方案 DXF --- # --- 导出选中方案 DXF ---
async def on_click_selected_dxf(): async def on_click_selected_dxf(e):
if not refs["results_table"] or not refs["results_table"].selected: if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning") ui.notify("请先在上方表格中选择一个方案", type="warning")
return return
@@ -573,7 +546,7 @@ def index():
) )
await save_file_with_dialog( await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)" default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
) )
else: else:
ui.notify( ui.notify(
@@ -588,7 +561,7 @@ def index():
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})") refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
# --- 导出全部 ZIP --- # --- 导出全部 ZIP ---
async def on_click_all_dxf(): async def on_click_all_dxf(e):
if not state["results"] or state["substation"] is None: if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning") ui.notify("无方案数据可导出", type="warning")
return return
@@ -634,7 +607,7 @@ def index():
except: except:
pass pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)") await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender)
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props( ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary" "icon=folder_zip color=secondary"
@@ -688,8 +661,6 @@ def index():
import queue import queue
from nicegui import run
async def run_analysis(): async def run_analysis():
if not state["excel_path"]: if not state["excel_path"]:
ui.notify("请先上传 Excel 坐标文件!", type="warning") ui.notify("请先上传 Excel 坐标文件!", type="warning")
@@ -810,11 +781,16 @@ def index():
total_length_m = sum(d["length"] for d in res["eval"]["details"]) total_length_m = sum(d["length"] for d in res["eval"]["details"])
total_length_km = total_length_m / 1000 total_length_km = total_length_m / 1000
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(1 for d in res["eval"]["details"] if d["source"] == "substation" or d["target"] == "substation")
row_dict = { row_dict = {
"name": name_display, "name": name_display,
"n_circuits": n_circuits,
"cost_wan": f"{res['cost'] / 10000:.2f}", "cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}", "loss_kw": f"{res['loss']:.2f}",
"total_length": f"{total_length_km:.2f}", "total_length": f"{total_length_km:.2f}",
"npv_loss_wan": f"{res.get('npv_loss', 0) / 10000:.2f}",
"total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}", "total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}",
"note": note, "note": note,
"original_name": res["name"], "original_name": res["name"],
@@ -878,7 +854,7 @@ def index():
# 使用 items-stretch 确保所有子元素高度一致 # 使用 items-stretch 确保所有子元素高度一致
with ui.row().classes("w-full items-stretch gap-4"): with ui.row().classes("w-full items-stretch gap-4"):
# 1. 导出模板按钮 # 1. 导出模板按钮
async def export_template(): async def export_template(e):
import shutil import shutil
from generate_template import create_template from generate_template import create_template
@@ -899,7 +875,7 @@ def index():
raise FileNotFoundError("无法生成模板文件") raise FileNotFoundError("无法生成模板文件")
await save_file_with_dialog( await save_file_with_dialog(
"coordinates.xlsx", save_template, "Excel Files (*.xlsx)" "coordinates.xlsx", save_template, "Excel Files (*.xlsx)", sender=e.sender
) )
ui.button("导出 Excel 模板", on_click=export_template).classes( ui.button("导出 Excel 模板", on_click=export_template).classes(
@@ -966,6 +942,12 @@ def index():
"required": True, "required": True,
"align": "left", "align": "left",
}, },
{
"name": "n_circuits",
"label": "回路数",
"field": "n_circuits",
"sortable": True,
},
{ {
"name": "cost_wan", "name": "cost_wan",
"label": "总投资 (万元)", "label": "总投资 (万元)",
@@ -984,6 +966,12 @@ def index():
"field": "total_length", "field": "total_length",
"sortable": True, "sortable": True,
}, },
{
"name": "npv_loss_wan",
"label": "损耗费用净现值 (万元)",
"field": "npv_loss_wan",
"sortable": True,
},
{ {
"name": "total_cost_npv_wan", "name": "total_cost_npv_wan",
"label": "总费用 (万元)", "label": "总费用 (万元)",

116
main.py
View File

@@ -655,38 +655,20 @@ def design_with_rotational_sweep(
return final_connections, turbines return final_connections, turbines
def get_max_cable_capacity_mw( # 4. 获取电缆最大容量(MW)
cable_specs=None, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR def get_max_cable_capacity_mw(cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR):
):
""" """
计算给定电缆规格中能够承载的最大功率 (单位: MW)。 根据电缆规格计算最大承载功率
:param cable_specs: 电缆规格列表 list of tuples或者直接是最大功率数值(MW)
基于提供的电缆规格列表,选取最大载流量,结合系统电压和功率因数计算理论最大传输功率。
参数:
cable_specs (list, optional): 电缆规格列表。每个元素应包含 (截面积, 额定电流, 单价, 损耗系数)。
voltage (float): 系统电压 (V), 默认 66000
power_factor (float): 功率因数, 默认 0.95
返回:
float: 最大功率承载能力 (MW)。
异常:
Exception: 当未提供 cable_specs 时抛出,提示截面不满足。
""" """
if cable_specs is None: # 如果传入的已经是数值,直接返回
# Default cable specs if not provided (same as in evaluate_design) if isinstance(cable_specs, (int, float, np.number)):
cable_specs = [ return float(cable_specs)
(35, 150, 0.524, 80),
(70, 215, 0.268, 120), # 兼容性检查:如果列表为空
(95, 260, 0.193, 150), if not cable_specs:
(120, 295, 0.153, 180), print("Warning: 没有可用的电缆规格,使用默认最大容量 100MW")
(150, 330, 0.124, 220), return 100.0
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600),
]
# 从所有电缆规格中找到最大的额定电流容量 # 从所有电缆规格中找到最大的额定电流容量
max_current_capacity = max(spec[1] for spec in cable_specs) max_current_capacity = max(spec[1] for spec in cable_specs)
@@ -858,8 +840,9 @@ def evaluate_design(
total_cost += cable["cost"] total_cost += cable["cost"]
# 计算I²R损耗 (简化版) # 计算I²R损耗 (简化版)
loss = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相 loss_w = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相单位W
total_loss += loss loss_kw = loss_w / 1000 # 转换为 kW
total_loss += loss_kw
return { return {
"total_cost": total_cost, "total_cost": total_cost,
@@ -1008,10 +991,12 @@ def export_to_excel(connections_details, filename):
df = pd.DataFrame(data) df = pd.DataFrame(data)
# 汇总统计 # 汇总统计
n_circuits = sum(1 for conn in connections_details if conn["source"] == "substation" or conn["target"] == "substation")
summary = { summary = {
"Total Cost (¥)": df["Cost (¥)"].sum(), "Total Cost (¥)": df["Cost (¥)"].sum(),
"Total Effective Length (m)": df["Effective Length (m)"].sum(), "Total Effective Length (m)": df["Effective Length (m)"].sum(),
"Total Vertical Length (m)": df["Vertical Length (m)"].sum(), "Total Vertical Length (m)": df["Vertical Length (m)"].sum(),
"Number of Circuits": n_circuits,
} }
summary_df = pd.DataFrame([summary]) summary_df = pd.DataFrame([summary])
@@ -1037,15 +1022,14 @@ def export_all_scenarios_to_excel(results, filename):
# 1. 总览 Sheet # 1. 总览 Sheet
summary_data = [] summary_data = []
for res in results: for res in results:
# 获取回路数 # 获取回路数 (通过统计从升压站发出的连接)
n_circuits = 0 n_circuits = sum(1 for conn in res["eval"]["details"] if conn["source"] == "substation" or conn["target"] == "substation")
if "turbines" in res and "cluster" in res["turbines"].columns:
n_circuits = res["turbines"]["cluster"].nunique()
summary_data.append( summary_data.append(
{ {
"Scenario": res["name"], "Scenario": res["name"],
"Total Cost (¥)": res["cost"], "Total Cost (¥)": res["cost"],
"总费用(万元)": res.get("total_cost_npv", res["cost"]) / 10000,
"Total Loss (kW)": res["loss"], "Total Loss (kW)": res["loss"],
"Num Circuits": n_circuits, "Num Circuits": n_circuits,
# 计算电缆统计 # 计算电缆统计
@@ -1412,14 +1396,10 @@ def compare_design_methods(
print(f" 最大电缆容量: {max_cable_mw:.2f} MW") print(f" 最大电缆容量: {max_cable_mw:.2f} MW")
# --- Run 1: Base Algorithm (Capacitated Sweep) --- # --- Run 1: Base Sector Sweep ---
base_name = f"{name} (Base)" base_name = f"{name} (Base)"
conns_base, turbines_base = design_with_capacitated_sweep( conns_base, turbines_base = design_with_capacitated_sweep(
turbines.copy(), turbines.copy(), substation, max_cable_mw, voltage=voltage
substation,
cable_specs=current_specs,
voltage=voltage,
power_factor=power_factor,
) )
eval_base = evaluate_design( eval_base = evaluate_design(
turbines, turbines,
@@ -1431,7 +1411,7 @@ def compare_design_methods(
voltage=voltage, voltage=voltage,
power_factor=power_factor, power_factor=power_factor,
) )
n_circuits_base = sum(1 for d in eval_base["details"] if d["source"] == "substation" or d["target"] == "substation")
comparison_results.append( comparison_results.append(
{ {
"name": base_name, "name": base_name,
@@ -1443,17 +1423,13 @@ def compare_design_methods(
} }
) )
print( print(
f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW" f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW | Circuits: {n_circuits_base}"
) )
# --- Run 2: Rotational Algorithm (Optimization) --- # --- Run 2: Rotational Sweep (Optimization) ---
rot_name = f"{name} (Rotational)" rot_name = f"{name} (Rotational)"
conns_rot, turbines_rot = design_with_rotational_sweep( conns_rot, turbines_rot = design_with_rotational_sweep(
turbines.copy(), turbines.copy(), substation, max_cable_mw, voltage=voltage
substation,
cable_specs=current_specs,
voltage=voltage,
power_factor=power_factor,
) )
eval_rot = evaluate_design( eval_rot = evaluate_design(
turbines, turbines,
@@ -1465,7 +1441,7 @@ def compare_design_methods(
voltage=voltage, voltage=voltage,
power_factor=power_factor, power_factor=power_factor,
) )
n_circuits_rot = sum(1 for d in eval_rot["details"] if d["source"] == "substation" or d["target"] == "substation")
comparison_results.append( comparison_results.append(
{ {
"name": rot_name, "name": rot_name,
@@ -1477,7 +1453,7 @@ def compare_design_methods(
} }
) )
print( print(
f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW" f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW | Circuits: {n_circuits_rot}"
) )
# --- Run 3: Esau-Williams Algorithm --- # --- Run 3: Esau-Williams Algorithm ---
@@ -1495,7 +1471,7 @@ def compare_design_methods(
voltage=voltage, voltage=voltage,
power_factor=power_factor, power_factor=power_factor,
) )
n_circuits_ew = sum(1 for d in eval_ew["details"] if d["source"] == "substation" or d["target"] == "substation")
comparison_results.append( comparison_results.append(
{ {
"name": ew_name, "name": ew_name,
@@ -1507,7 +1483,7 @@ def compare_design_methods(
} }
) )
print( print(
f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW" f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW | Circuits: {n_circuits_ew}"
) )
# 记录最佳 # 记录最佳
@@ -1524,7 +1500,7 @@ def compare_design_methods(
# 可视化 (只画 Base 版本) # 可视化 (只画 Base 版本)
ax_idx = i + 1 ax_idx = i + 1
if plot_results and ax_idx < 4: if plot_results and ax_idx < 4:
n_circuits = turbines_base["cluster"].nunique() n_circuits = sum(1 for d in eval_base["details"] if d["source"] == "substation" or d["target"] == "substation")
title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}" title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}"
visualize_design( visualize_design(
turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx] turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx]
@@ -1564,7 +1540,11 @@ def compare_design_methods(
for i, res in enumerate(comparison_results): for i, res in enumerate(comparison_results):
if res["cost"] < comparison_results[best_idx]["cost"]: if res["cost"] < comparison_results[best_idx]["cost"]:
best_idx = i best_idx = i
print(f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f}")
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(1 for conn in res["eval"]["details"] if conn["source"] == "substation" or conn["target"] == "substation")
print(f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}")
print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)") print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)")
@@ -1680,21 +1660,23 @@ def total_investment(results, system_params):
更新后的results列表每个结果新增 'total_cost_npv' 字段(总费用净现值,元) 更新后的results列表每个结果新增 'total_cost_npv' 字段(总费用净现值,元)
""" """
# 获取系统参数,使用默认值 # 获取系统参数,使用默认值
discount_rate_percent = system_params.get('discount_rate', DISCOUNT_RATE) discount_rate_percent = system_params.get("discount_rate", DISCOUNT_RATE)
electricity_price = system_params.get('electricity_price', ELECTRICITY_PRICE) electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE)
project_lifetime = system_params.get('project_lifetime', PROJECT_LIFETIME) project_lifetime = system_params.get("project_lifetime", PROJECT_LIFETIME)
annual_loss_hours = system_params.get('annual_loss_hours', ANNUAL_LOSS_HOURS) annual_loss_hours = system_params.get("annual_loss_hours", ANNUAL_LOSS_HOURS)
# 将折现率转换为小数 # 将折现率转换为小数
r = discount_rate_percent / 100.0 r = discount_rate_percent / 100.0
for result in results: for result in results:
cable_cost = result['cost'] # 电缆总投资(元) cable_cost = result["cost"] # 电缆总投资(元)
loss_power = result['loss'] # 线损功率kW loss_power = result["loss"] # 线损功率kW
# 1. 计算电缆投资的净现值2年分期 # 1. 计算电缆投资的净现值2年分期
# 第1年支付50%第2年支付50% # 第1年支付50%第2年支付50%
npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / ((1 + r) ** 2) npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / (
(1 + r) ** 2
)
# 2. 计算电费损耗的净现值(生命周期内) # 2. 计算电费损耗的净现值(生命周期内)
# 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh) # 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh)
@@ -1713,10 +1695,10 @@ def total_investment(results, system_params):
total_cost_npv = npv_cable + npv_loss total_cost_npv = npv_cable + npv_loss
# 将结果添加到字典中 # 将结果添加到字典中
result['total_cost_npv'] = total_cost_npv result["total_cost_npv"] = total_cost_npv
result['npv_cable'] = npv_cable result["npv_cable"] = npv_cable
result['npv_loss'] = npv_loss result["npv_loss"] = npv_loss
result['annual_loss_cost'] = annual_loss_cost result["annual_loss_cost"] = annual_loss_cost
return results return results

232
win32_helper.py Normal file
View File

@@ -0,0 +1,232 @@
import ctypes
import ctypes.wintypes
import os
def show_save_dialog_win32():
"""
使用 ctypes 直接调用 Windows API (GetSaveFileNameW)
不需要子进程,可以在线程中运行 (run.io_bound)
"""
try:
# 定义 OPENFILENAME 结构体
class OPENFILENAME(ctypes.Structure):
_fields_ = [
("lStructSize", ctypes.wintypes.DWORD),
("hwndOwner", ctypes.wintypes.HWND),
("hInstance", ctypes.wintypes.HINSTANCE),
("lpstrFilter", ctypes.wintypes.LPCWSTR),
("lpstrCustomFilter", ctypes.wintypes.LPWSTR),
("nMaxCustFilter", ctypes.wintypes.DWORD),
("nFilterIndex", ctypes.wintypes.DWORD),
("lpstrFile", ctypes.wintypes.LPWSTR),
("nMaxFile", ctypes.wintypes.DWORD),
("lpstrFileTitle", ctypes.wintypes.LPWSTR),
("nMaxFileTitle", ctypes.wintypes.DWORD),
("lpstrInitialDir", ctypes.wintypes.LPCWSTR),
("lpstrTitle", ctypes.wintypes.LPCWSTR),
("Flags", ctypes.wintypes.DWORD),
("nFileOffset", ctypes.wintypes.WORD),
("nFileExtension", ctypes.wintypes.WORD),
("lpstrDefExt", ctypes.wintypes.LPCWSTR),
("lCustData", ctypes.wintypes.LPARAM),
("lpfnHook", ctypes.wintypes.LPVOID),
("lpTemplateName", ctypes.wintypes.LPCWSTR),
# 还有更多字段,但这通常足够了
# ("pvReserved", ctypes.wintypes.LPVOID),
# ("dwReserved", ctypes.wintypes.DWORD),
# ("FlagsEx", ctypes.wintypes.DWORD),
]
# 准备缓冲区
filename_buffer = ctypes.create_unicode_buffer(260) # MAX_PATH
# 设置初始文件名
filename_buffer.value = "win32_save.xlsx"
# 准备过滤器 (用 \0 分隔)
# 格式: "描述\0模式\0描述\0模式\0\0"
filter_str = "Excel Files (*.xlsx)\0*.xlsx\0All Files (*.*)\0*.*\0\0"
ofn = OPENFILENAME()
ofn.lStructSize = ctypes.sizeof(OPENFILENAME)
ofn.hwndOwner = 0 # NULL
ofn.lpstrFilter = filter_str
ofn.lpstrFile = ctypes.cast(filename_buffer, ctypes.wintypes.LPWSTR)
ofn.nMaxFile = 260
ofn.lpstrDefExt = "xlsx"
ofn.lpstrTitle = "保存文件 (Win32 API)"
# OFN_OVERWRITEPROMPT | OFN_PATHMUSTEXIST | OFN_NOCHANGEDIR
ofn.Flags = 0x00000002 | 0x00000800 | 0x00000008
comdlg32 = ctypes.windll.comdlg32
# 调用 API
# GetSaveFileNameW 返回非零值表示成功
if comdlg32.GetSaveFileNameW(ctypes.byref(ofn)):
return filename_buffer.value
else:
return None
except Exception as e:
print(f"Win32 API Error: {e}")
return None
def show_save_dialog_com():
"""
使用 COM 接口 IFileSaveDialog (Windows Vista+)
提供更现代化的文件保存对话框,支持更多功能
"""
try:
import ctypes
import ctypes.wintypes
import uuid
# 定义必要的常量
CLSCTX_INPROC_SERVER = 1
S_OK = 0
FOS_OVERWRITEPROMPT = 0x00000002
FOS_PATHMUSTEXIST = 0x00000800
FOS_NOCHANGEDIR = 0x00000008
SIGDN_FILESYSPATH = 0x80058000
# IFileSaveDialog 的 CLSID 和 IID
CLSID_FileSaveDialog = uuid.UUID("{C0B4E2F3-BA21-4773-8DBA-335EC946EB8B}")
IID_IFileSaveDialog = uuid.UUID("{84bccd23-5fde-4cdb-aea4-af64b83d78ab}")
IID_IShellItem = uuid.UUID("{43826d1e-e718-42ee-bc55-a1e261c37bfe}")
# 加载 ole32.dll
ole32 = ctypes.windll.ole32
# CoInitialize
ole32.CoInitialize(None)
# CoCreateInstance
p_dialog = ctypes.c_void_p()
hr = ole32.CoCreateInstance(
ctypes.byref(CLSID_FileSaveDialog),
None,
CLSCTX_INPROC_SERVER,
ctypes.byref(IID_IFileSaveDialog),
ctypes.byref(p_dialog)
)
if hr != S_OK:
print(f"CoCreateInstance failed: {hr}")
return None
# 定义 IFileSaveDialog 的 vtable 方法
# 我们只需要调用 Show, GetResult, SetOptions, SetFileName, SetDefaultExtension, SetFileTypeIndex
# 这些方法在 IFileOpenDialog 基类中定义
# SetOptions
class IFileSaveDialogVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
# IModalWindow
("Show", ctypes.c_void_p),
# IFileDialog
("SetFileTypes", ctypes.c_void_p),
("SetFileTypeIndex", ctypes.c_void_p),
("GetFileTypeIndex", ctypes.c_void_p),
("Advise", ctypes.c_void_p),
("Unadvise", ctypes.c_void_p),
("SetOptions", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong)),
("GetOptions", ctypes.c_void_p),
("SetDefaultFolder", ctypes.c_void_p),
("SetFolder", ctypes.c_void_p),
("GetFolder", ctypes.c_void_p),
("GetCurrentSelection", ctypes.c_void_p),
("SetFileName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("GetFileName", ctypes.c_void_p),
("SetTitle", ctypes.c_void_p),
("SetOkButtonLabel", ctypes.c_void_p),
("SetFileNameLabel", ctypes.c_void_p),
("GetResult", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p))),
("AddPlace", ctypes.c_void_p),
("SetDefaultExtension", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("Close", ctypes.c_void_p),
("SetClientGuid", ctypes.c_void_p),
("ClearClientData", ctypes.c_void_p),
("SetFilter", ctypes.c_void_p),
]
# 获取 vtable
vtable = ctypes.cast(p_dialog, ctypes.POINTER(ctypes.POINTER(IFileSaveDialogVtbl))).contents.contents
# 调用 SetOptions
hr = vtable.SetOptions(p_dialog, FOS_OVERWRITEPROMPT | FOS_PATHMUSTEXIST | FOS_NOCHANGEDIR)
if hr != S_OK:
print(f"SetOptions failed: {hr}")
return None
# 调用 SetFileName
hr = vtable.SetFileName(p_dialog, "com_save.xlsx")
if hr != S_OK:
print(f"SetFileName failed: {hr}")
return None
# 调用 SetDefaultExtension
hr = vtable.SetDefaultExtension(p_dialog, "xlsx")
if hr != S_OK:
print(f"SetDefaultExtension failed: {hr}")
return None
# 调用 SetFileTypeIndex
hr = vtable.SetFileTypeIndex(p_dialog, 1)
if hr != S_OK:
print(f"SetFileTypeIndex failed: {hr}")
return None
# 调用 Show
hr = vtable.Show(p_dialog, 0) # 0 表示没有父窗口
if hr != S_OK:
# 用户取消
return None
# 调用 GetResult
p_result = ctypes.c_void_p()
hr = vtable.GetResult(p_dialog, ctypes.byref(p_result))
if hr != S_OK:
print(f"GetResult failed: {hr}")
return None
# 定义 IShellItem 接口
class IShellItemVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
("BindToHandler", ctypes.c_void_p),
("GetParent", ctypes.c_void_p),
("GetDisplayName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_wchar_p))),
("GetAttributes", ctypes.c_void_p),
("Compare", ctypes.c_void_p),
]
# 获取 IShellItem 的 vtable
result_vtable = ctypes.cast(p_result, ctypes.POINTER(ctypes.POINTER(IShellItemVtbl))).contents.contents
# 调用 GetDisplayName
p_display_name = ctypes.c_wchar_p()
hr = result_vtable.GetDisplayName(p_result, SIGDN_FILESYSPATH, ctypes.byref(p_display_name))
if hr != S_OK:
print(f"GetDisplayName failed: {hr}")
return None
filepath = p_display_name.value
# 清理
ole32.CoUninitialize()
return filepath
except ImportError as e:
print(f"COM Error: {e}")
return None
except Exception as e:
print(f"COM Error: {e}")
import traceback
traceback.print_exc()
return None