Files
windfarm/mip.py

175 lines
6.0 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
try:
import pulp
except ImportError:
pulp = None
def design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300, # seconds
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用混合整数规划(MIP)优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if pulp is None:
print(
"WARNING: PuLP library not available. MIP optimization skipped, falling back to MST."
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
if max_clusters is None:
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
print(
f"MIP Model Setup: n_turbines={n_turbines}, max_clusters={max_clusters}, max_mw={max_mw:.2f} MW"
)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
# MIP 模型
prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize)
# 创建所有变量和约束的辅助函数
def assign_var(i, k):
return pulp.LpVariable(f"assign_{i}_{k}", cat="Binary")
def cluster_var(k):
return pulp.LpVariable(f"cluster_{k}", cat="Binary")
# 目标函数:最小化使用的簇数(越少回路,成本越低)
prob += pulp.lpSum([cluster_var(k) for k in range(max_clusters)])
# 约束1每个风机分配到一个簇
for i in range(n_turbines):
prob += pulp.lpSum([assign_var(i, k) for k in range(max_clusters)]) == 1
# 约束2簇功率约束允许20%过载,增加求解器灵活性)
for k in range(max_clusters):
cluster_power = pulp.lpSum(
[turbines.iloc[i]["power"] * assign_var(i, k) for i in range(n_turbines)]
)
prob += cluster_power <= max_mw * 1.2 * cluster_var(k)
# 约束3如果簇未使用则无分配
for k in range(max_clusters):
for i in range(n_turbines):
prob += assign_var(i, k) <= cluster_var(k)
print(
f"MIP Model: {len(prob.variables())} variables, {len(prob.constraints)} constraints"
)
# 求解
solver = pulp.PULP_CBC_CMD(timeLimit=time_limit, msg=0, warmStart=False)
print("MIP: Starting to solve...")
status = prob.solve(solver)
print(
f"MIP: Solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
if pulp.LpStatus[prob.status] != "Optimal":
print(
f"MIP solver status: {pulp.LpStatus[prob.status]}, solution not found, falling back to MST"
)
print(f"Model infeasibility check:")
print(
f"Total power: {turbines['power'].sum():.2f} MW, Max cluster capacity: {max_mw:.2f} MW"
)
print(f"Number of clusters: {max_clusters}, Number of turbines: {n_turbines}")
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
# 提取结果:确定每个簇
cluster_assign = [-1] * n_turbines
active_clusters = []
for k in range(max_clusters):
if pulp.value(cluster_var(k)) > 0.5:
active_clusters.append(k)
# 将每个未分配的风机分配到最近的活跃簇
for i in range(n_turbines):
assigned = False
for k in active_clusters:
if pulp.value(assign_var(i, k)) > 0.5:
cluster_assign[i] = k
assigned = True
break
if not assigned and active_clusters:
# 如果没有分配,分配到距离最近的活跃簇
dists = [dist_matrix_full[0, i + 1] for k in active_clusters]
cluster_assign[i] = active_clusters[np.argmin(dists)]
# 构建连接
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站:选择簇中距离升压站最近的风机
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
print(
f"MIP optimization completed successfully, {len(connections)} connections generated"
)
return connections, turbines