import argparse import math import os from collections import defaultdict import matplotlib.pyplot as plt import networkx as nx import numpy as np import pandas as pd from scipy.sparse.csgraph import minimum_spanning_tree from scipy.spatial import distance_matrix from sklearn.cluster import KMeans from esau_williams import design_with_esau_williams # 设置matplotlib支持中文显示 plt.rcParams["font.sans-serif"] = ["Microsoft YaHei", "SimHei", "Arial"] plt.rcParams["axes.unicode_minus"] = False # 常量定义 VOLTAGE_LEVEL = 66000 # 66kV POWER_FACTOR = 0.95 ELECTRICITY_PRICE = 0.4 # 元/kWh PROJECT_LIFETIME = 25 # 工程运行期限(年) DISCOUNT_RATE = 8 # 折现率(%) ANNUAL_LOSS_HOURS = 1400 # 年损耗小时数(小时) # 1. 生成风电场数据(实际应用中替换为真实坐标) def generate_wind_farm_data(n_turbines=30, seed=42, layout="random", spacing=800): """ 生成模拟风电场数据 :param layout: 'random' (随机) 或 'grid' (规则行列) :param spacing: 规则布局时的风机间距(m) """ np.random.seed(seed) if layout == "grid": # 计算行列数 (尽量接近方形) n_cols = int(np.ceil(np.sqrt(n_turbines))) n_rows = int(np.ceil(n_turbines / n_cols)) x_coords = [] y_coords = [] for i in range(n_turbines): row = i // n_cols col = i % n_cols # 添加微小抖动模拟海上定位误差(±1%) jitter = spacing * 0.01 x = col * spacing + np.random.uniform(-jitter, jitter) y = row * spacing + np.random.uniform(-jitter, jitter) x_coords.append(x) y_coords.append(y) x_coords = np.array(x_coords) y_coords = np.array(y_coords) # 升压站位置:通常位于风电场边缘或中心,这里设为离岸侧(y负方向)中心 substation = np.array([[np.mean(x_coords), -spacing]]) else: # 随机生成风机位置(扩大范围以适应更大容量) x_coords = np.random.uniform(0, 2000, n_turbines) y_coords = np.random.uniform(0, 2000, n_turbines) # 升压站位置 substation = np.array([[0, 0]]) # 随机生成风机额定功率(海上风机通常更大,6-10MW) power_ratings = ( np.random.uniform(6.0, 10.0, n_turbines) if layout == "grid" else np.random.uniform(2.0, 5.0, n_turbines) ) # 创建DataFrame turbines = pd.DataFrame( { "id": range(n_turbines), "x": x_coords, "y": y_coords, "power": power_ratings, "platform_height": np.zeros(n_turbines), "cumulative_power": np.zeros(n_turbines), # 用于后续计算 } ) return turbines, substation # 1.5 从Excel加载数据 def load_data_from_excel(file_path): """ 从Excel文件读取风机和升压站坐标,以及可选的电缆规格 Excel格式要求: - Sheet 'Coordinates' (或第一个Sheet): Type (Turbine/Substation), ID, X, Y, Power - Sheet 'Cables' (可选): CrossSection, Capacity, Resistance, Cost """ try: xl = pd.ExcelFile(file_path) # 读取坐标数据 if "Coordinates" in xl.sheet_names: df = pd.read_excel(xl, "Coordinates") else: df = pd.read_excel(xl, 0) # 标准化列名(忽略大小写) df.columns = [c.capitalize() for c in df.columns] required_cols = {"Type", "X", "Y", "Power"} if not required_cols.issubset(df.columns): raise ValueError(f"Excel文件缺少必要列: {required_cols - set(df.columns)}") # 提取升压站数据 substation_df = df[df["Type"].astype(str).str.lower() == "substation"] if len(substation_df) == 0: raise ValueError("未在文件中找到升压站(Substation)数据") substation = substation_df[["X", "Y"]].values # 提取风机数据 turbines_df = df[df["Type"].astype(str).str.lower() == "turbine"].copy() if len(turbines_df) == 0: raise ValueError("未在文件中找到风机(Turbine)数据") # 尝试获取平台高度列 (兼容不同命名) platform_height_col = None for col in turbines_df.columns: if col.lower().replace(" ", "") == "platformheight": platform_height_col = col break platform_heights = ( turbines_df[platform_height_col].values if platform_height_col else np.zeros(len(turbines_df)) ) # 重置索引并整理格式 turbines = pd.DataFrame( { "id": range(len(turbines_df)), "original_id": ( turbines_df["Id"].values if "Id" in turbines_df.columns else range(len(turbines_df)) ), "x": turbines_df["X"].values, "y": turbines_df["Y"].values, "power": turbines_df["Power"].values, "platform_height": platform_heights, "cumulative_power": np.zeros(len(turbines_df)), } ) # 读取电缆数据 (如果存在) cable_specs = None if "Cables" in xl.sheet_names: cables_df = pd.read_excel(xl, "Cables") # 标准化列名 cables_df.columns = [ c.replace(" ", "").capitalize() for c in cables_df.columns ] # Handle 'Cross Section' vs 'CrossSection' # 尝试匹配列 # 目标格式: (截面mm², 载流量A, 电阻Ω/km, 基准价格元/m, 是否可选) specs = [] for _, row in cables_df.iterrows(): # 容错处理列名 section = row.get("Crosssection", row.get("Section", 0)) capacity = row.get("Capacity", row.get("Current", 0)) resistance = row.get("Resistance", 0) cost_wan_per_km = row.get("Cost", row.get("Price", 0)) # 转换:1 万元/km = 10 元/m cost = cost_wan_per_km * 10 optional_val = str(row.get("Optional", "")).strip().upper() is_optional = optional_val == "Y" if section > 0 and capacity > 0: specs.append((section, capacity, resistance, cost, is_optional)) if specs: # --- 输入数据校验:单调性 --- for i in range(len(specs) - 1): curr_s = specs[i] next_s = specs[i + 1] # 校验截面 (section) if curr_s[0] >= next_s[0]: raise ValueError( f"电缆数据校验失败:Excel中电缆顺序必须按截面从小到大排列。第{i + 1}行({curr_s[0]}mm²) >= 第{i + 2}行({next_s[0]}mm²)。" ) # 校验载流量 (capacity) if curr_s[1] >= next_s[1]: raise ValueError( f"电缆数据校验失败:Excel中电缆载流量必须严格递增。第{i + 1}行({curr_s[1]}A) >= 第{i + 2}行({next_s[1]}A)。" ) specs.sort(key=lambda x: x[1]) # 按载流量排序 # --- 输入数据校验 --- # 筛选出所有标记为 Optional='Y' 的电缆 optional_cables = [s for s in specs if s[4]] # 规则1: 最多只能有一条可选电缆 if len(optional_cables) > 1: raise ValueError( f"电缆数据校验失败:检测到 {len(optional_cables)} 条可选电缆(Optional='Y')。系统限制最多只能指定 1 条可选电缆。" ) # 规则2: 如果存在可选电缆,它必须是所有电缆中截面最大的一条 if len(optional_cables) == 1: opt_cable = optional_cables[0] # s[0] 是截面积 max_section = max(s[0] for s in specs) if opt_cable[0] < max_section: raise ValueError( f"电缆数据校验失败:可选电缆 ({opt_cable[0]}mm²) 必须是所有电缆中截面最大的一条 (当前最大为 {max_section}mm²)。" ) # -------------------- cable_specs = specs print(f"成功加载: {len(turbines)} 台风机, {len(substation)} 座升压站") if cable_specs: print(f"成功加载: {len(cable_specs)} 种电缆规格") # 读取参数数据 (如果存在) system_params = {} param_sheet_name = None if "Parameters" in xl.sheet_names: param_sheet_name = "Parameters" elif "参数" in xl.sheet_names: param_sheet_name = "参数" if param_sheet_name: try: params_df = pd.read_excel(xl, param_sheet_name) # 假设格式为两列:Parameter (参数名), Value (值) if len(params_df.columns) >= 2: for _, row in params_df.iterrows(): key = str(row[0]).strip().lower() try: val = float(row[1]) if "voltage" in key or "电压" in key: # 检测是否为kV单位 if "kv" in key: system_params["voltage"] = val * 1000 else: system_params["voltage"] = val elif "factor" in key or "功率因数" in key: system_params["power_factor"] = val elif "price" in key or "电价" in key: system_params["electricity_price"] = val elif "lifetime" in key or "期限" in key: system_params["project_lifetime"] = val elif "discount" in key or "折现率" in key: system_params["discount_rate"] = val elif "hours" in key or "小时" in key: system_params["annual_loss_hours"] = val except ValueError: pass # 设置默认值(如果参数表中未提供) if "electricity_price" not in system_params: system_params["electricity_price"] = ELECTRICITY_PRICE if "project_lifetime" not in system_params: system_params["project_lifetime"] = PROJECT_LIFETIME if "discount_rate" not in system_params: system_params["discount_rate"] = DISCOUNT_RATE if "annual_loss_hours" not in system_params: system_params["annual_loss_hours"] = ANNUAL_LOSS_HOURS if system_params: print(f"成功加载系统参数: {system_params}") except Exception as e: print(f"读取参数Sheet失败: {e}") return turbines, substation, cable_specs, system_params except Exception as e: print(f"读取Excel文件失败: {str(e)}") raise # 2. 基于最小生成树(MST)的集电线路设计 def design_with_mst(turbines, substation): """使用最小生成树算法设计集电线路拓扑""" # 合并风机和升压站数据 all_points = np.vstack([substation, turbines[["x", "y"]].values]) n_points = len(all_points) # 计算距离矩阵 dist_matrix = distance_matrix(all_points, all_points) # 计算最小生成树 mst = minimum_spanning_tree(dist_matrix).toarray() # 提取连接关系 connections = [] for i in range(n_points): for j in range(n_points): if mst[i, j] > 0: # 确定节点类型:0是升压站,1+是风机 source = "substation" if i == 0 else f"turbine_{i - 1}" target = "substation" if j == 0 else f"turbine_{j - 1}" connections.append((source, target, mst[i, j])) return connections # 3. 基于扇区聚类(改进版K-means)的集电线路设计 def design_with_kmeans(turbines, substation, n_clusters=3): """ 使用基于角度的K-means聚类设计集电线路拓扑 (避免交叉) 原理:将风机按相对于升压站的角度划分扇区,确保出线不交叉 """ # 准备风机坐标数据 turbine_coords = turbines[["x", "y"]].values substation_coord = substation[0] # 计算每台风机相对于升压站的角度和单位向量 # 使用 (cos, sin) 也就是单位向量进行聚类,可以完美处理 -180/+180 的周期性问题 dx = turbine_coords[:, 0] - substation_coord[0] dy = turbine_coords[:, 1] - substation_coord[1] # 归一化为单位向量 (对应角度特征) magnitudes = np.sqrt(dx**2 + dy**2) # 处理重合点避免除零 with np.errstate(divide="ignore", invalid="ignore"): unit_x = dx / magnitudes unit_y = dy / magnitudes unit_x[magnitudes == 0] = 0 unit_y[magnitudes == 0] = 0 # 构建特征矩阵:主要基于角度(单位向量),可根据需要加入少量距离权重 # 如果纯粹按角度,设为单位向量即可 features = np.column_stack([unit_x, unit_y]) # 执行K-means聚类 kmeans = KMeans(n_clusters=n_clusters, random_state=42, n_init=10) turbines["cluster"] = kmeans.fit_predict(features) # 为每个簇找到最佳连接点(离升压站最近的风机) cluster_connection_points = {} for cluster_id in range(n_clusters): cluster_turbines = turbines[turbines["cluster"] == cluster_id] if len(cluster_turbines) == 0: continue distances_to_substation = np.sqrt( (cluster_turbines["x"] - substation_coord[0]) ** 2 + (cluster_turbines["y"] - substation_coord[1]) ** 2 ) closest_idx = distances_to_substation.idxmin() cluster_connection_points[cluster_id] = closest_idx # 为每个簇内构建MST cluster_connections = [] for cluster_id in range(n_clusters): # 获取当前簇的风机 cluster_turbines = turbines[turbines["cluster"] == cluster_id] if len(cluster_turbines) == 0: continue cluster_indices = cluster_turbines.index.tolist() # 计算簇内距离矩阵 coords = cluster_turbines[["x", "y"]].values dist_matrix = distance_matrix(coords, coords) # 计算簇内MST mst = minimum_spanning_tree(dist_matrix).toarray() # 添加簇内连接 for i in range(len(cluster_indices)): for j in range(len(cluster_indices)): if mst[i, j] > 0: source = f"turbine_{cluster_indices[i]}" target = f"turbine_{cluster_indices[j]}" cluster_connections.append((source, target, mst[i, j])) # 添加簇到升压站的连接 substation_connections = [] for cluster_id, turbine_idx in cluster_connection_points.items(): turbine_coord = turbines.loc[turbine_idx, ["x", "y"]].values distance = np.sqrt( (turbine_coord[0] - substation_coord[0]) ** 2 + (turbine_coord[1] - substation_coord[1]) ** 2 ) substation_connections.append( (f"turbine_{turbine_idx}", "substation", distance) ) return cluster_connections + substation_connections, turbines # 3.5 带容量约束的扇区扫描算法 (Capacitated Sweep) - 基础版 def design_with_capacitated_sweep( turbines, substation, cable_specs=None, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR, ): """ 使用带容量约束的扇区扫描算法设计集电线路 (基础版:单次扫描) 原理: 1. 计算所有风机相对于升压站的角度。 2. 找到角度间隔最大的位置作为起始“切割线”,以避免切断密集的风机群。 3. 沿圆周方向扫描,贪婪地将风机加入当前回路,直到达到电缆容量上限。 4. 满载后开启新回路。 """ # 1. 获取电缆最大容量 max_mw = get_max_cable_capacity_mw( cable_specs, voltage=voltage, power_factor=power_factor ) substation_coord = substation[0] # 2. 计算角度 (使用 arctan2 返回 -pi 到 pi) work_df = turbines.copy() dx = work_df["x"] - substation_coord[0] dy = work_df["y"] - substation_coord[1] work_df["angle"] = np.arctan2(dy, dx) # 3. 寻找最佳起始角度 (最大角度间隙) work_df = work_df.sort_values("angle").reset_index(drop=True) angles = work_df["angle"].values n = len(angles) if n > 1: # 计算相邻角度差 diffs = np.diff(angles) # 计算首尾角度差 (跨越 ±pi 处) wrap_diff = (2 * np.pi) - (angles[-1] - angles[0]) diffs = np.append(diffs, wrap_diff) # 找到最大间隙的索引 max_gap_idx = np.argmax(diffs) # 旋转数组,使最大间隙成为新的起点 start_idx = (max_gap_idx + 1) % n if start_idx != 0: work_df = pd.concat( [work_df.iloc[start_idx:], work_df.iloc[:start_idx]] ).reset_index(drop=True) # 4. 贪婪分组 (Capacity Constrained Clustering) work_df["cluster"] = -1 cluster_id = 0 current_power = 0 current_indices = [] for i, row in work_df.iterrows(): p = row["power"] if len(current_indices) > 0 and (current_power + p > max_mw): work_df.loc[current_indices, "cluster"] = cluster_id cluster_id += 1 current_power = 0 current_indices = [] current_indices.append(i) current_power += p if current_indices: work_df.loc[current_indices, "cluster"] = cluster_id cluster_id += 1 # 建立 id -> cluster 的映射 id_to_cluster = dict(zip(work_df["id"], work_df["cluster"])) turbines["cluster"] = turbines["id"].map(id_to_cluster) # 5. 对每个簇内部进行MST连接 cluster_connections = [] substation_connections = [] n_clusters = cluster_id for cid in range(n_clusters): cluster_turbines = turbines[turbines["cluster"] == cid] if len(cluster_turbines) == 0: continue cluster_indices = cluster_turbines.index.tolist() coords = cluster_turbines[["x", "y"]].values if len(cluster_indices) > 1: dist_matrix = distance_matrix(coords, coords) mst = minimum_spanning_tree(dist_matrix).toarray() for i in range(len(cluster_indices)): for j in range(len(cluster_indices)): if mst[i, j] > 0: source = f"turbine_{cluster_indices[i]}" target = f"turbine_{cluster_indices[j]}" cluster_connections.append((source, target, mst[i, j])) # 连接到升压站 dists = np.sqrt( (cluster_turbines["x"] - substation_coord[0]) ** 2 + (cluster_turbines["y"] - substation_coord[1]) ** 2 ) closest_id = dists.idxmin() min_dist = dists.min() substation_connections.append((f"turbine_{closest_id}", "substation", min_dist)) return cluster_connections + substation_connections, turbines # 3.6 旋转扫描算法 (Rotational Sweep) - 优化版 def design_with_rotational_sweep( turbines, substation, cable_specs=None, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR, ): """ 使用带容量约束的扇区扫描算法设计集电线路 (优化版:旋转扫描) 原理: 1. 计算所有风机相对于升压站的角度并排序。 2. 遍历所有可能的起始角度(即尝试以每一台风机作为扫描的起点)。 3. 对每种起始角度,贪婪地将风机加入回路直到满载。 4. 对每种分组方案计算MST成本,选出总成本最低的方案。 """ # 1. 获取电缆最大容量 max_mw = get_max_cable_capacity_mw( cable_specs, voltage=voltage, power_factor=power_factor ) substation_coord = substation[0] # 2. 计算角度 (使用 arctan2 返回 -pi 到 pi) work_df = turbines.copy() dx = work_df["x"] - substation_coord[0] dy = work_df["y"] - substation_coord[1] work_df["angle"] = np.arctan2(dy, dx) # 按角度排序 work_df = work_df.sort_values("angle").reset_index(drop=True) n_turbines = len(work_df) best_cost = float("inf") best_connections = [] best_turbines_state = None best_start_idx = -1 best_id_to_cluster = {} # 遍历所有可能的起始点 for start_idx in range(n_turbines): # 构建当前旋转顺序的风机列表 if start_idx == 0: current_df = work_df.copy() else: current_df = pd.concat( [work_df.iloc[start_idx:], work_df.iloc[:start_idx]] ).reset_index(drop=True) # --- 贪婪分组 --- current_df["cluster"] = -1 cluster_id = 0 current_power = 0 current_indices_in_df = [] powers = current_df["power"].values for i in range(n_turbines): p = powers[i] if len(current_indices_in_df) > 0 and (current_power + p > max_mw): current_df.loc[current_indices_in_df, "cluster"] = cluster_id cluster_id += 1 current_power = 0 current_indices_in_df = [] current_indices_in_df.append(i) current_power += p if current_indices_in_df: current_df.loc[current_indices_in_df, "cluster"] = cluster_id cluster_id += 1 # --- 计算该分组方案的成本 --- current_total_length = 0 n_clusters = cluster_id for cid in range(n_clusters): cluster_rows = current_df[current_df["cluster"] == cid] if len(cluster_rows) == 0: continue # 1. 簇内 MST 长度 coords = cluster_rows[["x", "y"]].values if len(cluster_rows) > 1: dm = distance_matrix(coords, coords) mst = minimum_spanning_tree(dm).toarray() mst_len = mst.sum() current_total_length += mst_len # 2. 连接升压站长度 dists = np.sqrt( (cluster_rows["x"] - substation_coord[0]) ** 2 + (cluster_rows["y"] - substation_coord[1]) ** 2 ) current_total_length += dists.min() # --- 比较并保存最佳结果 --- if current_total_length < best_cost: best_cost = current_total_length best_start_idx = start_idx best_id_to_cluster = dict(zip(current_df["id"], current_df["cluster"])) # --- 根据最佳方案重新生成详细连接 --- turbines["cluster"] = turbines["id"].map(best_id_to_cluster) final_connections = [] unique_clusters = turbines["cluster"].unique() unique_clusters = [c for c in unique_clusters if not pd.isna(c) and c >= 0] for cid in unique_clusters: cluster_turbines = turbines[turbines["cluster"] == cid] if len(cluster_turbines) == 0: continue cluster_indices = cluster_turbines.index.tolist() coords = cluster_turbines[["x", "y"]].values if len(cluster_indices) > 1: dist_matrix_local = distance_matrix(coords, coords) mst = minimum_spanning_tree(dist_matrix_local).toarray() for i in range(len(cluster_indices)): for j in range(len(cluster_indices)): if mst[i, j] > 0: source = f"turbine_{cluster_indices[i]}" target = f"turbine_{cluster_indices[j]}" final_connections.append((source, target, mst[i, j])) dists = np.sqrt( (cluster_turbines["x"] - substation_coord[0]) ** 2 + (cluster_turbines["y"] - substation_coord[1]) ** 2 ) closest_idx_in_df = dists.idxmin() min_dist = dists.min() final_connections.append( (f"turbine_{closest_idx_in_df}", "substation", min_dist) ) return final_connections, turbines # 4. 获取电缆最大容量(MW) def get_max_cable_capacity_mw(cable_specs, voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR): """ 根据电缆规格计算最大承载功率 :param cable_specs: 电缆规格列表 list of tuples,或者直接是最大功率数值(MW) """ # 如果传入的已经是数值,直接返回 if isinstance(cable_specs, (int, float, np.number)): return float(cable_specs) # 兼容性检查:如果列表为空 if not cable_specs: print("Warning: 没有可用的电缆规格,使用默认最大容量 100MW") return 100.0 # 从所有电缆规格中找到最大的额定电流容量 max_current_capacity = max(spec[1] for spec in cable_specs) # 计算最大功率:P = √3 * U * I * cosφ # 这里假设降额系数为 1 (不降额) max_current = max_current_capacity * 1 max_power_w = np.sqrt(3) * voltage * max_current * power_factor # 将单位从 W 转换为 MW return max_power_w / 1e6 # 5. 计算集电线路方案成本 def evaluate_design( turbines, connections, substation, cable_specs=None, is_offshore=False, method_name="Unknown Method", voltage=VOLTAGE_LEVEL, power_factor=POWER_FACTOR, ): """评估设计方案的总成本和损耗""" total_cost = 0 total_loss = 0 # 创建连接图 graph = nx.Graph() graph.add_node("substation") for i, row in turbines.iterrows(): graph.add_node(f"turbine_{i}") # 添加边 for source, target, length in connections: graph.add_edge(source, target, weight=length) # 计算每台风机的累积功率(从叶到根) power_flow = defaultdict(float) # 按照后序遍历(从叶到根) nodes = list(nx.dfs_postorder_nodes(graph, "substation")) for node in nodes: if node == "substation": continue # 1. 如果节点是风机,先加上自身的功率 if node.startswith("turbine_"): turbine_id = int(node.split("_")[1]) power_flow[node] += turbines.loc[turbine_id, "power"] # 2. 找到上游节点(父节点)并将累积功率传递上去 try: # 找到通往升压站的最短路径上的下一个节点 path = nx.shortest_path(graph, source=node, target="substation") if len(path) > 1: # path[0]是node自己,path[1]是父节点 parent = path[1] power_flow[parent] += power_flow[node] except nx.NetworkXNoPath: pass # DEBUG: 打印最大功率流 (不含升压站本身) node_powers = [v for k, v in power_flow.items() if k != "substation"] max_power = max(node_powers) if node_powers else 0 print(f"DEBUG [{method_name}]: 最大线路功率 = {max_power:.2f} MW") # 计算成本和损耗 detailed_connections = [] for source, target, length in connections: # Determine vertical length (PlatformHeight) vertical_length = 0 if source.startswith("turbine_"): tid = int(source.split("_")[1]) vertical_length += turbines.loc[tid, "platform_height"] if target.startswith("turbine_"): tid = int(target.split("_")[1]) vertical_length += turbines.loc[tid, "platform_height"] # Calculate effective length with margin # Total Length = (Horizontal Distance + Vertical Up/Down) * 1.03 horizontal_length = length effective_length = (horizontal_length + vertical_length) * 1.03 # 确定该段线路承载的总功率 if source.startswith("turbine_") and target.startswith("turbine_"): # 风机间连接,取下游节点功率 if nx.shortest_path_length( graph, "substation", source ) < nx.shortest_path_length(graph, "substation", target): power = power_flow[target] else: power = power_flow[source] elif source == "substation": # 从升压站出发的连接 power = power_flow[target] elif target == "substation": # 连接到升压站 power = power_flow[source] else: power = 0 # 电缆选型 # 成本乘数:如果Excel中已包含敷设费用,则设为1.0 cost_multiplier = 1.0 if is_offshore else 1.0 # 默认电缆规格库 (如果未提供) if cable_specs is None: cable_specs_to_use = [ (35, 150, 0.524, 80), (70, 215, 0.268, 120), (95, 260, 0.193, 150), (120, 295, 0.153, 180), (150, 330, 0.124, 220), (185, 370, 0.0991, 270), (240, 425, 0.0754, 350), (300, 500, 0.0601, 450), (400, 580, 0.0470, 600), ] else: cable_specs_to_use = cable_specs # 估算电流 current = (power * 1e6) / (np.sqrt(3) * voltage * power_factor) # 选择满足载流量的最小电缆 selected_spec = None for spec in cable_specs_to_use: if current <= spec[1] * 1: # 100%负载率 selected_spec = spec break if selected_spec is None: selected_spec = cable_specs_to_use[-1] print( f"WARNING [{method_name}]: Current {current:.2f} A (Power: {power:.2f} MW) exceeds max cable capacity {selected_spec[1]} A!" ) resistance = selected_spec[2] * effective_length / 1000 # 电阻(Ω) cost = selected_spec[3] * effective_length * cost_multiplier # 电缆成本(含敷设) cable = { "cross_section": selected_spec[0], "current_capacity": selected_spec[1], "resistance": resistance, "cost": cost, "current": current, } # 记录详细信息 detailed_connections.append( { "source": source, "target": target, "horizontal_length": horizontal_length, "vertical_length": vertical_length, "length": effective_length, # effective length used for stats "power": power, "cable": cable, } ) # 累计成本 total_cost += cable["cost"] # 计算I²R损耗 (简化版) loss_w = (cable["current"] ** 2) * cable["resistance"] * 3 # 三相,单位:W loss_kw = loss_w / 1000 # 转换为 kW total_loss += loss_kw return { "total_cost": total_cost, "total_loss": total_loss, "num_connections": len(connections), "details": detailed_connections, } # 6.5 导出CAD图纸 (DXF格式) def export_to_dxf(turbines, substation, connections_details, filename): """ 将设计方案导出为DXF格式 (CAD通用格式) :param connections_details: evaluate_design返回的'details'列表 """ import ezdxf from ezdxf.enums import TextEntityAlignment doc = ezdxf.new(dxfversion="R2010") msp = doc.modelspace() # 1. 建立图层 doc.layers.add("Substation", color=1) # Red doc.layers.add("Turbines", color=5) # Blue # 动态确定电缆颜色 # 提取所有使用到的电缆截面 used_sections = sorted( list(set(conn["cable"]["cross_section"] for conn in connections_details)) ) # 定义颜色映射规则 (AutoCAD Color Index) # 1=Red, 2=Yellow, 3=Green, 4=Cyan, 5=Blue, 6=Magenta color_rank = [ 2, # 1st (Smallest): Yellow 3, # 2nd: Green 1, # 3rd: Red 5, # 4th: Blue 6, # 5th: Magenta ] default_color = 3 # Others: Green # 创建电缆图层 for i, section in enumerate(used_sections): if i < len(color_rank): c = color_rank[i] else: c = default_color layer_name = f"Cable_{section}mm" if layer_name not in doc.layers: doc.layers.add(layer_name, color=c) # 2. 绘制升压站 sx, sy = substation[0, 0], substation[0, 1] # 绘制一个矩形代表升压站 size = 50 msp.add_lwpolyline( [ (sx - size, sy - size), (sx + size, sy - size), (sx + size, sy + size), (sx - size, sy + size), (sx - size, sy - size), ], close=True, dxfattribs={"layer": "Substation"}, ) msp.add_text( "Substation", dxfattribs={"layer": "Substation", "height": 20} ).set_placement((sx, sy + size + 10), align=TextEntityAlignment.CENTER) # 3. 绘制风机 for i, row in turbines.iterrows(): tx, ty = row["x"], row["y"] # 绘制圆形代表风机 msp.add_circle((tx, ty), radius=15, dxfattribs={"layer": "Turbines"}) # 添加文字标签 label_id = str( int(row["original_id"]) if "original_id" in row else int(row["id"]) ) msp.add_text( label_id, dxfattribs={"layer": "Turbines", "height": 15} ).set_placement((tx, ty - 30), align=TextEntityAlignment.CENTER) # 4. 绘制连接电缆 for conn in connections_details: source, target = conn["source"], conn["target"] section = conn["cable"]["cross_section"] if source == "substation": p1 = (substation[0, 0], substation[0, 1]) else: tid = int(source.split("_")[1]) p1 = (turbines.loc[tid, "x"], turbines.loc[tid, "y"]) if target == "substation": p2 = (substation[0, 0], substation[0, 1]) else: tid = int(target.split("_")[1]) p2 = (turbines.loc[tid, "x"], turbines.loc[tid, "y"]) # 确定图层 layer_name = f"Cable_{section}mm" if layer_name not in doc.layers: doc.layers.add(layer_name) # 绘制二维多段线 msp.add_lwpolyline([p1, p2], dxfattribs={"layer": layer_name}) # 添加电缆型号文字(可选,在线的中点) # mid_x = (p1[0] + p2[0]) / 2 # mid_y = (p1[1] + p2[1]) / 2 # msp.add_text(f"{section}mm", dxfattribs={'layer': layer_name, 'height': 10}).set_pos((mid_x, mid_y), align='CENTER') try: doc.saveas(filename) print(f"成功导出DXF文件: {filename}") except Exception as e: print(f"导出DXF失败: {e}") # 6.6 导出Excel报表 def export_to_excel(connections_details, filename): """ 将设计方案详情导出为Excel文件 :param connections_details: evaluate_design返回的'details'列表 """ data = [] for conn in connections_details: data.append( { "Source": conn["source"], "Target": conn["target"], "Horizontal Length (m)": conn["horizontal_length"], "Vertical Length (m)": conn["vertical_length"], "Effective Length (m)": conn["length"], "Cable Type (mm²)": conn["cable"]["cross_section"], "Current (A)": conn["cable"]["current"], "Power (MW)": conn["power"], "Resistance (Ω)": conn["cable"]["resistance"], "Cost (¥)": conn["cable"]["cost"], } ) df = pd.DataFrame(data) # 汇总统计 n_circuits = sum(1 for conn in connections_details if conn["source"] == "substation" or conn["target"] == "substation") summary = { "Total Cost (¥)": df["Cost (¥)"].sum(), "Total Effective Length (m)": df["Effective Length (m)"].sum(), "Total Vertical Length (m)": df["Vertical Length (m)"].sum(), "Number of Circuits": n_circuits, } summary_df = pd.DataFrame([summary]) try: with pd.ExcelWriter(filename) as writer: df.to_excel(writer, sheet_name="Cable Schedule", index=False) summary_df.to_excel(writer, sheet_name="Summary", index=False) print(f"成功导出Excel文件: {filename}") except Exception as e: print(f"导出Excel失败: {e}") # 6.6 导出多方案对比Excel报表 def export_all_scenarios_to_excel(results, filename): """ 导出所有方案的对比结果到 Excel :param results: 包含各方案评估结果的列表 :param filename: 输出文件路径 """ try: # 使用 openpyxl 引擎以便后续写入单元格 with pd.ExcelWriter(filename, engine="openpyxl") as writer: # 1. 总览 Sheet summary_data = [] for res in results: # 获取回路数 (通过统计从升压站发出的连接) n_circuits = sum(1 for conn in res["eval"]["details"] if conn["source"] == "substation" or conn["target"] == "substation") summary_data.append( { "Scenario": res["name"], "Total Cost (¥)": res["cost"], "总费用(万元)": res.get("total_cost_npv", res["cost"]) / 10000, "Total Loss (kW)": res["loss"], "Num Circuits": n_circuits, # 计算电缆统计 "Total Cable Length (m)": sum( d["length"] for d in res["eval"]["details"] ), } ) pd.DataFrame(summary_data).to_excel( writer, sheet_name="Comparison Summary", index=False ) # 2. 每个方案的详细 Sheet for res in results: # 清理 Sheet 名称 safe_name = ( res["name"] .replace(":", "") .replace("/", "-") .replace("\\", "-") .replace(" ", "_") ) # 截断过长的名称 (Excel限制31字符) if len(safe_name) > 25: safe_name = safe_name[:25] details = res["eval"]["details"] data = [] for conn in details: data.append( { "Source": conn["source"], "Target": conn["target"], "Horizontal Length (m)": conn["horizontal_length"], "Vertical Length (m)": conn["vertical_length"], "Effective Length (m)": conn["length"], "Cable Type (mm²)": conn["cable"]["cross_section"], "Current (A)": conn["cable"]["current"], "Power (MW)": conn["power"], "Resistance (Ω)": conn["cable"]["resistance"], "Cost (¥)": conn["cable"]["cost"], } ) df = pd.DataFrame(data) # 从第 2 行开始写入数据(startrow=1,Excel中为第2行),留出第 1 行写标题 df.to_excel(writer, sheet_name=safe_name, index=False, startrow=1) # 在第一行写入方案名称 ws = writer.sheets[safe_name] ws.cell(row=1, column=1, value=f"Scenario: {res['name']}") print(f"成功导出包含所有方案的Excel文件: {filename}") except Exception as e: print(f"导出Excel失败: {e}") # 6. 可视化函数 def visualize_design( turbines, substation, connections, title, ax=None, show_costs=True ): """可视化集电线路设计方案""" if ax is None: fig, ax = plt.subplots(figsize=(10, 8)) # 绘制风机 ax.scatter( turbines["x"], turbines["y"], c="white", edgecolors="#333333", s=80, linewidth=1.0, zorder=3, label="Turbines", ) # 标记风机ID for i, row in turbines.iterrows(): # 显示原始ID(如果存在) label_id = int(row["original_id"]) if "original_id" in row else int(row["id"]) # 仅在风机数量较少时显示ID,避免密集恐惧 if len(turbines) < 50: ax.annotate( str(label_id), (row["x"], row["y"]), ha="center", va="center", fontsize=6, zorder=4, ) # 绘制升压站 ax.scatter( substation[0, 0], substation[0, 1], c="red", s=250, marker="s", zorder=5, label="Substation", ) # 颜色映射:使用鲜艳且区分度高的颜色 color_map = { 35: "#00FF00", # 亮绿 (Lime) - 最小 70: "#008000", # 深绿 (Green) 95: "#00FFFF", # 青色 (Cyan) 120: "#0000FF", # 纯蓝 (Blue) 150: "#800080", # 紫色 (Purple) 185: "#FF00FF", # 洋红 (Magenta) 240: "#FFA500", # 橙色 (Orange) 300: "#FF4500", # 红橙 (OrangeRed) 400: "#FF0000", # 纯红 (Red) - 最大 } # 统计电缆使用情况 cable_counts = defaultdict(int) # 绘制连接 # 判断传入的是简单连接列表还是详细连接列表 is_detailed = len(connections) > 0 and isinstance(connections[0], dict) # 收集图例句柄 legend_handles = {} for conn in connections: if is_detailed: source, target, length = conn["source"], conn["target"], conn["length"] section = conn["cable"]["cross_section"] # 统计 cable_counts[section] += 1 # 获取颜色和线宽 color = color_map.get(section, "black") # 线宽范围 1.0 ~ 3.5 linewidth = 1.0 + (section / 400.0) * 2.5 label_text = f"{section}mm² Cable" else: source, target, length = conn section = 0 color = "black" linewidth = 1.0 label_text = "Connection" # 获取坐标 if source == "substation": x1, y1 = substation[0, 0], substation[0, 1] else: turbine_id = int(source.split("_")[1]) x1, y1 = turbines.loc[turbine_id, "x"], turbines.loc[turbine_id, "y"] if target == "substation": x2, y2 = substation[0, 0], substation[0, 1] else: turbine_id = int(target.split("_")[1]) x2, y2 = turbines.loc[turbine_id, "x"], turbines.loc[turbine_id, "y"] # 绘制线路 (line,) = ax.plot( [x1, x2], [y1, y2], color=color, linewidth=linewidth, alpha=0.9, zorder=2 ) # 保存图例(去重) if is_detailed and section not in legend_handles: legend_handles[section] = line # 打印统计信息 # if is_detailed: # print(f"[{title.splitlines()[0]}] 电缆统计:") # for section in sorted(cable_counts.keys()): # print(f" {section}mm²: {cable_counts[section]} 条") # 设置图形属性 ax.set_title(title, fontsize=10) ax.set_xlabel("X (m)") ax.set_ylabel("Y (m)") # 构建图例 sorted_sections = sorted(legend_handles.keys()) handles = [legend_handles[s] for s in sorted_sections] labels = [f"{s}mm²" for s in sorted_sections] # 添加风机和升压站图例 handles.insert(0, ax.collections[0]) # Turbines labels.insert(0, "Turbines") handles.insert(1, ax.collections[1]) # Substation labels.insert(1, "Substation") ax.legend(handles, labels, loc="upper right", fontsize=8, framealpha=0.9) ax.grid(True, linestyle="--", alpha=0.3) ax.set_aspect("equal") return ax # 7. 主函数:比较两种设计方法 def compare_design_methods( excel_path=None, n_clusters_override=None, interactive=True, plot_results=True ): """ 比较MST和三种电缆方案下的K-means设计方法 :param excel_path: Excel文件路径 :param n_clusters_override: 可选,手动指定簇的数量 :param interactive: 是否启用交互式导出 (CLI模式) :param plot_results: 是否生成和保存对比图表 """ cable_specs = None system_params = {} if excel_path: print(f"正在从 {excel_path} 读取坐标数据...") try: turbines, substation, cable_specs, system_params = load_data_from_excel( excel_path ) scenario_title = "Offshore Wind Farm (Imported Data)" except Exception: print("回退到自动生成数据模式...") return compare_design_methods( excel_path=None, n_clusters_override=n_clusters_override, interactive=interactive, plot_results=plot_results, ) else: print("正在生成海上风电场数据 (规则阵列布局)...") turbines, substation = generate_wind_farm_data( n_turbines=30, layout="grid", spacing=800 ) scenario_title = "Offshore Wind Farm (Grid Layout)" is_offshore = True voltage = system_params.get("voltage", VOLTAGE_LEVEL) power_factor = system_params.get("power_factor", POWER_FACTOR) electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE) print( f"使用的系统参数: 电压={voltage} V, 功率因数={power_factor}, 电价={electricity_price} 元/kWh" ) # 准备三种电缆方案 # 原始 specs 是 5 元素元组: (section, capacity, resistance, cost, is_optional) # 下游函数期望 4 元素元组: (section, capacity, resistance, cost) has_optional_cables = False if cable_specs: # 检查是否存在 Optional 为 Y 的电缆 has_optional_cables = any(s[4] for s in cable_specs) # 方案 1: 不含 Optional='Y' (Standard) specs_1 = [s[:4] for s in cable_specs if not s[4]] # 方案 2: 含 Optional='Y' (All) specs_2 = [s[:4] for s in cable_specs] # 方案 3: 基于方案 1,删掉截面最大的一种 # cable_specs 已按 capacity 排序,假设 capacity 与 section 正相关 specs_3 = specs_1[:-1] if len(specs_1) > 1 else list(specs_1) else: # 默认电缆库 default_specs = [ (35, 150, 0.524, 80), (70, 215, 0.268, 120), (95, 260, 0.193, 150), (120, 295, 0.153, 180), (150, 330, 0.124, 220), (185, 370, 0.0991, 270), (240, 425, 0.0754, 350), (300, 500, 0.0601, 450), (400, 580, 0.0470, 600), ] specs_1 = default_specs specs_2 = default_specs specs_3 = default_specs[:-1] # 默认库视为没有 optional has_optional_cables = False scenarios = [("Scenario 1 (Standard)", specs_1)] if has_optional_cables: scenarios.append(("Scenario 2 (With Optional)", specs_2)) scenarios.append(("Scenario 3 (No Max)", specs_3)) else: # 重新编号,保证连续性 scenarios.append(("Scenario 2 (No Max)", specs_3)) # 1. MST 方法作为基准 (使用 Scenario 1) mst_connections = design_with_mst(turbines, substation) mst_evaluation = evaluate_design( turbines, mst_connections, substation, cable_specs=specs_1, is_offshore=is_offshore, method_name="MST Method", voltage=voltage, power_factor=power_factor, ) # 准备画布 2x2 fig = None axes = [] if plot_results: fig, axes = plt.subplots(2, 2, figsize=(20, 18)) axes = axes.flatten() # 绘制 MST visualize_design( turbines, substation, mst_evaluation["details"], f"MST Method (Standard Cables)\nTotal Cost: ¥{mst_evaluation['total_cost'] / 10000:.2f}万", ax=axes[0], ) print(f"\n===== 开始比较电缆方案 =====") best_cost = float("inf") best_result = None comparison_results = [] # 将 MST 结果也加入对比列表,方便查看 comparison_results.append( { "name": "MST Method", "cost": mst_evaluation["total_cost"], "loss": mst_evaluation["total_loss"], "eval": mst_evaluation, "turbines": turbines.copy(), # MST 不改变 turbines,但为了统一格式 "specs": specs_1, } ) for i, (name, current_specs) in enumerate(scenarios): print(f"\n--- {name} ---") if not current_specs: print(" 无可用电缆,跳过。") continue # 计算参数 total_power = turbines["power"].sum() max_cable_mw = get_max_cable_capacity_mw( cable_specs=current_specs, voltage=voltage, power_factor=power_factor ) # 确定簇数 (针对 Base 算法) if n_clusters_override is not None: n_clusters = n_clusters_override min_needed = int(np.ceil(total_power / max_cable_mw)) if n_clusters < min_needed: print( f" Warning: 指定簇数 {n_clusters} 小于理论最小需求 {min_needed}。" ) else: min_needed = int(np.ceil(total_power / max_cable_mw)) n_cable_types = len(current_specs) heuristic = int(np.ceil(len(turbines) / n_cable_types)) n_clusters = max(min_needed, heuristic) if n_clusters > len(turbines): n_clusters = len(turbines) print(f" 最大电缆容量: {max_cable_mw:.2f} MW") # --- Run 1: Base Sector Sweep --- base_name = f"{name} (Base)" conns_base, turbines_base = design_with_capacitated_sweep( turbines.copy(), substation, max_cable_mw, voltage=voltage ) eval_base = evaluate_design( turbines, conns_base, substation, cable_specs=current_specs, is_offshore=is_offshore, method_name=base_name, voltage=voltage, power_factor=power_factor, ) n_circuits_base = sum(1 for d in eval_base["details"] if d["source"] == "substation" or d["target"] == "substation") comparison_results.append( { "name": base_name, "cost": eval_base["total_cost"], "loss": eval_base["total_loss"], "eval": eval_base, "turbines": turbines_base, "specs": current_specs, } ) print( f" [Base] Cost: ¥{eval_base['total_cost']:,.2f} | Loss: {eval_base['total_loss']:.2f} kW | Circuits: {n_circuits_base}" ) # --- Run 2: Rotational Sweep (Optimization) --- rot_name = f"{name} (Rotational)" conns_rot, turbines_rot = design_with_rotational_sweep( turbines.copy(), substation, max_cable_mw, voltage=voltage ) eval_rot = evaluate_design( turbines, conns_rot, substation, cable_specs=current_specs, is_offshore=is_offshore, method_name=rot_name, voltage=voltage, power_factor=power_factor, ) n_circuits_rot = sum(1 for d in eval_rot["details"] if d["source"] == "substation" or d["target"] == "substation") comparison_results.append( { "name": rot_name, "cost": eval_rot["total_cost"], "loss": eval_rot["total_loss"], "eval": eval_rot, "turbines": turbines_rot, "specs": current_specs, } ) print( f" [Rotational] Cost: ¥{eval_rot['total_cost']:,.2f} | Loss: {eval_rot['total_loss']:.2f} kW | Circuits: {n_circuits_rot}" ) # --- Run 3: Esau-Williams Algorithm --- ew_name = f"{name} (Esau-Williams)" conns_ew, turbines_ew = design_with_esau_williams( turbines.copy(), substation, max_cable_mw ) eval_ew = evaluate_design( turbines, conns_ew, substation, cable_specs=current_specs, is_offshore=is_offshore, method_name=ew_name, voltage=voltage, power_factor=power_factor, ) n_circuits_ew = sum(1 for d in eval_ew["details"] if d["source"] == "substation" or d["target"] == "substation") comparison_results.append( { "name": ew_name, "cost": eval_ew["total_cost"], "loss": eval_ew["total_loss"], "eval": eval_ew, "turbines": turbines_ew, "specs": current_specs, } ) print( f" [Esau-Williams] Cost: ¥{eval_ew['total_cost']:,.2f} | Loss: {eval_ew['total_loss']:.2f} kW | Circuits: {n_circuits_ew}" ) # 记录最佳 if eval_rot["total_cost"] < best_cost: best_cost = eval_rot["total_cost"] if eval_ew["total_cost"] < best_cost: best_cost = eval_ew["total_cost"] # best_result 不再需要单独维护,最后遍历 comparison_results 即可 if eval_base["total_cost"] < best_cost: best_cost = eval_base["total_cost"] # 可视化 (只画 Base 版本) ax_idx = i + 1 if plot_results and ax_idx < 4: n_circuits = sum(1 for d in eval_base["details"] if d["source"] == "substation" or d["target"] == "substation") title = f"{base_name} ({n_circuits} circuits)\nCost: ¥{eval_base['total_cost'] / 10000:.2f}万" visualize_design( turbines_base, substation, eval_base["details"], title, ax=axes[ax_idx] ) if plot_results: plt.tight_layout() output_filename = "wind_farm_design_comparison.png" plt.savefig(output_filename, dpi=300) plt.close() print(f"\n比较图(Base版)已保存至: {output_filename}") # 准备文件路径 if excel_path: base_name = os.path.splitext(os.path.basename(excel_path))[0] dir_name = os.path.dirname(excel_path) dxf_filename = os.path.join(dir_name, f"{base_name}_design.dxf") excel_out_filename = os.path.join(dir_name, f"{base_name}_design.xlsx") else: dxf_filename = "wind_farm_design.dxf" excel_out_filename = "wind_farm_design.xlsx" # 导出所有方案到同一个 Excel if comparison_results: # 计算每个方案的总费用(净现值) comparison_results = total_investment(comparison_results, system_params) export_all_scenarios_to_excel(comparison_results, excel_out_filename) if not interactive: print(f"非交互模式:已自动导出 Excel 对比报表: {excel_out_filename}") return comparison_results # 交互式选择导出 DXF print("\n===== 方案选择 =====") best_idx = 0 for i, res in enumerate(comparison_results): if res["cost"] < comparison_results[best_idx]["cost"]: best_idx = i # 获取回路数 (通过统计从升压站发出的连接) n_circuits = sum(1 for conn in res["eval"]["details"] if conn["source"] == "substation" or conn["target"] == "substation") print(f" {i + 1}. {res['name']} - Cost: ¥{res['cost']:,.2f} | Circuits: {n_circuits}") print(f"推荐方案: {comparison_results[best_idx]['name']} (默认)") try: choice_str = input( f"请输入要导出DXF的方案编号 (1-{len(comparison_results)}),或输入 'A' 导出全部: " ).strip() if choice_str.upper() == "A": print("正在导出所有方案...") base_dxf_name, ext = os.path.splitext(dxf_filename) for res in comparison_results: # 生成文件名安全后缀 safe_suffix = ( res["name"] .replace(" ", "_") .replace(":", "") .replace("(", "") .replace(")", "") .replace("/", "-") ) current_filename = f"{base_dxf_name}_{safe_suffix}{ext}" print(f" 导出 '{res['name']}' -> {current_filename}") export_to_dxf( res["turbines"], substation, res["eval"]["details"], current_filename, ) else: if not choice_str: choice = best_idx else: choice = int(choice_str) - 1 if choice < 0 or choice >= len(comparison_results): print("输入编号无效,将使用默认推荐方案。") choice = best_idx selected_res = comparison_results[choice] # 生成带方案名称的文件名 base_dxf_name, ext = os.path.splitext(dxf_filename) safe_suffix = ( selected_res["name"] .replace(" ", "_") .replace(":", "") .replace("(", "") .replace(")", "") .replace("/", "-") ) final_filename = f"{base_dxf_name}_{safe_suffix}{ext}" print(f"正在导出 '{selected_res['name']}' 到 DXF: {final_filename} ...") export_to_dxf( selected_res["turbines"], substation, selected_res["eval"]["details"], final_filename, ) except Exception as e: print(f"输入处理出错: {e},将使用默认推荐方案。") selected_res = comparison_results[best_idx] # 生成带方案名称的文件名 base_dxf_name, ext = os.path.splitext(dxf_filename) safe_suffix = ( selected_res["name"] .replace(" ", "_") .replace(":", "") .replace("(", "") .replace(")", "") .replace("/", "-") ) final_filename = f"{base_dxf_name}_{safe_suffix}{ext}" print(f"正在导出 '{selected_res['name']}' 到 DXF: {final_filename} ...") export_to_dxf( selected_res["turbines"], substation, selected_res["eval"]["details"], final_filename, ) return comparison_results # total_investment这个函数计算每个比选方案的总投资 # 电缆费用按2年分期投资,采用输入的折现率。电费为电能损耗乘以电费,采用输入的折现率,并计算生命周期内的净现值,生命周期采用输入值。 # # 计算方法: # 1. 电缆投资:总投资按2年平均分配,第1年和第2年各支付50%,使用折现率计算净现值 # NPV_cable = (cost * 0.5) / (1 + r)^1 + (cost * 0.5) / (1 + r)^2 # 2. 电费损耗:年损耗费用 = 损耗功率(kW) * 8760小时 * 电价(元/kWh) # 生命周期内的净现值 = 年损耗费用 * [(1 - (1 + r)^(-n)) / r],其中n为运行期限 # 3. 总费用 = 电缆投资净现值 + 电费损耗净现值 # def total_investment(results, system_params): """ 计算每个比选方案的总费用(净现值) 参数: results: 比选结果列表,每个元素包含: - 'cost': 电缆总投资(元) - 'loss': 线损功率(kW) - 其他字段... system_params: 系统参数字典,包含: - 'discount_rate': 折现率(%) - 'electricity_price': 电价(元/kWh) - 'project_lifetime': 工程运行期限(年) - 'annual_loss_hours': 年损耗小时数(小时) 返回: 更新后的results列表,每个结果新增 'total_cost_npv' 字段(总费用净现值,元) """ # 获取系统参数,使用默认值 discount_rate_percent = system_params.get("discount_rate", DISCOUNT_RATE) electricity_price = system_params.get("electricity_price", ELECTRICITY_PRICE) project_lifetime = system_params.get("project_lifetime", PROJECT_LIFETIME) annual_loss_hours = system_params.get("annual_loss_hours", ANNUAL_LOSS_HOURS) # 将折现率转换为小数 r = discount_rate_percent / 100.0 for result in results: cable_cost = result["cost"] # 电缆总投资(元) loss_power = result["loss"] # 线损功率(kW) # 1. 计算电缆投资的净现值(2年分期) # 第1年支付50%,第2年支付50% npv_cable = (cable_cost * 0.5) / ((1 + r) ** 1) + (cable_cost * 0.5) / ( (1 + r) ** 2 ) # 2. 计算电费损耗的净现值(生命周期内) # 年损耗费用 = 损耗功率(kW) * 年损耗小时数 * 电价(元/kWh) annual_loss_cost = loss_power * annual_loss_hours * electricity_price # 年金现值系数 = [(1 - (1 + r)^(-n)) / r] if r > 0: annuity_factor = (1 - (1 + r) ** (-project_lifetime)) / r else: # 如果折现率为0,则净现值 = 年费用 * 年限 annuity_factor = project_lifetime npv_loss = annual_loss_cost * annuity_factor # 3. 计算总费用净现值 total_cost_npv = npv_cable + npv_loss # 将结果添加到字典中 result["total_cost_npv"] = total_cost_npv result["npv_cable"] = npv_cable result["npv_loss"] = npv_loss result["annual_loss_cost"] = annual_loss_cost return results # 8. 执行比较 if __name__ == "__main__": # 解析命令行参数 parser = argparse.ArgumentParser(description="Wind Farm Collector System Design") parser.add_argument( "--excel", help="Path to the Excel coordinates file", default=None ) parser.add_argument( "--clusters", type=int, help="Specify the number of clusters (circuits) manually", default=None, ) args = parser.parse_args() # 3. 运行比较 # 如果没有提供excel文件,将自动回退到生成数据模式 compare_design_methods( args.excel, n_clusters_override=args.clusters, interactive=True )