Implement genetic algorithm for collector layout optimization

This commit is contained in:
dmy
2026-01-08 09:46:00 +08:00
parent f2a960e789
commit 46e929bfce
3 changed files with 474 additions and 22 deletions

193
ga.py Normal file
View File

@@ -0,0 +1,193 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
def design_with_ga(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
pop_size=50,
generations=50,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用遗传算法优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param pop_size: 种群大小
:param generations: 迭代代数
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_func(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params and total_invest_func:
res_list = total_invest_func(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines

46
gui.py
View File

@@ -90,6 +90,7 @@ def index():
"run_btn": None,
"current_file_container": None, # 替换 label 为 container
"info_container": None, # 新增信息展示容器
"ga_switch": None, # 遗传算法开关
}
def update_info_panel():
@@ -347,7 +348,9 @@ def index():
except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)", sender=None):
async def save_file_with_dialog(
filename, callback, file_filter="All files (*.*)", sender=None
):
"""
跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。
@@ -372,7 +375,7 @@ def index():
ps_filter = file_filter.replace("(", "|").replace(")", "")
if "|" not in ps_filter:
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
# 这里做一个简化的映射,确保格式正确
if "Excel" in file_filter:
@@ -399,9 +402,9 @@ def index():
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
print("DEBUG: invoking PowerShell SaveFileDialog...")
# 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环
# 这样按钮的禁用状态可以立即同步到前端
result = await run.io_bound(
@@ -409,7 +412,7 @@ def index():
["powershell", "-Command", ps_script],
capture_output=True,
text=True,
startupinfo=startupinfo
startupinfo=startupinfo,
)
save_path = result.stdout.strip()
if save_path:
@@ -420,7 +423,7 @@ def index():
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
return
except Exception as e:
print(f"PowerShell dialog failed: {e}")
@@ -477,7 +480,10 @@ def index():
async def on_click_excel(e):
await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)", sender=e.sender
default_excel_name,
save_excel,
"Excel Files (*.xlsx)",
sender=e.sender,
)
ui.button(
@@ -607,7 +613,9 @@ def index():
except:
pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender)
await save_file_with_dialog(
default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender
)
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary"
@@ -669,6 +677,9 @@ def index():
refs["log_box"].clear()
log_queue = queue.Queue()
# 获取遗传算法开关状态
use_ga = refs["ga_switch"].value if refs["ga_switch"] else False
class QueueLogger(io.StringIO):
def write(self, message):
if message and message.strip():
@@ -716,6 +727,7 @@ def index():
n_clusters_override=None,
interactive=False,
plot_results=False,
use_ga=use_ga,
)
# 在后台线程运行计算任务
@@ -782,7 +794,11 @@ def index():
total_length_km = total_length_m / 1000
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(1 for d in res["eval"]["details"] if d["source"] == "substation" or d["target"] == "substation")
n_circuits = sum(
1
for d in res["eval"]["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
row_dict = {
"name": name_display,
@@ -875,7 +891,10 @@ def index():
raise FileNotFoundError("无法生成模板文件")
await save_file_with_dialog(
"coordinates.xlsx", save_template, "Excel Files (*.xlsx)", sender=e.sender
"coordinates.xlsx",
save_template,
"Excel Files (*.xlsx)",
sender=e.sender,
)
ui.button("导出 Excel 模板", on_click=export_template).classes(
@@ -901,6 +920,8 @@ def index():
# with refs["current_file_container"]:
# ui.label("未选择文件").classes("text-xs text-gray-500 italic ml-1")
# 3. 运行按钮
refs["run_btn"] = (
ui.button(
@@ -910,6 +931,11 @@ def index():
.classes("flex-1 py-4")
.props("icon=play_arrow color=secondary")
)
# 4. 遗传算法开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["ga_switch"] = ui.switch("启用遗传算法", value=False).props(
"color=orange"
)
with ui.column().classes("w-full gap-4"):
# 新增:信息展示卡片

257
main.py
View File

@@ -1,6 +1,7 @@
import argparse
import math
import os
import random
from collections import defaultdict
import matplotlib.pyplot as plt
@@ -12,6 +13,7 @@ from scipy.spatial import distance_matrix
from sklearn.cluster import KMeans
from esau_williams import design_with_esau_williams
from ga import design_with_ga
# 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
@@ -654,9 +656,163 @@ def design_with_rotational_sweep(
return final_connections, turbines
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_design(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params:
res_list = total_investment(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines
# 4. 获取电缆最大容量(MW)
def get_max_cable_capacity_mw(cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR):
def get_max_cable_capacity_mw(
cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR
):
"""
根据电缆规格计算最大承载功率
:param cable_specs: 电缆规格列表 list of tuples或者直接是最大功率数值(MW)
@@ -991,7 +1147,11 @@ def export_to_excel(connections_details, filename):
df = pd.DataFrame(data)
# 汇总统计
n_circuits = sum(1 for conn in connections_details if conn["source"] == "substation" or conn["target"] == "substation")
n_circuits = sum(
1
for conn in connections_details
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary = {
"Total Cost (¥)": df["Cost (¥)"].sum(),
"Total Effective Length (m)": df["Effective Length (m)"].sum(),
@@ -1023,7 +1183,11 @@ def export_all_scenarios_to_excel(results, filename):
summary_data = []
for res in results:
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(1 for conn in res["eval"]["details"] if conn["source"] == "substation" or conn["target"] == "substation")
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary_data.append(
{
@@ -1230,7 +1394,11 @@ def visualize_design(
# 7. 主函数:比较两种设计方法
def compare_design_methods(
excel_path=None, n_clusters_override=None, interactive=True, plot_results=True
excel_path=None,
n_clusters_override=None,
interactive=True,
plot_results=True,
use_ga=False,
):
"""
比较MST和三种电缆方案下的K-means设计方法
@@ -1411,7 +1579,11 @@ def compare_design_methods(
voltage=voltage,
power_factor=power_factor,
)
n_circuits_base = sum(1 for d in eval_base["details"] if d["source"] == "substation" or d["target"] == "substation")
n_circuits_base = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": base_name,
@@ -1441,7 +1613,11 @@ def compare_design_methods(
voltage=voltage,
power_factor=power_factor,
)
n_circuits_rot = sum(1 for d in eval_rot["details"] if d["source"] == "substation" or d["target"] == "substation")
n_circuits_rot = sum(
1
for d in eval_rot["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": rot_name,
@@ -1471,7 +1647,11 @@ def compare_design_methods(
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ew = sum(1 for d in eval_ew["details"] if d["source"] == "substation" or d["target"] == "substation")
n_circuits_ew = sum(
1
for d in eval_ew["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ew_name,
@@ -1486,6 +1666,49 @@ def compare_design_methods(
f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW | Circuits: {n_circuits_ew}"
)
if use_ga:
# --- Run 4: Genetic Algorithm ---
ga_name = f"{name} (GA)"
conns_ga, turbines_ga = design_with_ga(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_ga = evaluate_design(
turbines,
conns_ga,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=ga_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ga = sum(
1
for d in eval_ga["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ga_name,
"cost": eval_ga["total_cost"],
"loss": eval_ga["total_loss"],
"eval": eval_ga,
"turbines": turbines_ga,
"specs": current_specs,
}
)
print(
f" [GA] Cost: ¥{eval_ga['total_cost']:,.2f} | Loss: {eval_ga['total_loss']:.2f} kW | Circuits: {n_circuits_ga}"
)
# 记录最佳
if eval_rot["total_cost"] < best_cost:
best_cost = eval_rot["total_cost"]
@@ -1500,7 +1723,11 @@ def compare_design_methods(
# 可视化 (只画 Base 版本)
ax_idx = i + 1
if plot_results and ax_idx < 4:
n_circuits = sum(1 for d in eval_base["details"] if d["source"] == "substation" or d["target"] == "substation")
n_circuits = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}"
visualize_design(
turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx]
@@ -1540,11 +1767,17 @@ def compare_design_methods(
for i, res in enumerate(comparison_results):
if res["cost"] < comparison_results[best_idx]["cost"]:
best_idx = i
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(1 for conn in res["eval"]["details"] if conn["source"] == "substation" or conn["target"] == "substation")
print(f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}")
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
print(
f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}"
)
print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)")