Files
windfarm/ga.py

194 lines
6.7 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
def design_with_ga(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
pop_size=50,
generations=50,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用遗传算法优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param pop_size: 种群大小
:param generations: 迭代代数
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_func(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params and total_invest_func:
res_list = total_invest_func(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines