Files
windfarm/main.py

2006 lines
71 KiB
Python
Raw Permalink Normal View History

import argparse
import math
import os
import random
from collections import defaultdict
import matplotlib.pyplot as plt
import networkx as nx
import numpy as np
import pandas as pd
from scipy.sparse.csgraph import minimum_spanning_tree
from scipy.spatial import distance_matrix
from sklearn.cluster import KMeans
from esau_williams import design_with_esau_williams
from ga import design_with_ga
from mip import design_with_mip
# 设置matplotlib支持中文显示
plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"]
plt.rcParams["axes.unicode_minus"] = False
# 常量定义
VOLTAGE_LEVEL = 66000 # 66kV
POWER_FACTOR = 0.95
ELECTRICITY_PRICE = 0.4 # 元/kWh
PROJECT_LIFETIME = 25 # 工程运行期限(年)
DISCOUNT_RATE = 8 # 折现率(%
ANNUAL_LOSS_HOURS = 1400 # 年损耗小时数(小时)
# 1. 生成风电场数据(实际应用中替换为真实坐标)
def generate_wind_farm_data(n_turbines=30, seed=42, layout="random", spacing=800):
pass # 实际应用中从Excel读取真实坐标此函数保留用于测试
"""
生成模拟风电场数据
:param layout: 'random' (随机) 'grid' (规则行列)
:param spacing: 规则布局时的风机间距(m)
"""
np.random.seed(seed)
if layout == "grid":
# 计算行列数 (尽量接近方形)
n_cols = int(np.ceil(np.sqrt(n_turbines)))
n_rows = int(np.ceil(n_turbines / n_cols))
x_coords = []
y_coords = []
for i in range(n_turbines):
row = i // n_cols
col = i % n_cols
# 添加微小抖动模拟海上定位误差(±1%)
jitter = spacing * 0.01
x = col * spacing + np.random.uniform(-jitter, jitter)
y = row * spacing + np.random.uniform(-jitter, jitter)
x_coords.append(x)
y_coords.append(y)
x_coords = np.array(x_coords)
y_coords = np.array(y_coords)
# 升压站位置:通常位于风电场边缘或中心,这里设为离岸侧(y负方向)中心
substation = np.array([[np.mean(x_coords), -spacing]])
else:
# 随机生成风机位置(扩大范围以适应更大容量)
x_coords = np.random.uniform(0, 2000, n_turbines)
y_coords = np.random.uniform(0, 2000, n_turbines)
# 升压站位置
substation = np.array([[0, 0]])
# 随机生成风机额定功率海上风机通常更大6-10MW
power_ratings = (
np.random.uniform(6.0, 10.0, n_turbines)
if layout == "grid"
else np.random.uniform(2.0, 5.0, n_turbines)
)
# 创建DataFrame
turbines = pd.DataFrame(
{
"id": range(n_turbines),
"x": x_coords,
"y": y_coords,
"power": power_ratings,
"platform_height": np.zeros(n_turbines),
"cumulative_power": np.zeros(n_turbines), # 用于后续计算
}
)
return turbines, substation
# 1.5 从Excel加载数据
def load_data_from_excel(file_path):
"""
从Excel文件读取风机和升压站坐标以及可选的电缆规格
Excel格式要求:
- Sheet 'Coordinates' (或第一个Sheet): Type (Turbine/Substation), ID, X, Y, Power
- Sheet 'Cables' (可选): CrossSection, Capacity, Resistance, Cost
"""
try:
xl = pd.ExcelFile(file_path)
# 读取坐标数据
if "Coordinates" in xl.sheet_names:
df = pd.read_excel(xl, "Coordinates")
else:
df = pd.read_excel(xl, 0)
# 标准化列名(忽略大小写)
df.columns = [c.capitalize() for c in df.columns]
required_cols = {"Type", "X", "Y", "Power"}
if not required_cols.issubset(df.columns):
raise ValueError(f"Excel文件缺少必要列: {required_cols - set(df.columns)}")
# 提取升压站数据
substation_df = df[df["Type"].astype(str).str.lower() == "substation"]
if len(substation_df) == 0:
raise ValueError("未在文件中找到升压站(Substation)数据")
substation = substation_df[["X", "Y"]].values
# 提取风机数据
turbines_df = df[df["Type"].astype(str).str.lower() == "turbine"].copy()
if len(turbines_df) == 0:
raise ValueError("未在文件中找到风机(Turbine)数据")
# 尝试获取平台高度列 (兼容不同命名)
platform_height_col = None
for col in turbines_df.columns:
if col.lower().replace(" ", "") == "platformheight":
platform_height_col = col
break
platform_heights = (
turbines_df[platform_height_col].values
if platform_height_col
else np.zeros(len(turbines_df))
)
# 重置索引并整理格式
turbines = pd.DataFrame(
{
"id": range(len(turbines_df)),
"original_id": (
turbines_df["Id"].values
if "Id" in turbines_df.columns
else range(len(turbines_df))
),
"x": turbines_df["X"].values,
"y": turbines_df["Y"].values,
"power": turbines_df["Power"].values,
"platform_height": platform_heights,
"cumulative_power": np.zeros(len(turbines_df)),
}
)
# 读取电缆数据 (如果存在)
cable_specs = None
if "Cables" in xl.sheet_names:
cables_df = pd.read_excel(xl, "Cables")
# 标准化列名
cables_df.columns = [
c.replace(" ", "").capitalize() for c in cables_df.columns
] # Handle 'Cross Section' vs 'CrossSection'
# 尝试匹配列
# 目标格式: (截面mm², 载流量A, 电阻Ω/km, 基准价格元/m, 是否可选)
specs = []
for _, row in cables_df.iterrows():
# 容错处理列名
section = row.get("Crosssection", row.get("Section", 0))
capacity = row.get("Capacity", row.get("Current", 0))
resistance = row.get("Resistance", 0)
cost_wan_per_km = row.get("Cost", row.get("Price", 0))
# 转换1 万元/km = 10 元/m
cost = cost_wan_per_km * 10
optional_val = str(row.get("Optional", "")).strip().upper()
is_optional = optional_val == "Y"
if section > 0 and capacity > 0:
specs.append((section, capacity, resistance, cost, is_optional))
if specs:
# --- 输入数据校验:单调性 ---
for i in range(len(specs) - 1):
curr_s = specs[i]
next_s = specs[i + 1]
# 校验截面 (section)
if curr_s[0] >= next_s[0]:
raise ValueError(
f"电缆数据校验失败Excel中电缆顺序必须按截面从小到大排列。第{i + 1}行({curr_s[0]}mm²) >= 第{i + 2}行({next_s[0]}mm²)。"
)
# 校验载流量 (capacity)
if curr_s[1] >= next_s[1]:
raise ValueError(
f"电缆数据校验失败Excel中电缆载流量必须严格递增。第{i + 1}行({curr_s[1]}A) >= 第{i + 2}行({next_s[1]}A)。"
)
specs.sort(key=lambda x: x[1]) # 按载流量排序
# --- 输入数据校验 ---
# 筛选出所有标记为 Optional='Y' 的电缆
optional_cables = [s for s in specs if s[4]]
# 规则1: 最多只能有一条可选电缆
if len(optional_cables) > 1:
raise ValueError(
f"电缆数据校验失败:检测到 {len(optional_cables)} 条可选电缆(Optional='Y')。系统限制最多只能指定 1 条可选电缆。"
)
# 规则2: 如果存在可选电缆,它必须是所有电缆中截面最大的一条
if len(optional_cables) == 1:
opt_cable = optional_cables[0]
# s[0] 是截面积
max_section = max(s[0] for s in specs)
if opt_cable[0] < max_section:
raise ValueError(
f"电缆数据校验失败:可选电缆 ({opt_cable[0]}mm²) 必须是所有电缆中截面最大的一条 (当前最大为 {max_section}mm²)。"
)
# --------------------
cable_specs = specs
print(f"成功加载: {len(turbines)} 台风机, {len(substation)} 座升压站")
if cable_specs:
print(f"成功加载: {len(cable_specs)} 种电缆规格")
# 读取参数数据 (如果存在)
system_params = {}
param_sheet_name = None
if "Parameters" in xl.sheet_names:
param_sheet_name = "Parameters"
elif "参数" in xl.sheet_names:
param_sheet_name = "参数"
if param_sheet_name:
try:
params_df = pd.read_excel(xl, param_sheet_name)
# 假设格式为两列Parameter (参数名), Value (值)
if len(params_df.columns) >= 2:
for _, row in params_df.iterrows():
key = str(row[0]).strip().lower()
try:
val = float(row[1])
if "voltage" in key or "电压" in key:
# 检测是否为kV单位
if "kv" in key:
system_params["voltage"] = val * 1000
else:
system_params["voltage"] = val
elif "factor" in key or "功率因数" in key:
system_params["power_factor"] = val
elif "price" in key or "电价" in key:
system_params["electricity_price"] = val
elif "lifetime" in key or "期限" in key:
system_params["project_lifetime"] = val
elif "discount" in key or "折现率" in key:
system_params["discount_rate"] = val
elif "hours" in key or "小时" in key:
system_params["annual_loss_hours"] = val
except ValueError:
pass
# 设置默认值(如果参数表中未提供)
if "electricity_price" not in system_params:
system_params["electricity_price"] = ELECTRICITY_PRICE
if "project_lifetime" not in system_params:
system_params["project_lifetime"] = PROJECT_LIFETIME
if "discount_rate" not in system_params:
system_params["discount_rate"] = DISCOUNT_RATE
if "annual_loss_hours" not in system_params:
system_params["annual_loss_hours"] = ANNUAL_LOSS_HOURS
if system_params:
print(f"成功加载系统参数: {system_params}")
except Exception as e:
print(f"读取参数Sheet失败: {e}")
return turbines, substation, cable_specs, system_params
except Exception as e:
print(f"读取Excel文件失败: {str(e)}")
raise
# 2. 基于最小生成树(MST)的集电线路设计
def design_with_mst(turbines, substation):
"""使用最小生成树算法设计集电线路拓扑"""
# 合并风机和升压站数据
all_points = np.vstack([substation, turbines[["x", "y"]].values])
n_points = len(all_points)
# 计算距离矩阵
dist_matrix = distance_matrix(all_points, all_points)
# 计算最小生成树
mst = minimum_spanning_tree(dist_matrix).toarray()
# 提取连接关系
connections = []
for i in range(n_points):
for j in range(n_points):
if mst[i, j] > 0:
# 确定节点类型0是升压站1+是风机
source = "substation" if i == 0 else f"turbine_{i - 1}"
target = "substation" if j == 0 else f"turbine_{j - 1}"
connections.append((source, target, mst[i, j]))
return connections
# 3. 基于扇区聚类改进版K-means的集电线路设计
def design_with_kmeans(turbines, substation, n_clusters=3):
"""
使用基于角度的K-means聚类设计集电线路拓扑 (避免交叉)
原理将风机按相对于升压站的角度划分扇区确保出线不交叉
"""
# 准备风机坐标数据
turbine_coords = turbines[["x", "y"]].values
substation_coord = substation[0]
# 计算每台风机相对于升压站的角度和单位向量
# 使用 (cos, sin) 也就是单位向量进行聚类,可以完美处理 -180/+180 的周期性问题
dx = turbine_coords[:, 0] - substation_coord[0]
dy = turbine_coords[:, 1] - substation_coord[1]
# 归一化为单位向量 (对应角度特征)
magnitudes = np.sqrt(dx**2 + dy**2)
# 处理重合点避免除零
with np.errstate(divide="ignore", invalid="ignore"):
unit_x = dx / magnitudes
unit_y = dy / magnitudes
unit_x[magnitudes == 0] = 0
unit_y[magnitudes == 0] = 0
# 构建特征矩阵:主要基于角度(单位向量),可根据需要加入少量距离权重
# 如果纯粹按角度,设为单位向量即可
features = np.column_stack([unit_x, unit_y])
# 执行K-means聚类
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
turbines["cluster"] = kmeans.fit_predict(features)
# 为每个簇找到最佳连接点(离升压站最近的风机)
cluster_connection_points = {}
for cluster_id in range(n_clusters):
cluster_turbines = turbines[turbines["cluster"] == cluster_id]
if len(cluster_turbines) == 0:
continue
distances_to_substation = np.sqrt(
(cluster_turbines["x"] - substation_coord[0]) ** 2
+ (cluster_turbines["y"] - substation_coord[1]) ** 2
)
closest_idx = distances_to_substation.idxmin()
cluster_connection_points[cluster_id] = closest_idx
# 为每个簇内构建MST
cluster_connections = []
for cluster_id in range(n_clusters):
# 获取当前簇的风机
cluster_turbines = turbines[turbines["cluster"] == cluster_id]
if len(cluster_turbines) == 0:
continue
cluster_indices = cluster_turbines.index.tolist()
# 计算簇内距离矩阵
coords = cluster_turbines[["x", "y"]].values
dist_matrix = distance_matrix(coords, coords)
# 计算簇内MST
mst = minimum_spanning_tree(dist_matrix).toarray()
# 添加簇内连接
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f"turbine_{cluster_indices[i]}"
target = f"turbine_{cluster_indices[j]}"
cluster_connections.append((source, target, mst[i, j]))
# 添加簇到升压站的连接
substation_connections = []
for cluster_id, turbine_idx in cluster_connection_points.items():
turbine_coord = turbines.loc[turbine_idx, ["x", "y"]].values
distance = np.sqrt(
(turbine_coord[0] - substation_coord[0]) ** 2
+ (turbine_coord[1] - substation_coord[1]) ** 2
)
substation_connections.append(
(f"turbine_{turbine_idx}", "substation", distance)
)
return cluster_connections + substation_connections, turbines
# 3.5 带容量约束的扇区扫描算法 (Capacitated Sweep) - 基础版
def design_with_capacitated_sweep(
turbines,
substation,
cable_specs=None,
voltage=VOLTAGE_LEVEL,
power_factor=POWER_FACTOR,
):
"""
使用带容量约束的扇区扫描算法设计集电线路 (基础版单次扫描)
原理
1. 计算所有风机相对于升压站的角度
2. 找到角度间隔最大的位置作为起始切割线以避免切断密集的风机群
3. 沿圆周方向扫描贪婪地将风机加入当前回路直到达到电缆容量上限
4. 满载后开启新回路
"""
# 1. 获取电缆最大容量
max_mw = get_max_cable_capacity_mw(
cable_specs, voltage=voltage, power_factor=power_factor
)
substation_coord = substation[0]
# 2. 计算角度 (使用 arctan2 返回 -pi 到 pi)
work_df = turbines.copy()
dx = work_df["x"] - substation_coord[0]
dy = work_df["y"] - substation_coord[1]
work_df["angle"] = np.arctan2(dy, dx)
# 3. 寻找最佳起始角度 (最大角度间隙)
work_df = work_df.sort_values("angle").reset_index(drop=True)
angles = work_df["angle"].values
n = len(angles)
if n > 1:
# 计算相邻角度差
diffs = np.diff(angles)
# 计算首尾角度差 (跨越 ±pi 处)
wrap_diff = (2 * np.pi) - (angles[-1] - angles[0])
diffs = np.append(diffs, wrap_diff)
# 找到最大间隙的索引
max_gap_idx = np.argmax(diffs)
# 旋转数组,使最大间隙成为新的起点
start_idx = (max_gap_idx + 1) % n
if start_idx != 0:
work_df = pd.concat(
[work_df.iloc[start_idx:], work_df.iloc[:start_idx]]
).reset_index(drop=True)
# 4. 贪婪分组 (Capacity Constrained Clustering)
work_df["cluster"] = -1
cluster_id = 0
current_power = 0
current_indices = []
for i, row in work_df.iterrows():
p = row["power"]
if len(current_indices) > 0 and (current_power + p > max_mw):
work_df.loc[current_indices, "cluster"] = cluster_id
cluster_id += 1
current_power = 0
current_indices = []
current_indices.append(i)
current_power += p
if current_indices:
work_df.loc[current_indices, "cluster"] = cluster_id
cluster_id += 1
# 建立 id -> cluster 的映射
id_to_cluster = dict(zip(work_df["id"], work_df["cluster"]))
turbines["cluster"] = turbines["id"].map(id_to_cluster)
# 5. 对每个簇内部进行MST连接
cluster_connections = []
substation_connections = []
n_clusters = cluster_id
for cid in range(n_clusters):
cluster_turbines = turbines[turbines["cluster"] == cid]
if len(cluster_turbines) == 0:
continue
cluster_indices = cluster_turbines.index.tolist()
coords = cluster_turbines[["x", "y"]].values
if len(cluster_indices) > 1:
dist_matrix = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dist_matrix).toarray()
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f"turbine_{cluster_indices[i]}"
target = f"turbine_{cluster_indices[j]}"
cluster_connections.append((source, target, mst[i, j]))
# 连接到升压站
dists = np.sqrt(
(cluster_turbines["x"] - substation_coord[0]) ** 2
+ (cluster_turbines["y"] - substation_coord[1]) ** 2
)
closest_id = dists.idxmin()
min_dist = dists.min()
substation_connections.append((f"turbine_{closest_id}", "substation", min_dist))
return cluster_connections + substation_connections, turbines
# 3.6 旋转扫描算法 (Rotational Sweep) - 优化版
def design_with_rotational_sweep(
turbines,
substation,
cable_specs=None,
voltage=VOLTAGE_LEVEL,
power_factor=POWER_FACTOR,
):
"""
使用带容量约束的扇区扫描算法设计集电线路 (优化版旋转扫描)
原理
1. 计算所有风机相对于升压站的角度并排序
2. 遍历所有可能的起始角度即尝试以每一台风机作为扫描的起点
3. 对每种起始角度贪婪地将风机加入回路直到满载
4. 对每种分组方案计算MST成本选出总成本最低的方案
"""
# 1. 获取电缆最大容量
max_mw = get_max_cable_capacity_mw(
cable_specs, voltage=voltage, power_factor=power_factor
)
substation_coord = substation[0]
# 2. 计算角度 (使用 arctan2 返回 -pi 到 pi)
work_df = turbines.copy()
dx = work_df["x"] - substation_coord[0]
dy = work_df["y"] - substation_coord[1]
work_df["angle"] = np.arctan2(dy, dx)
# 按角度排序
work_df = work_df.sort_values("angle").reset_index(drop=True)
n_turbines = len(work_df)
best_cost = float("inf")
best_connections = []
best_turbines_state = None
best_start_idx = -1
best_id_to_cluster = {}
# 遍历所有可能的起始点
for start_idx in range(n_turbines):
# 构建当前旋转顺序的风机列表
if start_idx == 0:
current_df = work_df.copy()
else:
current_df = pd.concat(
[work_df.iloc[start_idx:], work_df.iloc[:start_idx]]
).reset_index(drop=True)
# --- 贪婪分组 ---
current_df["cluster"] = -1
cluster_id = 0
current_power = 0
current_indices_in_df = []
powers = current_df["power"].values
for i in range(n_turbines):
p = powers[i]
if len(current_indices_in_df) > 0 and (current_power + p > max_mw):
current_df.loc[current_indices_in_df, "cluster"] = cluster_id
cluster_id += 1
current_power = 0
current_indices_in_df = []
current_indices_in_df.append(i)
current_power += p
if current_indices_in_df:
current_df.loc[current_indices_in_df, "cluster"] = cluster_id
cluster_id += 1
# --- 计算该分组方案的成本 ---
current_total_length = 0
n_clusters = cluster_id
for cid in range(n_clusters):
cluster_rows = current_df[current_df["cluster"] == cid]
if len(cluster_rows) == 0:
continue
# 1. 簇内 MST 长度
coords = cluster_rows[["x", "y"]].values
if len(cluster_rows) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
mst_len = mst.sum()
current_total_length += mst_len
# 2. 连接升压站长度
dists = np.sqrt(
(cluster_rows["x"] - substation_coord[0]) ** 2
+ (cluster_rows["y"] - substation_coord[1]) ** 2
)
current_total_length += dists.min()
# --- 比较并保存最佳结果 ---
if current_total_length < best_cost:
best_cost = current_total_length
best_start_idx = start_idx
best_id_to_cluster = dict(zip(current_df["id"], current_df["cluster"]))
# --- 根据最佳方案重新生成详细连接 ---
turbines["cluster"] = turbines["id"].map(best_id_to_cluster)
final_connections = []
unique_clusters = turbines["cluster"].unique()
unique_clusters = [c for c in unique_clusters if not pd.isna(c) and c >= 0]
for cid in unique_clusters:
cluster_turbines = turbines[turbines["cluster"] == cid]
if len(cluster_turbines) == 0:
continue
cluster_indices = cluster_turbines.index.tolist()
coords = cluster_turbines[["x", "y"]].values
if len(cluster_indices) > 1:
dist_matrix_local = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dist_matrix_local).toarray()
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f"turbine_{cluster_indices[i]}"
target = f"turbine_{cluster_indices[j]}"
final_connections.append((source, target, mst[i, j]))
dists = np.sqrt(
(cluster_turbines["x"] - substation_coord[0]) ** 2
+ (cluster_turbines["y"] - substation_coord[1]) ** 2
)
closest_idx_in_df = dists.idxmin()
min_dist = dists.min()
final_connections.append(
(f"turbine_{closest_idx_in_df}", "substation", min_dist)
)
return final_connections, turbines
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_design(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params:
res_list = total_investment(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines
# 4. 获取电缆最大容量(MW)
def get_max_cable_capacity_mw(
cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR
):
"""
根据电缆规格计算最大承载功率
:param cable_specs: 电缆规格列表 list of tuples或者直接是最大功率数值(MW)
"""
# 如果传入的已经是数值,直接返回
if isinstance(cable_specs, (int, float, np.number)):
return float(cable_specs)
# 兼容性检查:如果列表为空
if not cable_specs:
print("Warning: 没有可用的电缆规格,使用默认最大容量 100MW")
return 100.0
# 从所有电缆规格中找到最大的额定电流容量
max_current_capacity = max(spec[1] for spec in cable_specs)
# 计算最大功率P = √3 * U * I * cosφ
# 这里假设降额系数为 1 (不降额)
max_current = max_current_capacity * 1
max_power_w = np.sqrt(3) * voltage * max_current * power_factor
# 将单位从 W 转换为 MW
return max_power_w / 1e6
# 5. 计算集电线路方案成本
def evaluate_design(
turbines,
connections,
substation,
cable_specs=None,
is_offshore=False,
method_name="Unknown Method",
voltage=VOLTAGE_LEVEL,
power_factor=POWER_FACTOR,
):
"""评估设计方案的总成本和损耗"""
total_cost = 0
total_loss = 0
# 创建连接图
graph = nx.Graph()
graph.add_node("substation")
for i, row in turbines.iterrows():
graph.add_node(f"turbine_{i}")
# 添加边
for source, target, length in connections:
graph.add_edge(source, target, weight=length)
# 计算每台风机的累积功率(从叶到根)
power_flow = defaultdict(float)
# 按照后序遍历(从叶到根)
nodes = list(nx.dfs_postorder_nodes(graph, "substation"))
for node in nodes:
if node == "substation":
continue
# 1. 如果节点是风机,先加上自身的功率
if node.startswith("turbine_"):
turbine_id = int(node.split("_")[1])
power_flow[node] += turbines.loc[turbine_id, "power"]
# 2. 找到上游节点(父节点)并将累积功率传递上去
try:
# 找到通往升压站的最短路径上的下一个节点
path = nx.shortest_path(graph, source=node, target="substation")
if len(path) > 1: # path[0]是node自己path[1]是父节点
parent = path[1]
power_flow[parent] += power_flow[node]
except nx.NetworkXNoPath:
pass
# DEBUG: 打印最大功率流 (不含升压站本身)
node_powers = [v for k, v in power_flow.items() if k != "substation"]
max_power = max(node_powers) if node_powers else 0
print(f"DEBUG [{method_name}]: 最大线路功率 = {max_power:.2f} MW")
# 计算成本和损耗
detailed_connections = []
for source, target, length in connections:
# Determine vertical length (PlatformHeight)
vertical_length = 0
if source.startswith("turbine_"):
tid = int(source.split("_")[1])
vertical_length += turbines.loc[tid, "platform_height"]
if target.startswith("turbine_"):
tid = int(target.split("_")[1])
vertical_length += turbines.loc[tid, "platform_height"]
# Calculate effective length with margin
# Total Length = (Horizontal Distance + Vertical Up/Down) * 1.03
horizontal_length = length
effective_length = (horizontal_length + vertical_length) * 1.03
# 确定该段线路承载的总功率
if source.startswith("turbine_") and target.startswith("turbine_"):
# 风机间连接,取下游节点功率
if nx.shortest_path_length(
graph, "substation", source
) < nx.shortest_path_length(graph, "substation", target):
power = power_flow[target]
else:
power = power_flow[source]
elif source == "substation":
# 从升压站出发的连接
power = power_flow[target]
elif target == "substation":
# 连接到升压站
power = power_flow[source]
else:
power = 0
# 电缆选型
# 成本乘数如果Excel中已包含敷设费用则设为1.0
cost_multiplier = 1.0 if is_offshore else 1.0
# 默认电缆规格库 (如果未提供)
if cable_specs is None:
cable_specs_to_use = [
(35, 150, 0.524, 80),
(70, 215, 0.268, 120),
(95, 260, 0.193, 150),
(120, 295, 0.153, 180),
(150, 330, 0.124, 220),
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600),
]
else:
cable_specs_to_use = cable_specs
# 估算电流
current = (power * 1e6) / (np.sqrt(3) * voltage * power_factor)
# 选择满足载流量的最小电缆
selected_spec = None
for spec in cable_specs_to_use:
if current <= spec[1] * 1: # 100%负载率
selected_spec = spec
break
if selected_spec is None:
selected_spec = cable_specs_to_use[-1]
print(
f"WARNING [{method_name}]: Current {current:.2f} A (Power: {power:.2f} MW) exceeds max cable capacity {selected_spec[1]} A!"
)
resistance = selected_spec[2] * effective_length / 1000 # 电阻(Ω)
cost = selected_spec[3] * effective_length * cost_multiplier # 电缆成本(含敷设)
cable = {
"cross_section": selected_spec[0],
"current_capacity": selected_spec[1],
"resistance": resistance,
"cost": cost,
"current": current,
}
# 记录详细信息
detailed_connections.append(
{
"source": source,
"target": target,
"horizontal_length": horizontal_length,
"vertical_length": vertical_length,
"length": effective_length, # effective length used for stats
"power": power,
"cable": cable,
}
)
# 累计成本
total_cost += cable["cost"]
# 计算I²R损耗 (简化版)
loss_w = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相单位W
loss_kw = loss_w / 1000 # 转换为 kW
total_loss += loss_kw
return {
"total_cost": total_cost,
"total_loss": total_loss,
"num_connections": len(connections),
"details": detailed_connections,
}
# 6.5 导出CAD图纸 (DXF格式)
def export_to_dxf(turbines, substation, connections_details, filename):
"""
将设计方案导出为DXF格式 (CAD通用格式)
:param connections_details: evaluate_design返回的'details'列表
"""
import ezdxf
from ezdxf.enums import TextEntityAlignment
doc = ezdxf.new(dxfversion="R2010")
msp = doc.modelspace()
# 1. 建立图层
doc.layers.add("Substation", color=1) # Red
doc.layers.add("Turbines", color=5) # Blue
# 动态确定电缆颜色
# 提取所有使用到的电缆截面
used_sections = sorted(
list(set(conn["cable"]["cross_section"] for conn in connections_details))
)
# 定义颜色映射规则 (AutoCAD Color Index)
# 1=Red, 2=Yellow, 3=Green, 4=Cyan, 5=Blue, 6=Magenta
color_rank = [
2, # 1st (Smallest): Yellow
3, # 2nd: Green
1, # 3rd: Red
5, # 4th: Blue
6, # 5th: Magenta
]
default_color = 3 # Others: Green
# 创建电缆图层
for i, section in enumerate(used_sections):
if i < len(color_rank):
c = color_rank[i]
else:
c = default_color
layer_name = f"Cable_{section}mm"
if layer_name not in doc.layers:
doc.layers.add(layer_name, color=c)
# 2. 绘制升压站
sx, sy = substation[0, 0], substation[0, 1]
# 绘制一个矩形代表升压站
size = 50
msp.add_lwpolyline(
[
(sx - size, sy - size),
(sx + size, sy - size),
(sx + size, sy + size),
(sx - size, sy + size),
(sx - size, sy - size),
],
close=True,
dxfattribs={"layer": "Substation"},
)
msp.add_text(
"Substation", dxfattribs={"layer": "Substation", "height": 20}
).set_placement((sx, sy + size + 10), align=TextEntityAlignment.CENTER)
# 3. 绘制风机
for i, row in turbines.iterrows():
tx, ty = row["x"], row["y"]
# 绘制圆形代表风机
msp.add_circle((tx, ty), radius=15, dxfattribs={"layer": "Turbines"})
# 添加文字标签
label_id = str(
int(row["original_id"]) if "original_id" in row else int(row["id"])
)
msp.add_text(
label_id, dxfattribs={"layer": "Turbines", "height": 15}
).set_placement((tx, ty - 30), align=TextEntityAlignment.CENTER)
# 4. 绘制连接电缆
for conn in connections_details:
source, target = conn["source"], conn["target"]
section = conn["cable"]["cross_section"]
if source == "substation":
p1 = (substation[0, 0], substation[0, 1])
else:
tid = int(source.split("_")[1])
p1 = (turbines.loc[tid, "x"], turbines.loc[tid, "y"])
if target == "substation":
p2 = (substation[0, 0], substation[0, 1])
else:
tid = int(target.split("_")[1])
p2 = (turbines.loc[tid, "x"], turbines.loc[tid, "y"])
# 确定图层
layer_name = f"Cable_{section}mm"
if layer_name not in doc.layers:
doc.layers.add(layer_name)
# 绘制二维多段线
msp.add_lwpolyline([p1, p2], dxfattribs={"layer": layer_name})
# 添加电缆型号文字(可选,在线的中点)
# mid_x = (p1[0] + p2[0]) / 2
# mid_y = (p1[1] + p2[1]) / 2
# msp.add_text(f"{section}mm", dxfattribs={'layer': layer_name, 'height': 10}).set_pos((mid_x, mid_y), align='CENTER')
try:
doc.saveas(filename)
print(f"成功导出DXF文件: {filename}")
except Exception as e:
print(f"导出DXF失败: {e}")
2026-01-01 16:25:55 +08:00
# 6.6 导出Excel报表
def export_to_excel(connections_details, filename):
"""
将设计方案详情导出为Excel文件
:param connections_details: evaluate_design返回的'details'列表
"""
data = []
for conn in connections_details:
data.append(
{
"Source": conn["source"],
"Target": conn["target"],
"Horizontal Length (m)": conn["horizontal_length"],
"Vertical Length (m)": conn["vertical_length"],
"Effective Length (m)": conn["length"],
"Cable Type (mm²)": conn["cable"]["cross_section"],
"Current (A)": conn["cable"]["current"],
"Power (MW)": conn["power"],
"Resistance (Ω)": conn["cable"]["resistance"],
"Cost (¥)": conn["cable"]["cost"],
}
)
2026-01-01 16:25:55 +08:00
df = pd.DataFrame(data)
2026-01-01 16:25:55 +08:00
# 汇总统计
n_circuits = sum(
1
for conn in connections_details
if conn["source"] == "substation" or conn["target"] == "substation"
)
2026-01-01 16:25:55 +08:00
summary = {
"Total Cost (¥)": df["Cost (¥)"].sum(),
"Total Effective Length (m)": df["Effective Length (m)"].sum(),
"Total Vertical Length (m)": df["Vertical Length (m)"].sum(),
"Number of Circuits": n_circuits,
2026-01-01 16:25:55 +08:00
}
summary_df = pd.DataFrame([summary])
2026-01-01 16:25:55 +08:00
try:
with pd.ExcelWriter(filename) as writer:
df.to_excel(writer, sheet_name="Cable Schedule", index=False)
summary_df.to_excel(writer, sheet_name="Summary", index=False)
2026-01-01 16:25:55 +08:00
print(f"成功导出Excel文件: {filename}")
except Exception as e:
print(f"导出Excel失败: {e}")
# 6.6 导出多方案对比Excel报表
def export_all_scenarios_to_excel(results, filename):
"""
导出所有方案的对比结果到 Excel
:param results: 包含各方案评估结果的列表
:param filename: 输出文件路径
"""
try:
# 使用 openpyxl 引擎以便后续写入单元格
with pd.ExcelWriter(filename, engine="openpyxl") as writer:
# 1. 总览 Sheet
summary_data = []
for res in results:
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
summary_data.append(
{
"Scenario": res["name"],
"Total Cost (¥)": res["cost"],
"总费用(万元)": res.get("total_cost_npv", res["cost"]) / 10000,
"Total Loss (kW)": res["loss"],
"Num Circuits": n_circuits,
# 计算电缆统计
"Total Cable Length (m)": sum(
d["length"] for d in res["eval"]["details"]
),
}
)
pd.DataFrame(summary_data).to_excel(
writer, sheet_name="Comparison Summary", index=False
)
# 2. 每个方案的详细 Sheet
for res in results:
# 清理 Sheet 名称
safe_name = (
res["name"]
.replace(":", "")
.replace("/", "-")
.replace("\\", "-")
.replace(" ", "_")
)
# 截断过长的名称 (Excel限制31字符)
if len(safe_name) > 25:
safe_name = safe_name[:25]
details = res["eval"]["details"]
data = []
for conn in details:
data.append(
{
"Source": conn["source"],
"Target": conn["target"],
"Horizontal Length (m)": conn["horizontal_length"],
"Vertical Length (m)": conn["vertical_length"],
"Effective Length (m)": conn["length"],
"Cable Type (mm²)": conn["cable"]["cross_section"],
"Current (A)": conn["cable"]["current"],
"Power (MW)": conn["power"],
"Resistance (Ω)": conn["cable"]["resistance"],
"Cost (¥)": conn["cable"]["cost"],
}
)
df = pd.DataFrame(data)
# 从第 2 行开始写入数据startrow=1Excel中为第2行留出第 1 行写标题
df.to_excel(writer, sheet_name=safe_name, index=False, startrow=1)
# 在第一行写入方案名称
ws = writer.sheets[safe_name]
ws.cell(row=1, column=1, value=f"Scenario: {res['name']}")
print(f"成功导出包含所有方案的Excel文件: {filename}")
except Exception as e:
print(f"导出Excel失败: {e}")
# 6. 可视化函数
def visualize_design(
turbines, substation, connections, title, ax=None, show_costs=True
):
"""可视化集电线路设计方案"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 8))
# 绘制风机
ax.scatter(
turbines["x"],
turbines["y"],
c="white",
edgecolors="#333333",
s=80,
linewidth=1.0,
zorder=3,
label="Turbines",
)
# 标记风机ID
for i, row in turbines.iterrows():
# 显示原始ID如果存在
label_id = int(row["original_id"]) if "original_id" in row else int(row["id"])
# 仅在风机数量较少时显示ID避免密集恐惧
if len(turbines) < 50:
ax.annotate(
str(label_id),
(row["x"], row["y"]),
ha="center",
va="center",
fontsize=6,
zorder=4,
)
# 绘制升压站
ax.scatter(
substation[0, 0],
substation[0, 1],
c="red",
s=250,
marker="s",
zorder=5,
label="Substation",
)
# 颜色映射:使用鲜艳且区分度高的颜色
color_map = {
35: "#00FF00", # 亮绿 (Lime) - 最小
70: "#008000", # 深绿 (Green)
95: "#00FFFF", # 青色 (Cyan)
120: "#0000FF", # 纯蓝 (Blue)
150: "#800080", # 紫色 (Purple)
185: "#FF00FF", # 洋红 (Magenta)
240: "#FFA500", # 橙色 (Orange)
300: "#FF4500", # 红橙 (OrangeRed)
400: "#FF0000", # 纯红 (Red) - 最大
}
# 统计电缆使用情况
cable_counts = defaultdict(int)
# 绘制连接
# 判断传入的是简单连接列表还是详细连接列表
is_detailed = len(connections) > 0 and isinstance(connections[0], dict)
# 收集图例句柄
legend_handles = {}
for conn in connections:
if is_detailed:
source, target, length = conn["source"], conn["target"], conn["length"]
section = conn["cable"]["cross_section"]
# 统计
cable_counts[section] += 1
# 获取颜色和线宽
color = color_map.get(section, "black")
# 线宽范围 1.0 ~ 3.5
linewidth = 1.0 + (section / 400.0) * 2.5
label_text = f"{section}mm² Cable"
else:
source, target, length = conn
section = 0
color = "black"
linewidth = 1.0
label_text = "Connection"
# 获取坐标
if source == "substation":
x1, y1 = substation[0, 0], substation[0, 1]
else:
turbine_id = int(source.split("_")[1])
x1, y1 = turbines.loc[turbine_id, "x"], turbines.loc[turbine_id, "y"]
if target == "substation":
x2, y2 = substation[0, 0], substation[0, 1]
else:
turbine_id = int(target.split("_")[1])
x2, y2 = turbines.loc[turbine_id, "x"], turbines.loc[turbine_id, "y"]
# 绘制线路
(line,) = ax.plot(
[x1, x2], [y1, y2], color=color, linewidth=linewidth, alpha=0.9, zorder=2
)
# 保存图例(去重)
if is_detailed and section not in legend_handles:
legend_handles[section] = line
# 打印统计信息
# if is_detailed:
# print(f"[{title.splitlines()[0]}] 电缆统计:")
# for section in sorted(cable_counts.keys()):
# print(f" {section}mm²: {cable_counts[section]} 条")
# 设置图形属性
ax.set_title(title, fontsize=10)
ax.set_xlabel("X (m)")
ax.set_ylabel("Y (m)")
# 构建图例
sorted_sections = sorted(legend_handles.keys())
handles = [legend_handles[s] for s in sorted_sections]
labels = [f"{s}mm²" for s in sorted_sections]
# 添加风机和升压站图例
handles.insert(0, ax.collections[0]) # Turbines
labels.insert(0, "Turbines")
handles.insert(1, ax.collections[1]) # Substation
labels.insert(1, "Substation")
ax.legend(handles, labels, loc="upper right", fontsize=8, framealpha=0.9)
ax.grid(True, linestyle="--", alpha=0.3)
ax.set_aspect("equal")
return ax
# 7. 主函数:比较两种设计方法
def compare_design_methods(
excel_path=None,
n_clusters_override=None,
interactive=True,
plot_results=True,
use_ga=False,
use_mip=False,
):
"""
比较MST和三种电缆方案下的K-means设计方法
:param excel_path: Excel文件路径
:param n_clusters_override: 可选手动指定簇的数量
:param interactive: 是否启用交互式导出 (CLI模式)
:param plot_results: 是否生成和保存对比图表
"""
cable_specs = None
system_params = {}
if excel_path:
print(f"正在从 {excel_path} 读取坐标数据...")
try:
turbines, substation, cable_specs, system_params = load_data_from_excel(
excel_path
)
scenario_title = "Offshore Wind Farm (Imported Data)"
except Exception:
print("回退到自动生成数据模式...")
return compare_design_methods(
excel_path=None,
n_clusters_override=n_clusters_override,
interactive=interactive,
plot_results=plot_results,
)
else:
print("正在生成海上风电场数据 (规则阵列布局)...")
turbines, substation = generate_wind_farm_data(
n_turbines=30, layout="grid", spacing=800
)
scenario_title = "Offshore Wind Farm (Grid Layout)"
is_offshore = True
voltage = system_params.get("voltage", VOLTAGE_LEVEL)
power_factor = system_params.get("power_factor", POWER_FACTOR)
electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE)
print(
f"使用的系统参数: 电压={voltage} V, 功率因数={power_factor}, 电价={electricity_price} 元/kWh"
)
# 准备三种电缆方案
# 原始 specs 是 5 元素元组: (section, capacity, resistance, cost, is_optional)
# 下游函数期望 4 元素元组: (section, capacity, resistance, cost)
has_optional_cables = False
if cable_specs:
# 检查是否存在 Optional 为 Y 的电缆
has_optional_cables = any(s[4] for s in cable_specs)
# 方案 1: 不含 Optional='Y' (Standard)
specs_1 = [s[:4] for s in cable_specs if not s[4]]
# 方案 2: 含 Optional='Y' (All)
specs_2 = [s[:4] for s in cable_specs]
# 方案 3: 基于方案 1删掉截面最大的一种
# cable_specs 已按 capacity 排序,假设 capacity 与 section 正相关
specs_3 = specs_1[:-1] if len(specs_1) > 1 else list(specs_1)
else:
# 默认电缆库
default_specs = [
(35, 150, 0.524, 80),
(70, 215, 0.268, 120),
(95, 260, 0.193, 150),
(120, 295, 0.153, 180),
(150, 330, 0.124, 220),
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600),
]
specs_1 = default_specs
specs_2 = default_specs
specs_3 = default_specs[:-1]
# 默认库视为没有 optional
has_optional_cables = False
scenarios = [("Scenario 1 (Standard)", specs_1)]
if has_optional_cables:
scenarios.append(("Scenario 2 (With Optional)", specs_2))
scenarios.append(("Scenario 3 (No Max)", specs_3))
else:
# 重新编号,保证连续性
scenarios.append(("Scenario 2 (No Max)", specs_3))
# 1. MST 方法作为基准 (使用 Scenario 1)
mst_connections = design_with_mst(turbines, substation)
mst_evaluation = evaluate_design(
turbines,
mst_connections,
substation,
cable_specs=specs_1,
is_offshore=is_offshore,
method_name="MST Method",
voltage=voltage,
power_factor=power_factor,
)
# 准备画布 2x2
fig = None
axes = []
if plot_results:
fig, axes = plt.subplots(2, 2, figsize=(20, 18))
axes = axes.flatten()
# 绘制 MST
visualize_design(
turbines,
substation,
mst_evaluation["details"],
f"MST Method (Standard Cables)\nTotal Cost: ¥{mst_evaluation['total_cost'] / 10000:.2f}",
ax=axes[0],
)
print(f"\n===== 开始比较电缆方案 =====")
best_cost = float("inf")
best_result = None
comparison_results = []
# 将 MST 结果也加入对比列表,方便查看
comparison_results.append(
{
"name": "MST Method",
"cost": mst_evaluation["total_cost"],
"loss": mst_evaluation["total_loss"],
"eval": mst_evaluation,
"turbines": turbines.copy(), # MST 不改变 turbines但为了统一格式
"specs": specs_1,
}
)
for i, (name, current_specs) in enumerate(scenarios):
print(f"\n--- {name} ---")
if not current_specs:
print(" 无可用电缆,跳过。")
continue
# 计算参数
total_power = turbines["power"].sum()
max_cable_mw = get_max_cable_capacity_mw(
cable_specs=current_specs, voltage=voltage, power_factor=power_factor
)
# 确定簇数 (针对 Base 算法)
if n_clusters_override is not None:
n_clusters = n_clusters_override
min_needed = int(np.ceil(total_power / max_cable_mw))
if n_clusters < min_needed:
print(
f" Warning: 指定簇数 {n_clusters} 小于理论最小需求 {min_needed}"
)
else:
min_needed = int(np.ceil(total_power / max_cable_mw))
n_cable_types = len(current_specs)
heuristic = int(np.ceil(len(turbines) / n_cable_types))
n_clusters = max(min_needed, heuristic)
if n_clusters > len(turbines):
n_clusters = len(turbines)
print(f" 最大电缆容量: {max_cable_mw:.2f} MW")
# --- Run 1: Base Sector Sweep ---
base_name = f"{name} (Base)"
conns_base, turbines_base = design_with_capacitated_sweep(
turbines.copy(), substation, max_cable_mw, voltage=voltage
)
eval_base = evaluate_design(
turbines,
conns_base,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=base_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_base = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": base_name,
"cost": eval_base["total_cost"],
"loss": eval_base["total_loss"],
"eval": eval_base,
"turbines": turbines_base,
"specs": current_specs,
}
)
print(
f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW | Circuits: {n_circuits_base}"
)
# --- Run 2: Rotational Sweep (Optimization) ---
rot_name = f"{name} (Rotational)"
conns_rot, turbines_rot = design_with_rotational_sweep(
turbines.copy(), substation, max_cable_mw, voltage=voltage
)
eval_rot = evaluate_design(
turbines,
conns_rot,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=rot_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_rot = sum(
1
for d in eval_rot["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": rot_name,
"cost": eval_rot["total_cost"],
"loss": eval_rot["total_loss"],
"eval": eval_rot,
"turbines": turbines_rot,
"specs": current_specs,
}
)
print(
f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW | Circuits: {n_circuits_rot}"
)
# --- Run 3: Esau-Williams Algorithm ---
ew_name = f"{name} (Esau-Williams)"
conns_ew, turbines_ew = design_with_esau_williams(
turbines.copy(), substation, max_cable_mw
)
eval_ew = evaluate_design(
turbines,
conns_ew,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=ew_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ew = sum(
1
for d in eval_ew["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ew_name,
"cost": eval_ew["total_cost"],
"loss": eval_ew["total_loss"],
"eval": eval_ew,
"turbines": turbines_ew,
"specs": current_specs,
}
)
print(
f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW | Circuits: {n_circuits_ew}"
)
if use_ga:
# --- Run 4: Genetic Algorithm ---
ga_name = f"{name} (GA)"
conns_ga, turbines_ga = design_with_ga(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_ga = evaluate_design(
turbines,
conns_ga,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=ga_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_ga = sum(
1
for d in eval_ga["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": ga_name,
"cost": eval_ga["total_cost"],
"loss": eval_ga["total_loss"],
"eval": eval_ga,
"turbines": turbines_ga,
"specs": current_specs,
}
)
print(
f" [GA] Cost: ¥{eval_ga['total_cost']:,.2f} | Loss: {eval_ga['total_loss']:.2f} kW | Circuits: {n_circuits_ga}"
)
if use_mip:
print(f"Starting MIP optimization for {name}")
# --- Run 5: Mixed Integer Programming ---
mip_name = f"{name} (MIP)"
conns_mip, turbines_mip = design_with_mip(
turbines.copy(),
substation,
current_specs,
voltage,
power_factor,
system_params,
evaluate_func=evaluate_design,
total_invest_func=total_investment,
get_max_capacity_func=get_max_cable_capacity_mw,
)
eval_mip = evaluate_design(
turbines,
conns_mip,
substation,
cable_specs=current_specs,
is_offshore=is_offshore,
method_name=mip_name,
voltage=voltage,
power_factor=power_factor,
)
n_circuits_mip = sum(
1
for d in eval_mip["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
comparison_results.append(
{
"name": mip_name,
"cost": eval_mip["total_cost"],
"loss": eval_mip["total_loss"],
"eval": eval_mip,
"turbines": turbines_mip,
"specs": current_specs,
}
)
print(
f" [MIP] Cost: ¥{eval_mip['total_cost']:,.2f} | Loss: {eval_mip['total_loss']:.2f} kW | Circuits: {n_circuits_mip}"
)
# 记录最佳
if eval_rot["total_cost"] < best_cost:
best_cost = eval_rot["total_cost"]
if eval_ew["total_cost"] < best_cost:
best_cost = eval_ew["total_cost"]
# best_result 不再需要单独维护,最后遍历 comparison_results 即可
if eval_base["total_cost"] < best_cost:
best_cost = eval_base["total_cost"]
# 可视化 (只画 Base 版本)
ax_idx = i + 1
if plot_results and ax_idx < 4:
n_circuits = sum(
1
for d in eval_base["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}"
visualize_design(
turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx]
)
if plot_results:
plt.tight_layout()
output_filename = "wind_farm_design_comparison.png"
plt.savefig(output_filename, dpi=300)
plt.close()
print(f"\n比较图(Base版)已保存至: {output_filename}")
# 准备文件路径
if excel_path:
base_name = os.path.splitext(os.path.basename(excel_path))[0]
dir_name = os.path.dirname(excel_path)
2026-01-01 16:25:55 +08:00
dxf_filename = os.path.join(dir_name, f"{base_name}_design.dxf")
excel_out_filename = os.path.join(dir_name, f"{base_name}_design.xlsx")
else:
dxf_filename = "wind_farm_design.dxf"
excel_out_filename = "wind_farm_design.xlsx"
# 导出所有方案到同一个 Excel
if comparison_results:
# 计算每个方案的总费用(净现值)
comparison_results = total_investment(comparison_results, system_params)
export_all_scenarios_to_excel(comparison_results, excel_out_filename)
if not interactive:
print(f"非交互模式:已自动导出 Excel 对比报表: {excel_out_filename}")
return comparison_results
# 交互式选择导出 DXF
print("\n===== 方案选择 =====")
best_idx = 0
for i, res in enumerate(comparison_results):
if res["cost"] < comparison_results[best_idx]["cost"]:
best_idx = i
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for conn in res["eval"]["details"]
if conn["source"] == "substation" or conn["target"] == "substation"
)
print(
f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}"
)
print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)")
try:
choice_str = input(
f"请输入要导出DXF的方案编号 (1-{len(comparison_results)}),或输入 'A' 导出全部: "
).strip()
if choice_str.upper() == "A":
print("正在导出所有方案...")
base_dxf_name, ext = os.path.splitext(dxf_filename)
for res in comparison_results:
# 生成文件名安全后缀
safe_suffix = (
res["name"]
.replace(" ", "_")
.replace(":", "")
.replace("(", "")
.replace(")", "")
.replace("/", "-")
)
current_filename = f"{base_dxf_name}_{safe_suffix}{ext}"
print(f" 导出 '{res['name']}' -> {current_filename}")
export_to_dxf(
res["turbines"],
substation,
res["eval"]["details"],
current_filename,
)
else:
if not choice_str:
choice = best_idx
else:
choice = int(choice_str) - 1
if choice < 0 or choice >= len(comparison_results):
print("输入编号无效,将使用默认推荐方案。")
choice = best_idx
selected_res = comparison_results[choice]
# 生成带方案名称的文件名
base_dxf_name, ext = os.path.splitext(dxf_filename)
safe_suffix = (
selected_res["name"]
.replace(" ", "_")
.replace(":", "")
.replace("(", "")
.replace(")", "")
.replace("/", "-")
)
final_filename = f"{base_dxf_name}_{safe_suffix}{ext}"
print(f"正在导出 '{selected_res['name']}' 到 DXF: {final_filename} ...")
export_to_dxf(
selected_res["turbines"],
substation,
selected_res["eval"]["details"],
final_filename,
)
except Exception as e:
print(f"输入处理出错: {e},将使用默认推荐方案。")
selected_res = comparison_results[best_idx]
# 生成带方案名称的文件名
base_dxf_name, ext = os.path.splitext(dxf_filename)
safe_suffix = (
selected_res["name"]
.replace(" ", "_")
.replace(":", "")
.replace("(", "")
.replace(")", "")
.replace("/", "-")
)
final_filename = f"{base_dxf_name}_{safe_suffix}{ext}"
print(f"正在导出 '{selected_res['name']}' 到 DXF: {final_filename} ...")
export_to_dxf(
selected_res["turbines"],
substation,
selected_res["eval"]["details"],
final_filename,
)
return comparison_results
# total_investment这个函数计算每个比选方案的总投资
# 电缆费用按2年分期投资采用输入的折现率。电费为电能损耗乘以电费采用输入的折现率并计算生命周期内的净现值生命周期采用输入值。
#
# 计算方法:
# 1. 电缆投资总投资按2年平均分配第1年和第2年各支付50%,使用折现率计算净现值
# NPV_cable = (cost * 0.5) / (1 + r)^1 + (cost * 0.5) / (1 + r)^2
# 2. 电费损耗:年损耗费用 = 损耗功率(kW) * 8760小时 * 电价(元/kWh)
# 生命周期内的净现值 = 年损耗费用 * [(1 - (1 + r)^(-n)) / r]其中n为运行期限
# 3. 总费用 = 电缆投资净现值 + 电费损耗净现值
#
def total_investment(results, system_params):
"""
计算每个比选方案的总费用净现值
参数:
results: 比选结果列表每个元素包含:
- 'cost': 电缆总投资
- 'loss': 线损功率kW
- 其他字段...
system_params: 系统参数字典包含:
- 'discount_rate': 折现率%
- 'electricity_price': 电价/kWh
- 'project_lifetime': 工程运行期限
- 'annual_loss_hours': 年损耗小时数小时
返回:
更新后的results列表每个结果新增 'total_cost_npv' 字段总费用净现值
"""
# 获取系统参数,使用默认值
discount_rate_percent = system_params.get("discount_rate", DISCOUNT_RATE)
electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE)
project_lifetime = system_params.get("project_lifetime", PROJECT_LIFETIME)
annual_loss_hours = system_params.get("annual_loss_hours", ANNUAL_LOSS_HOURS)
# 将折现率转换为小数
r = discount_rate_percent / 100.0
for result in results:
cable_cost = result["cost"] # 电缆总投资(元)
loss_power = result["loss"] # 线损功率kW
# 1. 计算电缆投资的净现值2年分期
# 第1年支付50%第2年支付50%
npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / (
(1 + r) ** 2
)
# 2. 计算电费损耗的净现值(生命周期内)
# 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh)
annual_loss_cost = loss_power * annual_loss_hours * electricity_price
# 年金现值系数 = [(1 - (1 + r)^(-n)) / r]
if r > 0:
annuity_factor = (1 - (1 + r) ** (-project_lifetime)) / r
else:
# 如果折现率为0则净现值 = 年费用 * 年限
annuity_factor = project_lifetime
npv_loss = annual_loss_cost * annuity_factor
# 3. 计算总费用净现值
total_cost_npv = npv_cable + npv_loss
# 将结果添加到字典中
result["total_cost_npv"] = total_cost_npv
result["npv_cable"] = npv_cable
result["npv_loss"] = npv_loss
result["annual_loss_cost"] = annual_loss_cost
return results
# 8. 执行比较
if __name__ == "__main__":
# 解析命令行参数
parser = argparse.ArgumentParser(description="Wind Farm Collector System Design")
parser.add_argument(
"--excel", help="Path to the Excel coordinates file", default=None
)
parser.add_argument(
"--clusters",
type=int,
help="Specify the number of clusters (circuits) manually",
default=None,
)
args = parser.parse_args()
# 3. 运行比较
# 如果没有提供excel文件将自动回退到生成数据模式
compare_design_methods(
args.excel, n_clusters_override=args.clusters, interactive=True
)