Files
windfarm/main.py
dmy b5718a0cc2 优化风电场设计方案对比算法:添加旋转算法和双模式对比
- 新增design_with_rotational_sweep函数:实现旋转优化算法
- 修改compare_design_methods函数:
  * 将MST结果纳入对比列表
  * 每个电缆场景运行Base和Rotational两种算法
  * 添加成本和损耗对比显示
  * 优化可视化展示和文件输出
- 改进算法选择逻辑:增强簇数计算的智能化
- 更新输出格式:区分不同算法结果并优化显示
2026-01-02 00:25:33 +08:00

1240 lines
48 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from sklearn.cluster import KMeans
from collections import defaultdict
import networkx as nx
import math
import argparse
import os
# 设置matplotlib支持中文显示
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
# 常量定义
VOLTAGE_LEVEL = 66000 # 66kV
POWER_FACTOR = 0.95
# 1. 生成风电场数据(实际应用中替换为真实坐标)
def generate_wind_farm_data(n_turbines=30, seed=42, layout='random', spacing=800):
"""
生成模拟风电场数据
:param layout: 'random' (随机) 或 'grid' (规则行列)
:param spacing: 规则布局时的风机间距(m)
"""
np.random.seed(seed)
if layout == 'grid':
# 计算行列数 (尽量接近方形)
n_cols = int(np.ceil(np.sqrt(n_turbines)))
n_rows = int(np.ceil(n_turbines / n_cols))
x_coords = []
y_coords = []
for i in range(n_turbines):
row = i // n_cols
col = i % n_cols
# 添加微小抖动模拟海上定位误差(±1%)
jitter = spacing * 0.01
x = col * spacing + np.random.uniform(-jitter, jitter)
y = row * spacing + np.random.uniform(-jitter, jitter)
x_coords.append(x)
y_coords.append(y)
x_coords = np.array(x_coords)
y_coords = np.array(y_coords)
# 升压站位置:通常位于风电场边缘或中心,这里设为离岸侧(y负方向)中心
substation = np.array([[np.mean(x_coords), -spacing]])
else:
# 随机生成风机位置(扩大范围以适应更大容量)
x_coords = np.random.uniform(0, 2000, n_turbines)
y_coords = np.random.uniform(0, 2000, n_turbines)
# 升压站位置
substation = np.array([[0, 0]])
# 随机生成风机额定功率海上风机通常更大6-10MW
power_ratings = np.random.uniform(6.0, 10.0, n_turbines) if layout == 'grid' else np.random.uniform(2.0, 5.0, n_turbines)
# 创建DataFrame
turbines = pd.DataFrame({
'id': range(n_turbines),
'x': x_coords,
'y': y_coords,
'power': power_ratings,
'platform_height': np.zeros(n_turbines),
'cumulative_power': np.zeros(n_turbines) # 用于后续计算
})
return turbines, substation
# 1.5 从Excel加载数据
def load_data_from_excel(file_path):
"""
从Excel文件读取风机和升压站坐标以及可选的电缆规格
Excel格式要求:
- Sheet 'Coordinates' (或第一个Sheet): Type (Turbine/Substation), ID, X, Y, Power
- Sheet 'Cables' (可选): CrossSection, Capacity, Resistance, Cost
"""
try:
xl = pd.ExcelFile(file_path)
# 读取坐标数据
if 'Coordinates' in xl.sheet_names:
df = pd.read_excel(xl, 'Coordinates')
else:
df = pd.read_excel(xl, 0)
# 标准化列名(忽略大小写)
df.columns = [c.capitalize() for c in df.columns]
required_cols = {'Type', 'X', 'Y', 'Power'}
if not required_cols.issubset(df.columns):
raise ValueError(f"Excel文件缺少必要列: {required_cols - set(df.columns)}")
# 提取升压站数据
substation_df = df[df['Type'].astype(str).str.lower() == 'substation']
if len(substation_df) == 0:
raise ValueError("未在文件中找到升压站(Substation)数据")
substation = substation_df[['X', 'Y']].values
# 提取风机数据
turbines_df = df[df['Type'].astype(str).str.lower() == 'turbine'].copy()
if len(turbines_df) == 0:
raise ValueError("未在文件中找到风机(Turbine)数据")
# 尝试获取平台高度列 (兼容不同命名)
platform_height_col = None
for col in turbines_df.columns:
if col.lower().replace(' ', '') == 'platformheight':
platform_height_col = col
break
platform_heights = turbines_df[platform_height_col].values if platform_height_col else np.zeros(len(turbines_df))
# 重置索引并整理格式
turbines = pd.DataFrame({
'id': range(len(turbines_df)),
'original_id': (turbines_df['Id'].values if 'Id' in turbines_df.columns else range(len(turbines_df))),
'x': turbines_df['X'].values,
'y': turbines_df['Y'].values,
'power': turbines_df['Power'].values,
'platform_height': platform_heights,
'cumulative_power': np.zeros(len(turbines_df))
})
# 读取电缆数据 (如果存在)
cable_specs = None
if 'Cables' in xl.sheet_names:
cables_df = pd.read_excel(xl, 'Cables')
# 标准化列名
cables_df.columns = [c.replace(' ', '').capitalize() for c in cables_df.columns] # Handle 'Cross Section' vs 'CrossSection'
# 尝试匹配列
# 目标格式: (截面mm², 载流量A, 电阻Ω/km, 基准价格元/m, 是否可选)
specs = []
for _, row in cables_df.iterrows():
# 容错处理列名
section = row.get('Crosssection', row.get('Section', 0))
capacity = row.get('Capacity', row.get('Current', 0))
resistance = row.get('Resistance', 0)
cost = row.get('Cost', row.get('Price', 0))
optional_val = str(row.get('Optional', '')).strip().upper()
is_optional = (optional_val == 'Y')
if section > 0 and capacity > 0:
specs.append((section, capacity, resistance, cost, is_optional))
if specs:
specs.sort(key=lambda x: x[1]) # 按载流量排序
cable_specs = specs
print(f"成功加载: {len(turbines)} 台风机, {len(substation)} 座升压站")
if cable_specs:
print(f"成功加载: {len(cable_specs)} 种电缆规格")
return turbines, substation, cable_specs
except Exception as e:
print(f"读取Excel文件失败: {str(e)}")
raise
# 2. 基于最小生成树(MST)的集电线路设计
def design_with_mst(turbines, substation):
"""使用最小生成树算法设计集电线路拓扑"""
# 合并风机和升压站数据
all_points = np.vstack([substation, turbines[['x', 'y']].values])
n_points = len(all_points)
# 计算距离矩阵
dist_matrix = distance_matrix(all_points, all_points)
# 计算最小生成树
mst = minimum_spanning_tree(dist_matrix).toarray()
# 提取连接关系
connections = []
for i in range(n_points):
for j in range(n_points):
if mst[i, j] > 0:
# 确定节点类型0是升压站1+是风机
source = 'substation' if i == 0 else f'turbine_{i-1}'
target = 'substation' if j == 0 else f'turbine_{j-1}'
connections.append((source, target, mst[i, j]))
return connections
# 3. 基于扇区聚类改进版K-means的集电线路设计
def design_with_kmeans(turbines, substation, n_clusters=3):
"""
使用基于角度的K-means聚类设计集电线路拓扑 (避免交叉)
原理:将风机按相对于升压站的角度划分扇区,确保出线不交叉
"""
# 准备风机坐标数据
turbine_coords = turbines[['x', 'y']].values
substation_coord = substation[0]
# 计算每台风机相对于升压站的角度和单位向量
# 使用 (cos, sin) 也就是单位向量进行聚类,可以完美处理 -180/+180 的周期性问题
dx = turbine_coords[:, 0] - substation_coord[0]
dy = turbine_coords[:, 1] - substation_coord[1]
# 归一化为单位向量 (对应角度特征)
magnitudes = np.sqrt(dx**2 + dy**2)
# 处理重合点避免除零
with np.errstate(divide='ignore', invalid='ignore'):
unit_x = dx / magnitudes
unit_y = dy / magnitudes
unit_x[magnitudes == 0] = 0
unit_y[magnitudes == 0] = 0
# 构建特征矩阵:主要基于角度(单位向量),可根据需要加入少量距离权重
# 如果纯粹按角度,设为单位向量即可
features = np.column_stack([unit_x, unit_y])
# 执行K-means聚类
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
turbines['cluster'] = kmeans.fit_predict(features)
# 为每个簇找到最佳连接点(离升压站最近的风机)
cluster_connection_points = {}
for cluster_id in range(n_clusters):
cluster_turbines = turbines[turbines['cluster'] == cluster_id]
if len(cluster_turbines) == 0:
continue
distances_to_substation = np.sqrt(
(cluster_turbines['x'] - substation_coord[0])**2 +
(cluster_turbines['y'] - substation_coord[1])**2
)
closest_idx = distances_to_substation.idxmin()
cluster_connection_points[cluster_id] = closest_idx
# 为每个簇内构建MST
cluster_connections = []
for cluster_id in range(n_clusters):
# 获取当前簇的风机
cluster_turbines = turbines[turbines['cluster'] == cluster_id]
if len(cluster_turbines) == 0:
continue
cluster_indices = cluster_turbines.index.tolist()
# 计算簇内距离矩阵
coords = cluster_turbines[['x', 'y']].values
dist_matrix = distance_matrix(coords, coords)
# 计算簇内MST
mst = minimum_spanning_tree(dist_matrix).toarray()
# 添加簇内连接
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f'turbine_{cluster_indices[i]}'
target = f'turbine_{cluster_indices[j]}'
cluster_connections.append((source, target, mst[i, j]))
# 添加簇到升压站的连接
substation_connections = []
for cluster_id, turbine_idx in cluster_connection_points.items():
turbine_coord = turbines.loc[turbine_idx, ['x', 'y']].values
distance = np.sqrt(
(turbine_coord[0] - substation_coord[0])**2 +
(turbine_coord[1] - substation_coord[1])**2
)
substation_connections.append((f'turbine_{turbine_idx}', 'substation', distance))
return cluster_connections + substation_connections, turbines
# 3.5 带容量约束的扇区扫描算法 (Capacitated Sweep) - 基础版
def design_with_capacitated_sweep(turbines, substation, cable_specs=None):
"""
使用带容量约束的扇区扫描算法设计集电线路 (基础版:单次扫描)
原理:
1. 计算所有风机相对于升压站的角度。
2. 找到角度间隔最大的位置作为起始“切割线”,以避免切断密集的风机群。
3. 沿圆周方向扫描,贪婪地将风机加入当前回路,直到达到电缆容量上限。
4. 满载后开启新回路。
"""
# 1. 获取电缆最大容量
max_mw = get_max_cable_capacity_mw(cable_specs)
# print(f"DEBUG: 扇区扫描算法启动 - 单回路容量限制: {max_mw:.2f} MW")
substation_coord = substation[0]
# 2. 计算角度 (使用 arctan2 返回 -pi 到 pi)
# 避免直接修改原始DataFrame使用副本
work_df = turbines.copy()
dx = work_df['x'] - substation_coord[0]
dy = work_df['y'] - substation_coord[1]
work_df['angle'] = np.arctan2(dy, dx)
# 3. 寻找最佳起始角度 (最大角度间隙)
# 按角度排序
work_df = work_df.sort_values('angle').reset_index(drop=True) # 重置索引方便切片
angles = work_df['angle'].values
n = len(angles)
if n > 1:
# 计算相邻角度差
diffs = np.diff(angles)
# 计算首尾角度差 (跨越 ±pi 处)
wrap_diff = (2 * np.pi) - (angles[-1] - angles[0])
diffs = np.append(diffs, wrap_diff)
# 找到最大间隙的索引
max_gap_idx = np.argmax(diffs)
# 旋转数组,使最大间隙成为新的起点
start_idx = (max_gap_idx + 1) % n
if start_idx != 0:
work_df = pd.concat([work_df.iloc[start_idx:], work_df.iloc[:start_idx]]).reset_index(drop=True)
# 4. 贪婪分组 (Capacity Constrained Clustering)
work_df['cluster'] = -1
cluster_id = 0
current_power = 0
current_indices = []
for i, row in work_df.iterrows():
p = row['power']
if len(current_indices) > 0 and (current_power + p > max_mw):
work_df.loc[current_indices, 'cluster'] = cluster_id
cluster_id += 1
current_power = 0
current_indices = []
current_indices.append(i)
current_power += p
if current_indices:
work_df.loc[current_indices, 'cluster'] = cluster_id
cluster_id += 1
# 建立 id -> cluster 的映射
id_to_cluster = dict(zip(work_df['id'], work_df['cluster']))
turbines['cluster'] = turbines['id'].map(id_to_cluster)
# 5. 对每个簇内部进行MST连接
cluster_connections = []
substation_connections = []
n_clusters = cluster_id
for cid in range(n_clusters):
cluster_turbines = turbines[turbines['cluster'] == cid]
if len(cluster_turbines) == 0:
continue
cluster_indices = cluster_turbines.index.tolist()
coords = cluster_turbines[['x', 'y']].values
if len(cluster_indices) > 1:
dist_matrix = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dist_matrix).toarray()
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f'turbine_{cluster_indices[i]}'
target = f'turbine_{cluster_indices[j]}'
cluster_connections.append((source, target, mst[i, j]))
# 连接到升压站
dists = np.sqrt((cluster_turbines['x'] - substation_coord[0])**2 +
(cluster_turbines['y'] - substation_coord[1])**2)
closest_id = dists.idxmin()
min_dist = dists.min()
substation_connections.append((f'turbine_{closest_id}', 'substation', min_dist))
return cluster_connections + substation_connections, turbines
# 3.6 旋转扫描算法 (Rotational Sweep) - 优化版
def design_with_rotational_sweep(turbines, substation, cable_specs=None):
"""
使用带容量约束的扇区扫描算法设计集电线路 (优化版:旋转扫描)
原理:
1. 计算所有风机相对于升压站的角度并排序。
2. 遍历所有可能的起始角度(即尝试以每一台风机作为扫描的起点)。
3. 对每种起始角度,贪婪地将风机加入回路直到满载。
4. 对每种分组方案计算MST成本选出总成本最低的方案。
"""
# 1. 获取电缆最大容量
max_mw = get_max_cable_capacity_mw(cable_specs)
# print(f"DEBUG: 扇区扫描算法启动 - 单回路容量限制: {max_mw:.2f} MW")
substation_coord = substation[0]
# 2. 计算角度 (使用 arctan2 返回 -pi 到 pi)
work_df = turbines.copy()
dx = work_df['x'] - substation_coord[0]
dy = work_df['y'] - substation_coord[1]
work_df['angle'] = np.arctan2(dy, dx)
# 按角度排序
work_df = work_df.sort_values('angle').reset_index(drop=True)
n_turbines = len(work_df)
best_cost = float('inf')
best_connections = []
best_turbines_state = None
best_start_idx = -1
# 遍历所有可能的起始点
for start_idx in range(n_turbines):
# 构建当前旋转顺序的风机列表
if start_idx == 0:
current_df = work_df.copy()
else:
current_df = pd.concat([work_df.iloc[start_idx:], work_df.iloc[:start_idx]]).reset_index(drop=True)
# --- 贪婪分组 ---
current_df['cluster'] = -1
cluster_id = 0
current_power = 0
current_indices_in_df = []
powers = current_df['power'].values
for i in range(n_turbines):
p = powers[i]
if len(current_indices_in_df) > 0 and (current_power + p > max_mw):
current_df.loc[current_indices_in_df, 'cluster'] = cluster_id
cluster_id += 1
current_power = 0
current_indices_in_df = []
current_indices_in_df.append(i)
current_power += p
if current_indices_in_df:
current_df.loc[current_indices_in_df, 'cluster'] = cluster_id
cluster_id += 1
# --- 计算该分组方案的成本 ---
current_total_length = 0
n_clusters = cluster_id
for cid in range(n_clusters):
cluster_rows = current_df[current_df['cluster'] == cid]
if len(cluster_rows) == 0: continue
# 1. 簇内 MST 长度
coords = cluster_rows[['x', 'y']].values
if len(cluster_rows) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
mst_len = mst.sum()
current_total_length += mst_len
# 2. 连接升压站长度
dists = np.sqrt((cluster_rows['x'] - substation_coord[0])**2 +
(cluster_rows['y'] - substation_coord[1])**2)
min_dist = dists.min()
current_total_length += min_dist
# --- 比较并保存最佳结果 ---
if current_total_length < best_cost:
best_cost = current_total_length
best_start_idx = start_idx
best_id_to_cluster = dict(zip(current_df['id'], current_df['cluster']))
# --- 根据最佳方案重新生成详细连接 ---
turbines['cluster'] = turbines['id'].map(best_id_to_cluster)
final_connections = []
unique_clusters = turbines['cluster'].unique()
unique_clusters = [c for c in unique_clusters if not pd.isna(c) and c >= 0]
for cid in unique_clusters:
cluster_turbines = turbines[turbines['cluster'] == cid]
if len(cluster_turbines) == 0: continue
cluster_indices = cluster_turbines.index.tolist()
coords = cluster_turbines[['x', 'y']].values
if len(cluster_indices) > 1:
dist_matrix_local = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dist_matrix_local).toarray()
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f'turbine_{cluster_indices[i]}'
target = f'turbine_{cluster_indices[j]}'
final_connections.append((source, target, mst[i, j]))
dists = np.sqrt((cluster_turbines['x'] - substation_coord[0])**2 +
(cluster_turbines['y'] - substation_coord[1])**2)
closest_idx_in_df = dists.idxmin()
min_dist = dists.min()
final_connections.append((f'turbine_{closest_idx_in_df}', 'substation', min_dist))
return final_connections, turbines
def get_max_cable_capacity_mw(cable_specs=None):
"""
计算给定电缆规格中能够承载的最大功率 (单位: MW)。
基于提供的电缆规格列表,选取最大载流量,结合系统电压和功率因数计算理论最大传输功率。
参数:
cable_specs (list, optional): 电缆规格列表。每个元素应包含 (截面积, 额定电流, 单价, 损耗系数)。
返回:
float: 最大功率承载能力 (MW)。
异常:
Exception: 当未提供 cable_specs 时抛出,提示截面不满足。
"""
if cable_specs is None:
# Default cable specs if not provided (same as in evaluate_design)
cable_specs = [
(35, 150, 0.524, 80),
(70, 215, 0.268, 120),
(95, 260, 0.193, 150),
(120, 295, 0.153, 180),
(150, 330, 0.124, 220),
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600)
]
# 从所有电缆规格中找到最大的额定电流容量
max_current_capacity = max(spec[1] for spec in cable_specs)
# 计算最大功率P = √3 * U * I * cosφ
# 这里假设降额系数为 1 (不降额)
max_current = max_current_capacity * 1
max_power_w = np.sqrt(3) * VOLTAGE_LEVEL * max_current * POWER_FACTOR
# 将单位从 W 转换为 MW
return max_power_w / 1e6
# 5. 计算集电线路方案成本
def evaluate_design(turbines, connections, substation, cable_specs=None, is_offshore=False, method_name="Unknown Method"):
"""评估设计方案的总成本和损耗"""
total_cost = 0
total_loss = 0
# 创建连接图
graph = nx.Graph()
graph.add_node('substation')
for i, row in turbines.iterrows():
graph.add_node(f'turbine_{i}')
# 添加边
for source, target, length in connections:
graph.add_edge(source, target, weight=length)
# 计算每台风机的累积功率(从叶到根)
power_flow = defaultdict(float)
# 按照后序遍历(从叶到根)
nodes = list(nx.dfs_postorder_nodes(graph, 'substation'))
for node in nodes:
if node == 'substation':
continue
# 1. 如果节点是风机,先加上自身的功率
if node.startswith('turbine_'):
turbine_id = int(node.split('_')[1])
power_flow[node] += turbines.loc[turbine_id, 'power']
# 2. 找到上游节点(父节点)并将累积功率传递上去
try:
# 找到通往升压站的最短路径上的下一个节点
path = nx.shortest_path(graph, source=node, target='substation')
if len(path) > 1:
parent = path[1] # path[0]是node自己path[1]是父节点
power_flow[parent] += power_flow[node]
except nx.NetworkXNoPath:
pass
# DEBUG: 打印最大功率流 (不含升压站本身)
node_powers = [v for k, v in power_flow.items() if k != 'substation']
max_power = max(node_powers) if node_powers else 0
print(f"DEBUG [{method_name}]: 最大线路功率 = {max_power:.2f} MW")
# 计算成本和损耗
detailed_connections = []
for source, target, length in connections:
# Determine vertical length (PlatformHeight)
vertical_length = 0
if source.startswith('turbine_'):
tid = int(source.split('_')[1])
vertical_length += turbines.loc[tid, 'platform_height']
if target.startswith('turbine_'):
tid = int(target.split('_')[1])
vertical_length += turbines.loc[tid, 'platform_height']
# Calculate effective length with margin
# Total Length = (Horizontal Distance + Vertical Up/Down) * 1.03
horizontal_length = length
effective_length = (horizontal_length + vertical_length) * 1.03
# 确定该段线路承载的总功率
if source.startswith('turbine_') and target.startswith('turbine_'):
# 风机间连接,取下游节点功率
if nx.shortest_path_length(graph, 'substation', source) < nx.shortest_path_length(graph, 'substation', target):
power = power_flow[target]
else:
power = power_flow[source]
elif source == 'substation':
# 从升压站出发的连接
power = power_flow[target]
elif target == 'substation':
# 连接到升压站
power = power_flow[source]
else:
power = 0
# 电缆选型
# 成本乘数如果Excel中已包含敷设费用则设为1.0
cost_multiplier = 1.0 if is_offshore else 1.0
# 默认电缆规格库 (如果未提供)
if cable_specs is None:
cable_specs_to_use = [
(35, 150, 0.524, 80),
(70, 215, 0.268, 120),
(95, 260, 0.193, 150),
(120, 295, 0.153, 180),
(150, 330, 0.124, 220),
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450),
(400, 580, 0.0470, 600)
]
else:
cable_specs_to_use = cable_specs
# 估算电流
current = (power * 1e6) / (np.sqrt(3) * VOLTAGE_LEVEL * POWER_FACTOR)
# 选择满足载流量的最小电缆
selected_spec = None
for spec in cable_specs_to_use:
if current <= spec[1] * 1: # 100%负载率
selected_spec = spec
break
if selected_spec is None:
selected_spec = cable_specs_to_use[-1]
print(f"WARNING [{method_name}]: Current {current:.2f} A (Power: {power:.2f} MW) exceeds max cable capacity {selected_spec[1]} A!")
resistance = selected_spec[2] * effective_length / 1000 # 电阻(Ω)
cost = selected_spec[3] * effective_length * cost_multiplier # 电缆成本(含敷设)
cable = {
'cross_section': selected_spec[0],
'current_capacity': selected_spec[1],
'resistance': resistance,
'cost': cost,
'current': current
}
# 记录详细信息
detailed_connections.append({
'source': source,
'target': target,
'horizontal_length': horizontal_length,
'vertical_length': vertical_length,
'length': effective_length, # effective length used for stats
'power': power,
'cable': cable
})
# 累计成本
total_cost += cable['cost']
# 计算I²R损耗 (简化版)
loss = (cable['current'] ** 2) * cable['resistance'] * 3 # 三相
total_loss += loss
return {
'total_cost': total_cost,
'total_loss': total_loss,
'num_connections': len(connections),
'details': detailed_connections
}
# 6.5 导出CAD图纸 (DXF格式)
def export_to_dxf(turbines, substation, connections_details, filename):
"""
将设计方案导出为DXF格式 (CAD通用格式)
:param connections_details: evaluate_design返回的'details'列表
"""
import ezdxf
from ezdxf.enums import TextEntityAlignment
doc = ezdxf.new(dxfversion='R2010')
msp = doc.modelspace()
# 1. 建立图层
doc.layers.add('Substation', color=1) # Red
doc.layers.add('Turbines', color=5) # Blue
# 动态确定电缆颜色
# 提取所有使用到的电缆截面
used_sections = sorted(list(set(conn['cable']['cross_section'] for conn in connections_details)))
# 定义颜色映射规则 (AutoCAD Color Index)
# 1=Red, 2=Yellow, 3=Green, 4=Cyan, 5=Blue, 6=Magenta
color_rank = [
2, # 1st (Smallest): Yellow
3, # 2nd: Green
1, # 3rd: Red
5, # 4th: Blue
6 # 5th: Magenta
]
default_color = 3 # Others: Green
# 创建电缆图层
for i, section in enumerate(used_sections):
if i < len(color_rank):
c = color_rank[i]
else:
c = default_color
layer_name = f'Cable_{section}mm'
if layer_name not in doc.layers:
doc.layers.add(layer_name, color=c)
# 2. 绘制升压站
sx, sy = substation[0, 0], substation[0, 1]
# 绘制一个矩形代表升压站
size = 50
msp.add_lwpolyline([(sx-size, sy-size), (sx+size, sy-size),
(sx+size, sy+size), (sx-size, sy+size), (sx-size, sy-size)],
close=True, dxfattribs={'layer': 'Substation'})
msp.add_text("Substation", dxfattribs={'layer': 'Substation', 'height': 20}).set_placement((sx, sy+size+10), align=TextEntityAlignment.CENTER)
# 3. 绘制风机
for i, row in turbines.iterrows():
tx, ty = row['x'], row['y']
# 绘制圆形代表风机
msp.add_circle((tx, ty), radius=15, dxfattribs={'layer': 'Turbines'})
# 添加文字标签
label_id = str(int(row['original_id']) if 'original_id' in row else int(row['id']))
msp.add_text(label_id, dxfattribs={'layer': 'Turbines', 'height': 15}).set_placement((tx, ty-30), align=TextEntityAlignment.CENTER)
# 4. 绘制连接电缆
for conn in connections_details:
source, target = conn['source'], conn['target']
section = conn['cable']['cross_section']
if source == 'substation':
p1 = (substation[0, 0], substation[0, 1])
else:
tid = int(source.split('_')[1])
p1 = (turbines.loc[tid, 'x'], turbines.loc[tid, 'y'])
if target == 'substation':
p2 = (substation[0, 0], substation[0, 1])
else:
tid = int(target.split('_')[1])
p2 = (turbines.loc[tid, 'x'], turbines.loc[tid, 'y'])
# 确定图层
layer_name = f'Cable_{section}mm'
if layer_name not in doc.layers:
doc.layers.add(layer_name)
# 绘制二维多段线
msp.add_lwpolyline([p1, p2], dxfattribs={'layer': layer_name})
# 添加电缆型号文字(可选,在线的中点)
# mid_x = (p1[0] + p2[0]) / 2
# mid_y = (p1[1] + p2[1]) / 2
# msp.add_text(f"{section}mm", dxfattribs={'layer': layer_name, 'height': 10}).set_pos((mid_x, mid_y), align='CENTER')
try:
doc.saveas(filename)
print(f"成功导出DXF文件: {filename}")
except Exception as e:
print(f"导出DXF失败: {e}")
# 6.6 导出Excel报表
def export_to_excel(connections_details, filename):
"""
将设计方案详情导出为Excel文件
:param connections_details: evaluate_design返回的'details'列表
"""
data = []
for conn in connections_details:
data.append({
'Source': conn['source'],
'Target': conn['target'],
'Horizontal Length (m)': conn['horizontal_length'],
'Vertical Length (m)': conn['vertical_length'],
'Effective Length (m)': conn['length'],
'Cable Type (mm²)': conn['cable']['cross_section'],
'Current (A)': conn['cable']['current'],
'Power (MW)': conn['power'],
'Resistance (Ω)': conn['cable']['resistance'],
'Cost (¥)': conn['cable']['cost']
})
df = pd.DataFrame(data)
# 汇总统计
summary = {
'Total Cost (¥)': df['Cost (¥)'].sum(),
'Total Effective Length (m)': df['Effective Length (m)'].sum(),
'Total Vertical Length (m)': df['Vertical Length (m)'].sum()
}
summary_df = pd.DataFrame([summary])
try:
with pd.ExcelWriter(filename) as writer:
df.to_excel(writer, sheet_name='Cable Schedule', index=False)
summary_df.to_excel(writer, sheet_name='Summary', index=False)
print(f"成功导出Excel文件: {filename}")
except Exception as e:
print(f"导出Excel失败: {e}")
# 6.6 导出多方案对比Excel报表
def export_all_scenarios_to_excel(results, filename):
"""
导出所有方案的对比结果到 Excel
:param results: 包含各方案评估结果的列表
:param filename: 输出文件路径
"""
try:
with pd.ExcelWriter(filename) as writer:
# 1. 总览 Sheet
summary_data = []
for res in results:
# 获取回路数
n_circuits = 0
if 'turbines' in res and 'cluster' in res['turbines'].columns:
n_circuits = res['turbines']['cluster'].nunique()
summary_data.append({
'Scenario': res['name'],
'Total Cost (¥)': res['cost'],
'Total Loss (kW)': res['loss'],
'Num Circuits': n_circuits,
# 计算电缆统计
'Total Cable Length (m)': sum(d['length'] for d in res['eval']['details'])
})
pd.DataFrame(summary_data).to_excel(writer, sheet_name='Comparison Summary', index=False)
# 2. 每个方案的详细 Sheet
for res in results:
# 清理 Sheet 名称
safe_name = res['name'].replace(':', '').replace('/', '-').replace('\\', '-')
# 截断过长的名称 (Excel限制31字符)
if len(safe_name) > 25:
safe_name = safe_name[:25]
details = res['eval']['details']
data = []
for conn in details:
data.append({
'Source': conn['source'],
'Target': conn['target'],
'Horizontal Length (m)': conn['horizontal_length'],
'Vertical Length (m)': conn['vertical_length'],
'Effective Length (m)': conn['length'],
'Cable Type (mm²)': conn['cable']['cross_section'],
'Current (A)': conn['cable']['current'],
'Power (MW)': conn['power'],
'Resistance (Ω)': conn['cable']['resistance'],
'Cost (¥)': conn['cable']['cost']
})
df = pd.DataFrame(data)
df.to_excel(writer, sheet_name=safe_name, index=False)
print(f"成功导出包含所有方案的Excel文件: {filename}")
except Exception as e:
print(f"导出Excel失败: {e}")
# 6. 可视化函数
def visualize_design(turbines, substation, connections, title, ax=None, show_costs=True):
"""可视化集电线路设计方案"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 8))
# 绘制风机
ax.scatter(turbines['x'], turbines['y'], c='white', edgecolors='#333333', s=80, linewidth=1.0, zorder=3, label='Turbines')
# 标记风机ID
for i, row in turbines.iterrows():
# 显示原始ID如果存在
label_id = int(row['original_id']) if 'original_id' in row else int(row['id'])
# 仅在风机数量较少时显示ID避免密集恐惧
if len(turbines) < 50:
ax.annotate(str(label_id), (row['x'], row['y']), ha='center', va='center', fontsize=6, zorder=4)
# 绘制升压站
ax.scatter(substation[0, 0], substation[0, 1], c='red', s=250, marker='s', zorder=5, label='Substation')
# 颜色映射:使用鲜艳且区分度高的颜色
color_map = {
35: '#00FF00', # 亮绿 (Lime) - 最小
70: '#008000', # 深绿 (Green)
95: '#00FFFF', # 青色 (Cyan)
120: '#0000FF', # 纯蓝 (Blue)
150: '#800080', # 紫色 (Purple)
185: '#FF00FF', # 洋红 (Magenta)
240: '#FFA500', # 橙色 (Orange)
300: '#FF4500', # 红橙 (OrangeRed)
400: '#FF0000' # 纯红 (Red) - 最大
}
# 统计电缆使用情况
cable_counts = defaultdict(int)
# 绘制连接
# 判断传入的是简单连接列表还是详细连接列表
is_detailed = len(connections) > 0 and isinstance(connections[0], dict)
# 收集图例句柄
legend_handles = {}
for conn in connections:
if is_detailed:
source, target, length = conn['source'], conn['target'], conn['length']
section = conn['cable']['cross_section']
# 统计
cable_counts[section] += 1
# 获取颜色和线宽
color = color_map.get(section, 'black')
# 线宽范围 1.0 ~ 3.5
linewidth = 1.0 + (section / 400.0) * 2.5
label_text = f'{section}mm² Cable'
else:
source, target, length = conn
section = 0
color = 'black'
linewidth = 1.0
label_text = 'Connection'
# 获取坐标
if source == 'substation':
x1, y1 = substation[0, 0], substation[0, 1]
else:
turbine_id = int(source.split('_')[1])
x1, y1 = turbines.loc[turbine_id, 'x'], turbines.loc[turbine_id, 'y']
if target == 'substation':
x2, y2 = substation[0, 0], substation[0, 1]
else:
turbine_id = int(target.split('_')[1])
x2, y2 = turbines.loc[turbine_id, 'x'], turbines.loc[turbine_id, 'y']
# 绘制线路
line, = ax.plot([x1, x2], [y1, y2], color=color, linewidth=linewidth, alpha=0.9, zorder=2)
# 保存图例(去重)
if is_detailed and section not in legend_handles:
legend_handles[section] = line
# 打印统计信息
if is_detailed:
print(f"[{title.splitlines()[0]}] 电缆统计:")
for section in sorted(cable_counts.keys()):
print(f" {section}mm²: {cable_counts[section]}")
# 设置图形属性
ax.set_title(title, fontsize=10)
ax.set_xlabel('X (m)')
ax.set_ylabel('Y (m)')
# 构建图例
sorted_sections = sorted(legend_handles.keys())
handles = [legend_handles[s] for s in sorted_sections]
labels = [f'{s}mm²' for s in sorted_sections]
# 添加风机和升压站图例
handles.insert(0, ax.collections[0]) # Turbines
labels.insert(0, 'Turbines')
handles.insert(1, ax.collections[1]) # Substation
labels.insert(1, 'Substation')
ax.legend(handles, labels, loc='upper right', fontsize=8, framealpha=0.9)
ax.grid(True, linestyle='--', alpha=0.3)
ax.set_aspect('equal')
return ax
# 7. 主函数:比较两种设计方法
def compare_design_methods(excel_path=None, n_clusters_override=None):
"""
比较MST和三种电缆方案下的K-means设计方法
:param excel_path: Excel文件路径
:param n_clusters_override: 可选,手动指定簇的数量
"""
cable_specs = None
if excel_path:
print(f"正在从 {excel_path} 读取坐标数据...")
try:
turbines, substation, cable_specs = load_data_from_excel(excel_path)
scenario_title = "Offshore Wind Farm (Imported Data)"
except Exception:
print("回退到自动生成数据模式...")
return compare_design_methods(excel_path=None, n_clusters_override=n_clusters_override)
else:
print("正在生成海上风电场数据 (规则阵列布局)...")
turbines, substation = generate_wind_farm_data(n_turbines=30, layout='grid', spacing=800)
scenario_title = "Offshore Wind Farm (Grid Layout)"
is_offshore = True
# 准备三种电缆方案
# 原始 specs 是 5 元素元组: (section, capacity, resistance, cost, is_optional)
# 下游函数期望 4 元素元组: (section, capacity, resistance, cost)
if cable_specs:
# 方案 1: 不含 Optional='Y' (Standard)
specs_1 = [s[:4] for s in cable_specs if not s[4]]
# 方案 2: 含 Optional='Y' (All)
specs_2 = [s[:4] for s in cable_specs]
# 方案 3: 基于方案 1删掉截面最大的一种
# cable_specs 已按 capacity 排序,假设 capacity 与 section 正相关
specs_3 = specs_1[:-1] if len(specs_1) > 1 else list(specs_1)
else:
# 默认电缆库
default_specs = [
(35, 150, 0.524, 80), (70, 215, 0.268, 120), (95, 260, 0.193, 150),
(120, 295, 0.153, 180), (150, 330, 0.124, 220), (185, 370, 0.0991, 270),
(240, 425, 0.0754, 350), (300, 500, 0.0601, 450), (400, 580, 0.0470, 600)
]
specs_1 = default_specs
specs_2 = default_specs
specs_3 = default_specs[:-1]
scenarios = [
("Scenario 1 (Standard)", specs_1),
("Scenario 2 (With Optional)", specs_2),
("Scenario 3 (No Max)", specs_3)
]
# 1. MST 方法作为基准 (使用 Scenario 1)
mst_connections = design_with_mst(turbines, substation)
mst_evaluation = evaluate_design(turbines, mst_connections, substation, cable_specs=specs_1, is_offshore=is_offshore, method_name="MST Method")
# 准备画布 2x2
fig, axes = plt.subplots(2, 2, figsize=(20, 18))
axes = axes.flatten()
# 绘制 MST
visualize_design(turbines, substation, mst_evaluation['details'],
f"MST Method (Standard Cables)\nTotal Cost: ¥{mst_evaluation['total_cost']/10000:.2f}",
ax=axes[0])
print(f"\n===== 开始比较电缆方案 =====")
best_cost = float('inf')
best_result = None
comparison_results = []
# 将 MST 结果也加入对比列表,方便查看
comparison_results.append({
'name': 'MST Method',
'cost': mst_evaluation['total_cost'],
'loss': mst_evaluation['total_loss'],
'eval': mst_evaluation,
'turbines': turbines.copy(), # MST 不改变 turbines但为了统一格式
'specs': specs_1
})
for i, (name, current_specs) in enumerate(scenarios):
print(f"\n--- {name} ---")
if not current_specs:
print(" 无可用电缆,跳过。")
continue
# 计算参数
total_power = turbines['power'].sum()
max_cable_mw = get_max_cable_capacity_mw(cable_specs=current_specs)
# 确定簇数 (针对 Base 算法)
if n_clusters_override is not None:
n_clusters = n_clusters_override
min_needed = int(np.ceil(total_power / max_cable_mw))
if n_clusters < min_needed:
print(f" Warning: 指定簇数 {n_clusters} 小于理论最小需求 {min_needed}")
else:
min_needed = int(np.ceil(total_power / max_cable_mw))
n_cable_types = len(current_specs)
heuristic = int(np.ceil(len(turbines) / n_cable_types))
n_clusters = max(min_needed, heuristic)
if n_clusters > len(turbines): n_clusters = len(turbines)
print(f" 最大电缆容量: {max_cable_mw:.2f} MW")
# --- Run 1: Base Algorithm (Capacitated Sweep) ---
base_name = f"{name} (Base)"
conns_base, turbines_base = design_with_capacitated_sweep(
turbines.copy(), substation, cable_specs=current_specs
)
eval_base = evaluate_design(
turbines, conns_base, substation, cable_specs=current_specs,
is_offshore=is_offshore, method_name=base_name
)
comparison_results.append({
'name': base_name,
'cost': eval_base['total_cost'],
'loss': eval_base['total_loss'],
'eval': eval_base,
'turbines': turbines_base,
'specs': current_specs
})
print(f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW")
# --- Run 2: Rotational Algorithm (Optimization) ---
rot_name = f"{name} (Rotational)"
conns_rot, turbines_rot = design_with_rotational_sweep(
turbines.copy(), substation, cable_specs=current_specs
)
eval_rot = evaluate_design(
turbines, conns_rot, substation, cable_specs=current_specs,
is_offshore=is_offshore, method_name=rot_name
)
comparison_results.append({
'name': rot_name,
'cost': eval_rot['total_cost'],
'loss': eval_rot['total_loss'],
'eval': eval_rot,
'turbines': turbines_rot,
'specs': current_specs
})
print(f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW")
# 记录最佳
if eval_rot['total_cost'] < best_cost:
best_cost = eval_rot['total_cost']
# best_result 不再需要单独维护,最后遍历 comparison_results 即可
if eval_base['total_cost'] < best_cost:
best_cost = eval_base['total_cost']
# 可视化 (只画 Base 版本)
ax_idx = i + 1
if ax_idx < 4:
n_circuits = turbines_base['cluster'].nunique()
title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost']/10000:.2f}"
visualize_design(turbines_base, substation, eval_base['details'], title, ax=axes[ax_idx])
plt.tight_layout()
output_filename = 'wind_farm_design_comparison.png'
plt.savefig(output_filename, dpi=300)
print(f"\n比较图(Base版)已保存至: {output_filename}")
# 准备文件路径
if excel_path:
base_name = os.path.splitext(os.path.basename(excel_path))[0]
dir_name = os.path.dirname(excel_path)
dxf_filename = os.path.join(dir_name, f"{base_name}_design.dxf")
excel_out_filename = os.path.join(dir_name, f"{base_name}_design.xlsx")
else:
dxf_filename = 'wind_farm_design.dxf'
excel_out_filename = 'wind_farm_design.xlsx'
# 导出所有方案到同一个 Excel
if comparison_results:
export_all_scenarios_to_excel(comparison_results, excel_out_filename)
# 交互式选择导出 DXF
print("\n===== 方案选择 =====")
best_idx = 0
for i, res in enumerate(comparison_results):
if res['cost'] < comparison_results[best_idx]['cost']:
best_idx = i
print(f" {i+1}. {res['name']} - Cost: ¥{res['cost']:,.2f}")
print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)")
try:
choice_str = input(f"请输入要导出DXF的方案编号 (1-{len(comparison_results)}),或输入 'A' 导出全部: ").strip()
if choice_str.upper() == 'A':
print("正在导出所有方案...")
base_dxf_name, ext = os.path.splitext(dxf_filename)
for res in comparison_results:
# 生成文件名安全后缀
safe_suffix = res['name'].replace(' ', '_').replace(':', '').replace('(', '').replace(')', '').replace('/', '-')
current_filename = f"{base_dxf_name}_{safe_suffix}{ext}"
print(f" 导出 '{res['name']}' -> {current_filename}")
export_to_dxf(res['turbines'], substation, res['eval']['details'], current_filename)
else:
if not choice_str:
choice = best_idx
else:
choice = int(choice_str) - 1
if choice < 0 or choice >= len(comparison_results):
print("输入编号无效,将使用默认推荐方案。")
choice = best_idx
selected_res = comparison_results[choice]
print(f"正在导出 '{selected_res['name']}' 到 DXF: {dxf_filename} ...")
export_to_dxf(selected_res['turbines'], substation, selected_res['eval']['details'], dxf_filename)
except Exception as e:
print(f"输入处理出错: {e},将使用默认推荐方案。")
selected_res = comparison_results[best_idx]
print(f"正在导出 '{selected_res['name']}' 到 DXF: {dxf_filename} ...")
export_to_dxf(selected_res['turbines'], substation, selected_res['eval']['details'], dxf_filename)
return comparison_results
# 8. 执行比较
if __name__ == "__main__":
# 解析命令行参数
parser = argparse.ArgumentParser(description='Wind Farm Collector System Design')
parser.add_argument('--excel', help='Path to the Excel coordinates file', default=None)
parser.add_argument('--clusters', type=int, help='Specify the number of clusters (circuits) manually', default=None)
args = parser.parse_args()
# 3. 运行比较
# 如果没有提供excel文件将自动回退到生成数据模式
compare_design_methods(args.excel, n_clusters_override=args.clusters)