import numpy as np import pandas as pd from scipy.spatial import distance_matrix from scipy.sparse.csgraph import minimum_spanning_tree from collections import defaultdict import random try: import pulp except ImportError: pulp = None def design_with_mip( turbines, substation, cable_specs=None, voltage=66000, power_factor=0.95, system_params=None, max_clusters=None, time_limit=300, # seconds evaluate_func=None, total_invest_func=None, get_max_capacity_func=None, ): """ 使用混合整数规划(MIP)优化集电线路布局 :param turbines: 风机DataFrame :param substation: 升压站坐标 :param cable_specs: 电缆规格 :param system_params: 系统参数(用于NPV计算) :param max_clusters: 最大簇数,默认基于功率计算 :param time_limit: 求解时间限制(秒) :param evaluate_func: 评估函数 :param total_invest_func: 总投资计算函数 :param get_max_capacity_func: 获取最大容量函数 :return: 连接列表和带有簇信息的turbines """ if pulp is None: print( "WARNING: PuLP library not available. MIP optimization skipped, falling back to MST." ) from main import design_with_mst return design_with_mst(turbines, substation) if get_max_capacity_func: max_mw = get_max_capacity_func(cable_specs, voltage, power_factor) else: max_mw = 100.0 # 默认值 total_power = turbines["power"].sum() max_clusters = int(np.ceil(total_power / max_mw)) n_turbines = len(turbines) # 预计算距离矩阵 all_coords = np.vstack([substation, turbines[["x", "y"]].values]) dist_matrix_full = distance_matrix(all_coords, all_coords) # MIP 模型 prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize) # 决策变量:风机分配到簇 (binary) x = pulp.LpVariable.dicts( "assign", (range(n_turbines), range(max_clusters)), cat="Binary" ) # 簇使用变量 (binary) y = pulp.LpVariable.dicts("use_cluster", range(max_clusters), cat="Binary") # 目标函数:最小化总成本 (简化版:距离成本) # 这里使用简化成本:簇内距离 + 到升压站距离 prob += pulp.lpSum( [ dist_matrix_full[i + 1, j + 1] * x[i][k] * x[j][k] for i in range(n_turbines) for j in range(n_turbines) for k in range(max_clusters) if i < j ] ) + pulp.lpSum( [ dist_matrix_full[0, i + 1] * y[k] # 假设每个簇连接到升压站 for i in range(n_turbines) for k in range(max_clusters) ] ) # 约束:每个风机分配到一个簇 for i in range(n_turbines): prob += pulp.lpSum([x[i][k] for k in range(max_clusters)]) == 1 # 簇功率约束 for k in range(max_clusters): prob += ( pulp.lpSum([turbines.iloc[i]["power"] * x[i][k] for i in range(n_turbines)]) <= max_mw * y[k] ) # 如果簇未使用,则无分配 for k in range(max_clusters): for i in range(n_turbines): prob += x[i][k] <= y[k] # 求解 solver = pulp.PULP_CBC_CMD(timeLimit=time_limit) status = prob.solve(solver) if pulp.LpStatus[prob.status] != "Optimal": print(f"MIP not optimal: {pulp.LpStatus[prob.status]}") # 返回默认方案,如 MST from main import design_with_mst return design_with_mst(turbines, substation) # 提取结果 cluster_assign = [-1] * n_turbines for i in range(n_turbines): for k in range(max_clusters): if pulp.value(x[i][k]) > 0.5: cluster_assign[i] = k break # 构建连接 clusters = defaultdict(list) for i, c in enumerate(cluster_assign): clusters[c].append(i) connections = [] for c, members in clusters.items(): if len(members) == 0: continue coords = turbines.iloc[members][["x", "y"]].values if len(members) > 1: dm = distance_matrix(coords, coords) mst = minimum_spanning_tree(dm).toarray() for i in range(len(members)): for j in range(len(members)): if mst[i, j] > 0: connections.append( ( f"turbine_{members[i]}", f"turbine_{members[j]}", mst[i, j], ) ) # 连接到升压站 dists = [dist_matrix_full[0, m + 1] for m in members] closest = members[np.argmin(dists)] connections.append((f"turbine_{closest}", "substation", min(dists))) turbines["cluster"] = cluster_assign return connections, turbines