Files

194 lines
6.7 KiB
Python
Raw Permalink Normal View History

import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
def design_with_ga(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
pop_size=50,
generations=50,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用遗传算法优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param pop_size: 种群大小
:param generations: 迭代代数
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_func(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params and total_invest_func:
res_list = total_invest_func(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines