194 lines
6.7 KiB
Python
194 lines
6.7 KiB
Python
import numpy as np
|
||
import pandas as pd
|
||
from scipy.spatial import distance_matrix
|
||
from scipy.sparse.csgraph import minimum_spanning_tree
|
||
from collections import defaultdict
|
||
import random
|
||
|
||
|
||
def design_with_ga(
|
||
turbines,
|
||
substation,
|
||
cable_specs=None,
|
||
voltage=66000,
|
||
power_factor=0.95,
|
||
system_params=None,
|
||
pop_size=50,
|
||
generations=50,
|
||
evaluate_func=None,
|
||
total_invest_func=None,
|
||
get_max_capacity_func=None,
|
||
):
|
||
"""
|
||
使用遗传算法优化集电线路布局
|
||
:param turbines: 风机DataFrame
|
||
:param substation: 升压站坐标
|
||
:param cable_specs: 电缆规格
|
||
:param system_params: 系统参数(用于NPV计算)
|
||
:param pop_size: 种群大小
|
||
:param generations: 迭代代数
|
||
:param evaluate_func: 评估函数
|
||
:param total_invest_func: 总投资计算函数
|
||
:param get_max_capacity_func: 获取最大容量函数
|
||
:return: 连接列表和带有簇信息的turbines
|
||
"""
|
||
if get_max_capacity_func:
|
||
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
|
||
else:
|
||
max_mw = 100.0 # 默认值
|
||
total_power = turbines["power"].sum()
|
||
max_clusters = int(np.ceil(total_power / max_mw))
|
||
n_turbines = len(turbines)
|
||
|
||
# 预计算距离矩阵
|
||
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
|
||
dist_matrix_full = distance_matrix(all_coords, all_coords)
|
||
|
||
def fitness(chromosome):
|
||
cluster_assign = chromosome
|
||
clusters = defaultdict(list)
|
||
for i, c in enumerate(cluster_assign):
|
||
clusters[c].append(i)
|
||
|
||
connections = []
|
||
for c, members in clusters.items():
|
||
if len(members) == 0:
|
||
continue
|
||
coords = turbines.iloc[members][["x", "y"]].values
|
||
if len(members) > 1:
|
||
dm = distance_matrix(coords, coords)
|
||
mst = minimum_spanning_tree(dm).toarray()
|
||
for i in range(len(members)):
|
||
for j in range(len(members)):
|
||
if mst[i, j] > 0:
|
||
connections.append(
|
||
(
|
||
f"turbine_{members[i]}",
|
||
f"turbine_{members[j]}",
|
||
mst[i, j],
|
||
)
|
||
)
|
||
# 连接到升压站
|
||
dists = [dist_matrix_full[0, m + 1] for m in members]
|
||
closest = members[np.argmin(dists)]
|
||
connections.append((f"turbine_{closest}", "substation", min(dists)))
|
||
|
||
eval_res = evaluate_func(
|
||
turbines,
|
||
connections,
|
||
substation,
|
||
cable_specs,
|
||
is_offshore=False,
|
||
method_name="GA",
|
||
voltage=voltage,
|
||
power_factor=power_factor,
|
||
)
|
||
if system_params and total_invest_func:
|
||
res_list = total_invest_func(
|
||
[
|
||
{
|
||
"cost": eval_res["total_cost"],
|
||
"loss": eval_res["total_loss"],
|
||
"eval": eval_res,
|
||
}
|
||
],
|
||
system_params,
|
||
)
|
||
return res_list[0]["total_cost_npv"]
|
||
return eval_res["total_cost"]
|
||
|
||
def init_individual():
|
||
assign = np.zeros(n_turbines, dtype=int)
|
||
cluster_powers = np.zeros(max_clusters)
|
||
for i in range(n_turbines):
|
||
p = turbines.iloc[i]["power"]
|
||
possible = [
|
||
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
|
||
]
|
||
if possible:
|
||
c = random.choice(possible)
|
||
else:
|
||
c = random.randint(0, max_clusters - 1)
|
||
assign[i] = c
|
||
cluster_powers[c] += p
|
||
return assign.tolist()
|
||
|
||
population = [init_individual() for _ in range(pop_size)]
|
||
best = None
|
||
best_fitness = float("inf")
|
||
|
||
for gen in range(generations):
|
||
fitnesses = [fitness(ind) for ind in population]
|
||
min_fit = min(fitnesses)
|
||
if min_fit < best_fitness:
|
||
best_fitness = min_fit
|
||
best = population[fitnesses.index(min_fit)].copy()
|
||
|
||
def tournament(size=3):
|
||
candidates = random.sample(list(zip(population, fitnesses)), size)
|
||
return min(candidates, key=lambda x: x[1])[0]
|
||
|
||
selected = [tournament() for _ in range(pop_size)]
|
||
|
||
new_pop = []
|
||
for i in range(0, pop_size, 2):
|
||
p1 = selected[i]
|
||
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
|
||
if random.random() < 0.8:
|
||
point = random.randint(1, n_turbines - 1)
|
||
child1 = p1[:point] + p2[point:]
|
||
child2 = p2[:point] + p1[point:]
|
||
else:
|
||
child1, child2 = p1.copy(), p2.copy()
|
||
new_pop.extend([child1, child2])
|
||
|
||
for ind in new_pop:
|
||
if random.random() < 0.1:
|
||
idx = random.randint(0, n_turbines - 1)
|
||
old_c = ind[idx]
|
||
new_c = random.randint(0, max_clusters - 1)
|
||
ind[idx] = new_c
|
||
cluster_powers = defaultdict(float)
|
||
for j, c in enumerate(ind):
|
||
cluster_powers[c] += turbines.iloc[j]["power"]
|
||
if max(cluster_powers.values()) > max_mw:
|
||
ind[idx] = max_clusters
|
||
max_clusters += 1
|
||
|
||
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
|
||
: int(0.1 * pop_size)
|
||
]
|
||
new_pop[: len(elites)] = [e[0] for e in elites]
|
||
population = new_pop[:pop_size]
|
||
|
||
# 解码最佳个体
|
||
cluster_assign = best
|
||
clusters = defaultdict(list)
|
||
for i, c in enumerate(cluster_assign):
|
||
clusters[c].append(i)
|
||
|
||
connections = []
|
||
for c, members in clusters.items():
|
||
if len(members) == 0:
|
||
continue
|
||
coords = turbines.iloc[members][["x", "y"]].values
|
||
if len(members) > 1:
|
||
dm = distance_matrix(coords, coords)
|
||
mst = minimum_spanning_tree(dm).toarray()
|
||
for i in range(len(members)):
|
||
for j in range(len(members)):
|
||
if mst[i, j] > 0:
|
||
connections.append(
|
||
(
|
||
f"turbine_{members[i]}",
|
||
f"turbine_{members[j]}",
|
||
mst[i, j],
|
||
)
|
||
)
|
||
dists = [dist_matrix_full[0, m + 1] for m in members]
|
||
closest = members[np.argmin(dists)]
|
||
connections.append((f"turbine_{closest}", "substation", min(dists)))
|
||
|
||
turbines["cluster"] = cluster_assign
|
||
return connections, turbines
|