Compare commits

...

20 Commits

Author SHA1 Message Date
dmy
2b3ab83d91 Fix substation coordinate handling for cable crossing detection 2026-01-08 16:05:06 +08:00
dmy
68df76702e Fix cable crossing detection coordinate unpacking error 2026-01-08 15:53:15 +08:00
dmy
b3a4513f94 Fix MIP solver variable duplication and function structure 2026-01-08 15:30:36 +08:00
dmy
04a5e19451 Improve MIP optimization and add log export feature 2026-01-08 15:08:04 +08:00
dmy
ebd5883dbf Fix unreachable code in design_with_pyomo function 2026-01-08 13:06:26 +08:00
dmy
41ac6f3963 Change MIP objective function to minimize total investment 2026-01-08 13:01:36 +08:00
dmy
09b2ada5df Add debug prints to check MIP toggle functionality 2026-01-08 12:58:15 +08:00
dmy
6441ddc059 Fix MIP import issue: move design_with_mst function outside __main__ protection block 2026-01-08 12:39:07 +08:00
dmy
2f095df12e Fix MIP algorithm: simplify model formulation and add detailed debugging 2026-01-08 10:28:35 +08:00
dmy
a3837a6707 Rewrite MIP model formulation and add comprehensive debugging 2026-01-08 10:22:39 +08:00
dmy
886fba4d15 Clear comparison results and topology visualization on new file upload 2026-01-08 10:10:46 +08:00
dmy
397ca8847e Fix MIP fallback return values: ensure consistent unpacking 2026-01-08 10:06:46 +08:00
dmy
6ad11a9b69 Fix MIP model: make objective function linear to avoid multiplication error 2026-01-08 10:03:49 +08:00
dmy
579f8866c4 Fix MIP toggle bug: handle PuLP import gracefully 2026-01-08 10:01:46 +08:00
dmy
4230d2221d Add MIP module for collector layout optimization 2026-01-08 09:54:40 +08:00
dmy
46e929bfce Implement genetic algorithm for collector layout optimization 2026-01-08 09:46:00 +08:00
dmy
f2a960e789 feat: 优化回路数计算逻辑,提升报表准确性 2026-01-07 16:55:11 +08:00
dmy
87cea6ed86 feat: 优化文件保存对话框并增强系统稳定性
- 重新启用run.io_bound()调用,使用后台线程执行PowerShell脚本
- 为所有文件保存操作添加按钮防重复点击功能
- 新增win32_helper模块,提供Win32 API和COM接口的文件对话框
- 简化导出最佳方案DXF的代码结构
- 改进异步操作和错误处理机制
2026-01-07 12:47:58 +08:00
dmy
e0b5b0c3dc feat: 在方案对比表格中增加损耗费用净现值列
- 新增'损耗费用净现值 (万元)'列,显示生命周期内损耗费用的净现值
- 使用npv_loss字段替代annual_loss_cost,考虑折现率对全生命周期成本的影响
- 支持按损耗费用净现值排序,便于方案经济性对比
2026-01-07 11:27:29 +08:00
dmy
7aef58de1e fix: 修正损耗计算单位从瓦特(W)转换为千瓦(kW)
- 将evaluate_design函数中的损耗计算结果从W转换为kW
- loss_w变量存储三相损耗(W),loss_kw转换为kW后累加
- 确保total_loss返回值单位为kW,与后续经济性分析计算一致
2026-01-07 10:01:32 +08:00
10 changed files with 1696 additions and 179 deletions

16
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File with Arguments",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"args": "--excel abc.xlsx"
},
]
}

193
ga.py Normal file
View File

@@ -0,0 +1,193 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
def design_with_ga(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
pop_size=50,
generations=50,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用遗传算法优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param pop_size: 种群大小
:param generations: 迭代代数
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_func(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params and total_invest_func:
res_list = total_invest_func(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines

314
gui.py
View File

@@ -10,7 +10,7 @@ matplotlib.use("Agg")
import matplotlib.backends.backend_svg
import matplotlib.pyplot as plt
import pandas as pd
from nicegui import app, events, ui
from nicegui import app, events, ui, run
from main import (
compare_design_methods,
@@ -90,6 +90,9 @@ def index():
"run_btn": None,
"current_file_container": None, # 替换 label 为 container
"info_container": None, # 新增信息展示容器
"ga_switch": None, # 遗传算法开关
"mip_switch": None, # MIP开关
"log_content": "", # 存储计算日志内容
}
def update_info_panel():
@@ -340,6 +343,16 @@ def index():
update_info_panel()
# 清空方案对比结果和拓扑可视化
state["results"] = []
if refs["results_table"]:
refs["results_table"].rows = []
refs["results_table"].selected = []
if refs["plot_container"]:
refs["plot_container"].clear()
if refs["export_row"]:
refs["export_row"].clear()
# 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用)
if refs["upload_widget"]:
refs["upload_widget"].reset()
@@ -347,7 +360,9 @@ def index():
except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)"):
async def save_file_with_dialog(
filename, callback, file_filter="All files (*.*)", sender=None
):
"""
跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。
@@ -356,77 +371,84 @@ def index():
:param filename: 默认文件名
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
:param file_filter: 格式如 "Excel Files (*.xlsx)"
:param sender: 触发该操作的 UI 组件,用于在操作期间禁用以防重复点击
"""
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
# 这完全避免了 Python UI 库 (Tkinter/PyWebview) 的线程冲突和 PicklingError
import platform
import subprocess
if sender:
sender.disable()
try:
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
import platform
import subprocess
if platform.system() == "Windows":
try:
# 构建 PowerShell 脚本
# 注意:过滤器格式为 "描述|*.ext|所有文件|*.*"
ps_filter = file_filter.replace("(", "|").replace(")", "")
if "|" not in ps_filter:
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
if platform.system() == "Windows":
try:
# 构建 PowerShell 脚本
# 注意:过滤器格式为 "描述|*.ext|所有文件|*.*"
ps_filter = file_filter.replace("(", "|").replace(")", "")
if "|" not in ps_filter:
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
# 这里做一个简化的映射,确保格式正确
if "Excel" in file_filter:
ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*"
elif "DXF" in file_filter:
ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*"
elif "ZIP" in file_filter:
ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*"
else:
ps_filter = "All Files (*.*)|*.*"
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
# 这里做一个简化的映射,确保格式正确
if "Excel" in file_filter:
ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*"
elif "DXF" in file_filter:
ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*"
elif "ZIP" in file_filter:
ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*"
else:
ps_filter = "All Files (*.*)|*.*"
ps_script = f"""
Add-Type -AssemblyName System.Windows.Forms
$d = New-Object System.Windows.Forms.SaveFileDialog
$d.Filter = "{ps_filter}"
$d.FileName = "{filename}"
$d.Title = "保存文件"
if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{
Write-Output $d.FileName
}}
"""
ps_script = f"""
Add-Type -AssemblyName System.Windows.Forms
$d = New-Object System.Windows.Forms.SaveFileDialog
$d.Filter = "{ps_filter}"
$d.FileName = "{filename}"
$d.Title = "保存文件"
if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{
Write-Output $d.FileName
}}
"""
# 运行 PowerShell
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
# 运行 PowerShell
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
print("DEBUG: invoking PowerShell SaveFileDialog...")
print("DEBUG: invoking PowerShell SaveFileDialog...")
# 在 native 模式下直接同步执行,不使用 run.io_bound()
result = subprocess.run(
["powershell", "-Command", ps_script],
capture_output=True,
text=True,
startupinfo=startupinfo
)
save_path = result.stdout.strip()
# 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环
# 这样按钮的禁用状态可以立即同步到前端
result = await run.io_bound(
subprocess.run,
["powershell", "-Command", ps_script],
capture_output=True,
text=True,
startupinfo=startupinfo,
)
save_path = result.stdout.strip()
if save_path:
print(f"DEBUG: PowerShell returned path: {save_path}")
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
if save_path:
print(f"DEBUG: PowerShell returned path: {save_path}")
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
except Exception as e:
print(f"PowerShell dialog failed: {e}")
# 出错则回退到 ui.download
except Exception as e:
print(f"PowerShell dialog failed: {e}")
# 出错则回退到 ui.download
# 统一回退方案:浏览器下载
print("DEBUG: Using ui.download fallback")
temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path)
ui.download(temp_path)
# 统一回退方案:浏览器下载
print("DEBUG: Using ui.download fallback")
temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path)
ui.download(temp_path)
finally:
if sender:
sender.enable()
def update_export_buttons():
if refs["export_row"]:
@@ -468,9 +490,12 @@ def index():
# 如果不存在,重新生成
export_all_scenarios_to_excel(state["results"], path)
async def on_click_excel():
async def on_click_excel(e):
await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)"
default_excel_name,
save_excel,
"Excel Files (*.xlsx)",
sender=e.sender,
)
ui.button(
@@ -479,41 +504,7 @@ def index():
).props("icon=download")
# --- 导出推荐方案 DXF ---
def export_best_dxf():
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
# 包装为 async 任务,并在 NiceGUI 事件循环中执行
async def run_save():
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
# 这里的 export_best_dxf 本身是普通函数,绑定到 on_click
# 但我们需要它执行异步操作。最简单的是让 export_best_dxf 变为 async
# 或者在这里直接调用 run_save (但这在普通函数里不行)
# 更好的方法是将 export_best_dxf 定义为 async如下所示
return run_save()
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
# 将 export_best_dxf 改为 async 并重命名,以便直接用作回调
async def on_click_best_dxf():
async def on_click_best_dxf(e):
if state["substation"] is not None:
safe_name = "".join(
[
@@ -533,7 +524,7 @@ def index():
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
)
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
@@ -543,7 +534,7 @@ def index():
).props("icon=architecture color=accent")
# --- 导出选中方案 DXF ---
async def on_click_selected_dxf():
async def on_click_selected_dxf(e):
if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning")
return
@@ -573,7 +564,7 @@ def index():
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
)
else:
ui.notify(
@@ -588,7 +579,7 @@ def index():
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
# --- 导出全部 ZIP ---
async def on_click_all_dxf():
async def on_click_all_dxf(e):
if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning")
return
@@ -634,12 +625,71 @@ def index():
except:
pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)")
await save_file_with_dialog(
default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender
)
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary"
)
# --- 导出计算日志 ---
async def on_click_export_log(e):
# 尝试多种方式获取日志内容
log_content = ""
method_used = "unknown"
# 方法1: 首先尝试从保存的日志内容获取
if refs.get("log_content") and refs["log_content"].strip():
log_content = refs["log_content"]
method_used = "saved_memory"
# 方法2: 如果保存的日志为空,尝试使用 JavaScript 获取 log 组件的内容
if not log_content.strip() and refs["log_box"]:
try:
log_id = refs["log_box"].id
js_code = f"""
(function() {{
const logElement = document.querySelector("#c{log_id}");
if (logElement) {{
console.log("Found log element:", logElement);
return logElement.innerText || logElement.textContent || "";
}}
console.log("Log element not found for ID: c{log_id}");
return "";
}})()
"""
result = await ui.run_javascript(js_code)
if result and result.strip():
log_content = result
method_used = "javascript"
except Exception as js_error:
print(f"JavaScript method failed: {js_error}")
if not log_content.strip():
ui.notify(
"没有可导出的日志内容。请先运行计算任务。", type="warning"
)
print(f"Log export failed. Method tried: {method_used}")
return
print(
f"Successfully exported log using method: {method_used}, length: {len(log_content)}"
)
default_name = f"{file_prefix}_calculation_log.txt"
async def save_log(path):
with open(path, "w", encoding="utf-8") as f:
f.write(log_content)
await save_file_with_dialog(
default_name, save_log, "Text Files (*.txt)", sender=e.sender
)
ui.button("导出计算日志", on_click=on_click_export_log).props(
"icon=description color=info"
)
def update_plot(result):
if refs["plot_container"]:
refs["plot_container"].clear()
@@ -688,16 +738,21 @@ def index():
import queue
from nicegui import run
async def run_analysis():
if not state["excel_path"]:
ui.notify("请先上传 Excel 坐标文件!", type="warning")
return
if refs["log_box"]:
refs["log_box"].clear()
# 重置日志内容
refs["log_content"] = ""
log_queue = queue.Queue()
# 获取开关状态
use_ga = refs["ga_switch"].value if refs["ga_switch"] else False
use_mip = refs["mip_switch"].value if refs["mip_switch"] else False
print(f"Switch values: GA={use_ga}, MIP={use_mip}")
class QueueLogger(io.StringIO):
def write(self, message):
if message and message.strip():
@@ -711,6 +766,8 @@ def index():
try:
msg = log_queue.get_nowait()
refs["log_box"].push(msg)
# 同时保存到日志内容中
refs["log_content"] += msg + "\n"
new_msg = True
if msg.startswith("--- Scenario"):
scenario_name = msg.replace("---", "").strip()
@@ -745,6 +802,8 @@ def index():
n_clusters_override=None,
interactive=False,
plot_results=False,
use_ga=use_ga,
use_mip=use_mip,
)
# 在后台线程运行计算任务
@@ -810,11 +869,20 @@ def index():
total_length_m = sum(d["length"] for d in res["eval"]["details"])
total_length_km = total_length_m / 1000
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for d in res["eval"]["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
row_dict = {
"name": name_display,
"n_circuits": n_circuits,
"cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}",
"total_length": f"{total_length_km:.2f}",
"npv_loss_wan": f"{res.get('npv_loss', 0) / 10000:.2f}",
"total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}",
"note": note,
"original_name": res["name"],
@@ -878,7 +946,7 @@ def index():
# 使用 items-stretch 确保所有子元素高度一致
with ui.row().classes("w-full items-stretch gap-4"):
# 1. 导出模板按钮
async def export_template():
async def export_template(e):
import shutil
from generate_template import create_template
@@ -899,7 +967,10 @@ def index():
raise FileNotFoundError("无法生成模板文件")
await save_file_with_dialog(
"coordinates.xlsx", save_template, "Excel Files (*.xlsx)"
"coordinates.xlsx",
save_template,
"Excel Files (*.xlsx)",
sender=e.sender,
)
ui.button("导出 Excel 模板", on_click=export_template).classes(
@@ -934,6 +1005,17 @@ def index():
.classes("flex-1 py-4")
.props("icon=play_arrow color=secondary")
)
# 4. 遗传算法开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["ga_switch"] = ui.switch("启用遗传算法", value=False).props(
"color=orange"
)
# 5. MIP开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["mip_switch"] = ui.switch("启用MIP", value=False).props(
"color=blue"
)
with ui.column().classes("w-full gap-4"):
# 新增:信息展示卡片
@@ -966,6 +1048,12 @@ def index():
"required": True,
"align": "left",
},
{
"name": "n_circuits",
"label": "回路数",
"field": "n_circuits",
"sortable": True,
},
{
"name": "cost_wan",
"label": "总投资 (万元)",
@@ -984,6 +1072,12 @@ def index():
"field": "total_length",
"sortable": True,
},
{
"name": "npv_loss_wan",
"label": "损耗费用净现值 (万元)",
"field": "npv_loss_wan",
"sortable": True,
},
{
"name": "total_cost_npv_wan",
"label": "总费用 (万元)",

394
main.py
View File

@@ -1,6 +1,7 @@
import argparse
import math
import os
import random
from collections import defaultdict
import matplotlib.pyplot as plt
@@ -12,6 +13,8 @@ from scipy.spatial import distance_matrix
from sklearn.cluster import KMeans
from esau_williams import design_with_esau_williams
from ga import design_with_ga
from mip import design_with_mip
# 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
@@ -28,6 +31,7 @@ ANNUAL_LOSS_HOURS = 1400 # 年损耗小时数(小时)
# 1. 生成风电场数据(实际应用中替换为真实坐标)
def generate_wind_farm_data(n_turbines=30, seed=42, layout="random", spacing=800):
pass # 实际应用中从Excel读取真实坐标此函数保留用于测试
"""
生成模拟风电场数据
:param layout: 'random' (随机) 或 'grid' (规则行列)
@@ -654,39 +658,175 @@ def design_with_rotational_sweep(
return final_connections, turbines
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_design(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params:
res_list = total_investment(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines
# 4. 获取电缆最大容量(MW)
def get_max_cable_capacity_mw(
cable_specs=None, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR
cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR
):
"""
计算给定电缆规格中能够承载的最大功率 (单位: MW)。
基于提供的电缆规格列表,选取最大载流量,结合系统电压和功率因数计算理论最大传输功率。
参数:
cable_specs (list, optional): 电缆规格列表。每个元素应包含 (截面积, 额定电流, 单价, 损耗系数)。
voltage (float): 系统电压 (V), 默认 66000
power_factor (float): 功率因数, 默认 0.95
返回:
float: 最大功率承载能力 (MW)。
异常:
Exception: 当未提供 cable_specs 时抛出,提示截面不满足。
根据电缆规格计算最大承载功率
:param cable_specs: 电缆规格列表 list of tuples或者直接是最大功率数值(MW)
"""
if cable_specs is None:
# Default cable specs if not provided (same as in evaluate_design)
cable_specs = [
(35, 150, 0.524, 80),
(70, 215, 0.268, 120),
(95, 260, 0.193, 150),
(120, 295, 0.153, 180),
(150, 330, 0.124, 220),
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600),
]
# 如果传入的已经是数值,直接返回
if isinstance(cable_specs, (int, float, np.number)):
return float(cable_specs)
# 兼容性检查:如果列表为空
if not cable_specs:
print("Warning: 没有可用的电缆规格,使用默认最大容量 100MW")
return 100.0
# 从所有电缆规格中找到最大的额定电流容量
max_current_capacity = max(spec[1] for spec in cable_specs)
@@ -858,8 +998,9 @@ def evaluate_design(
total_cost += cable["cost"]
# 计算I²R损耗 (简化版)
loss = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相
total_loss += loss
loss_w = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相单位W
loss_kw = loss_w / 1000 # 转换为 kW
total_loss += loss_kw
return {
"total_cost": total_cost,
@@ -1008,10 +1149,16 @@ def export_to_excel(connections_details, filename):
df = pd.DataFrame(data)
# 汇总统计
n_circuits = sum(
1
for conn in connections_details
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary = {
"Total Cost (¥)": df["Cost (¥)"].sum(),
"Total Effective Length (m)": df["Effective Length (m)"].sum(),
"Total Vertical Length (m)": df["Vertical Length (m)"].sum(),
"Number of Circuits": n_circuits,
}
summary_df = pd.DataFrame([summary])
@@ -1037,15 +1184,18 @@ def export_all_scenarios_to_excel(results, filename):
# 1. 总览 Sheet
summary_data = []
for res in results:
# 获取回路数
n_circuits = 0
if "turbines" in res and "cluster" in res["turbines"].columns:
n_circuits = res["turbines"]["cluster"].nunique()
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary_data.append(
{
"Scenario": res["name"],
"Total Cost (¥)": res["cost"],
"总费用(万元)": res.get("total_cost_npv", res["cost"]) / 10000,
"Total Loss (kW)": res["loss"],
"Num Circuits": n_circuits,
# 计算电缆统计
@@ -1246,7 +1396,12 @@ def visualize_design(
# 7. 主函数:比较两种设计方法
def compare_design_methods(
excel_path=None, n_clusters_override=None, interactive=True, plot_results=True
excel_path=None,
n_clusters_override=None,
interactive=True,
plot_results=True,
use_ga=False,
use_mip=False,
):
"""
比较MST和三种电缆方案下的K-means设计方法
@@ -1412,14 +1567,10 @@ def compare_design_methods(
print(f" 最大电缆容量: {max_cable_mw:.2f} MW")
# --- Run 1: Base Algorithm (Capacitated Sweep) ---
# --- Run 1: Base Sector Sweep ---
base_name = f"{name} (Base)"
conns_base, turbines_base = design_with_capacitated_sweep(
turbines.copy(),
substation,
cable_specs=current_specs,
voltage=voltage,
power_factor=power_factor,
turbines.copy(), substation, max_cable_mw, voltage=voltage
)
eval_base = evaluate_design(
turbines,
@@ -1431,7 +1582,11 @@ def compare_design_methods(
voltage=voltage,
power_factor=power_factor,
)
n_circuits_base = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": base_name,
@@ -1443,17 +1598,13 @@ def compare_design_methods(
}
)
print(
f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW"
f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW | Circuits: {n_circuits_base}"
)
# --- Run 2: Rotational Algorithm (Optimization) ---
# --- Run 2: Rotational Sweep (Optimization) ---
rot_name = f"{name} (Rotational)"
conns_rot, turbines_rot = design_with_rotational_sweep(
turbines.copy(),
substation,
cable_specs=current_specs,
voltage=voltage,
power_factor=power_factor,
turbines.copy(), substation, max_cable_mw, voltage=voltage
)
eval_rot = evaluate_design(
turbines,
@@ -1465,7 +1616,11 @@ def compare_design_methods(
voltage=voltage,
power_factor=power_factor,
)
n_circuits_rot = sum(
1
for d in eval_rot["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": rot_name,
@@ -1477,7 +1632,7 @@ def compare_design_methods(
}
)
print(
f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW"
f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW | Circuits: {n_circuits_rot}"
)
# --- Run 3: Esau-Williams Algorithm ---
@@ -1495,7 +1650,11 @@ def compare_design_methods(
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ew = sum(
1
for d in eval_ew["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ew_name,
@@ -1507,9 +1666,96 @@ def compare_design_methods(
}
)
print(
f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW"
f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW | Circuits: {n_circuits_ew}"
)
if use_ga:
# --- Run 4: Genetic Algorithm ---
ga_name = f"{name} (GA)"
conns_ga, turbines_ga = design_with_ga(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_ga = evaluate_design(
turbines,
conns_ga,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=ga_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ga = sum(
1
for d in eval_ga["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ga_name,
"cost": eval_ga["total_cost"],
"loss": eval_ga["total_loss"],
"eval": eval_ga,
"turbines": turbines_ga,
"specs": current_specs,
}
)
print(
f" [GA] Cost: ¥{eval_ga['total_cost']:,.2f} | Loss: {eval_ga['total_loss']:.2f} kW | Circuits: {n_circuits_ga}"
)
if use_mip:
print(f"Starting MIP optimization for {name}")
# --- Run 5: Mixed Integer Programming ---
mip_name = f"{name} (MIP)"
conns_mip, turbines_mip = design_with_mip(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_mip = evaluate_design(
turbines,
conns_mip,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=mip_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_mip = sum(
1
for d in eval_mip["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": mip_name,
"cost": eval_mip["total_cost"],
"loss": eval_mip["total_loss"],
"eval": eval_mip,
"turbines": turbines_mip,
"specs": current_specs,
}
)
print(
f" [MIP] Cost: ¥{eval_mip['total_cost']:,.2f} | Loss: {eval_mip['total_loss']:.2f} kW | Circuits: {n_circuits_mip}"
)
# 记录最佳
if eval_rot["total_cost"] < best_cost:
best_cost = eval_rot["total_cost"]
@@ -1524,7 +1770,11 @@ def compare_design_methods(
# 可视化 (只画 Base 版本)
ax_idx = i + 1
if plot_results and ax_idx < 4:
n_circuits = turbines_base["cluster"].nunique()
n_circuits = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}"
visualize_design(
turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx]
@@ -1564,7 +1814,17 @@ def compare_design_methods(
for i, res in enumerate(comparison_results):
if res["cost"] < comparison_results[best_idx]["cost"]:
best_idx = i
print(f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f}")
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
print(
f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}"
)
print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)")
@@ -1680,21 +1940,23 @@ def total_investment(results, system_params):
更新后的results列表每个结果新增 'total_cost_npv' 字段(总费用净现值,元)
"""
# 获取系统参数,使用默认值
discount_rate_percent = system_params.get('discount_rate', DISCOUNT_RATE)
electricity_price = system_params.get('electricity_price', ELECTRICITY_PRICE)
project_lifetime = system_params.get('project_lifetime', PROJECT_LIFETIME)
annual_loss_hours = system_params.get('annual_loss_hours', ANNUAL_LOSS_HOURS)
discount_rate_percent = system_params.get("discount_rate", DISCOUNT_RATE)
electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE)
project_lifetime = system_params.get("project_lifetime", PROJECT_LIFETIME)
annual_loss_hours = system_params.get("annual_loss_hours", ANNUAL_LOSS_HOURS)
# 将折现率转换为小数
r = discount_rate_percent / 100.0
for result in results:
cable_cost = result['cost'] # 电缆总投资(元)
loss_power = result['loss'] # 线损功率kW
cable_cost = result["cost"] # 电缆总投资(元)
loss_power = result["loss"] # 线损功率kW
# 1. 计算电缆投资的净现值2年分期
# 第1年支付50%第2年支付50%
npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / ((1 + r) ** 2)
npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / (
(1 + r) ** 2
)
# 2. 计算电费损耗的净现值(生命周期内)
# 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh)
@@ -1713,10 +1975,10 @@ def total_investment(results, system_params):
total_cost_npv = npv_cable + npv_loss
# 将结果添加到字典中
result['total_cost_npv'] = total_cost_npv
result['npv_cable'] = npv_cable
result['npv_loss'] = npv_loss
result['annual_loss_cost'] = annual_loss_cost
result["total_cost_npv"] = total_cost_npv
result["npv_cable"] = npv_cable
result["npv_loss"] = npv_loss
result["annual_loss_cost"] = annual_loss_cost
return results

472
mip.py Normal file
View File

@@ -0,0 +1,472 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
try:
import pulp
pulp_available = True
except ImportError:
pulp = None
pulp_available = False
try:
import pyomo.environ as pyo_env
pyomo_available = True
except (ImportError, AttributeError):
pyomo_available = False
print("Pyomo not available, falling back to PuLP")
def design_with_pyomo(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用Pyomo求解器优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0
total_power = turbines["power"].sum()
if max_clusters is None:
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
# Simple fallback for now - use PuLP instead
print("Pyomo not fully implemented, falling back to PuLP")
return design_with_mip(
turbines,
substation,
cable_specs,
voltage,
power_factor,
system_params,
max_clusters,
time_limit,
evaluate_func,
total_invest_func,
get_max_capacity_func,
)
def design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用混合整数规划(MIP)优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if not pulp_available:
print(
"WARNING: PuLP library not available. MIP optimization skipped, falling back to MST."
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0
if max_clusters is None:
max_clusters = int(np.ceil(turbines["power"].sum() / max_mw))
n_turbines = len(turbines)
print(
f"MIP Model Setup: n_turbines={n_turbines}, max_clusters={max_clusters}, max_mw={max_mw:.2f} MW"
)
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize)
# Create all decision variables upfront to avoid duplicates
assign_vars = {}
for i in range(n_turbines):
for k in range(max_clusters):
assign_vars[(i, k)] = pulp.LpVariable(f"assign_{i}_{k}", cat="Binary")
cluster_vars = {}
for k in range(max_clusters):
cluster_vars[k] = pulp.LpVariable(f"cluster_{k}", cat="Binary")
# Helper functions to access variables
def assign_var(i, k):
return assign_vars[(i, k)]
def cluster_var(k):
return cluster_vars[k]
# Simplified objective function: minimize total distance
prob += pulp.lpSum(
[
dist_matrix_full[0, i + 1] * assign_var(i, k)
for i in range(n_turbines)
for k in range(max_clusters)
]
)
for i in range(n_turbines):
prob += pulp.lpSum([assign_var(i, k) for k in range(max_clusters)]) == 1
for k in range(max_clusters):
cluster_power = pulp.lpSum(
[turbines.iloc[i]["power"] * assign_var(i, k) for i in range(n_turbines)]
)
prob += cluster_power <= max_mw * 1.2 * cluster_var(k)
for k in range(max_clusters):
for i in range(n_turbines):
prob += assign_var(i, k) <= cluster_var(k)
print(
f"MIP Model: {len(prob.variables())} variables, {len(prob.constraints)} constraints"
)
# Debug: Print model structure
print("MIP model structure check:")
print(f" Variables: {len(prob.variables())}")
print(f" Constraints: {len(prob.constraints)}")
print(f" Time limit: {time_limit}s")
print(f" Turbines: {n_turbines}, Clusters: {max_clusters}")
# Test solver availability
try:
import subprocess
test_solver = subprocess.run(
[
r"D:\code\windfarm\.venv\Lib\site-packages\pulp\apis\..\solverdir\cbc\win\i64\cbc.exe",
"-version",
],
capture_output=True,
text=True,
timeout=5,
)
print(
f"CBC solver test: {test_solver.stdout[:100] if test_solver.stdout else 'No output'}"
)
except Exception as solver_test_error:
print(f"CBC solver test failed: {solver_test_error}")
print("MIP: Starting to solve...")
try:
# Try to use CBC solver with different configurations
solver = pulp.PULP_CBC_CMD(
timeLimit=time_limit,
msg=False,
warmStart=False,
)
print(f"Using CBC solver with time limit: {time_limit}s")
status = prob.solve(solver)
print(
f"MIP: Solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e:
print(f"MIP: CBC solver execution failed: {e}")
# Try alternative solver configurations
try:
print("MIP: Trying alternative solver configuration...")
solver = pulp.PULP_CBC_CMD(
msg=True, # Enable messages for debugging
threads=1, # Single thread
timeLimit=time_limit,
)
status = prob.solve(solver)
print(
f"MIP: Alternative solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e2:
print(f"MIP: All solver attempts failed: {e2}, falling back to MST")
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if pulp.LpStatus[prob.status] != "Optimal":
print(
f"MIP solver status: {pulp.LpStatus[prob.status]}, solution not found, falling back to MST"
)
print("Model feasibility check:")
print(f"Total power: {turbines['power'].sum():.2f} MW")
print(f"Max cluster capacity: {max_mw:.2f} MW")
print(f"Number of clusters: {max_clusters}, Number of turbines: {n_turbines}")
for k in range(max_clusters):
cluster_power = pulp.value(
pulp.lpSum(
[
turbines.iloc[i]["power"] * assign_var(i, k)
for i in range(n_turbines)
]
)
)
cluster_used = pulp.value(cluster_var(k))
print(
f"Cluster {k}: Power={cluster_power:.2f} MW (max {max_mw * 1.2:.2f}), Used={cluster_used}"
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
cluster_assign = [-1] * n_turbines
active_clusters = []
for k in range(max_clusters):
if pulp.value(cluster_var(k)) > 0.5:
active_clusters.append(k)
for i in range(n_turbines):
assigned = False
for k in active_clusters:
if pulp.value(assign_var(i, k)) > 0.5:
cluster_assign[i] = k
assigned = True
break
if not assigned:
dists = [dist_matrix_full[0, i + 1] for k in active_clusters]
cluster_assign[i] = active_clusters[np.argmin(dists)]
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
# Check cluster distances
min_cluster_distance = check_cluster_distances(clusters, turbines)
if min_cluster_distance is not None:
print(
f"Cluster validation: Minimum distance between clusters = {min_cluster_distance:.2f} m"
)
if min_cluster_distance < 1000:
print(
f"WARNING: Clusters are very close to each other ({min_cluster_distance:.2f} m < 1000 m)"
)
elif min_cluster_distance < 2000:
print(
f"NOTICE: Clusters are relatively close ({min_cluster_distance:.2f} m)"
)
# Check for cable crossings
cable_crossings = check_cable_crossings(connections, turbines, substation)
if cable_crossings:
print(
f"WARNING: Found {len(cable_crossings)} cable crossing(s) in the solution"
)
for i, (idx1, idx2, p1, p2, p3, p4) in enumerate(cable_crossings):
conn1 = connections[idx1]
conn2 = connections[idx2]
print(
f" Crossing {i + 1}: Connection {conn1[0]}-{conn1[1]} crosses {conn2[0]}-{conn2[1]}"
)
else:
print("No cable crossings detected in the solution")
print(
f"MIP optimization completed successfully, {len(connections)} connections generated"
)
return connections, turbines
def calculate_cluster_centroids(clusters, turbines):
"""Calculate the centroid coordinates for each cluster."""
centroids = {}
for c, members in clusters.items():
if len(members) == 0:
centroids[c] = (0, 0)
else:
coords = turbines.iloc[members][["x", "y"]].values
centroid_x = np.mean(coords[:, 0])
centroid_y = np.mean(coords[:, 1])
centroids[c] = (centroid_x, centroid_y)
return centroids
def check_cluster_distances(clusters, turbines, min_distance_threshold=1000):
"""Check if any clusters are too close to each other."""
if len(clusters) < 2:
return None
centroids = calculate_cluster_centroids(clusters, turbines)
active_clusters = [c for c, members in clusters.items() if len(members) > 0]
min_distance = float("inf")
min_pair = None
for i in range(len(active_clusters)):
for j in range(i + 1, len(active_clusters)):
c1, c2 = active_clusters[i], active_clusters[j]
centroid1 = np.array(centroids[c1])
centroid2 = np.array(centroids[c2])
distance = np.linalg.norm(centroid1 - centroid2)
if distance < min_distance:
min_distance = distance
min_pair = (c1, c2)
return min_distance
def check_cable_crossings(connections, turbines, substation):
"""Check if there are cable crossings in the solution."""
crossings = []
def line_intersection(p1, p2, p3, p4):
"""Check if line segments (p1,p2) and (p3,p4) intersect."""
x1, y1 = p1
x2, y2 = p2
x3, y3 = p3
x4, y4 = p4
denom = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1)
if abs(denom) < 1e-10:
return False
ua = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / denom
ub = ((x2 - x1) * (y1 - y3) - (y2 - y1) * (x1 - x3)) / denom
return 0 <= ua <= 1 and 0 <= ub <= 1
def get_turbine_coord(connection_part):
"""Get coordinates from connection part (turbine_# or substation)."""
if connection_part == "substation":
# Handle different substation formats robustly
if isinstance(substation, np.ndarray):
if substation.ndim == 1:
# 1D array [x, y]
return (substation[0], substation[1])
elif substation.ndim == 2:
# 2D array [[x, y]] or shape (n, 2)
if substation.shape[0] == 1:
return (substation[0, 0], substation[0, 1])
else:
# Multiple points, use first one
return (substation[0, 0], substation[0, 1])
else:
# Unexpected dimension, try fallback
return (substation.flat[0], substation.flat[1])
elif isinstance(substation, (list, tuple)):
# List or tuple format
# Handle nested lists like [[x, y]]
if (
isinstance(substation[0], (list, tuple, np.ndarray))
and len(substation[0]) >= 2
):
return (substation[0][0], substation[0][1])
elif len(substation) >= 2:
return (substation[0], substation[1])
else:
return (float("inf"), float("inf"))
else:
# Unexpected format, try to convert
try:
sub_array = np.array(substation)
if sub_array.ndim == 1:
return (sub_array[0], sub_array[1])
else:
return (sub_array.flat[0], sub_array.flat[1])
except:
return (float("inf"), float("inf"))
else:
turbine_idx = int(connection_part.split("_")[1])
return (
turbines.iloc[turbine_idx]["x"],
turbines.iloc[turbine_idx]["y"],
)
for i in range(len(connections)):
for j in range(i + 1, len(connections)):
conn1 = connections[i]
conn2 = connections[j]
p1 = get_turbine_coord(conn1[0])
p2 = get_turbine_coord(conn1[1])
p3 = get_turbine_coord(conn2[0])
p4 = get_turbine_coord(conn2[1])
if (
np.array_equal(p1, p3)
or np.array_equal(p1, p4)
or np.array_equal(p2, p3)
or np.array_equal(p2, p4)
):
continue
if line_intersection(p1, p2, p3, p4):
crossings.append((i, j, p1, p2, p3, p4))
return crossings

View File

@@ -12,6 +12,8 @@ dependencies = [
"numpy>=2.4.0",
"openpyxl>=3.1.5",
"pandas>=2.3.3",
"pulp>=3.3.0",
"pyomo>=6.9.5",
"pywebview>=6.1",
"scikit-learn>=1.8.0",
"scipy>=1.16.3",

146
test_cbc_solver.py Normal file
View File

@@ -0,0 +1,146 @@
"""
Simple test to verify CBC solver functionality
"""
import pulp
import sys
import subprocess
import os
print("=== PuLP and CBC Solver Test ===")
print(f"Python version: {sys.version}")
print(f"PuLP version: {pulp.__version__}")
# Test 1: Check PuLP installation
print("\n1. Checking PuLP installation...")
try:
from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, lpSum, value
print("[OK] PuLP imported successfully")
except ImportError as e:
print(f"[FAIL] PuLP import failed: {e}")
sys.exit(1)
# Test 2: Check CBC solver file existence
print("\n2. Checking CBC solver file...")
solver_dir = os.path.join(
os.path.dirname(pulp.__file__), "apis", "..", "solverdir", "cbc", "win", "i64"
)
solver_path = os.path.join(solver_dir, "cbc.exe")
print(f"Looking for CBC at: {solver_path}")
if os.path.exists(solver_path):
print(f"[OK] CBC solver file found")
file_size = os.path.getsize(solver_path)
print(f" File size: {file_size:,} bytes ({file_size / 1024 / 1024:.2f} MB)")
else:
print(f"[FAIL] CBC solver file not found")
print(f" Checking directory contents:")
try:
parent_dir = os.path.dirname(solver_path)
if os.path.exists(parent_dir):
for item in os.listdir(parent_dir):
print(f" - {item}")
else:
print(f" Directory does not exist: {parent_dir}")
except Exception as e:
print(f" Error listing directory: {e}")
# Test 3: Try to run CBC solver directly
print("\n3. Testing CBC solver execution...")
if os.path.exists(solver_path):
try:
result = subprocess.run(
[solver_path, "-version"],
capture_output=True,
text=True,
timeout=10,
check=True,
)
print("[OK] CBC solver executed successfully")
print(f" Output: {result.stdout[:200]}")
except subprocess.CalledProcessError as e:
print(f"[FAIL] CBC solver execution failed (exit code {e.returncode})")
print(f" stdout: {e.stdout[:200]}")
print(f" stderr: {e.stderr[:200]}")
except subprocess.TimeoutExpired:
print("[FAIL] CBC solver execution timed out")
except Exception as e:
print(f"[FAIL] CBC solver execution error: {e}")
else:
print("[FAIL] Cannot test CBC execution - file not found")
# Test 4: Solve a simple linear programming problem
print("\n4. Testing simple LP problem...")
try:
# Simple problem: minimize x + y subject to x + y >= 5, x >= 0, y >= 0
prob = LpProblem("Simple_LP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Continuous")
y = LpVariable("y", lowBound=0, cat="Continuous")
prob += x + y # Objective: minimize x + y
prob += x + y >= 5 # Constraint
print(" Created simple LP problem: minimize x + y subject to x + y >= 5")
# Try to solve with CBC
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] LP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] LP problem solving failed: {e}")
import traceback
traceback.print_exc()
# Test 5: Solve a simple mixed integer programming problem
print("\n5. Testing simple MIP problem...")
try:
# Simple MIP: minimize x + y subject to x + y >= 5, x, y integers >= 0
prob = LpProblem("Simple_MIP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Integer")
y = LpVariable("y", lowBound=0, cat="Integer")
prob += x + y # Objective
prob += x + y >= 5 # Constraint
print(
" Created simple MIP problem: minimize x + y subject to x + y >= 5, x,y integers"
)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] MIP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] MIP problem solving failed: {e}")
import traceback
traceback.print_exc()
print("\n=== Test Complete ===")

50
test_mip.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Test script to verify MIP functionality
"""
import numpy as np
import pandas as pd
from mip import design_with_mip
# Create test data
np.random.seed(42)
n_turbines = 10
turbines = pd.DataFrame(
{
"x": np.random.uniform(0, 2000, n_turbines),
"y": np.random.uniform(0, 2000, n_turbines),
"power": np.random.uniform(5, 10, n_turbines),
}
)
substation = np.array([1000, 1000])
print("Test data created:")
print(f"Number of turbines: {n_turbines}")
print(f"Substation location: {substation}")
print(f"Total power: {turbines['power'].sum():.2f} MW")
# Test MIP function
print("\nTesting MIP design...")
try:
connections, turbines_with_clusters = design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=30,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
)
print(f"MIP test successful!")
print(f"Number of connections: {len(connections)}")
print(f"Clusters assigned: {turbines_with_clusters['cluster'].tolist()}")
except Exception as e:
print(f"MIP test failed with error: {e}")
import traceback
traceback.print_exc()

50
uv.lock generated
View File

@@ -1243,6 +1243,15 @@ wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/fc/f5/68334c015eed9b5cff77814258717dec591ded209ab5b6fb70e2ae873d1d/pillow-12.1.0-cp314-cp314t-win_arm64.whl", hash = "sha256:f61333d817698bdcdd0f9d7793e365ac3d2a21c1f1eb02b32ad6aefb8d8ea831" },
]
[[package]]
name = "ply"
version = "3.11"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/e5/69/882ee5c9d017149285cab114ebeab373308ef0f874fcdac9beb90e0ac4da/ply-3.11.tar.gz", hash = "sha256:00c7c1aaa88358b9c765b6d3000c6eec0ba42abca5351b095321aef446081da3" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a3/58/35da89ee790598a0700ea49b2a66594140f44dec458c07e8e3d4979137fc/ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce" },
]
[[package]]
name = "propcache"
version = "0.4.1"
@@ -1333,6 +1342,15 @@ version = "0.1.0"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/f2/cf/77d3e19b7fabd03895caca7857ef51e4c409e0ca6b37ee6e9f7daa50b642/proxy_tools-0.1.0.tar.gz", hash = "sha256:ccb3751f529c047e2d8a58440d86b205303cf0fe8146f784d1cbcd94f0a28010" }
[[package]]
name = "pulp"
version = "3.3.0"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/16/1c/d880b739b841a8aa81143091c9bdda5e72e226a660aa13178cb312d4b27f/pulp-3.3.0.tar.gz", hash = "sha256:7eb99b9ce7beeb8bbb7ea9d1c919f02f003ab7867e0d1e322f2f2c26dd31c8ba" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/99/6c/64cafaceea3f99927e84b38a362ec6a8f24f33061c90bda77dfe1cd4c3c6/pulp-3.3.0-py3-none-any.whl", hash = "sha256:dd6ad2d63f196d1254eddf9dcff5cd224912c1f046120cb7c143c5b0eda63fae" },
]
[[package]]
name = "pycparser"
version = "2.23"
@@ -1571,6 +1589,34 @@ wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/86/637cda4983dc0936b73a385f3906256953ac434537b812814cb0b6d231a2/pyobjc_framework_webkit-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:1aaa3bf12c7b68e1a36c0b294d2728e06f2cc220775e6dc4541d5046290e4dc8" },
]
[[package]]
name = "pyomo"
version = "6.9.5"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
dependencies = [
{ name = "ply" },
]
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/87/d8/f32e0dcacc8219694709200d4402c86a6e28d3af50380a5ccf7f7e15ffae/pyomo-6.9.5.tar.gz", hash = "sha256:0734020fcd5cc03ee200fd3f79d143fbfc14e6be116e0d16bab79f3f89609879" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f8/63/5f163b231a924ba7a5f6c58466c751f70be88568fa446524b6e806c98e4b/pyomo-6.9.5-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:549ee4226cab6e2ff6efe5b3b9891ce1dfd866d38a024715315ea850fa1bf0ec" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/bf/0cebcfce70be04d6d7aa19fbcbdeecdd5843caac617424f34ab3feb8e96e/pyomo-6.9.5-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b382cc8c3728199c8332024d64eed8622dabb3f8aebe5874c86a036489064f7a" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/14/27/967545514a2d0f4ca5ac6b595661cb0927cdcd10c3bb2832c5aa0ee15990/pyomo-6.9.5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:43c6e425ca5231b530cd23460e371b7ca9119224dd57237c34580e15f31e4d72" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/85/fe/691e5eb26f58ee4a072add6cc484756d9e3c367901ec6701d2c6789b394d/pyomo-6.9.5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a1923c358e1e8009a05ada911fc72e615c9e2ce6988f0979ec1ecc75880ee1f7" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/c7/b3/ae47340790f2f1f92f76b176acf475890717f0cb7def073e504b9857a057/pyomo-6.9.5-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:694262dc2eb53ca1ab245261f432a5ed1ec30cf3e651b5a6a1c276bc2dd81076" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/29/e9/7f782864afd28a9eb53057c9d046541be6535b2da35e11c2bcb80839c6bd/pyomo-6.9.5-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1f99ce91f2710d60b380a3a519288282d2183c44e1d66c131909313a3b63e7a2" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/cd/4d/9ca17a602e31a1c3f3148c455a5739fcbe23c102b80a12ec3e6d3bf5e847/pyomo-6.9.5-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d22f99e0ba8e2fb7d0e806bf630b8ce9b0a41d777c51f22711adbcb905f7486e" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/44/2e/78c3ac876791b59c836338b73dc49317b01cef574b01af061999a04a064a/pyomo-6.9.5-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d5953e490b9e9ea42d28804dd0358a9d3ef82560022c2b538e70a638790bc392" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/3c/27/3eb3db8e9ed6a01dee63219389aec761d5cc29b6dc5015b32f826f2a9225/pyomo-6.9.5-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:058eddde05b4354307975f1ecd25cfda9f8a282ad2e3b4f168ff8fee3c3623a1" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/9a/31/7f4750fc9bb0ec18a9534549e4c80ea63f1267aa828d495a48bbf0018f49/pyomo-6.9.5-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:105a073c47a2d2d6e74e48ed6fc82c6f6d19027488d5003aabb7ed5d10271483" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f8/67/639d0006eddab30cf415b0154763ccc51f3c15b934e866eb4fb07bc2b6ed/pyomo-6.9.5-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f2c636c2c640b33dde3b119f6f0941a1bbde39397c392dba55351b0438d8600f" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/67/ef/023b74b8f161f15a51febdd160354f1e3fd7e1475abbe5ccfb3d7588cf1f/pyomo-6.9.5-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e02813b4021eeed7214a1ca5d7daecbdc78d3db7059962553a57fd138d747c22" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a0/ca/edab1b532fd5e2d146d0cb96836eb5ae387b8a5bd255213e306793f6168e/pyomo-6.9.5-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:83789ce89271da31e0ff5bbef692af1621ab1747798183a5603b6577b7074277" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a9/3c/2745386f57030bc60b626adba002b68db3f9538d5b52900f48026a4a17d7/pyomo-6.9.5-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:1f449aaceac5078daaecc21d19b96a15529f9ac8aa90f6472e8811cc07112ecc" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f1/93/2058af0890b13f7e1a26e4925ff8d681c23d9cbdc2ecc9db17c744941617/pyomo-6.9.5-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f94d03f122fcf04a769c28ad48c423cd7b6d3d2c40da20bc8ea1a41bb20d0c36" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/de/30/c808931fc034851a16d3f8360d045b087ac743ea97bfe96cdb4b1df47c21/pyomo-6.9.5-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:96ff300e96cdab75e2e6983c99e3a61eaff2a6d0f5ed83acd939e74e361de537" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/68/29/394967f7df51788cbdf1b4aedfb7c5a3a62e11b85b4c9d806b86cc576be4/pyomo-6.9.5-py3-none-any.whl", hash = "sha256:60326f7d3143ee7d0f5c5c4a3cbf871b53e08cc6c2b0c9e6d25568880233472f" },
]
[[package]]
name = "pyparsing"
version = "3.3.1"
@@ -2106,6 +2152,8 @@ dependencies = [
{ name = "numpy" },
{ name = "openpyxl" },
{ name = "pandas" },
{ name = "pulp" },
{ name = "pyomo" },
{ name = "pywebview" },
{ name = "scikit-learn" },
{ name = "scipy" },
@@ -2125,6 +2173,8 @@ requires-dist = [
{ name = "numpy", specifier = ">=2.4.0" },
{ name = "openpyxl", specifier = ">=3.1.5" },
{ name = "pandas", specifier = ">=2.3.3" },
{ name = "pulp", specifier = ">=3.3.0" },
{ name = "pyomo", specifier = ">=6.9.5" },
{ name = "pywebview", specifier = ">=6.1" },
{ name = "scikit-learn", specifier = ">=1.8.0" },
{ name = "scipy", specifier = ">=1.16.3" },

232
win32_helper.py Normal file
View File

@@ -0,0 +1,232 @@
import ctypes
import ctypes.wintypes
import os
def show_save_dialog_win32():
"""
使用 ctypes 直接调用 Windows API (GetSaveFileNameW)
不需要子进程,可以在线程中运行 (run.io_bound)
"""
try:
# 定义 OPENFILENAME 结构体
class OPENFILENAME(ctypes.Structure):
_fields_ = [
("lStructSize", ctypes.wintypes.DWORD),
("hwndOwner", ctypes.wintypes.HWND),
("hInstance", ctypes.wintypes.HINSTANCE),
("lpstrFilter", ctypes.wintypes.LPCWSTR),
("lpstrCustomFilter", ctypes.wintypes.LPWSTR),
("nMaxCustFilter", ctypes.wintypes.DWORD),
("nFilterIndex", ctypes.wintypes.DWORD),
("lpstrFile", ctypes.wintypes.LPWSTR),
("nMaxFile", ctypes.wintypes.DWORD),
("lpstrFileTitle", ctypes.wintypes.LPWSTR),
("nMaxFileTitle", ctypes.wintypes.DWORD),
("lpstrInitialDir", ctypes.wintypes.LPCWSTR),
("lpstrTitle", ctypes.wintypes.LPCWSTR),
("Flags", ctypes.wintypes.DWORD),
("nFileOffset", ctypes.wintypes.WORD),
("nFileExtension", ctypes.wintypes.WORD),
("lpstrDefExt", ctypes.wintypes.LPCWSTR),
("lCustData", ctypes.wintypes.LPARAM),
("lpfnHook", ctypes.wintypes.LPVOID),
("lpTemplateName", ctypes.wintypes.LPCWSTR),
# 还有更多字段,但这通常足够了
# ("pvReserved", ctypes.wintypes.LPVOID),
# ("dwReserved", ctypes.wintypes.DWORD),
# ("FlagsEx", ctypes.wintypes.DWORD),
]
# 准备缓冲区
filename_buffer = ctypes.create_unicode_buffer(260) # MAX_PATH
# 设置初始文件名
filename_buffer.value = "win32_save.xlsx"
# 准备过滤器 (用 \0 分隔)
# 格式: "描述\0模式\0描述\0模式\0\0"
filter_str = "Excel Files (*.xlsx)\0*.xlsx\0All Files (*.*)\0*.*\0\0"
ofn = OPENFILENAME()
ofn.lStructSize = ctypes.sizeof(OPENFILENAME)
ofn.hwndOwner = 0 # NULL
ofn.lpstrFilter = filter_str
ofn.lpstrFile = ctypes.cast(filename_buffer, ctypes.wintypes.LPWSTR)
ofn.nMaxFile = 260
ofn.lpstrDefExt = "xlsx"
ofn.lpstrTitle = "保存文件 (Win32 API)"
# OFN_OVERWRITEPROMPT | OFN_PATHMUSTEXIST | OFN_NOCHANGEDIR
ofn.Flags = 0x00000002 | 0x00000800 | 0x00000008
comdlg32 = ctypes.windll.comdlg32
# 调用 API
# GetSaveFileNameW 返回非零值表示成功
if comdlg32.GetSaveFileNameW(ctypes.byref(ofn)):
return filename_buffer.value
else:
return None
except Exception as e:
print(f"Win32 API Error: {e}")
return None
def show_save_dialog_com():
"""
使用 COM 接口 IFileSaveDialog (Windows Vista+)
提供更现代化的文件保存对话框,支持更多功能
"""
try:
import ctypes
import ctypes.wintypes
import uuid
# 定义必要的常量
CLSCTX_INPROC_SERVER = 1
S_OK = 0
FOS_OVERWRITEPROMPT = 0x00000002
FOS_PATHMUSTEXIST = 0x00000800
FOS_NOCHANGEDIR = 0x00000008
SIGDN_FILESYSPATH = 0x80058000
# IFileSaveDialog 的 CLSID 和 IID
CLSID_FileSaveDialog = uuid.UUID("{C0B4E2F3-BA21-4773-8DBA-335EC946EB8B}")
IID_IFileSaveDialog = uuid.UUID("{84bccd23-5fde-4cdb-aea4-af64b83d78ab}")
IID_IShellItem = uuid.UUID("{43826d1e-e718-42ee-bc55-a1e261c37bfe}")
# 加载 ole32.dll
ole32 = ctypes.windll.ole32
# CoInitialize
ole32.CoInitialize(None)
# CoCreateInstance
p_dialog = ctypes.c_void_p()
hr = ole32.CoCreateInstance(
ctypes.byref(CLSID_FileSaveDialog),
None,
CLSCTX_INPROC_SERVER,
ctypes.byref(IID_IFileSaveDialog),
ctypes.byref(p_dialog)
)
if hr != S_OK:
print(f"CoCreateInstance failed: {hr}")
return None
# 定义 IFileSaveDialog 的 vtable 方法
# 我们只需要调用 Show, GetResult, SetOptions, SetFileName, SetDefaultExtension, SetFileTypeIndex
# 这些方法在 IFileOpenDialog 基类中定义
# SetOptions
class IFileSaveDialogVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
# IModalWindow
("Show", ctypes.c_void_p),
# IFileDialog
("SetFileTypes", ctypes.c_void_p),
("SetFileTypeIndex", ctypes.c_void_p),
("GetFileTypeIndex", ctypes.c_void_p),
("Advise", ctypes.c_void_p),
("Unadvise", ctypes.c_void_p),
("SetOptions", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong)),
("GetOptions", ctypes.c_void_p),
("SetDefaultFolder", ctypes.c_void_p),
("SetFolder", ctypes.c_void_p),
("GetFolder", ctypes.c_void_p),
("GetCurrentSelection", ctypes.c_void_p),
("SetFileName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("GetFileName", ctypes.c_void_p),
("SetTitle", ctypes.c_void_p),
("SetOkButtonLabel", ctypes.c_void_p),
("SetFileNameLabel", ctypes.c_void_p),
("GetResult", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p))),
("AddPlace", ctypes.c_void_p),
("SetDefaultExtension", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("Close", ctypes.c_void_p),
("SetClientGuid", ctypes.c_void_p),
("ClearClientData", ctypes.c_void_p),
("SetFilter", ctypes.c_void_p),
]
# 获取 vtable
vtable = ctypes.cast(p_dialog, ctypes.POINTER(ctypes.POINTER(IFileSaveDialogVtbl))).contents.contents
# 调用 SetOptions
hr = vtable.SetOptions(p_dialog, FOS_OVERWRITEPROMPT | FOS_PATHMUSTEXIST | FOS_NOCHANGEDIR)
if hr != S_OK:
print(f"SetOptions failed: {hr}")
return None
# 调用 SetFileName
hr = vtable.SetFileName(p_dialog, "com_save.xlsx")
if hr != S_OK:
print(f"SetFileName failed: {hr}")
return None
# 调用 SetDefaultExtension
hr = vtable.SetDefaultExtension(p_dialog, "xlsx")
if hr != S_OK:
print(f"SetDefaultExtension failed: {hr}")
return None
# 调用 SetFileTypeIndex
hr = vtable.SetFileTypeIndex(p_dialog, 1)
if hr != S_OK:
print(f"SetFileTypeIndex failed: {hr}")
return None
# 调用 Show
hr = vtable.Show(p_dialog, 0) # 0 表示没有父窗口
if hr != S_OK:
# 用户取消
return None
# 调用 GetResult
p_result = ctypes.c_void_p()
hr = vtable.GetResult(p_dialog, ctypes.byref(p_result))
if hr != S_OK:
print(f"GetResult failed: {hr}")
return None
# 定义 IShellItem 接口
class IShellItemVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
("BindToHandler", ctypes.c_void_p),
("GetParent", ctypes.c_void_p),
("GetDisplayName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_wchar_p))),
("GetAttributes", ctypes.c_void_p),
("Compare", ctypes.c_void_p),
]
# 获取 IShellItem 的 vtable
result_vtable = ctypes.cast(p_result, ctypes.POINTER(ctypes.POINTER(IShellItemVtbl))).contents.contents
# 调用 GetDisplayName
p_display_name = ctypes.c_wchar_p()
hr = result_vtable.GetDisplayName(p_result, SIGDN_FILESYSPATH, ctypes.byref(p_display_name))
if hr != S_OK:
print(f"GetDisplayName failed: {hr}")
return None
filepath = p_display_name.value
# 清理
ole32.CoUninitialize()
return filepath
except ImportError as e:
print(f"COM Error: {e}")
return None
except Exception as e:
print(f"COM Error: {e}")
import traceback
traceback.print_exc()
return None