Files
windfarm/main.py

657 lines
25 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
import numpy as np
import pandas as pd
import matplotlib.pyplot as plt
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from sklearn.cluster import KMeans
from collections import defaultdict
import networkx as nx
import math
# 设置matplotlib支持中文显示
plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial']
plt.rcParams['axes.unicode_minus'] = False
# 1. 生成风电场数据(实际应用中替换为真实坐标)
def generate_wind_farm_data(n_turbines=30, seed=42, layout='random', spacing=800):
"""
生成模拟风电场数据
:param layout: 'random' (随机) 或 'grid' (规则行列)
:param spacing: 规则布局时的风机间距(m)
"""
np.random.seed(seed)
if layout == 'grid':
# 计算行列数 (尽量接近方形)
n_cols = int(np.ceil(np.sqrt(n_turbines)))
n_rows = int(np.ceil(n_turbines / n_cols))
x_coords = []
y_coords = []
for i in range(n_turbines):
row = i // n_cols
col = i % n_cols
# 添加微小抖动模拟海上定位误差(±1%)
jitter = spacing * 0.01
x = col * spacing + np.random.uniform(-jitter, jitter)
y = row * spacing + np.random.uniform(-jitter, jitter)
x_coords.append(x)
y_coords.append(y)
x_coords = np.array(x_coords)
y_coords = np.array(y_coords)
# 升压站位置:通常位于风电场边缘或中心,这里设为离岸侧(y负方向)中心
substation = np.array([[np.mean(x_coords), -spacing]])
else:
# 随机生成风机位置(扩大范围以适应更大容量)
x_coords = np.random.uniform(0, 2000, n_turbines)
y_coords = np.random.uniform(0, 2000, n_turbines)
# 升压站位置
substation = np.array([[0, 0]])
# 随机生成风机额定功率海上风机通常更大6-10MW
power_ratings = np.random.uniform(6.0, 10.0, n_turbines) if layout == 'grid' else np.random.uniform(2.0, 5.0, n_turbines)
# 创建DataFrame
turbines = pd.DataFrame({
'id': range(n_turbines),
'x': x_coords,
'y': y_coords,
'power': power_ratings,
'cumulative_power': np.zeros(n_turbines) # 用于后续计算
})
return turbines, substation
# 1.5 从Excel加载数据
def load_data_from_excel(file_path):
"""
从Excel文件读取风机和升压站坐标
Excel格式要求包含列: Type (Turbine/Substation), ID, X, Y, Power
"""
try:
df = pd.read_excel(file_path)
# 标准化列名(忽略大小写)
df.columns = [c.capitalize() for c in df.columns]
required_cols = {'Type', 'X', 'Y', 'Power'}
if not required_cols.issubset(df.columns):
raise ValueError(f"Excel文件缺少必要列: {required_cols - set(df.columns)}")
# 提取升压站数据
substation_df = df[df['Type'].astype(str).str.lower() == 'substation']
if len(substation_df) == 0:
raise ValueError("未在文件中找到升压站(Substation)数据")
substation = substation_df[['X', 'Y']].values
# 提取风机数据
turbines_df = df[df['Type'].astype(str).str.lower() == 'turbine'].copy()
if len(turbines_df) == 0:
raise ValueError("未在文件中找到风机(Turbine)数据")
# 重置索引并整理格式
turbines = pd.DataFrame({
'id': range(len(turbines_df)),
'original_id': (turbines_df['Id'].values if 'Id' in turbines_df.columns else range(len(turbines_df))),
'x': turbines_df['X'].values,
'y': turbines_df['Y'].values,
'power': turbines_df['Power'].values,
'cumulative_power': np.zeros(len(turbines_df))
})
print(f"成功加载: {len(turbines)} 台风机, {len(substation)} 座升压站")
return turbines, substation
except Exception as e:
print(f"读取Excel文件失败: {str(e)}")
raise
# 2. 基于最小生成树(MST)的集电线路设计
def design_with_mst(turbines, substation):
"""使用最小生成树算法设计集电线路拓扑"""
# 合并风机和升压站数据
all_points = np.vstack([substation, turbines[['x', 'y']].values])
n_points = len(all_points)
# 计算距离矩阵
dist_matrix = distance_matrix(all_points, all_points)
# 计算最小生成树
mst = minimum_spanning_tree(dist_matrix).toarray()
# 提取连接关系
connections = []
for i in range(n_points):
for j in range(n_points):
if mst[i, j] > 0:
# 确定节点类型0是升压站1+是风机
source = 'substation' if i == 0 else f'turbine_{i-1}'
target = 'substation' if j == 0 else f'turbine_{j-1}'
connections.append((source, target, mst[i, j]))
return connections
# 3. 基于扇区聚类改进版K-means的集电线路设计
def design_with_kmeans(turbines, substation, n_clusters=3):
"""
使用基于角度的K-means聚类设计集电线路拓扑 (避免交叉)
原理:将风机按相对于升压站的角度划分扇区,确保出线不交叉
"""
# 准备风机坐标数据
turbine_coords = turbines[['x', 'y']].values
substation_coord = substation[0]
# 计算每台风机相对于升压站的角度和单位向量
# 使用 (cos, sin) 也就是单位向量进行聚类,可以完美处理 -180/+180 的周期性问题
dx = turbine_coords[:, 0] - substation_coord[0]
dy = turbine_coords[:, 1] - substation_coord[1]
# 归一化为单位向量 (对应角度特征)
magnitudes = np.sqrt(dx**2 + dy**2)
# 处理重合点避免除零
with np.errstate(divide='ignore', invalid='ignore'):
unit_x = dx / magnitudes
unit_y = dy / magnitudes
unit_x[magnitudes == 0] = 0
unit_y[magnitudes == 0] = 0
# 构建特征矩阵:主要基于角度(单位向量),可根据需要加入少量距离权重
# 如果纯粹按角度,设为单位向量即可
features = np.column_stack([unit_x, unit_y])
# 执行K-means聚类
kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10)
turbines['cluster'] = kmeans.fit_predict(features)
# 为每个簇找到最佳连接点(离升压站最近的风机)
cluster_connection_points = {}
for cluster_id in range(n_clusters):
cluster_turbines = turbines[turbines['cluster'] == cluster_id]
if len(cluster_turbines) == 0:
continue
distances_to_substation = np.sqrt(
(cluster_turbines['x'] - substation_coord[0])**2 +
(cluster_turbines['y'] - substation_coord[1])**2
)
closest_idx = distances_to_substation.idxmin()
cluster_connection_points[cluster_id] = closest_idx
# 为每个簇内构建MST
cluster_connections = []
for cluster_id in range(n_clusters):
# 获取当前簇的风机
cluster_turbines = turbines[turbines['cluster'] == cluster_id]
if len(cluster_turbines) == 0:
continue
cluster_indices = cluster_turbines.index.tolist()
# 计算簇内距离矩阵
coords = cluster_turbines[['x', 'y']].values
dist_matrix = distance_matrix(coords, coords)
# 计算簇内MST
mst = minimum_spanning_tree(dist_matrix).toarray()
# 添加簇内连接
for i in range(len(cluster_indices)):
for j in range(len(cluster_indices)):
if mst[i, j] > 0:
source = f'turbine_{cluster_indices[i]}'
target = f'turbine_{cluster_indices[j]}'
cluster_connections.append((source, target, mst[i, j]))
# 添加簇到升压站的连接
substation_connections = []
for cluster_id, turbine_idx in cluster_connection_points.items():
turbine_coord = turbines.loc[turbine_idx, ['x', 'y']].values
distance = np.sqrt(
(turbine_coord[0] - substation_coord[0])**2 +
(turbine_coord[1] - substation_coord[1])**2
)
substation_connections.append((f'turbine_{turbine_idx}', 'substation', distance))
return cluster_connections + substation_connections, turbines
# 常量定义
VOLTAGE_LEVEL = 66000 # 66kV
POWER_FACTOR = 0.95
# 4. 电缆选型函数(简化版)
def select_cable(power, length, is_offshore=False):
"""
基于功率和长度选择合适的电缆截面
:param is_offshore: 是否为海上环境(成本更高)
"""
# 成本乘数:海缆材料+敷设成本通常是陆缆的4-6倍
cost_multiplier = 5.0 if is_offshore else 1.0
# 电缆规格库: (截面mm², 载流量A, 电阻Ω/km, 基准价格元/m)
cable_specs = [
(35, 150, 0.524, 80),
(70, 215, 0.268, 120),
(95, 260, 0.193, 150),
(120, 295, 0.153, 180),
(150, 330, 0.124, 220),
(185, 370, 0.0991, 270),
(240, 425, 0.0754, 350),
(300, 500, 0.0601, 450), # 增加大截面适应海上大功率
(400, 580, 0.0470, 600)
]
# 估算电流
# power是MW, 换算成W需要 * 1e6
current = (power * 1e6) / (np.sqrt(3) * VOLTAGE_LEVEL * POWER_FACTOR)
# 选择满足载流量的最小电缆
selected_spec = None
for spec in cable_specs:
if current <= spec[1] * 0.8: # 80%负载率
selected_spec = spec
break
if selected_spec is None:
selected_spec = cable_specs[-1]
resistance = selected_spec[2] * length / 1000 # 电阻(Ω)
cost = selected_spec[3] * length * cost_multiplier # 电缆成本(含敷设)
return {
'cross_section': selected_spec[0],
'current_capacity': selected_spec[1],
'resistance': resistance,
'cost': cost,
'current': current
}
def get_max_cable_capacity_mw():
"""计算最大电缆(400mm2)能承载的最大功率(MW)"""
# 400mm2载流量580A
max_current = 580 * 0.8 # 80%降额
max_power_w = np.sqrt(3) * VOLTAGE_LEVEL * max_current * POWER_FACTOR
return max_power_w / 1e6 # MW
# 5. 计算集电线路方案成本
def evaluate_design(turbines, connections, substation, is_offshore=False):
"""评估设计方案的总成本和损耗"""
total_cost = 0
total_loss = 0
# 创建连接图
graph = nx.Graph()
graph.add_node('substation')
for i, row in turbines.iterrows():
graph.add_node(f'turbine_{i}')
# 添加边
for source, target, length in connections:
graph.add_edge(source, target, weight=length)
# 计算每台风机的累积功率(从叶到根)
power_flow = defaultdict(float)
# 按照后序遍历(从叶到根)
nodes = list(nx.dfs_postorder_nodes(graph, 'substation'))
for node in nodes:
if node == 'substation':
continue
# 1. 如果节点是风机,先加上自身的功率
if node.startswith('turbine_'):
turbine_id = int(node.split('_')[1])
power_flow[node] += turbines.loc[turbine_id, 'power']
# 2. 找到上游节点(父节点)并将累积功率传递上去
try:
# 找到通往升压站的最短路径上的下一个节点
path = nx.shortest_path(graph, source=node, target='substation')
if len(path) > 1:
parent = path[1] # path[0]是node自己path[1]是父节点
power_flow[parent] += power_flow[node]
except nx.NetworkXNoPath:
pass
# DEBUG: 打印最大功率流
max_power = max(power_flow.values()) if power_flow else 0
print(f"DEBUG: 最大线路功率 = {max_power:.2f} MW")
# 计算成本和损耗
detailed_connections = []
for source, target, length in connections:
# 确定该段线路承载的总功率
if source.startswith('turbine_') and target.startswith('turbine_'):
# 风机间连接,取下游节点功率
if nx.shortest_path_length(graph, 'substation', source) < nx.shortest_path_length(graph, 'substation', target):
power = power_flow[target]
else:
power = power_flow[source]
elif source == 'substation':
# 从升压站出发的连接
power = power_flow[target]
elif target == 'substation':
# 连接到升压站
power = power_flow[source]
else:
power = 0
# 电缆选型
cable = select_cable(power, length, is_offshore=is_offshore)
# 记录详细信息
detailed_connections.append({
'source': source,
'target': target,
'length': length,
'power': power,
'cable': cable
})
# 累计成本
total_cost += cable['cost']
# 计算I²R损耗 (简化版)
loss = (cable['current'] ** 2) * cable['resistance'] * 3 # 三相
total_loss += loss
return {
'total_cost': total_cost,
'total_loss': total_loss,
'num_connections': len(connections),
'details': detailed_connections
}
# 6.5 导出CAD图纸 (DXF格式)
def export_to_dxf(turbines, substation, connections_details, filename):
"""
将设计方案导出为DXF格式 (CAD通用格式)
:param connections_details: evaluate_design返回的'details'列表
"""
import ezdxf
from ezdxf.enums import TextEntityAlignment
doc = ezdxf.new(dxfversion='R2010')
msp = doc.modelspace()
# 1. 建立图层
doc.layers.add('Substation', color=1) # Red
doc.layers.add('Turbines', color=5) # Blue
doc.layers.add('Cable_35mm', color=3) # Green
doc.layers.add('Cable_70mm', color=130)
doc.layers.add('Cable_95mm', color=150)
doc.layers.add('Cable_120mm', color=4) # Cyan
doc.layers.add('Cable_150mm', color=6) # Magenta
doc.layers.add('Cable_185mm', color=30) # Orange
doc.layers.add('Cable_240mm', color=1) # Red
doc.layers.add('Cable_300mm', color=250)
doc.layers.add('Cable_400mm', color=7) # White/Black
# 2. 绘制升压站
sx, sy = substation[0, 0], substation[0, 1]
# 绘制一个矩形代表升压站
size = 50
msp.add_lwpolyline([(sx-size, sy-size), (sx+size, sy-size),
(sx+size, sy+size), (sx-size, sy+size), (sx-size, sy-size)],
close=True, dxfattribs={'layer': 'Substation'})
msp.add_text("Substation", dxfattribs={'layer': 'Substation', 'height': 20}).set_placement((sx, sy+size+10), align=TextEntityAlignment.CENTER)
# 3. 绘制风机
for i, row in turbines.iterrows():
tx, ty = row['x'], row['y']
# 绘制圆形代表风机
msp.add_circle((tx, ty), radius=15, dxfattribs={'layer': 'Turbines'})
# 添加文字标签
label_id = str(int(row['original_id']) if 'original_id' in row else int(row['id']))
msp.add_text(label_id, dxfattribs={'layer': 'Turbines', 'height': 15}).set_placement((tx, ty-30), align=TextEntityAlignment.CENTER)
# 4. 绘制连接电缆
for conn in connections_details:
source, target = conn['source'], conn['target']
section = conn['cable']['cross_section']
# 获取坐标
if source == 'substation':
p1 = (substation[0, 0], substation[0, 1])
else:
tid = int(source.split('_')[1])
p1 = (turbines.loc[tid, 'x'], turbines.loc[tid, 'y'])
if target == 'substation':
p2 = (substation[0, 0], substation[0, 1])
else:
tid = int(target.split('_')[1])
p2 = (turbines.loc[tid, 'x'], turbines.loc[tid, 'y'])
# 确定图层
layer_name = f'Cable_{section}mm'
if layer_name not in doc.layers:
doc.layers.add(layer_name)
# 绘制直线
msp.add_line(p1, p2, dxfattribs={'layer': layer_name})
# 添加电缆型号文字(可选,在线的中点)
# mid_x = (p1[0] + p2[0]) / 2
# mid_y = (p1[1] + p2[1]) / 2
# msp.add_text(f"{section}mm", dxfattribs={'layer': layer_name, 'height': 10}).set_pos((mid_x, mid_y), align='CENTER')
try:
doc.saveas(filename)
print(f"成功导出DXF文件: {filename}")
except Exception as e:
print(f"导出DXF失败: {e}")
# 6. 可视化函数
def visualize_design(turbines, substation, connections, title, ax=None, show_costs=True):
"""可视化集电线路设计方案"""
if ax is None:
fig, ax = plt.subplots(figsize=(10, 8))
# 绘制风机
ax.scatter(turbines['x'], turbines['y'], c='white', edgecolors='#333333', s=80, linewidth=1.0, zorder=3, label='Turbines')
# 标记风机ID
for i, row in turbines.iterrows():
# 显示原始ID如果存在
label_id = int(row['original_id']) if 'original_id' in row else int(row['id'])
# 仅在风机数量较少时显示ID避免密集恐惧
if len(turbines) < 50:
ax.annotate(str(label_id), (row['x'], row['y']), ha='center', va='center', fontsize=6, zorder=4)
# 绘制升压站
ax.scatter(substation[0, 0], substation[0, 1], c='red', s=250, marker='s', zorder=5, label='Substation')
# 颜色映射:使用鲜艳且区分度高的颜色
color_map = {
35: '#00FF00', # 亮绿 (Lime) - 最小
70: '#008000', # 深绿 (Green)
95: '#00FFFF', # 青色 (Cyan)
120: '#0000FF', # 纯蓝 (Blue)
150: '#800080', # 紫色 (Purple)
185: '#FF00FF', # 洋红 (Magenta)
240: '#FFA500', # 橙色 (Orange)
300: '#FF4500', # 红橙 (OrangeRed)
400: '#FF0000' # 纯红 (Red) - 最大
}
# 统计电缆使用情况
cable_counts = defaultdict(int)
# 绘制连接
# 判断传入的是简单连接列表还是详细连接列表
is_detailed = len(connections) > 0 and isinstance(connections[0], dict)
# 收集图例句柄
legend_handles = {}
for conn in connections:
if is_detailed:
source, target, length = conn['source'], conn['target'], conn['length']
section = conn['cable']['cross_section']
# 统计
cable_counts[section] += 1
# 获取颜色和线宽
color = color_map.get(section, 'black')
# 线宽范围 1.0 ~ 3.5
linewidth = 1.0 + (section / 400.0) * 2.5
label_text = f'{section}mm² Cable'
else:
source, target, length = conn
section = 0
color = 'black'
linewidth = 1.0
label_text = 'Connection'
# 获取坐标
if source == 'substation':
x1, y1 = substation[0, 0], substation[0, 1]
else:
turbine_id = int(source.split('_')[1])
x1, y1 = turbines.loc[turbine_id, 'x'], turbines.loc[turbine_id, 'y']
if target == 'substation':
x2, y2 = substation[0, 0], substation[0, 1]
else:
turbine_id = int(target.split('_')[1])
x2, y2 = turbines.loc[turbine_id, 'x'], turbines.loc[turbine_id, 'y']
# 绘制线路
line, = ax.plot([x1, x2], [y1, y2], color=color, linewidth=linewidth, alpha=0.9, zorder=2)
# 保存图例(去重)
if is_detailed and section not in legend_handles:
legend_handles[section] = line
# 打印统计信息
if is_detailed:
print(f"[{title.splitlines()[0]}] 电缆统计:")
for section in sorted(cable_counts.keys()):
print(f" {section}mm²: {cable_counts[section]}")
# 设置图形属性
ax.set_title(title, fontsize=10)
ax.set_xlabel('X (m)')
ax.set_ylabel('Y (m)')
# 构建图例
sorted_sections = sorted(legend_handles.keys())
handles = [legend_handles[s] for s in sorted_sections]
labels = [f'{s}mm²' for s in sorted_sections]
# 添加风机和升压站图例
handles.insert(0, ax.collections[0]) # Turbines
labels.insert(0, 'Turbines')
handles.insert(1, ax.collections[1]) # Substation
labels.insert(1, 'Substation')
ax.legend(handles, labels, loc='upper right', fontsize=8, framealpha=0.9)
ax.grid(True, linestyle='--', alpha=0.3)
ax.set_aspect('equal')
return ax
# 7. 主函数:比较两种设计方法
def compare_design_methods(excel_path=None):
"""
比较MST和K-means两种设计方法 (海上风电场场景)
:param excel_path: Excel文件路径如果提供则从文件读取数据
"""
if excel_path:
print(f"正在从 {excel_path} 读取坐标数据...")
try:
turbines, substation = load_data_from_excel(excel_path)
scenario_title = "Offshore Wind Farm (Imported Data)"
except Exception:
print("回退到自动生成数据模式...")
return compare_design_methods(excel_path=None)
else:
print("正在生成海上风电场数据 (规则阵列布局)...")
# 使用规则布局间距800m
turbines, substation = generate_wind_farm_data(n_turbines=30, layout='grid', spacing=800)
scenario_title = "Offshore Wind Farm (Grid Layout)"
is_offshore = True
# 方法1最小生成树
# 注意MST方法在不考虑容量约束时可能会导致根部线路严重过载
mst_connections = design_with_mst(turbines, substation)
mst_evaluation = evaluate_design(turbines, mst_connections, substation, is_offshore=is_offshore)
# 方法2K-means聚类 (容量受限聚类)
# 计算总功率和所需的最小回路数
total_power = turbines['power'].sum()
max_cable_mw = get_max_cable_capacity_mw()
min_clusters_needed = int(np.ceil(total_power / max_cable_mw))
# 增加一定的安全裕度 (1.2倍) 并确保至少有一定数量的簇
n_clusters = max(int(min_clusters_needed * 1.2), 4)
if len(turbines) < n_clusters: # 避免簇数多于风机数
n_clusters = len(turbines)
print(f"系统设计参数: 总功率 {total_power:.1f} MW, 单回路最大容量 {max_cable_mw:.1f} MW")
print(f"计算建议回路数(簇数): {n_clusters} (最小需求 {min_clusters_needed})")
kmeans_connections, clustered_turbines = design_with_kmeans(turbines.copy(), substation, n_clusters=n_clusters)
kmeans_evaluation = evaluate_design(turbines, kmeans_connections, substation, is_offshore=is_offshore)
# 创建结果比较
results = {
'MST Method': mst_evaluation,
'K-means Method': kmeans_evaluation
}
# 可视化
fig, axes = plt.subplots(1, 2, figsize=(20, 10))
# 可视化MST方法
visualize_design(turbines, substation, mst_evaluation['details'],
f"MST Design - {scenario_title}\nTotal Cost: ¥{mst_evaluation['total_cost']/10000:.2f}\nTotal Loss: {mst_evaluation['total_loss']:.2f} kW",
ax=axes[0])
# 可视化K-means方法
visualize_design(clustered_turbines, substation, kmeans_evaluation['details'],
f"Sector Clustering (Angular) ({n_clusters} clusters) - {scenario_title}\nTotal Cost: ¥{kmeans_evaluation['total_cost']/10000:.2f}\nTotal Loss: {kmeans_evaluation['total_loss']:.2f} kW",
ax=axes[1])
plt.tight_layout()
output_filename = 'wind_farm_design_imported.png' if excel_path else 'offshore_wind_farm_design.png'
plt.savefig(output_filename, dpi=300)
# 导出DXF
dxf_filename = 'wind_farm_design.dxf'
# 默认导出更优的方案通常K-means扇区聚类在海上更合理或者成本更低者
# 这里我们导出Sector Clustering的结果
export_to_dxf(clustered_turbines, substation, kmeans_evaluation['details'], dxf_filename)
plt.show()
# 打印详细结果
print(f"\n===== 海上风电场设计方案比较 ({'导入数据' if excel_path else '自动生成'}) =====")
for method, eval_data in results.items():
print(f"\n{method}:")
print(f" 总成本: ¥{eval_data['total_cost']:,.2f} ({eval_data['total_cost']/10000:.2f}万元)")
print(f" 预估总损耗: {eval_data['total_loss']:.2f} kW")
print(f" 连接数量: {eval_data['num_connections']}")
return results
# 8. 执行比较
if __name__ == "__main__":
import os
# 检查是否存在 coordinates.xlsx存在则优先使用
default_excel = 'coordinates.xlsx'
if os.path.exists(default_excel):
results = compare_design_methods(excel_path=default_excel)
else:
results = compare_design_methods()