Compare commits

..

20 Commits

Author SHA1 Message Date
dmy
2b3ab83d91 Fix substation coordinate handling for cable crossing detection 2026-01-08 16:05:06 +08:00
dmy
68df76702e Fix cable crossing detection coordinate unpacking error 2026-01-08 15:53:15 +08:00
dmy
b3a4513f94 Fix MIP solver variable duplication and function structure 2026-01-08 15:30:36 +08:00
dmy
04a5e19451 Improve MIP optimization and add log export feature 2026-01-08 15:08:04 +08:00
dmy
ebd5883dbf Fix unreachable code in design_with_pyomo function 2026-01-08 13:06:26 +08:00
dmy
41ac6f3963 Change MIP objective function to minimize total investment 2026-01-08 13:01:36 +08:00
dmy
09b2ada5df Add debug prints to check MIP toggle functionality 2026-01-08 12:58:15 +08:00
dmy
6441ddc059 Fix MIP import issue: move design_with_mst function outside __main__ protection block 2026-01-08 12:39:07 +08:00
dmy
2f095df12e Fix MIP algorithm: simplify model formulation and add detailed debugging 2026-01-08 10:28:35 +08:00
dmy
a3837a6707 Rewrite MIP model formulation and add comprehensive debugging 2026-01-08 10:22:39 +08:00
dmy
886fba4d15 Clear comparison results and topology visualization on new file upload 2026-01-08 10:10:46 +08:00
dmy
397ca8847e Fix MIP fallback return values: ensure consistent unpacking 2026-01-08 10:06:46 +08:00
dmy
6ad11a9b69 Fix MIP model: make objective function linear to avoid multiplication error 2026-01-08 10:03:49 +08:00
dmy
579f8866c4 Fix MIP toggle bug: handle PuLP import gracefully 2026-01-08 10:01:46 +08:00
dmy
4230d2221d Add MIP module for collector layout optimization 2026-01-08 09:54:40 +08:00
dmy
46e929bfce Implement genetic algorithm for collector layout optimization 2026-01-08 09:46:00 +08:00
dmy
f2a960e789 feat: 优化回路数计算逻辑,提升报表准确性 2026-01-07 16:55:11 +08:00
dmy
87cea6ed86 feat: 优化文件保存对话框并增强系统稳定性
- 重新启用run.io_bound()调用,使用后台线程执行PowerShell脚本
- 为所有文件保存操作添加按钮防重复点击功能
- 新增win32_helper模块,提供Win32 API和COM接口的文件对话框
- 简化导出最佳方案DXF的代码结构
- 改进异步操作和错误处理机制
2026-01-07 12:47:58 +08:00
dmy
e0b5b0c3dc feat: 在方案对比表格中增加损耗费用净现值列
- 新增'损耗费用净现值 (万元)'列,显示生命周期内损耗费用的净现值
- 使用npv_loss字段替代annual_loss_cost,考虑折现率对全生命周期成本的影响
- 支持按损耗费用净现值排序,便于方案经济性对比
2026-01-07 11:27:29 +08:00
dmy
7aef58de1e fix: 修正损耗计算单位从瓦特(W)转换为千瓦(kW)
- 将evaluate_design函数中的损耗计算结果从W转换为kW
- loss_w变量存储三相损耗(W),loss_kw转换为kW后累加
- 确保total_loss返回值单位为kW,与后续经济性分析计算一致
2026-01-07 10:01:32 +08:00
10 changed files with 1696 additions and 179 deletions

16
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File with Arguments",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"args": "--excel abc.xlsx"
},
]
}

193
ga.py Normal file
View File

@@ -0,0 +1,193 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
def design_with_ga(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
pop_size=50,
generations=50,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用遗传算法优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param pop_size: 种群大小
:param generations: 迭代代数
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_func(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params and total_invest_func:
res_list = total_invest_func(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines

200
gui.py
View File

@@ -10,7 +10,7 @@ matplotlib.use("Agg")
import matplotlib.backends.backend_svg import matplotlib.backends.backend_svg
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import pandas as pd import pandas as pd
from nicegui import app, events, ui from nicegui import app, events, ui, run
from main import ( from main import (
compare_design_methods, compare_design_methods,
@@ -90,6 +90,9 @@ def index():
"run_btn": None, "run_btn": None,
"current_file_container": None, # 替换 label 为 container "current_file_container": None, # 替换 label 为 container
"info_container": None, # 新增信息展示容器 "info_container": None, # 新增信息展示容器
"ga_switch": None, # 遗传算法开关
"mip_switch": None, # MIP开关
"log_content": "", # 存储计算日志内容
} }
def update_info_panel(): def update_info_panel():
@@ -340,6 +343,16 @@ def index():
update_info_panel() update_info_panel()
# 清空方案对比结果和拓扑可视化
state["results"] = []
if refs["results_table"]:
refs["results_table"].rows = []
refs["results_table"].selected = []
if refs["plot_container"]:
refs["plot_container"].clear()
if refs["export_row"]:
refs["export_row"].clear()
# 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用) # 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用)
if refs["upload_widget"]: if refs["upload_widget"]:
refs["upload_widget"].reset() refs["upload_widget"].reset()
@@ -347,7 +360,9 @@ def index():
except Exception as ex: except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative") ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)"): async def save_file_with_dialog(
filename, callback, file_filter="All files (*.*)", sender=None
):
""" """
跨平台文件保存助手。 跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。 如果是原生模式,弹出系统保存对话框。
@@ -356,9 +371,12 @@ def index():
:param filename: 默认文件名 :param filename: 默认文件名
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None :param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
:param file_filter: 格式如 "Excel Files (*.xlsx)" :param file_filter: 格式如 "Excel Files (*.xlsx)"
:param sender: 触发该操作的 UI 组件,用于在操作期间禁用以防重复点击
""" """
if sender:
sender.disable()
try:
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows) # 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
# 这完全避免了 Python UI 库 (Tkinter/PyWebview) 的线程冲突和 PicklingError
import platform import platform
import subprocess import subprocess
@@ -399,15 +417,16 @@ def index():
print("DEBUG: invoking PowerShell SaveFileDialog...") print("DEBUG: invoking PowerShell SaveFileDialog...")
# 在 native 模式下直接同步执行,不使用 run.io_bound() # 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环
result = subprocess.run( # 这样按钮的禁用状态可以立即同步到前端
result = await run.io_bound(
subprocess.run,
["powershell", "-Command", ps_script], ["powershell", "-Command", ps_script],
capture_output=True, capture_output=True,
text=True, text=True,
startupinfo=startupinfo startupinfo=startupinfo,
) )
save_path = result.stdout.strip() save_path = result.stdout.strip()
if save_path: if save_path:
print(f"DEBUG: PowerShell returned path: {save_path}") print(f"DEBUG: PowerShell returned path: {save_path}")
await callback(save_path) await callback(save_path)
@@ -427,6 +446,9 @@ def index():
temp_path = os.path.join(state["temp_dir"], filename) temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path) await callback(temp_path)
ui.download(temp_path) ui.download(temp_path)
finally:
if sender:
sender.enable()
def update_export_buttons(): def update_export_buttons():
if refs["export_row"]: if refs["export_row"]:
@@ -468,9 +490,12 @@ def index():
# 如果不存在,重新生成 # 如果不存在,重新生成
export_all_scenarios_to_excel(state["results"], path) export_all_scenarios_to_excel(state["results"], path)
async def on_click_excel(): async def on_click_excel(e):
await save_file_with_dialog( await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)" default_excel_name,
save_excel,
"Excel Files (*.xlsx)",
sender=e.sender,
) )
ui.button( ui.button(
@@ -479,41 +504,7 @@ def index():
).props("icon=download") ).props("icon=download")
# --- 导出推荐方案 DXF --- # --- 导出推荐方案 DXF ---
def export_best_dxf(): async def on_click_best_dxf(e):
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
# 包装为 async 任务,并在 NiceGUI 事件循环中执行
async def run_save():
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
# 这里的 export_best_dxf 本身是普通函数,绑定到 on_click
# 但我们需要它执行异步操作。最简单的是让 export_best_dxf 变为 async
# 或者在这里直接调用 run_save (但这在普通函数里不行)
# 更好的方法是将 export_best_dxf 定义为 async如下所示
return run_save()
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
# 将 export_best_dxf 改为 async 并重命名,以便直接用作回调
async def on_click_best_dxf():
if state["substation"] is not None: if state["substation"] is not None:
safe_name = "".join( safe_name = "".join(
[ [
@@ -533,7 +524,7 @@ def index():
) )
await save_file_with_dialog( await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)" default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
) )
else: else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative") ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
@@ -543,7 +534,7 @@ def index():
).props("icon=architecture color=accent") ).props("icon=architecture color=accent")
# --- 导出选中方案 DXF --- # --- 导出选中方案 DXF ---
async def on_click_selected_dxf(): async def on_click_selected_dxf(e):
if not refs["results_table"] or not refs["results_table"].selected: if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning") ui.notify("请先在上方表格中选择一个方案", type="warning")
return return
@@ -573,7 +564,7 @@ def index():
) )
await save_file_with_dialog( await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)" default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
) )
else: else:
ui.notify( ui.notify(
@@ -588,7 +579,7 @@ def index():
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})") refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
# --- 导出全部 ZIP --- # --- 导出全部 ZIP ---
async def on_click_all_dxf(): async def on_click_all_dxf(e):
if not state["results"] or state["substation"] is None: if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning") ui.notify("无方案数据可导出", type="warning")
return return
@@ -634,12 +625,71 @@ def index():
except: except:
pass pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)") await save_file_with_dialog(
default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender
)
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props( ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary" "icon=folder_zip color=secondary"
) )
# --- 导出计算日志 ---
async def on_click_export_log(e):
# 尝试多种方式获取日志内容
log_content = ""
method_used = "unknown"
# 方法1: 首先尝试从保存的日志内容获取
if refs.get("log_content") and refs["log_content"].strip():
log_content = refs["log_content"]
method_used = "saved_memory"
# 方法2: 如果保存的日志为空,尝试使用 JavaScript 获取 log 组件的内容
if not log_content.strip() and refs["log_box"]:
try:
log_id = refs["log_box"].id
js_code = f"""
(function() {{
const logElement = document.querySelector("#c{log_id}");
if (logElement) {{
console.log("Found log element:", logElement);
return logElement.innerText || logElement.textContent || "";
}}
console.log("Log element not found for ID: c{log_id}");
return "";
}})()
"""
result = await ui.run_javascript(js_code)
if result and result.strip():
log_content = result
method_used = "javascript"
except Exception as js_error:
print(f"JavaScript method failed: {js_error}")
if not log_content.strip():
ui.notify(
"没有可导出的日志内容。请先运行计算任务。", type="warning"
)
print(f"Log export failed. Method tried: {method_used}")
return
print(
f"Successfully exported log using method: {method_used}, length: {len(log_content)}"
)
default_name = f"{file_prefix}_calculation_log.txt"
async def save_log(path):
with open(path, "w", encoding="utf-8") as f:
f.write(log_content)
await save_file_with_dialog(
default_name, save_log, "Text Files (*.txt)", sender=e.sender
)
ui.button("导出计算日志", on_click=on_click_export_log).props(
"icon=description color=info"
)
def update_plot(result): def update_plot(result):
if refs["plot_container"]: if refs["plot_container"]:
refs["plot_container"].clear() refs["plot_container"].clear()
@@ -688,16 +738,21 @@ def index():
import queue import queue
from nicegui import run
async def run_analysis(): async def run_analysis():
if not state["excel_path"]: if not state["excel_path"]:
ui.notify("请先上传 Excel 坐标文件!", type="warning") ui.notify("请先上传 Excel 坐标文件!", type="warning")
return return
if refs["log_box"]: if refs["log_box"]:
refs["log_box"].clear() refs["log_box"].clear()
# 重置日志内容
refs["log_content"] = ""
log_queue = queue.Queue() log_queue = queue.Queue()
# 获取开关状态
use_ga = refs["ga_switch"].value if refs["ga_switch"] else False
use_mip = refs["mip_switch"].value if refs["mip_switch"] else False
print(f"Switch values: GA={use_ga}, MIP={use_mip}")
class QueueLogger(io.StringIO): class QueueLogger(io.StringIO):
def write(self, message): def write(self, message):
if message and message.strip(): if message and message.strip():
@@ -711,6 +766,8 @@ def index():
try: try:
msg = log_queue.get_nowait() msg = log_queue.get_nowait()
refs["log_box"].push(msg) refs["log_box"].push(msg)
# 同时保存到日志内容中
refs["log_content"] += msg + "\n"
new_msg = True new_msg = True
if msg.startswith("--- Scenario"): if msg.startswith("--- Scenario"):
scenario_name = msg.replace("---", "").strip() scenario_name = msg.replace("---", "").strip()
@@ -745,6 +802,8 @@ def index():
n_clusters_override=None, n_clusters_override=None,
interactive=False, interactive=False,
plot_results=False, plot_results=False,
use_ga=use_ga,
use_mip=use_mip,
) )
# 在后台线程运行计算任务 # 在后台线程运行计算任务
@@ -810,11 +869,20 @@ def index():
total_length_m = sum(d["length"] for d in res["eval"]["details"]) total_length_m = sum(d["length"] for d in res["eval"]["details"])
total_length_km = total_length_m / 1000 total_length_km = total_length_m / 1000
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for d in res["eval"]["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
row_dict = { row_dict = {
"name": name_display, "name": name_display,
"n_circuits": n_circuits,
"cost_wan": f"{res['cost'] / 10000:.2f}", "cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}", "loss_kw": f"{res['loss']:.2f}",
"total_length": f"{total_length_km:.2f}", "total_length": f"{total_length_km:.2f}",
"npv_loss_wan": f"{res.get('npv_loss', 0) / 10000:.2f}",
"total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}", "total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}",
"note": note, "note": note,
"original_name": res["name"], "original_name": res["name"],
@@ -878,7 +946,7 @@ def index():
# 使用 items-stretch 确保所有子元素高度一致 # 使用 items-stretch 确保所有子元素高度一致
with ui.row().classes("w-full items-stretch gap-4"): with ui.row().classes("w-full items-stretch gap-4"):
# 1. 导出模板按钮 # 1. 导出模板按钮
async def export_template(): async def export_template(e):
import shutil import shutil
from generate_template import create_template from generate_template import create_template
@@ -899,7 +967,10 @@ def index():
raise FileNotFoundError("无法生成模板文件") raise FileNotFoundError("无法生成模板文件")
await save_file_with_dialog( await save_file_with_dialog(
"coordinates.xlsx", save_template, "Excel Files (*.xlsx)" "coordinates.xlsx",
save_template,
"Excel Files (*.xlsx)",
sender=e.sender,
) )
ui.button("导出 Excel 模板", on_click=export_template).classes( ui.button("导出 Excel 模板", on_click=export_template).classes(
@@ -934,6 +1005,17 @@ def index():
.classes("flex-1 py-4") .classes("flex-1 py-4")
.props("icon=play_arrow color=secondary") .props("icon=play_arrow color=secondary")
) )
# 4. 遗传算法开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["ga_switch"] = ui.switch("启用遗传算法", value=False).props(
"color=orange"
)
# 5. MIP开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["mip_switch"] = ui.switch("启用MIP", value=False).props(
"color=blue"
)
with ui.column().classes("w-full gap-4"): with ui.column().classes("w-full gap-4"):
# 新增:信息展示卡片 # 新增:信息展示卡片
@@ -966,6 +1048,12 @@ def index():
"required": True, "required": True,
"align": "left", "align": "left",
}, },
{
"name": "n_circuits",
"label": "回路数",
"field": "n_circuits",
"sortable": True,
},
{ {
"name": "cost_wan", "name": "cost_wan",
"label": "总投资 (万元)", "label": "总投资 (万元)",
@@ -984,6 +1072,12 @@ def index():
"field": "total_length", "field": "total_length",
"sortable": True, "sortable": True,
}, },
{
"name": "npv_loss_wan",
"label": "损耗费用净现值 (万元)",
"field": "npv_loss_wan",
"sortable": True,
},
{ {
"name": "total_cost_npv_wan", "name": "total_cost_npv_wan",
"label": "总费用 (万元)", "label": "总费用 (万元)",

394
main.py
View File

@@ -1,6 +1,7 @@
import argparse import argparse
import math import math
import os import os
import random
from collections import defaultdict from collections import defaultdict
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
@@ -12,6 +13,8 @@ from scipy.spatial import distance_matrix
from sklearn.cluster import KMeans from sklearn.cluster import KMeans
from esau_williams import design_with_esau_williams from esau_williams import design_with_esau_williams
from ga import design_with_ga
from mip import design_with_mip
# 设置matplotlib支持中文显示 # 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"] plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
@@ -28,6 +31,7 @@ ANNUAL_LOSS_HOURS = 1400 # 年损耗小时数(小时)
# 1. 生成风电场数据(实际应用中替换为真实坐标) # 1. 生成风电场数据(实际应用中替换为真实坐标)
def generate_wind_farm_data(n_turbines=30, seed=42, layout="random", spacing=800): def generate_wind_farm_data(n_turbines=30, seed=42, layout="random", spacing=800):
pass # 实际应用中从Excel读取真实坐标此函数保留用于测试
""" """
生成模拟风电场数据 生成模拟风电场数据
:param layout: 'random' (随机) 或 'grid' (规则行列) :param layout: 'random' (随机) 或 'grid' (规则行列)
@@ -654,39 +658,175 @@ def design_with_rotational_sweep(
return final_connections, turbines return final_connections, turbines
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_design(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params:
res_list = total_investment(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines
# 4. 获取电缆最大容量(MW)
def get_max_cable_capacity_mw( def get_max_cable_capacity_mw(
cable_specs=None, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR
): ):
""" """
计算给定电缆规格中能够承载的最大功率 (单位: MW)。 根据电缆规格计算最大承载功率
:param cable_specs: 电缆规格列表 list of tuples或者直接是最大功率数值(MW)
基于提供的电缆规格列表,选取最大载流量,结合系统电压和功率因数计算理论最大传输功率。
参数:
cable_specs (list, optional): 电缆规格列表。每个元素应包含 (截面积, 额定电流, 单价, 损耗系数)。
voltage (float): 系统电压 (V), 默认 66000
power_factor (float): 功率因数, 默认 0.95
返回:
float: 最大功率承载能力 (MW)。
异常:
Exception: 当未提供 cable_specs 时抛出,提示截面不满足。
""" """
if cable_specs is None: # 如果传入的已经是数值,直接返回
# Default cable specs if not provided (same as in evaluate_design) if isinstance(cable_specs, (int, float, np.number)):
cable_specs = [ return float(cable_specs)
(35, 150, 0.524, 80),
(70, 215, 0.268, 120), # 兼容性检查:如果列表为空
(95, 260, 0.193, 150), if not cable_specs:
(120, 295, 0.153, 180), print("Warning: 没有可用的电缆规格,使用默认最大容量 100MW")
(150, 330, 0.124, 220), return 100.0
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600),
]
# 从所有电缆规格中找到最大的额定电流容量 # 从所有电缆规格中找到最大的额定电流容量
max_current_capacity = max(spec[1] for spec in cable_specs) max_current_capacity = max(spec[1] for spec in cable_specs)
@@ -858,8 +998,9 @@ def evaluate_design(
total_cost += cable["cost"] total_cost += cable["cost"]
# 计算I²R损耗 (简化版) # 计算I²R损耗 (简化版)
loss = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相 loss_w = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相单位W
total_loss += loss loss_kw = loss_w / 1000 # 转换为 kW
total_loss += loss_kw
return { return {
"total_cost": total_cost, "total_cost": total_cost,
@@ -1008,10 +1149,16 @@ def export_to_excel(connections_details, filename):
df = pd.DataFrame(data) df = pd.DataFrame(data)
# 汇总统计 # 汇总统计
n_circuits = sum(
1
for conn in connections_details
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary = { summary = {
"Total Cost (¥)": df["Cost (¥)"].sum(), "Total Cost (¥)": df["Cost (¥)"].sum(),
"Total Effective Length (m)": df["Effective Length (m)"].sum(), "Total Effective Length (m)": df["Effective Length (m)"].sum(),
"Total Vertical Length (m)": df["Vertical Length (m)"].sum(), "Total Vertical Length (m)": df["Vertical Length (m)"].sum(),
"Number of Circuits": n_circuits,
} }
summary_df = pd.DataFrame([summary]) summary_df = pd.DataFrame([summary])
@@ -1037,15 +1184,18 @@ def export_all_scenarios_to_excel(results, filename):
# 1. 总览 Sheet # 1. 总览 Sheet
summary_data = [] summary_data = []
for res in results: for res in results:
# 获取回路数 # 获取回路数 (通过统计从升压站发出的连接)
n_circuits = 0 n_circuits = sum(
if "turbines" in res and "cluster" in res["turbines"].columns: 1
n_circuits = res["turbines"]["cluster"].nunique() for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary_data.append( summary_data.append(
{ {
"Scenario": res["name"], "Scenario": res["name"],
"Total Cost (¥)": res["cost"], "Total Cost (¥)": res["cost"],
"总费用(万元)": res.get("total_cost_npv", res["cost"]) / 10000,
"Total Loss (kW)": res["loss"], "Total Loss (kW)": res["loss"],
"Num Circuits": n_circuits, "Num Circuits": n_circuits,
# 计算电缆统计 # 计算电缆统计
@@ -1246,7 +1396,12 @@ def visualize_design(
# 7. 主函数:比较两种设计方法 # 7. 主函数:比较两种设计方法
def compare_design_methods( def compare_design_methods(
excel_path=None, n_clusters_override=None, interactive=True, plot_results=True excel_path=None,
n_clusters_override=None,
interactive=True,
plot_results=True,
use_ga=False,
use_mip=False,
): ):
""" """
比较MST和三种电缆方案下的K-means设计方法 比较MST和三种电缆方案下的K-means设计方法
@@ -1412,14 +1567,10 @@ def compare_design_methods(
print(f" 最大电缆容量: {max_cable_mw:.2f} MW") print(f" 最大电缆容量: {max_cable_mw:.2f} MW")
# --- Run 1: Base Algorithm (Capacitated Sweep) --- # --- Run 1: Base Sector Sweep ---
base_name = f"{name} (Base)" base_name = f"{name} (Base)"
conns_base, turbines_base = design_with_capacitated_sweep( conns_base, turbines_base = design_with_capacitated_sweep(
turbines.copy(), turbines.copy(), substation, max_cable_mw, voltage=voltage
substation,
cable_specs=current_specs,
voltage=voltage,
power_factor=power_factor,
) )
eval_base = evaluate_design( eval_base = evaluate_design(
turbines, turbines,
@@ -1431,7 +1582,11 @@ def compare_design_methods(
voltage=voltage, voltage=voltage,
power_factor=power_factor, power_factor=power_factor,
) )
n_circuits_base = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append( comparison_results.append(
{ {
"name": base_name, "name": base_name,
@@ -1443,17 +1598,13 @@ def compare_design_methods(
} }
) )
print( print(
f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW" f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW | Circuits: {n_circuits_base}"
) )
# --- Run 2: Rotational Algorithm (Optimization) --- # --- Run 2: Rotational Sweep (Optimization) ---
rot_name = f"{name} (Rotational)" rot_name = f"{name} (Rotational)"
conns_rot, turbines_rot = design_with_rotational_sweep( conns_rot, turbines_rot = design_with_rotational_sweep(
turbines.copy(), turbines.copy(), substation, max_cable_mw, voltage=voltage
substation,
cable_specs=current_specs,
voltage=voltage,
power_factor=power_factor,
) )
eval_rot = evaluate_design( eval_rot = evaluate_design(
turbines, turbines,
@@ -1465,7 +1616,11 @@ def compare_design_methods(
voltage=voltage, voltage=voltage,
power_factor=power_factor, power_factor=power_factor,
) )
n_circuits_rot = sum(
1
for d in eval_rot["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append( comparison_results.append(
{ {
"name": rot_name, "name": rot_name,
@@ -1477,7 +1632,7 @@ def compare_design_methods(
} }
) )
print( print(
f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW" f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW | Circuits: {n_circuits_rot}"
) )
# --- Run 3: Esau-Williams Algorithm --- # --- Run 3: Esau-Williams Algorithm ---
@@ -1495,7 +1650,11 @@ def compare_design_methods(
voltage=voltage, voltage=voltage,
power_factor=power_factor, power_factor=power_factor,
) )
n_circuits_ew = sum(
1
for d in eval_ew["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append( comparison_results.append(
{ {
"name": ew_name, "name": ew_name,
@@ -1507,7 +1666,94 @@ def compare_design_methods(
} }
) )
print( print(
f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW" f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW | Circuits: {n_circuits_ew}"
)
if use_ga:
# --- Run 4: Genetic Algorithm ---
ga_name = f"{name} (GA)"
conns_ga, turbines_ga = design_with_ga(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_ga = evaluate_design(
turbines,
conns_ga,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=ga_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ga = sum(
1
for d in eval_ga["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ga_name,
"cost": eval_ga["total_cost"],
"loss": eval_ga["total_loss"],
"eval": eval_ga,
"turbines": turbines_ga,
"specs": current_specs,
}
)
print(
f" [GA] Cost: ¥{eval_ga['total_cost']:,.2f} | Loss: {eval_ga['total_loss']:.2f} kW | Circuits: {n_circuits_ga}"
)
if use_mip:
print(f"Starting MIP optimization for {name}")
# --- Run 5: Mixed Integer Programming ---
mip_name = f"{name} (MIP)"
conns_mip, turbines_mip = design_with_mip(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_mip = evaluate_design(
turbines,
conns_mip,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=mip_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_mip = sum(
1
for d in eval_mip["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": mip_name,
"cost": eval_mip["total_cost"],
"loss": eval_mip["total_loss"],
"eval": eval_mip,
"turbines": turbines_mip,
"specs": current_specs,
}
)
print(
f" [MIP] Cost: ¥{eval_mip['total_cost']:,.2f} | Loss: {eval_mip['total_loss']:.2f} kW | Circuits: {n_circuits_mip}"
) )
# 记录最佳 # 记录最佳
@@ -1524,7 +1770,11 @@ def compare_design_methods(
# 可视化 (只画 Base 版本) # 可视化 (只画 Base 版本)
ax_idx = i + 1 ax_idx = i + 1
if plot_results and ax_idx < 4: if plot_results and ax_idx < 4:
n_circuits = turbines_base["cluster"].nunique() n_circuits = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}" title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}"
visualize_design( visualize_design(
turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx] turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx]
@@ -1564,7 +1814,17 @@ def compare_design_methods(
for i, res in enumerate(comparison_results): for i, res in enumerate(comparison_results):
if res["cost"] < comparison_results[best_idx]["cost"]: if res["cost"] < comparison_results[best_idx]["cost"]:
best_idx = i best_idx = i
print(f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f}")
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
print(
f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}"
)
print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)") print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)")
@@ -1680,21 +1940,23 @@ def total_investment(results, system_params):
更新后的results列表每个结果新增 'total_cost_npv' 字段(总费用净现值,元) 更新后的results列表每个结果新增 'total_cost_npv' 字段(总费用净现值,元)
""" """
# 获取系统参数,使用默认值 # 获取系统参数,使用默认值
discount_rate_percent = system_params.get('discount_rate', DISCOUNT_RATE) discount_rate_percent = system_params.get("discount_rate", DISCOUNT_RATE)
electricity_price = system_params.get('electricity_price', ELECTRICITY_PRICE) electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE)
project_lifetime = system_params.get('project_lifetime', PROJECT_LIFETIME) project_lifetime = system_params.get("project_lifetime", PROJECT_LIFETIME)
annual_loss_hours = system_params.get('annual_loss_hours', ANNUAL_LOSS_HOURS) annual_loss_hours = system_params.get("annual_loss_hours", ANNUAL_LOSS_HOURS)
# 将折现率转换为小数 # 将折现率转换为小数
r = discount_rate_percent / 100.0 r = discount_rate_percent / 100.0
for result in results: for result in results:
cable_cost = result['cost'] # 电缆总投资(元) cable_cost = result["cost"] # 电缆总投资(元)
loss_power = result['loss'] # 线损功率kW loss_power = result["loss"] # 线损功率kW
# 1. 计算电缆投资的净现值2年分期 # 1. 计算电缆投资的净现值2年分期
# 第1年支付50%第2年支付50% # 第1年支付50%第2年支付50%
npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / ((1 + r) ** 2) npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / (
(1 + r) ** 2
)
# 2. 计算电费损耗的净现值(生命周期内) # 2. 计算电费损耗的净现值(生命周期内)
# 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh) # 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh)
@@ -1713,10 +1975,10 @@ def total_investment(results, system_params):
total_cost_npv = npv_cable + npv_loss total_cost_npv = npv_cable + npv_loss
# 将结果添加到字典中 # 将结果添加到字典中
result['total_cost_npv'] = total_cost_npv result["total_cost_npv"] = total_cost_npv
result['npv_cable'] = npv_cable result["npv_cable"] = npv_cable
result['npv_loss'] = npv_loss result["npv_loss"] = npv_loss
result['annual_loss_cost'] = annual_loss_cost result["annual_loss_cost"] = annual_loss_cost
return results return results

472
mip.py Normal file
View File

@@ -0,0 +1,472 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
try:
import pulp
pulp_available = True
except ImportError:
pulp = None
pulp_available = False
try:
import pyomo.environ as pyo_env
pyomo_available = True
except (ImportError, AttributeError):
pyomo_available = False
print("Pyomo not available, falling back to PuLP")
def design_with_pyomo(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用Pyomo求解器优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0
total_power = turbines["power"].sum()
if max_clusters is None:
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
# Simple fallback for now - use PuLP instead
print("Pyomo not fully implemented, falling back to PuLP")
return design_with_mip(
turbines,
substation,
cable_specs,
voltage,
power_factor,
system_params,
max_clusters,
time_limit,
evaluate_func,
total_invest_func,
get_max_capacity_func,
)
def design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用混合整数规划(MIP)优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if not pulp_available:
print(
"WARNING: PuLP library not available. MIP optimization skipped, falling back to MST."
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0
if max_clusters is None:
max_clusters = int(np.ceil(turbines["power"].sum() / max_mw))
n_turbines = len(turbines)
print(
f"MIP Model Setup: n_turbines={n_turbines}, max_clusters={max_clusters}, max_mw={max_mw:.2f} MW"
)
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize)
# Create all decision variables upfront to avoid duplicates
assign_vars = {}
for i in range(n_turbines):
for k in range(max_clusters):
assign_vars[(i, k)] = pulp.LpVariable(f"assign_{i}_{k}", cat="Binary")
cluster_vars = {}
for k in range(max_clusters):
cluster_vars[k] = pulp.LpVariable(f"cluster_{k}", cat="Binary")
# Helper functions to access variables
def assign_var(i, k):
return assign_vars[(i, k)]
def cluster_var(k):
return cluster_vars[k]
# Simplified objective function: minimize total distance
prob += pulp.lpSum(
[
dist_matrix_full[0, i + 1] * assign_var(i, k)
for i in range(n_turbines)
for k in range(max_clusters)
]
)
for i in range(n_turbines):
prob += pulp.lpSum([assign_var(i, k) for k in range(max_clusters)]) == 1
for k in range(max_clusters):
cluster_power = pulp.lpSum(
[turbines.iloc[i]["power"] * assign_var(i, k) for i in range(n_turbines)]
)
prob += cluster_power <= max_mw * 1.2 * cluster_var(k)
for k in range(max_clusters):
for i in range(n_turbines):
prob += assign_var(i, k) <= cluster_var(k)
print(
f"MIP Model: {len(prob.variables())} variables, {len(prob.constraints)} constraints"
)
# Debug: Print model structure
print("MIP model structure check:")
print(f" Variables: {len(prob.variables())}")
print(f" Constraints: {len(prob.constraints)}")
print(f" Time limit: {time_limit}s")
print(f" Turbines: {n_turbines}, Clusters: {max_clusters}")
# Test solver availability
try:
import subprocess
test_solver = subprocess.run(
[
r"D:\code\windfarm\.venv\Lib\site-packages\pulp\apis\..\solverdir\cbc\win\i64\cbc.exe",
"-version",
],
capture_output=True,
text=True,
timeout=5,
)
print(
f"CBC solver test: {test_solver.stdout[:100] if test_solver.stdout else 'No output'}"
)
except Exception as solver_test_error:
print(f"CBC solver test failed: {solver_test_error}")
print("MIP: Starting to solve...")
try:
# Try to use CBC solver with different configurations
solver = pulp.PULP_CBC_CMD(
timeLimit=time_limit,
msg=False,
warmStart=False,
)
print(f"Using CBC solver with time limit: {time_limit}s")
status = prob.solve(solver)
print(
f"MIP: Solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e:
print(f"MIP: CBC solver execution failed: {e}")
# Try alternative solver configurations
try:
print("MIP: Trying alternative solver configuration...")
solver = pulp.PULP_CBC_CMD(
msg=True, # Enable messages for debugging
threads=1, # Single thread
timeLimit=time_limit,
)
status = prob.solve(solver)
print(
f"MIP: Alternative solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e2:
print(f"MIP: All solver attempts failed: {e2}, falling back to MST")
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if pulp.LpStatus[prob.status] != "Optimal":
print(
f"MIP solver status: {pulp.LpStatus[prob.status]}, solution not found, falling back to MST"
)
print("Model feasibility check:")
print(f"Total power: {turbines['power'].sum():.2f} MW")
print(f"Max cluster capacity: {max_mw:.2f} MW")
print(f"Number of clusters: {max_clusters}, Number of turbines: {n_turbines}")
for k in range(max_clusters):
cluster_power = pulp.value(
pulp.lpSum(
[
turbines.iloc[i]["power"] * assign_var(i, k)
for i in range(n_turbines)
]
)
)
cluster_used = pulp.value(cluster_var(k))
print(
f"Cluster {k}: Power={cluster_power:.2f} MW (max {max_mw * 1.2:.2f}), Used={cluster_used}"
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
cluster_assign = [-1] * n_turbines
active_clusters = []
for k in range(max_clusters):
if pulp.value(cluster_var(k)) > 0.5:
active_clusters.append(k)
for i in range(n_turbines):
assigned = False
for k in active_clusters:
if pulp.value(assign_var(i, k)) > 0.5:
cluster_assign[i] = k
assigned = True
break
if not assigned:
dists = [dist_matrix_full[0, i + 1] for k in active_clusters]
cluster_assign[i] = active_clusters[np.argmin(dists)]
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
# Check cluster distances
min_cluster_distance = check_cluster_distances(clusters, turbines)
if min_cluster_distance is not None:
print(
f"Cluster validation: Minimum distance between clusters = {min_cluster_distance:.2f} m"
)
if min_cluster_distance < 1000:
print(
f"WARNING: Clusters are very close to each other ({min_cluster_distance:.2f} m < 1000 m)"
)
elif min_cluster_distance < 2000:
print(
f"NOTICE: Clusters are relatively close ({min_cluster_distance:.2f} m)"
)
# Check for cable crossings
cable_crossings = check_cable_crossings(connections, turbines, substation)
if cable_crossings:
print(
f"WARNING: Found {len(cable_crossings)} cable crossing(s) in the solution"
)
for i, (idx1, idx2, p1, p2, p3, p4) in enumerate(cable_crossings):
conn1 = connections[idx1]
conn2 = connections[idx2]
print(
f" Crossing {i + 1}: Connection {conn1[0]}-{conn1[1]} crosses {conn2[0]}-{conn2[1]}"
)
else:
print("No cable crossings detected in the solution")
print(
f"MIP optimization completed successfully, {len(connections)} connections generated"
)
return connections, turbines
def calculate_cluster_centroids(clusters, turbines):
"""Calculate the centroid coordinates for each cluster."""
centroids = {}
for c, members in clusters.items():
if len(members) == 0:
centroids[c] = (0, 0)
else:
coords = turbines.iloc[members][["x", "y"]].values
centroid_x = np.mean(coords[:, 0])
centroid_y = np.mean(coords[:, 1])
centroids[c] = (centroid_x, centroid_y)
return centroids
def check_cluster_distances(clusters, turbines, min_distance_threshold=1000):
"""Check if any clusters are too close to each other."""
if len(clusters) < 2:
return None
centroids = calculate_cluster_centroids(clusters, turbines)
active_clusters = [c for c, members in clusters.items() if len(members) > 0]
min_distance = float("inf")
min_pair = None
for i in range(len(active_clusters)):
for j in range(i + 1, len(active_clusters)):
c1, c2 = active_clusters[i], active_clusters[j]
centroid1 = np.array(centroids[c1])
centroid2 = np.array(centroids[c2])
distance = np.linalg.norm(centroid1 - centroid2)
if distance < min_distance:
min_distance = distance
min_pair = (c1, c2)
return min_distance
def check_cable_crossings(connections, turbines, substation):
"""Check if there are cable crossings in the solution."""
crossings = []
def line_intersection(p1, p2, p3, p4):
"""Check if line segments (p1,p2) and (p3,p4) intersect."""
x1, y1 = p1
x2, y2 = p2
x3, y3 = p3
x4, y4 = p4
denom = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1)
if abs(denom) < 1e-10:
return False
ua = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / denom
ub = ((x2 - x1) * (y1 - y3) - (y2 - y1) * (x1 - x3)) / denom
return 0 <= ua <= 1 and 0 <= ub <= 1
def get_turbine_coord(connection_part):
"""Get coordinates from connection part (turbine_# or substation)."""
if connection_part == "substation":
# Handle different substation formats robustly
if isinstance(substation, np.ndarray):
if substation.ndim == 1:
# 1D array [x, y]
return (substation[0], substation[1])
elif substation.ndim == 2:
# 2D array [[x, y]] or shape (n, 2)
if substation.shape[0] == 1:
return (substation[0, 0], substation[0, 1])
else:
# Multiple points, use first one
return (substation[0, 0], substation[0, 1])
else:
# Unexpected dimension, try fallback
return (substation.flat[0], substation.flat[1])
elif isinstance(substation, (list, tuple)):
# List or tuple format
# Handle nested lists like [[x, y]]
if (
isinstance(substation[0], (list, tuple, np.ndarray))
and len(substation[0]) >= 2
):
return (substation[0][0], substation[0][1])
elif len(substation) >= 2:
return (substation[0], substation[1])
else:
return (float("inf"), float("inf"))
else:
# Unexpected format, try to convert
try:
sub_array = np.array(substation)
if sub_array.ndim == 1:
return (sub_array[0], sub_array[1])
else:
return (sub_array.flat[0], sub_array.flat[1])
except:
return (float("inf"), float("inf"))
else:
turbine_idx = int(connection_part.split("_")[1])
return (
turbines.iloc[turbine_idx]["x"],
turbines.iloc[turbine_idx]["y"],
)
for i in range(len(connections)):
for j in range(i + 1, len(connections)):
conn1 = connections[i]
conn2 = connections[j]
p1 = get_turbine_coord(conn1[0])
p2 = get_turbine_coord(conn1[1])
p3 = get_turbine_coord(conn2[0])
p4 = get_turbine_coord(conn2[1])
if (
np.array_equal(p1, p3)
or np.array_equal(p1, p4)
or np.array_equal(p2, p3)
or np.array_equal(p2, p4)
):
continue
if line_intersection(p1, p2, p3, p4):
crossings.append((i, j, p1, p2, p3, p4))
return crossings

View File

@@ -12,6 +12,8 @@ dependencies = [
"numpy>=2.4.0", "numpy>=2.4.0",
"openpyxl>=3.1.5", "openpyxl>=3.1.5",
"pandas>=2.3.3", "pandas>=2.3.3",
"pulp>=3.3.0",
"pyomo>=6.9.5",
"pywebview>=6.1", "pywebview>=6.1",
"scikit-learn>=1.8.0", "scikit-learn>=1.8.0",
"scipy>=1.16.3", "scipy>=1.16.3",

146
test_cbc_solver.py Normal file
View File

@@ -0,0 +1,146 @@
"""
Simple test to verify CBC solver functionality
"""
import pulp
import sys
import subprocess
import os
print("=== PuLP and CBC Solver Test ===")
print(f"Python version: {sys.version}")
print(f"PuLP version: {pulp.__version__}")
# Test 1: Check PuLP installation
print("\n1. Checking PuLP installation...")
try:
from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, lpSum, value
print("[OK] PuLP imported successfully")
except ImportError as e:
print(f"[FAIL] PuLP import failed: {e}")
sys.exit(1)
# Test 2: Check CBC solver file existence
print("\n2. Checking CBC solver file...")
solver_dir = os.path.join(
os.path.dirname(pulp.__file__), "apis", "..", "solverdir", "cbc", "win", "i64"
)
solver_path = os.path.join(solver_dir, "cbc.exe")
print(f"Looking for CBC at: {solver_path}")
if os.path.exists(solver_path):
print(f"[OK] CBC solver file found")
file_size = os.path.getsize(solver_path)
print(f" File size: {file_size:,} bytes ({file_size / 1024 / 1024:.2f} MB)")
else:
print(f"[FAIL] CBC solver file not found")
print(f" Checking directory contents:")
try:
parent_dir = os.path.dirname(solver_path)
if os.path.exists(parent_dir):
for item in os.listdir(parent_dir):
print(f" - {item}")
else:
print(f" Directory does not exist: {parent_dir}")
except Exception as e:
print(f" Error listing directory: {e}")
# Test 3: Try to run CBC solver directly
print("\n3. Testing CBC solver execution...")
if os.path.exists(solver_path):
try:
result = subprocess.run(
[solver_path, "-version"],
capture_output=True,
text=True,
timeout=10,
check=True,
)
print("[OK] CBC solver executed successfully")
print(f" Output: {result.stdout[:200]}")
except subprocess.CalledProcessError as e:
print(f"[FAIL] CBC solver execution failed (exit code {e.returncode})")
print(f" stdout: {e.stdout[:200]}")
print(f" stderr: {e.stderr[:200]}")
except subprocess.TimeoutExpired:
print("[FAIL] CBC solver execution timed out")
except Exception as e:
print(f"[FAIL] CBC solver execution error: {e}")
else:
print("[FAIL] Cannot test CBC execution - file not found")
# Test 4: Solve a simple linear programming problem
print("\n4. Testing simple LP problem...")
try:
# Simple problem: minimize x + y subject to x + y >= 5, x >= 0, y >= 0
prob = LpProblem("Simple_LP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Continuous")
y = LpVariable("y", lowBound=0, cat="Continuous")
prob += x + y # Objective: minimize x + y
prob += x + y >= 5 # Constraint
print(" Created simple LP problem: minimize x + y subject to x + y >= 5")
# Try to solve with CBC
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] LP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] LP problem solving failed: {e}")
import traceback
traceback.print_exc()
# Test 5: Solve a simple mixed integer programming problem
print("\n5. Testing simple MIP problem...")
try:
# Simple MIP: minimize x + y subject to x + y >= 5, x, y integers >= 0
prob = LpProblem("Simple_MIP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Integer")
y = LpVariable("y", lowBound=0, cat="Integer")
prob += x + y # Objective
prob += x + y >= 5 # Constraint
print(
" Created simple MIP problem: minimize x + y subject to x + y >= 5, x,y integers"
)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] MIP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] MIP problem solving failed: {e}")
import traceback
traceback.print_exc()
print("\n=== Test Complete ===")

50
test_mip.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Test script to verify MIP functionality
"""
import numpy as np
import pandas as pd
from mip import design_with_mip
# Create test data
np.random.seed(42)
n_turbines = 10
turbines = pd.DataFrame(
{
"x": np.random.uniform(0, 2000, n_turbines),
"y": np.random.uniform(0, 2000, n_turbines),
"power": np.random.uniform(5, 10, n_turbines),
}
)
substation = np.array([1000, 1000])
print("Test data created:")
print(f"Number of turbines: {n_turbines}")
print(f"Substation location: {substation}")
print(f"Total power: {turbines['power'].sum():.2f} MW")
# Test MIP function
print("\nTesting MIP design...")
try:
connections, turbines_with_clusters = design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=30,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
)
print(f"MIP test successful!")
print(f"Number of connections: {len(connections)}")
print(f"Clusters assigned: {turbines_with_clusters['cluster'].tolist()}")
except Exception as e:
print(f"MIP test failed with error: {e}")
import traceback
traceback.print_exc()

50
uv.lock generated
View File

@@ -1243,6 +1243,15 @@ wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/fc/f5/68334c015eed9b5cff77814258717dec591ded209ab5b6fb70e2ae873d1d/pillow-12.1.0-cp314-cp314t-win_arm64.whl", hash = "sha256:f61333d817698bdcdd0f9d7793e365ac3d2a21c1f1eb02b32ad6aefb8d8ea831" }, { url = "https://mirrors.pku.edu.cn/pypi/web/packages/fc/f5/68334c015eed9b5cff77814258717dec591ded209ab5b6fb70e2ae873d1d/pillow-12.1.0-cp314-cp314t-win_arm64.whl", hash = "sha256:f61333d817698bdcdd0f9d7793e365ac3d2a21c1f1eb02b32ad6aefb8d8ea831" },
] ]
[[package]]
name = "ply"
version = "3.11"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/e5/69/882ee5c9d017149285cab114ebeab373308ef0f874fcdac9beb90e0ac4da/ply-3.11.tar.gz", hash = "sha256:00c7c1aaa88358b9c765b6d3000c6eec0ba42abca5351b095321aef446081da3" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a3/58/35da89ee790598a0700ea49b2a66594140f44dec458c07e8e3d4979137fc/ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce" },
]
[[package]] [[package]]
name = "propcache" name = "propcache"
version = "0.4.1" version = "0.4.1"
@@ -1333,6 +1342,15 @@ version = "0.1.0"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" } source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/f2/cf/77d3e19b7fabd03895caca7857ef51e4c409e0ca6b37ee6e9f7daa50b642/proxy_tools-0.1.0.tar.gz", hash = "sha256:ccb3751f529c047e2d8a58440d86b205303cf0fe8146f784d1cbcd94f0a28010" } sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/f2/cf/77d3e19b7fabd03895caca7857ef51e4c409e0ca6b37ee6e9f7daa50b642/proxy_tools-0.1.0.tar.gz", hash = "sha256:ccb3751f529c047e2d8a58440d86b205303cf0fe8146f784d1cbcd94f0a28010" }
[[package]]
name = "pulp"
version = "3.3.0"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/16/1c/d880b739b841a8aa81143091c9bdda5e72e226a660aa13178cb312d4b27f/pulp-3.3.0.tar.gz", hash = "sha256:7eb99b9ce7beeb8bbb7ea9d1c919f02f003ab7867e0d1e322f2f2c26dd31c8ba" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/99/6c/64cafaceea3f99927e84b38a362ec6a8f24f33061c90bda77dfe1cd4c3c6/pulp-3.3.0-py3-none-any.whl", hash = "sha256:dd6ad2d63f196d1254eddf9dcff5cd224912c1f046120cb7c143c5b0eda63fae" },
]
[[package]] [[package]]
name = "pycparser" name = "pycparser"
version = "2.23" version = "2.23"
@@ -1571,6 +1589,34 @@ wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/86/637cda4983dc0936b73a385f3906256953ac434537b812814cb0b6d231a2/pyobjc_framework_webkit-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:1aaa3bf12c7b68e1a36c0b294d2728e06f2cc220775e6dc4541d5046290e4dc8" }, { url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/86/637cda4983dc0936b73a385f3906256953ac434537b812814cb0b6d231a2/pyobjc_framework_webkit-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:1aaa3bf12c7b68e1a36c0b294d2728e06f2cc220775e6dc4541d5046290e4dc8" },
] ]
[[package]]
name = "pyomo"
version = "6.9.5"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
dependencies = [
{ name = "ply" },
]
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/87/d8/f32e0dcacc8219694709200d4402c86a6e28d3af50380a5ccf7f7e15ffae/pyomo-6.9.5.tar.gz", hash = "sha256:0734020fcd5cc03ee200fd3f79d143fbfc14e6be116e0d16bab79f3f89609879" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f8/63/5f163b231a924ba7a5f6c58466c751f70be88568fa446524b6e806c98e4b/pyomo-6.9.5-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:549ee4226cab6e2ff6efe5b3b9891ce1dfd866d38a024715315ea850fa1bf0ec" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/bf/0cebcfce70be04d6d7aa19fbcbdeecdd5843caac617424f34ab3feb8e96e/pyomo-6.9.5-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b382cc8c3728199c8332024d64eed8622dabb3f8aebe5874c86a036489064f7a" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/14/27/967545514a2d0f4ca5ac6b595661cb0927cdcd10c3bb2832c5aa0ee15990/pyomo-6.9.5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:43c6e425ca5231b530cd23460e371b7ca9119224dd57237c34580e15f31e4d72" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/85/fe/691e5eb26f58ee4a072add6cc484756d9e3c367901ec6701d2c6789b394d/pyomo-6.9.5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a1923c358e1e8009a05ada911fc72e615c9e2ce6988f0979ec1ecc75880ee1f7" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/c7/b3/ae47340790f2f1f92f76b176acf475890717f0cb7def073e504b9857a057/pyomo-6.9.5-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:694262dc2eb53ca1ab245261f432a5ed1ec30cf3e651b5a6a1c276bc2dd81076" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/29/e9/7f782864afd28a9eb53057c9d046541be6535b2da35e11c2bcb80839c6bd/pyomo-6.9.5-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1f99ce91f2710d60b380a3a519288282d2183c44e1d66c131909313a3b63e7a2" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/cd/4d/9ca17a602e31a1c3f3148c455a5739fcbe23c102b80a12ec3e6d3bf5e847/pyomo-6.9.5-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d22f99e0ba8e2fb7d0e806bf630b8ce9b0a41d777c51f22711adbcb905f7486e" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/44/2e/78c3ac876791b59c836338b73dc49317b01cef574b01af061999a04a064a/pyomo-6.9.5-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d5953e490b9e9ea42d28804dd0358a9d3ef82560022c2b538e70a638790bc392" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/3c/27/3eb3db8e9ed6a01dee63219389aec761d5cc29b6dc5015b32f826f2a9225/pyomo-6.9.5-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:058eddde05b4354307975f1ecd25cfda9f8a282ad2e3b4f168ff8fee3c3623a1" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/9a/31/7f4750fc9bb0ec18a9534549e4c80ea63f1267aa828d495a48bbf0018f49/pyomo-6.9.5-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:105a073c47a2d2d6e74e48ed6fc82c6f6d19027488d5003aabb7ed5d10271483" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f8/67/639d0006eddab30cf415b0154763ccc51f3c15b934e866eb4fb07bc2b6ed/pyomo-6.9.5-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f2c636c2c640b33dde3b119f6f0941a1bbde39397c392dba55351b0438d8600f" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/67/ef/023b74b8f161f15a51febdd160354f1e3fd7e1475abbe5ccfb3d7588cf1f/pyomo-6.9.5-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e02813b4021eeed7214a1ca5d7daecbdc78d3db7059962553a57fd138d747c22" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a0/ca/edab1b532fd5e2d146d0cb96836eb5ae387b8a5bd255213e306793f6168e/pyomo-6.9.5-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:83789ce89271da31e0ff5bbef692af1621ab1747798183a5603b6577b7074277" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a9/3c/2745386f57030bc60b626adba002b68db3f9538d5b52900f48026a4a17d7/pyomo-6.9.5-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:1f449aaceac5078daaecc21d19b96a15529f9ac8aa90f6472e8811cc07112ecc" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f1/93/2058af0890b13f7e1a26e4925ff8d681c23d9cbdc2ecc9db17c744941617/pyomo-6.9.5-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f94d03f122fcf04a769c28ad48c423cd7b6d3d2c40da20bc8ea1a41bb20d0c36" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/de/30/c808931fc034851a16d3f8360d045b087ac743ea97bfe96cdb4b1df47c21/pyomo-6.9.5-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:96ff300e96cdab75e2e6983c99e3a61eaff2a6d0f5ed83acd939e74e361de537" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/68/29/394967f7df51788cbdf1b4aedfb7c5a3a62e11b85b4c9d806b86cc576be4/pyomo-6.9.5-py3-none-any.whl", hash = "sha256:60326f7d3143ee7d0f5c5c4a3cbf871b53e08cc6c2b0c9e6d25568880233472f" },
]
[[package]] [[package]]
name = "pyparsing" name = "pyparsing"
version = "3.3.1" version = "3.3.1"
@@ -2106,6 +2152,8 @@ dependencies = [
{ name = "numpy" }, { name = "numpy" },
{ name = "openpyxl" }, { name = "openpyxl" },
{ name = "pandas" }, { name = "pandas" },
{ name = "pulp" },
{ name = "pyomo" },
{ name = "pywebview" }, { name = "pywebview" },
{ name = "scikit-learn" }, { name = "scikit-learn" },
{ name = "scipy" }, { name = "scipy" },
@@ -2125,6 +2173,8 @@ requires-dist = [
{ name = "numpy", specifier = ">=2.4.0" }, { name = "numpy", specifier = ">=2.4.0" },
{ name = "openpyxl", specifier = ">=3.1.5" }, { name = "openpyxl", specifier = ">=3.1.5" },
{ name = "pandas", specifier = ">=2.3.3" }, { name = "pandas", specifier = ">=2.3.3" },
{ name = "pulp", specifier = ">=3.3.0" },
{ name = "pyomo", specifier = ">=6.9.5" },
{ name = "pywebview", specifier = ">=6.1" }, { name = "pywebview", specifier = ">=6.1" },
{ name = "scikit-learn", specifier = ">=1.8.0" }, { name = "scikit-learn", specifier = ">=1.8.0" },
{ name = "scipy", specifier = ">=1.16.3" }, { name = "scipy", specifier = ">=1.16.3" },

232
win32_helper.py Normal file
View File

@@ -0,0 +1,232 @@
import ctypes
import ctypes.wintypes
import os
def show_save_dialog_win32():
"""
使用 ctypes 直接调用 Windows API (GetSaveFileNameW)
不需要子进程,可以在线程中运行 (run.io_bound)
"""
try:
# 定义 OPENFILENAME 结构体
class OPENFILENAME(ctypes.Structure):
_fields_ = [
("lStructSize", ctypes.wintypes.DWORD),
("hwndOwner", ctypes.wintypes.HWND),
("hInstance", ctypes.wintypes.HINSTANCE),
("lpstrFilter", ctypes.wintypes.LPCWSTR),
("lpstrCustomFilter", ctypes.wintypes.LPWSTR),
("nMaxCustFilter", ctypes.wintypes.DWORD),
("nFilterIndex", ctypes.wintypes.DWORD),
("lpstrFile", ctypes.wintypes.LPWSTR),
("nMaxFile", ctypes.wintypes.DWORD),
("lpstrFileTitle", ctypes.wintypes.LPWSTR),
("nMaxFileTitle", ctypes.wintypes.DWORD),
("lpstrInitialDir", ctypes.wintypes.LPCWSTR),
("lpstrTitle", ctypes.wintypes.LPCWSTR),
("Flags", ctypes.wintypes.DWORD),
("nFileOffset", ctypes.wintypes.WORD),
("nFileExtension", ctypes.wintypes.WORD),
("lpstrDefExt", ctypes.wintypes.LPCWSTR),
("lCustData", ctypes.wintypes.LPARAM),
("lpfnHook", ctypes.wintypes.LPVOID),
("lpTemplateName", ctypes.wintypes.LPCWSTR),
# 还有更多字段,但这通常足够了
# ("pvReserved", ctypes.wintypes.LPVOID),
# ("dwReserved", ctypes.wintypes.DWORD),
# ("FlagsEx", ctypes.wintypes.DWORD),
]
# 准备缓冲区
filename_buffer = ctypes.create_unicode_buffer(260) # MAX_PATH
# 设置初始文件名
filename_buffer.value = "win32_save.xlsx"
# 准备过滤器 (用 \0 分隔)
# 格式: "描述\0模式\0描述\0模式\0\0"
filter_str = "Excel Files (*.xlsx)\0*.xlsx\0All Files (*.*)\0*.*\0\0"
ofn = OPENFILENAME()
ofn.lStructSize = ctypes.sizeof(OPENFILENAME)
ofn.hwndOwner = 0 # NULL
ofn.lpstrFilter = filter_str
ofn.lpstrFile = ctypes.cast(filename_buffer, ctypes.wintypes.LPWSTR)
ofn.nMaxFile = 260
ofn.lpstrDefExt = "xlsx"
ofn.lpstrTitle = "保存文件 (Win32 API)"
# OFN_OVERWRITEPROMPT | OFN_PATHMUSTEXIST | OFN_NOCHANGEDIR
ofn.Flags = 0x00000002 | 0x00000800 | 0x00000008
comdlg32 = ctypes.windll.comdlg32
# 调用 API
# GetSaveFileNameW 返回非零值表示成功
if comdlg32.GetSaveFileNameW(ctypes.byref(ofn)):
return filename_buffer.value
else:
return None
except Exception as e:
print(f"Win32 API Error: {e}")
return None
def show_save_dialog_com():
"""
使用 COM 接口 IFileSaveDialog (Windows Vista+)
提供更现代化的文件保存对话框,支持更多功能
"""
try:
import ctypes
import ctypes.wintypes
import uuid
# 定义必要的常量
CLSCTX_INPROC_SERVER = 1
S_OK = 0
FOS_OVERWRITEPROMPT = 0x00000002
FOS_PATHMUSTEXIST = 0x00000800
FOS_NOCHANGEDIR = 0x00000008
SIGDN_FILESYSPATH = 0x80058000
# IFileSaveDialog 的 CLSID 和 IID
CLSID_FileSaveDialog = uuid.UUID("{C0B4E2F3-BA21-4773-8DBA-335EC946EB8B}")
IID_IFileSaveDialog = uuid.UUID("{84bccd23-5fde-4cdb-aea4-af64b83d78ab}")
IID_IShellItem = uuid.UUID("{43826d1e-e718-42ee-bc55-a1e261c37bfe}")
# 加载 ole32.dll
ole32 = ctypes.windll.ole32
# CoInitialize
ole32.CoInitialize(None)
# CoCreateInstance
p_dialog = ctypes.c_void_p()
hr = ole32.CoCreateInstance(
ctypes.byref(CLSID_FileSaveDialog),
None,
CLSCTX_INPROC_SERVER,
ctypes.byref(IID_IFileSaveDialog),
ctypes.byref(p_dialog)
)
if hr != S_OK:
print(f"CoCreateInstance failed: {hr}")
return None
# 定义 IFileSaveDialog 的 vtable 方法
# 我们只需要调用 Show, GetResult, SetOptions, SetFileName, SetDefaultExtension, SetFileTypeIndex
# 这些方法在 IFileOpenDialog 基类中定义
# SetOptions
class IFileSaveDialogVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
# IModalWindow
("Show", ctypes.c_void_p),
# IFileDialog
("SetFileTypes", ctypes.c_void_p),
("SetFileTypeIndex", ctypes.c_void_p),
("GetFileTypeIndex", ctypes.c_void_p),
("Advise", ctypes.c_void_p),
("Unadvise", ctypes.c_void_p),
("SetOptions", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong)),
("GetOptions", ctypes.c_void_p),
("SetDefaultFolder", ctypes.c_void_p),
("SetFolder", ctypes.c_void_p),
("GetFolder", ctypes.c_void_p),
("GetCurrentSelection", ctypes.c_void_p),
("SetFileName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("GetFileName", ctypes.c_void_p),
("SetTitle", ctypes.c_void_p),
("SetOkButtonLabel", ctypes.c_void_p),
("SetFileNameLabel", ctypes.c_void_p),
("GetResult", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p))),
("AddPlace", ctypes.c_void_p),
("SetDefaultExtension", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("Close", ctypes.c_void_p),
("SetClientGuid", ctypes.c_void_p),
("ClearClientData", ctypes.c_void_p),
("SetFilter", ctypes.c_void_p),
]
# 获取 vtable
vtable = ctypes.cast(p_dialog, ctypes.POINTER(ctypes.POINTER(IFileSaveDialogVtbl))).contents.contents
# 调用 SetOptions
hr = vtable.SetOptions(p_dialog, FOS_OVERWRITEPROMPT | FOS_PATHMUSTEXIST | FOS_NOCHANGEDIR)
if hr != S_OK:
print(f"SetOptions failed: {hr}")
return None
# 调用 SetFileName
hr = vtable.SetFileName(p_dialog, "com_save.xlsx")
if hr != S_OK:
print(f"SetFileName failed: {hr}")
return None
# 调用 SetDefaultExtension
hr = vtable.SetDefaultExtension(p_dialog, "xlsx")
if hr != S_OK:
print(f"SetDefaultExtension failed: {hr}")
return None
# 调用 SetFileTypeIndex
hr = vtable.SetFileTypeIndex(p_dialog, 1)
if hr != S_OK:
print(f"SetFileTypeIndex failed: {hr}")
return None
# 调用 Show
hr = vtable.Show(p_dialog, 0) # 0 表示没有父窗口
if hr != S_OK:
# 用户取消
return None
# 调用 GetResult
p_result = ctypes.c_void_p()
hr = vtable.GetResult(p_dialog, ctypes.byref(p_result))
if hr != S_OK:
print(f"GetResult failed: {hr}")
return None
# 定义 IShellItem 接口
class IShellItemVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
("BindToHandler", ctypes.c_void_p),
("GetParent", ctypes.c_void_p),
("GetDisplayName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_wchar_p))),
("GetAttributes", ctypes.c_void_p),
("Compare", ctypes.c_void_p),
]
# 获取 IShellItem 的 vtable
result_vtable = ctypes.cast(p_result, ctypes.POINTER(ctypes.POINTER(IShellItemVtbl))).contents.contents
# 调用 GetDisplayName
p_display_name = ctypes.c_wchar_p()
hr = result_vtable.GetDisplayName(p_result, SIGDN_FILESYSPATH, ctypes.byref(p_display_name))
if hr != S_OK:
print(f"GetDisplayName failed: {hr}")
return None
filepath = p_display_name.value
# 清理
ole32.CoUninitialize()
return filepath
except ImportError as e:
print(f"COM Error: {e}")
return None
except Exception as e:
print(f"COM Error: {e}")
import traceback
traceback.print_exc()
return None