import numpy as np import pandas as pd import matplotlib.pyplot as plt from scipy.spatial import distance_matrix from scipy.sparse.csgraph import minimum_spanning_tree from sklearn.cluster import KMeans from collections import defaultdict import networkx as nx import math import argparse import os # 设置matplotlib支持中文显示 plt.rcParams['font.sans-serif'] = ['Microsoft YaHei', 'SimHei', 'Arial'] plt.rcParams['axes.unicode_minus'] = False # 常量定义 VOLTAGE_LEVEL = 66000 # 66kV POWER_FACTOR = 0.95 # 1. 生成风电场数据(实际应用中替换为真实坐标) def generate_wind_farm_data(n_turbines=30, seed=42, layout='random', spacing=800): """ 生成模拟风电场数据 :param layout: 'random' (随机) 或 'grid' (规则行列) :param spacing: 规则布局时的风机间距(m) """ np.random.seed(seed) if layout == 'grid': # 计算行列数 (尽量接近方形) n_cols = int(np.ceil(np.sqrt(n_turbines))) n_rows = int(np.ceil(n_turbines / n_cols)) x_coords = [] y_coords = [] for i in range(n_turbines): row = i // n_cols col = i % n_cols # 添加微小抖动模拟海上定位误差(±1%) jitter = spacing * 0.01 x = col * spacing + np.random.uniform(-jitter, jitter) y = row * spacing + np.random.uniform(-jitter, jitter) x_coords.append(x) y_coords.append(y) x_coords = np.array(x_coords) y_coords = np.array(y_coords) # 升压站位置:通常位于风电场边缘或中心,这里设为离岸侧(y负方向)中心 substation = np.array([[np.mean(x_coords), -spacing]]) else: # 随机生成风机位置(扩大范围以适应更大容量) x_coords = np.random.uniform(0, 2000, n_turbines) y_coords = np.random.uniform(0, 2000, n_turbines) # 升压站位置 substation = np.array([[0, 0]]) # 随机生成风机额定功率(海上风机通常更大,6-10MW) power_ratings = np.random.uniform(6.0, 10.0, n_turbines) if layout == 'grid' else np.random.uniform(2.0, 5.0, n_turbines) # 创建DataFrame turbines = pd.DataFrame({ 'id': range(n_turbines), 'x': x_coords, 'y': y_coords, 'power': power_ratings, 'platform_height': np.zeros(n_turbines), 'cumulative_power': np.zeros(n_turbines) # 用于后续计算 }) return turbines, substation # 1.5 从Excel加载数据 def load_data_from_excel(file_path): """ 从Excel文件读取风机和升压站坐标,以及可选的电缆规格 Excel格式要求: - Sheet 'Coordinates' (或第一个Sheet): Type (Turbine/Substation), ID, X, Y, Power - Sheet 'Cables' (可选): CrossSection, Capacity, Resistance, Cost """ try: xl = pd.ExcelFile(file_path) # 读取坐标数据 if 'Coordinates' in xl.sheet_names: df = pd.read_excel(xl, 'Coordinates') else: df = pd.read_excel(xl, 0) # 标准化列名(忽略大小写) df.columns = [c.capitalize() for c in df.columns] required_cols = {'Type', 'X', 'Y', 'Power'} if not required_cols.issubset(df.columns): raise ValueError(f"Excel文件缺少必要列: {required_cols - set(df.columns)}") # 提取升压站数据 substation_df = df[df['Type'].astype(str).str.lower() == 'substation'] if len(substation_df) == 0: raise ValueError("未在文件中找到升压站(Substation)数据") substation = substation_df[['X', 'Y']].values # 提取风机数据 turbines_df = df[df['Type'].astype(str).str.lower() == 'turbine'].copy() if len(turbines_df) == 0: raise ValueError("未在文件中找到风机(Turbine)数据") # 尝试获取平台高度列 (兼容不同命名) platform_height_col = None for col in turbines_df.columns: if col.lower().replace(' ', '') == 'platformheight': platform_height_col = col break platform_heights = turbines_df[platform_height_col].values if platform_height_col else np.zeros(len(turbines_df)) # 重置索引并整理格式 turbines = pd.DataFrame({ 'id': range(len(turbines_df)), 'original_id': (turbines_df['Id'].values if 'Id' in turbines_df.columns else range(len(turbines_df))), 'x': turbines_df['X'].values, 'y': turbines_df['Y'].values, 'power': turbines_df['Power'].values, 'platform_height': platform_heights, 'cumulative_power': np.zeros(len(turbines_df)) }) # 读取电缆数据 (如果存在) cable_specs = None if 'Cables' in xl.sheet_names: cables_df = pd.read_excel(xl, 'Cables') # 标准化列名 cables_df.columns = [c.replace(' ', '').capitalize() for c in cables_df.columns] # Handle 'Cross Section' vs 'CrossSection' # 尝试匹配列 # 目标格式: (截面mm², 载流量A, 电阻Ω/km, 基准价格元/m, 是否可选) specs = [] for _, row in cables_df.iterrows(): # 容错处理列名 section = row.get('Crosssection', row.get('Section', 0)) capacity = row.get('Capacity', row.get('Current', 0)) resistance = row.get('Resistance', 0) cost = row.get('Cost', row.get('Price', 0)) optional_val = str(row.get('Optional', '')).strip().upper() is_optional = (optional_val == 'Y') if section > 0 and capacity > 0: specs.append((section, capacity, resistance, cost, is_optional)) if specs: specs.sort(key=lambda x: x[1]) # 按载流量排序 cable_specs = specs print(f"成功加载: {len(turbines)} 台风机, {len(substation)} 座升压站") if cable_specs: print(f"成功加载: {len(cable_specs)} 种电缆规格") return turbines, substation, cable_specs except Exception as e: print(f"读取Excel文件失败: {str(e)}") raise # 2. 基于最小生成树(MST)的集电线路设计 def design_with_mst(turbines, substation): """使用最小生成树算法设计集电线路拓扑""" # 合并风机和升压站数据 all_points = np.vstack([substation, turbines[['x', 'y']].values]) n_points = len(all_points) # 计算距离矩阵 dist_matrix = distance_matrix(all_points, all_points) # 计算最小生成树 mst = minimum_spanning_tree(dist_matrix).toarray() # 提取连接关系 connections = [] for i in range(n_points): for j in range(n_points): if mst[i, j] > 0: # 确定节点类型:0是升压站,1+是风机 source = 'substation' if i == 0 else f'turbine_{i-1}' target = 'substation' if j == 0 else f'turbine_{j-1}' connections.append((source, target, mst[i, j])) return connections # 3. 基于扇区聚类(改进版K-means)的集电线路设计 def design_with_kmeans(turbines, substation, n_clusters=3): """ 使用基于角度的K-means聚类设计集电线路拓扑 (避免交叉) 原理:将风机按相对于升压站的角度划分扇区,确保出线不交叉 """ # 准备风机坐标数据 turbine_coords = turbines[['x', 'y']].values substation_coord = substation[0] # 计算每台风机相对于升压站的角度和单位向量 # 使用 (cos, sin) 也就是单位向量进行聚类,可以完美处理 -180/+180 的周期性问题 dx = turbine_coords[:, 0] - substation_coord[0] dy = turbine_coords[:, 1] - substation_coord[1] # 归一化为单位向量 (对应角度特征) magnitudes = np.sqrt(dx**2 + dy**2) # 处理重合点避免除零 with np.errstate(divide='ignore', invalid='ignore'): unit_x = dx / magnitudes unit_y = dy / magnitudes unit_x[magnitudes == 0] = 0 unit_y[magnitudes == 0] = 0 # 构建特征矩阵:主要基于角度(单位向量),可根据需要加入少量距离权重 # 如果纯粹按角度,设为单位向量即可 features = np.column_stack([unit_x, unit_y]) # 执行K-means聚类 kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) turbines['cluster'] = kmeans.fit_predict(features) # 为每个簇找到最佳连接点(离升压站最近的风机) cluster_connection_points = {} for cluster_id in range(n_clusters): cluster_turbines = turbines[turbines['cluster'] == cluster_id] if len(cluster_turbines) == 0: continue distances_to_substation = np.sqrt( (cluster_turbines['x'] - substation_coord[0])**2 + (cluster_turbines['y'] - substation_coord[1])**2 ) closest_idx = distances_to_substation.idxmin() cluster_connection_points[cluster_id] = closest_idx # 为每个簇内构建MST cluster_connections = [] for cluster_id in range(n_clusters): # 获取当前簇的风机 cluster_turbines = turbines[turbines['cluster'] == cluster_id] if len(cluster_turbines) == 0: continue cluster_indices = cluster_turbines.index.tolist() # 计算簇内距离矩阵 coords = cluster_turbines[['x', 'y']].values dist_matrix = distance_matrix(coords, coords) # 计算簇内MST mst = minimum_spanning_tree(dist_matrix).toarray() # 添加簇内连接 for i in range(len(cluster_indices)): for j in range(len(cluster_indices)): if mst[i, j] > 0: source = f'turbine_{cluster_indices[i]}' target = f'turbine_{cluster_indices[j]}' cluster_connections.append((source, target, mst[i, j])) # 添加簇到升压站的连接 substation_connections = [] for cluster_id, turbine_idx in cluster_connection_points.items(): turbine_coord = turbines.loc[turbine_idx, ['x', 'y']].values distance = np.sqrt( (turbine_coord[0] - substation_coord[0])**2 + (turbine_coord[1] - substation_coord[1])**2 ) substation_connections.append((f'turbine_{turbine_idx}', 'substation', distance)) return cluster_connections + substation_connections, turbines # 3.5 带容量约束的扇区扫描算法 (Capacitated Angular Sweep) def design_with_capacitated_sweep(turbines, substation, cable_specs=None): """ 使用带容量约束的扇区扫描算法设计集电线路 原理: 1. 计算所有风机相对于升压站的角度。 2. 找到角度间隔最大的位置作为起始“切割线”,以避免切断密集的风机群。 3. 沿圆周方向扫描,贪婪地将风机加入当前回路,直到达到电缆容量上限。 4. 满载后开启新回路。 """ # 1. 获取电缆最大容量 max_mw = get_max_cable_capacity_mw(cable_specs) print(f"DEBUG: 扇区扫描算法启动 - 单回路容量限制: {max_mw:.2f} MW") substation_coord = substation[0] # 2. 计算角度 (使用 arctan2 返回 -pi 到 pi) # 避免直接修改原始DataFrame,使用副本 work_df = turbines.copy() dx = work_df['x'] - substation_coord[0] dy = work_df['y'] - substation_coord[1] work_df['angle'] = np.arctan2(dy, dx) # 3. 寻找最佳起始角度 (最大角度间隙) # 按角度排序 work_df = work_df.sort_values('angle').reset_index(drop=True) # 重置索引方便切片 angles = work_df['angle'].values n = len(angles) if n > 1: # 计算相邻角度差 diffs = np.diff(angles) # 计算首尾角度差 (跨越 ±pi 处) wrap_diff = (2 * np.pi) - (angles[-1] - angles[0]) diffs = np.append(diffs, wrap_diff) # 找到最大间隙的索引 (该索引对应元素的 *后面* 是间隙) max_gap_idx = np.argmax(diffs) # 如果最大间隙不是在队尾,则需要旋转数组,使最大间隙成为新的起点(即队尾) # 实际上我们希望扫描的起点是最大间隙的“右侧” # 例如: [0, 10, 100, 110]. Gaps: 10, 90, 10, 250. Max gap is wrap (250). Start at 0 is fine. # 例如: [0, 10, 200, 210]. Gaps: 10, 190, 10, 150. Max gap is 190 (idx 1). # 我们应该从 idx 2 (200) 开始扫描。 start_idx = (max_gap_idx + 1) % n # 重新排序风机列表 if start_idx != 0: work_df = pd.concat([work_df.iloc[start_idx:], work_df.iloc[:start_idx]]).reset_index(drop=True) # 4. 贪婪分组 (Capacity Constrained Clustering) work_df['cluster'] = -1 cluster_id = 0 current_power = 0 current_indices = [] for i, row in work_df.iterrows(): p = row['power'] # 检查是否超载 # 注意:如果是空簇,必须加入至少一个(即使单个风机超载也得加,但在风电中单机不会超载) if len(current_indices) > 0 and (current_power + p > max_mw): # 当前簇已满,结束当前簇,开启新簇 work_df.loc[current_indices, 'cluster'] = cluster_id cluster_id += 1 current_power = 0 current_indices = [] current_indices.append(i) current_power += p # 处理最后一个簇 if current_indices: work_df.loc[current_indices, 'cluster'] = cluster_id cluster_id += 1 # 计数用 print(f"DEBUG: 生成了 {cluster_id} 个回路 (簇)") # 将 cluster 标记映射回原始 turbines DataFrame (通过 original_id 或 索引匹配) # 这里我们简单地重建 turbines,因为 work_df 包含了所有信息且顺序变了 # 为了保持外部一致性,我们把 cluster 列 map 回去 # 建立 id -> cluster 的映射 id_to_cluster = dict(zip(work_df['id'], work_df['cluster'])) turbines['cluster'] = turbines['id'].map(id_to_cluster) # 5. 对每个簇内部进行MST连接 (复用现有逻辑) cluster_connections = [] substation_connections = [] n_clusters = cluster_id for cid in range(n_clusters): cluster_turbines = turbines[turbines['cluster'] == cid] if len(cluster_turbines) == 0: continue # --- 簇内 MST --- cluster_indices = cluster_turbines.index.tolist() coords = cluster_turbines[['x', 'y']].values if len(cluster_indices) > 1: dist_matrix = distance_matrix(coords, coords) mst = minimum_spanning_tree(dist_matrix).toarray() for i in range(len(cluster_indices)): for j in range(len(cluster_indices)): if mst[i, j] > 0: source = f'turbine_{cluster_indices[i]}' target = f'turbine_{cluster_indices[j]}' cluster_connections.append((source, target, mst[i, j])) # --- 连接到升压站 --- # 找到簇内离升压站最近的风机 dists = np.sqrt((cluster_turbines['x'] - substation_coord[0])**2 + (cluster_turbines['y'] - substation_coord[1])**2) closest_id = dists.idxmin() # 返回的是原始DataFrame的index # 添加升压站连接 min_dist = dists.min() substation_connections.append((f'turbine_{closest_id}', 'substation', min_dist)) return cluster_connections + substation_connections, turbines def get_max_cable_capacity_mw(cable_specs=None): """ 计算给定电缆规格中能够承载的最大功率 (单位: MW)。 基于提供的电缆规格列表,选取最大载流量,结合系统电压和功率因数计算理论最大传输功率。 参数: cable_specs (list, optional): 电缆规格列表。每个元素应包含 (截面积, 额定电流, 单价, 损耗系数)。 返回: float: 最大功率承载能力 (MW)。 异常: Exception: 当未提供 cable_specs 时抛出,提示截面不满足。 """ if cable_specs is None: # Default cable specs if not provided (same as in evaluate_design) cable_specs = [ (35, 150, 0.524, 80), (70, 215, 0.268, 120), (95, 260, 0.193, 150), (120, 295, 0.153, 180), (150, 330, 0.124, 220), (185, 370, 0.0991, 270), (240, 425, 0.0754, 350), (300, 500, 0.0601, 450), (400, 580, 0.0470, 600) ] # 从所有电缆规格中找到最大的额定电流容量 max_current_capacity = max(spec[1] for spec in cable_specs) # 计算最大功率:P = √3 * U * I * cosφ # 这里假设降额系数为 1 (不降额) max_current = max_current_capacity * 1 max_power_w = np.sqrt(3) * VOLTAGE_LEVEL * max_current * POWER_FACTOR # 将单位从 W 转换为 MW return max_power_w / 1e6 # 5. 计算集电线路方案成本 def evaluate_design(turbines, connections, substation, cable_specs=None, is_offshore=False, method_name="Unknown Method"): """评估设计方案的总成本和损耗""" total_cost = 0 total_loss = 0 # 创建连接图 graph = nx.Graph() graph.add_node('substation') for i, row in turbines.iterrows(): graph.add_node(f'turbine_{i}') # 添加边 for source, target, length in connections: graph.add_edge(source, target, weight=length) # 计算每台风机的累积功率(从叶到根) power_flow = defaultdict(float) # 按照后序遍历(从叶到根) nodes = list(nx.dfs_postorder_nodes(graph, 'substation')) for node in nodes: if node == 'substation': continue # 1. 如果节点是风机,先加上自身的功率 if node.startswith('turbine_'): turbine_id = int(node.split('_')[1]) power_flow[node] += turbines.loc[turbine_id, 'power'] # 2. 找到上游节点(父节点)并将累积功率传递上去 try: # 找到通往升压站的最短路径上的下一个节点 path = nx.shortest_path(graph, source=node, target='substation') if len(path) > 1: parent = path[1] # path[0]是node自己,path[1]是父节点 power_flow[parent] += power_flow[node] except nx.NetworkXNoPath: pass # DEBUG: 打印最大功率流 (不含升压站本身) node_powers = [v for k, v in power_flow.items() if k != 'substation'] max_power = max(node_powers) if node_powers else 0 print(f"DEBUG [{method_name}]: 最大线路功率 = {max_power:.2f} MW") # 计算成本和损耗 detailed_connections = [] for source, target, length in connections: # Determine vertical length (PlatformHeight) vertical_length = 0 if source.startswith('turbine_'): tid = int(source.split('_')[1]) vertical_length += turbines.loc[tid, 'platform_height'] if target.startswith('turbine_'): tid = int(target.split('_')[1]) vertical_length += turbines.loc[tid, 'platform_height'] # Calculate effective length with margin # Total Length = (Horizontal Distance + Vertical Up/Down) * 1.03 horizontal_length = length effective_length = (horizontal_length + vertical_length) * 1.03 # 确定该段线路承载的总功率 if source.startswith('turbine_') and target.startswith('turbine_'): # 风机间连接,取下游节点功率 if nx.shortest_path_length(graph, 'substation', source) < nx.shortest_path_length(graph, 'substation', target): power = power_flow[target] else: power = power_flow[source] elif source == 'substation': # 从升压站出发的连接 power = power_flow[target] elif target == 'substation': # 连接到升压站 power = power_flow[source] else: power = 0 # 电缆选型 # 成本乘数:如果Excel中已包含敷设费用,则设为1.0 cost_multiplier = 1.0 if is_offshore else 1.0 # 默认电缆规格库 (如果未提供) if cable_specs is None: cable_specs_to_use = [ (35, 150, 0.524, 80), (70, 215, 0.268, 120), (95, 260, 0.193, 150), (120, 295, 0.153, 180), (150, 330, 0.124, 220), (185, 370, 0.0991, 270), (240, 425, 0.0754, 350), (300, 500, 0.0601, 450), (400, 580, 0.0470, 600) ] else: cable_specs_to_use = cable_specs # 估算电流 current = (power * 1e6) / (np.sqrt(3) * VOLTAGE_LEVEL * POWER_FACTOR) # 选择满足载流量的最小电缆 selected_spec = None for spec in cable_specs_to_use: if current <= spec[1] * 1: # 100%负载率 selected_spec = spec break if selected_spec is None: selected_spec = cable_specs_to_use[-1] print(f"WARNING [{method_name}]: Current {current:.2f} A (Power: {power:.2f} MW) exceeds max cable capacity {selected_spec[1]} A!") resistance = selected_spec[2] * effective_length / 1000 # 电阻(Ω) cost = selected_spec[3] * effective_length * cost_multiplier # 电缆成本(含敷设) cable = { 'cross_section': selected_spec[0], 'current_capacity': selected_spec[1], 'resistance': resistance, 'cost': cost, 'current': current } # 记录详细信息 detailed_connections.append({ 'source': source, 'target': target, 'horizontal_length': horizontal_length, 'vertical_length': vertical_length, 'length': effective_length, # effective length used for stats 'power': power, 'cable': cable }) # 累计成本 total_cost += cable['cost'] # 计算I²R损耗 (简化版) loss = (cable['current'] ** 2) * cable['resistance'] * 3 # 三相 total_loss += loss return { 'total_cost': total_cost, 'total_loss': total_loss, 'num_connections': len(connections), 'details': detailed_connections } # 6.5 导出CAD图纸 (DXF格式) def export_to_dxf(turbines, substation, connections_details, filename): """ 将设计方案导出为DXF格式 (CAD通用格式) :param connections_details: evaluate_design返回的'details'列表 """ import ezdxf from ezdxf.enums import TextEntityAlignment doc = ezdxf.new(dxfversion='R2010') msp = doc.modelspace() # 1. 建立图层 doc.layers.add('Substation', color=1) # Red doc.layers.add('Turbines', color=5) # Blue # 动态确定电缆颜色 # 提取所有使用到的电缆截面 used_sections = sorted(list(set(conn['cable']['cross_section'] for conn in connections_details))) # 定义颜色映射规则 (AutoCAD Color Index) # 1=Red, 2=Yellow, 3=Green, 4=Cyan, 5=Blue, 6=Magenta color_rank = [ 2, # 1st (Smallest): Yellow 3, # 2nd: Green 1, # 3rd: Red 5, # 4th: Blue 6 # 5th: Magenta ] default_color = 3 # Others: Green # 创建电缆图层 for i, section in enumerate(used_sections): if i < len(color_rank): c = color_rank[i] else: c = default_color layer_name = f'Cable_{section}mm' if layer_name not in doc.layers: doc.layers.add(layer_name, color=c) # 2. 绘制升压站 sx, sy = substation[0, 0], substation[0, 1] # 绘制一个矩形代表升压站 size = 50 msp.add_lwpolyline([(sx-size, sy-size), (sx+size, sy-size), (sx+size, sy+size), (sx-size, sy+size), (sx-size, sy-size)], close=True, dxfattribs={'layer': 'Substation'}) msp.add_text("Substation", dxfattribs={'layer': 'Substation', 'height': 20}).set_placement((sx, sy+size+10), align=TextEntityAlignment.CENTER) # 3. 绘制风机 for i, row in turbines.iterrows(): tx, ty = row['x'], row['y'] # 绘制圆形代表风机 msp.add_circle((tx, ty), radius=15, dxfattribs={'layer': 'Turbines'}) # 添加文字标签 label_id = str(int(row['original_id']) if 'original_id' in row else int(row['id'])) msp.add_text(label_id, dxfattribs={'layer': 'Turbines', 'height': 15}).set_placement((tx, ty-30), align=TextEntityAlignment.CENTER) # 4. 绘制连接电缆 for conn in connections_details: source, target = conn['source'], conn['target'] section = conn['cable']['cross_section'] if source == 'substation': p1 = (substation[0, 0], substation[0, 1]) else: tid = int(source.split('_')[1]) p1 = (turbines.loc[tid, 'x'], turbines.loc[tid, 'y']) if target == 'substation': p2 = (substation[0, 0], substation[0, 1]) else: tid = int(target.split('_')[1]) p2 = (turbines.loc[tid, 'x'], turbines.loc[tid, 'y']) # 确定图层 layer_name = f'Cable_{section}mm' if layer_name not in doc.layers: doc.layers.add(layer_name) # 绘制二维多段线 msp.add_lwpolyline([p1, p2], dxfattribs={'layer': layer_name}) # 添加电缆型号文字(可选,在线的中点) # mid_x = (p1[0] + p2[0]) / 2 # mid_y = (p1[1] + p2[1]) / 2 # msp.add_text(f"{section}mm", dxfattribs={'layer': layer_name, 'height': 10}).set_pos((mid_x, mid_y), align='CENTER') try: doc.saveas(filename) print(f"成功导出DXF文件: {filename}") except Exception as e: print(f"导出DXF失败: {e}") # 6.6 导出Excel报表 def export_to_excel(connections_details, filename): """ 将设计方案详情导出为Excel文件 :param connections_details: evaluate_design返回的'details'列表 """ data = [] for conn in connections_details: data.append({ 'Source': conn['source'], 'Target': conn['target'], 'Horizontal Length (m)': conn['horizontal_length'], 'Vertical Length (m)': conn['vertical_length'], 'Effective Length (m)': conn['length'], 'Cable Type (mm²)': conn['cable']['cross_section'], 'Current (A)': conn['cable']['current'], 'Power (MW)': conn['power'], 'Resistance (Ω)': conn['cable']['resistance'], 'Cost (¥)': conn['cable']['cost'] }) df = pd.DataFrame(data) # 汇总统计 summary = { 'Total Cost (¥)': df['Cost (¥)'].sum(), 'Total Effective Length (m)': df['Effective Length (m)'].sum(), 'Total Vertical Length (m)': df['Vertical Length (m)'].sum() } summary_df = pd.DataFrame([summary]) try: with pd.ExcelWriter(filename) as writer: df.to_excel(writer, sheet_name='Cable Schedule', index=False) summary_df.to_excel(writer, sheet_name='Summary', index=False) print(f"成功导出Excel文件: {filename}") except Exception as e: print(f"导出Excel失败: {e}") # 6.6 导出多方案对比Excel报表 def export_all_scenarios_to_excel(results, filename): """ 导出所有方案的对比结果到 Excel :param results: 包含各方案评估结果的列表 :param filename: 输出文件路径 """ try: with pd.ExcelWriter(filename) as writer: # 1. 总览 Sheet summary_data = [] for res in results: # 获取回路数 n_circuits = 0 if 'turbines' in res and 'cluster' in res['turbines'].columns: n_circuits = res['turbines']['cluster'].nunique() summary_data.append({ 'Scenario': res['name'], 'Total Cost (¥)': res['cost'], 'Total Loss (kW)': res['loss'], 'Num Circuits': n_circuits, # 计算电缆统计 'Total Cable Length (m)': sum(d['length'] for d in res['eval']['details']) }) pd.DataFrame(summary_data).to_excel(writer, sheet_name='Comparison Summary', index=False) # 2. 每个方案的详细 Sheet for res in results: # 清理 Sheet 名称 safe_name = res['name'].replace(':', '').replace('/', '-').replace('\\', '-') # 截断过长的名称 (Excel限制31字符) if len(safe_name) > 25: safe_name = safe_name[:25] details = res['eval']['details'] data = [] for conn in details: data.append({ 'Source': conn['source'], 'Target': conn['target'], 'Horizontal Length (m)': conn['horizontal_length'], 'Vertical Length (m)': conn['vertical_length'], 'Effective Length (m)': conn['length'], 'Cable Type (mm²)': conn['cable']['cross_section'], 'Current (A)': conn['cable']['current'], 'Power (MW)': conn['power'], 'Resistance (Ω)': conn['cable']['resistance'], 'Cost (¥)': conn['cable']['cost'] }) df = pd.DataFrame(data) df.to_excel(writer, sheet_name=safe_name, index=False) print(f"成功导出包含所有方案的Excel文件: {filename}") except Exception as e: print(f"导出Excel失败: {e}") # 6. 可视化函数 def visualize_design(turbines, substation, connections, title, ax=None, show_costs=True): """可视化集电线路设计方案""" if ax is None: fig, ax = plt.subplots(figsize=(10, 8)) # 绘制风机 ax.scatter(turbines['x'], turbines['y'], c='white', edgecolors='#333333', s=80, linewidth=1.0, zorder=3, label='Turbines') # 标记风机ID for i, row in turbines.iterrows(): # 显示原始ID(如果存在) label_id = int(row['original_id']) if 'original_id' in row else int(row['id']) # 仅在风机数量较少时显示ID,避免密集恐惧 if len(turbines) < 50: ax.annotate(str(label_id), (row['x'], row['y']), ha='center', va='center', fontsize=6, zorder=4) # 绘制升压站 ax.scatter(substation[0, 0], substation[0, 1], c='red', s=250, marker='s', zorder=5, label='Substation') # 颜色映射:使用鲜艳且区分度高的颜色 color_map = { 35: '#00FF00', # 亮绿 (Lime) - 最小 70: '#008000', # 深绿 (Green) 95: '#00FFFF', # 青色 (Cyan) 120: '#0000FF', # 纯蓝 (Blue) 150: '#800080', # 紫色 (Purple) 185: '#FF00FF', # 洋红 (Magenta) 240: '#FFA500', # 橙色 (Orange) 300: '#FF4500', # 红橙 (OrangeRed) 400: '#FF0000' # 纯红 (Red) - 最大 } # 统计电缆使用情况 cable_counts = defaultdict(int) # 绘制连接 # 判断传入的是简单连接列表还是详细连接列表 is_detailed = len(connections) > 0 and isinstance(connections[0], dict) # 收集图例句柄 legend_handles = {} for conn in connections: if is_detailed: source, target, length = conn['source'], conn['target'], conn['length'] section = conn['cable']['cross_section'] # 统计 cable_counts[section] += 1 # 获取颜色和线宽 color = color_map.get(section, 'black') # 线宽范围 1.0 ~ 3.5 linewidth = 1.0 + (section / 400.0) * 2.5 label_text = f'{section}mm² Cable' else: source, target, length = conn section = 0 color = 'black' linewidth = 1.0 label_text = 'Connection' # 获取坐标 if source == 'substation': x1, y1 = substation[0, 0], substation[0, 1] else: turbine_id = int(source.split('_')[1]) x1, y1 = turbines.loc[turbine_id, 'x'], turbines.loc[turbine_id, 'y'] if target == 'substation': x2, y2 = substation[0, 0], substation[0, 1] else: turbine_id = int(target.split('_')[1]) x2, y2 = turbines.loc[turbine_id, 'x'], turbines.loc[turbine_id, 'y'] # 绘制线路 line, = ax.plot([x1, x2], [y1, y2], color=color, linewidth=linewidth, alpha=0.9, zorder=2) # 保存图例(去重) if is_detailed and section not in legend_handles: legend_handles[section] = line # 打印统计信息 if is_detailed: print(f"[{title.splitlines()[0]}] 电缆统计:") for section in sorted(cable_counts.keys()): print(f" {section}mm²: {cable_counts[section]} 条") # 设置图形属性 ax.set_title(title, fontsize=10) ax.set_xlabel('X (m)') ax.set_ylabel('Y (m)') # 构建图例 sorted_sections = sorted(legend_handles.keys()) handles = [legend_handles[s] for s in sorted_sections] labels = [f'{s}mm²' for s in sorted_sections] # 添加风机和升压站图例 handles.insert(0, ax.collections[0]) # Turbines labels.insert(0, 'Turbines') handles.insert(1, ax.collections[1]) # Substation labels.insert(1, 'Substation') ax.legend(handles, labels, loc='upper right', fontsize=8, framealpha=0.9) ax.grid(True, linestyle='--', alpha=0.3) ax.set_aspect('equal') return ax # 7. 主函数:比较两种设计方法 def compare_design_methods(excel_path=None, n_clusters_override=None): """ 比较MST和三种电缆方案下的K-means设计方法 :param excel_path: Excel文件路径 :param n_clusters_override: 可选,手动指定簇的数量 """ cable_specs = None if excel_path: print(f"正在从 {excel_path} 读取坐标数据...") try: turbines, substation, cable_specs = load_data_from_excel(excel_path) scenario_title = "Offshore Wind Farm (Imported Data)" except Exception: print("回退到自动生成数据模式...") return compare_design_methods(excel_path=None, n_clusters_override=n_clusters_override) else: print("正在生成海上风电场数据 (规则阵列布局)...") turbines, substation = generate_wind_farm_data(n_turbines=30, layout='grid', spacing=800) scenario_title = "Offshore Wind Farm (Grid Layout)" is_offshore = True # 准备三种电缆方案 # 原始 specs 是 5 元素元组: (section, capacity, resistance, cost, is_optional) # 下游函数期望 4 元素元组: (section, capacity, resistance, cost) if cable_specs: # 方案 1: 不含 Optional='Y' (Standard) specs_1 = [s[:4] for s in cable_specs if not s[4]] # 方案 2: 含 Optional='Y' (All) specs_2 = [s[:4] for s in cable_specs] # 方案 3: 基于方案 1,删掉截面最大的一种 # cable_specs 已按 capacity 排序,假设 capacity 与 section 正相关 specs_3 = specs_1[:-1] if len(specs_1) > 1 else list(specs_1) else: # 默认电缆库 default_specs = [ (35, 150, 0.524, 80), (70, 215, 0.268, 120), (95, 260, 0.193, 150), (120, 295, 0.153, 180), (150, 330, 0.124, 220), (185, 370, 0.0991, 270), (240, 425, 0.0754, 350), (300, 500, 0.0601, 450), (400, 580, 0.0470, 600) ] specs_1 = default_specs specs_2 = default_specs specs_3 = default_specs[:-1] scenarios = [ ("Scenario 1 (Standard)", specs_1), ("Scenario 2 (With Optional)", specs_2), ("Scenario 3 (No Max)", specs_3) ] # 1. MST 方法作为基准 (使用 Scenario 1) mst_connections = design_with_mst(turbines, substation) mst_evaluation = evaluate_design(turbines, mst_connections, substation, cable_specs=specs_1, is_offshore=is_offshore, method_name="MST Method") # 准备画布 2x2 fig, axes = plt.subplots(2, 2, figsize=(20, 18)) axes = axes.flatten() # 绘制 MST visualize_design(turbines, substation, mst_evaluation['details'], f"MST Method (Standard Cables)\nTotal Cost: ¥{mst_evaluation['total_cost']/10000:.2f}万", ax=axes[0]) print(f"\n===== 开始比较电缆方案 (基于 Capacitated Sweep) =====") best_cost = float('inf') best_result = None comparison_results = [] for i, (name, current_specs) in enumerate(scenarios): print(f"\n--- {name} ---") if not current_specs: print(" 无可用电缆,跳过。") continue # 计算参数 total_power = turbines['power'].sum() max_cable_mw = get_max_cable_capacity_mw(cable_specs=current_specs) n_cable_types = len(current_specs) # 确定簇数 if n_clusters_override is not None: n_clusters = n_clusters_override min_needed = int(np.ceil(total_power / max_cable_mw)) if n_clusters < min_needed: print(f" Warning: 指定簇数 {n_clusters} 小于理论最小需求 {min_needed}。设计可能会失败或严重过载。") else: # 自动计算:取 (理论最小需求) 和 (基于电缆型号分级估算) 的较大值 min_needed = int(np.ceil(total_power / max_cable_mw)) # 这里的逻辑是:如果电缆级差很密,可能不需要那么多回路;如果电缆很大,回路可以少。 # 原有的逻辑是 len(turbines)/n_cable_types,这只是一个经验值。 # 我们主要保证满足容量: n_clusters = min_needed # 稍微增加一点裕度,避免因为离散分布导致最后一点功率塞不进 # 或者保持原有的启发式逻辑,取最大值 heuristic = int(np.ceil(len(turbines) / n_cable_types)) n_clusters = max(min_needed, heuristic) if n_clusters > len(turbines): n_clusters = len(turbines) print(f" 最大电缆容量: {max_cable_mw:.2f} MW") print(f" 设计回路数: {n_clusters}") # 运行设计 conns, clustered_turbines = design_with_capacitated_sweep( turbines.copy(), substation, cable_specs=current_specs ) # 评估 evaluation = evaluate_design( turbines, conns, substation, cable_specs=current_specs, is_offshore=is_offshore, method_name=name ) # 记录结果 comparison_results.append({ 'name': name, 'cost': evaluation['total_cost'], 'loss': evaluation['total_loss'], 'eval': evaluation, 'turbines': clustered_turbines, 'specs': current_specs }) # 输出简报 print(f" 总成本: ¥{evaluation['total_cost']:,.2f}") print(f" 总损耗: {evaluation['total_loss']:.2f} kW") # 记录最佳 if evaluation['total_cost'] < best_cost: best_cost = evaluation['total_cost'] best_result = comparison_results[-1] # 可视化 (axes 1, 2, 3) ax_idx = i + 1 if ax_idx < 4: n_actual_clusters = clustered_turbines['cluster'].nunique() title = f"{name} ({n_actual_clusters} circuits)\nCost: ¥{evaluation['total_cost']/10000:.2f}万 | Loss: {evaluation['total_loss']:.2f} kW" visualize_design(clustered_turbines, substation, evaluation['details'], title, ax=axes[ax_idx]) plt.tight_layout() output_filename = 'wind_farm_design_comparison.png' plt.savefig(output_filename, dpi=300) print(f"\n比较图已保存至: {output_filename}") # 准备文件路径 if excel_path: base_name = os.path.splitext(os.path.basename(excel_path))[0] dir_name = os.path.dirname(excel_path) dxf_filename = os.path.join(dir_name, f"{base_name}_design.dxf") excel_out_filename = os.path.join(dir_name, f"{base_name}_design.xlsx") else: dxf_filename = 'wind_farm_design.dxf' excel_out_filename = 'wind_farm_design.xlsx' # 导出所有方案到同一个 Excel if comparison_results: export_all_scenarios_to_excel(comparison_results, excel_out_filename) # 交互式选择导出 DXF print("\n===== 方案选择 =====") best_idx = 0 for i, res in enumerate(comparison_results): if res['cost'] < comparison_results[best_idx]['cost']: best_idx = i print(f" {i+1}. {res['name']} - Cost: ¥{res['cost']:,.2f}") print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)") try: choice_str = input(f"请输入要导出DXF的方案编号 (1-{len(comparison_results)}),或输入 'A' 导出全部: ").strip() if choice_str.upper() == 'A': print("正在导出所有方案...") base_dxf_name, ext = os.path.splitext(dxf_filename) for res in comparison_results: # 生成文件名安全后缀 safe_suffix = res['name'].replace(' ', '_').replace(':', '').replace('(', '').replace(')', '').replace('/', '-') current_filename = f"{base_dxf_name}_{safe_suffix}{ext}" print(f" 导出 '{res['name']}' -> {current_filename}") export_to_dxf(res['turbines'], substation, res['eval']['details'], current_filename) else: if not choice_str: choice = best_idx else: choice = int(choice_str) - 1 if choice < 0 or choice >= len(comparison_results): print("输入编号无效,将使用默认推荐方案。") choice = best_idx selected_res = comparison_results[choice] print(f"正在导出 '{selected_res['name']}' 到 DXF: {dxf_filename} ...") export_to_dxf(selected_res['turbines'], substation, selected_res['eval']['details'], dxf_filename) except Exception as e: print(f"输入处理出错: {e},将使用默认推荐方案。") selected_res = comparison_results[best_idx] print(f"正在导出 '{selected_res['name']}' 到 DXF: {dxf_filename} ...") export_to_dxf(selected_res['turbines'], substation, selected_res['eval']['details'], dxf_filename) return comparison_results # 8. 执行比较 if __name__ == "__main__": # 解析命令行参数 parser = argparse.ArgumentParser(description='Wind Farm Collector System Design') parser.add_argument('--excel', help='Path to the Excel coordinates file', default=None) parser.add_argument('--clusters', type=int, help='Specify the number of clusters (circuits) manually', default=None) args = parser.parse_args() # 3. 运行比较 # 如果没有提供excel文件,将自动回退到生成数据模式 compare_design_methods(args.excel, n_clusters_override=args.clusters)