import numpy as np import pandas as pd from scipy.spatial import distance_matrix from scipy.sparse.csgraph import minimum_spanning_tree from collections import defaultdict import random def design_with_ga( turbines, substation, cable_specs=None, voltage=66000, power_factor=0.95, system_params=None, pop_size=50, generations=50, evaluate_func=None, total_invest_func=None, get_max_capacity_func=None, ): """ 使用遗传算法优化集电线路布局 :param turbines: 风机DataFrame :param substation: 升压站坐标 :param cable_specs: 电缆规格 :param system_params: 系统参数(用于NPV计算) :param pop_size: 种群大小 :param generations: 迭代代数 :param evaluate_func: 评估函数 :param total_invest_func: 总投资计算函数 :param get_max_capacity_func: 获取最大容量函数 :return: 连接列表和带有簇信息的turbines """ if get_max_capacity_func: max_mw = get_max_capacity_func(cable_specs, voltage, power_factor) else: max_mw = 100.0 # 默认值 total_power = turbines["power"].sum() max_clusters = int(np.ceil(total_power / max_mw)) n_turbines = len(turbines) # 预计算距离矩阵 all_coords = np.vstack([substation, turbines[["x", "y"]].values]) dist_matrix_full = distance_matrix(all_coords, all_coords) def fitness(chromosome): cluster_assign = chromosome clusters = defaultdict(list) for i, c in enumerate(cluster_assign): clusters[c].append(i) connections = [] for c, members in clusters.items(): if len(members) == 0: continue coords = turbines.iloc[members][["x", "y"]].values if len(members) > 1: dm = distance_matrix(coords, coords) mst = minimum_spanning_tree(dm).toarray() for i in range(len(members)): for j in range(len(members)): if mst[i, j] > 0: connections.append( ( f"turbine_{members[i]}", f"turbine_{members[j]}", mst[i, j], ) ) # 连接到升压站 dists = [dist_matrix_full[0, m + 1] for m in members] closest = members[np.argmin(dists)] connections.append((f"turbine_{closest}", "substation", min(dists))) eval_res = evaluate_func( turbines, connections, substation, cable_specs, is_offshore=False, method_name="GA", voltage=voltage, power_factor=power_factor, ) if system_params and total_invest_func: res_list = total_invest_func( [ { "cost": eval_res["total_cost"], "loss": eval_res["total_loss"], "eval": eval_res, } ], system_params, ) return res_list[0]["total_cost_npv"] return eval_res["total_cost"] def init_individual(): assign = np.zeros(n_turbines, dtype=int) cluster_powers = np.zeros(max_clusters) for i in range(n_turbines): p = turbines.iloc[i]["power"] possible = [ c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw ] if possible: c = random.choice(possible) else: c = random.randint(0, max_clusters - 1) assign[i] = c cluster_powers[c] += p return assign.tolist() population = [init_individual() for _ in range(pop_size)] best = None best_fitness = float("inf") for gen in range(generations): fitnesses = [fitness(ind) for ind in population] min_fit = min(fitnesses) if min_fit < best_fitness: best_fitness = min_fit best = population[fitnesses.index(min_fit)].copy() def tournament(size=3): candidates = random.sample(list(zip(population, fitnesses)), size) return min(candidates, key=lambda x: x[1])[0] selected = [tournament() for _ in range(pop_size)] new_pop = [] for i in range(0, pop_size, 2): p1 = selected[i] p2 = selected[i + 1] if i + 1 < pop_size else selected[0] if random.random() < 0.8: point = random.randint(1, n_turbines - 1) child1 = p1[:point] + p2[point:] child2 = p2[:point] + p1[point:] else: child1, child2 = p1.copy(), p2.copy() new_pop.extend([child1, child2]) for ind in new_pop: if random.random() < 0.1: idx = random.randint(0, n_turbines - 1) old_c = ind[idx] new_c = random.randint(0, max_clusters - 1) ind[idx] = new_c cluster_powers = defaultdict(float) for j, c in enumerate(ind): cluster_powers[c] += turbines.iloc[j]["power"] if max(cluster_powers.values()) > max_mw: ind[idx] = max_clusters max_clusters += 1 elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[ : int(0.1 * pop_size) ] new_pop[: len(elites)] = [e[0] for e in elites] population = new_pop[:pop_size] # 解码最佳个体 cluster_assign = best clusters = defaultdict(list) for i, c in enumerate(cluster_assign): clusters[c].append(i) connections = [] for c, members in clusters.items(): if len(members) == 0: continue coords = turbines.iloc[members][["x", "y"]].values if len(members) > 1: dm = distance_matrix(coords, coords) mst = minimum_spanning_tree(dm).toarray() for i in range(len(members)): for j in range(len(members)): if mst[i, j] > 0: connections.append( ( f"turbine_{members[i]}", f"turbine_{members[j]}", mst[i, j], ) ) dists = [dist_matrix_full[0, m + 1] for m in members] closest = members[np.argmin(dists)] connections.append((f"turbine_{closest}", "substation", min(dists))) turbines["cluster"] = cluster_assign return connections, turbines