From 1b09e4299cf9c57e367fc75b8d7daaf58fd2ee1d Mon Sep 17 00:00:00 2001 From: dmy Date: Sat, 27 Dec 2025 20:40:18 +0800 Subject: [PATCH] =?UTF-8?q?=E4=BF=AE=E6=AD=A3=E6=B2=A1=E6=9C=89=E5=88=A9?= =?UTF-8?q?=E7=94=A8--excel=E5=8F=82=E6=95=B0=E7=9A=84bug=E3=80=82?= MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit --- .gitignore | 1 - .vscode/launch.json | 12 +- pyproject.toml | 8 + src/multi_scenario.py | 476 ++++++++++++++++-- src/storage_optimization.py | 66 ++- .../test_multi_scenario.py | 0 .../test_scenario_storage_optimization.py | 0 7 files changed, 511 insertions(+), 52 deletions(-) rename test_multi_scenario.py => tests/test_multi_scenario.py (100%) rename test_scenario_storage_optimization.py => tests/test_scenario_storage_optimization.py (100%) diff --git a/.gitignore b/.gitignore index 8208f43..5017448 100644 --- a/.gitignore +++ b/.gitignore @@ -8,6 +8,5 @@ wheels/ # Virtual environments .venv -test_* *.xls* *.png \ No newline at end of file diff --git a/.vscode/launch.json b/.vscode/launch.json index e86e2b2..a8f847c 100644 --- a/.vscode/launch.json +++ b/.vscode/launch.json @@ -9,9 +9,17 @@ "name": "Main", "type": "debugpy", "request": "launch", - "program": "main.py", + "program": "src\\main.py", "console": "integratedTerminal", - "args": "--excel .\\data_template_8760-in-use.xlsx" + "args": "--excel .\\templates\\data_template_8760-in-use2.xlsx" + }, + { + "name": "Multi_Scenarios", + "type": "debugpy", + "request": "launch", + "program": "src\\multi_scenario.py", + "console": "integratedTerminal", + "args": "--excel .\\templates\\data_template_8760-in-use2.xlsx" } ] } \ No newline at end of file diff --git a/pyproject.toml b/pyproject.toml index adb377f..44b3553 100644 --- a/pyproject.toml +++ b/pyproject.toml @@ -9,4 +9,12 @@ dependencies = [ "numpy>=1.19.0", "openpyxl>=3.1.5", "pandas>=2.3.3", + "pyinstaller>=6.17.0", + "scikit-learn>=1.8.0", + "seaborn>=0.13.2", +] + +[dependency-groups] +dev = [ + "mypy>=1.19.1", ] diff --git a/src/multi_scenario.py b/src/multi_scenario.py index c0a6757..f889ee5 100644 --- a/src/multi_scenario.py +++ b/src/multi_scenario.py @@ -9,17 +9,19 @@ """ import numpy as np -import pandas as pd +import pandas as pd # type: ignore[import-untyped] import matplotlib.pyplot as plt -from sklearn.cluster import KMeans -from sklearn.preprocessing import StandardScaler -from sklearn.metrics import silhouette_score -from typing import List, Dict, Tuple, Optional, Union +from sklearn.cluster import KMeans # type: ignore[import-untyped] +from sklearn.preprocessing import StandardScaler # type: ignore[import-untyped] +from sklearn.metrics import silhouette_score # type: ignore[import-untyped] +from typing import List, Dict, Tuple, Optional, Union, Any from storage_optimization import optimize_storage_capacity, SystemParameters from dataclasses import dataclass -import seaborn as sns +import seaborn as sns # type: ignore[import-untyped] from datetime import datetime, timedelta import warnings +import os +from excel_reader import read_excel_data, read_system_parameters, get_optimization_settings warnings.filterwarnings('ignore') # 设置中文字体 @@ -57,6 +59,147 @@ class MultiScenarioAnalyzer: self.kmeans: KMeans = KMeans(n_clusters=n_clusters, random_state=random_state, n_init=10) self.scenario_names: List[str] = [] + @staticmethod + def validate_excel_data(df: pd.DataFrame, data_type: str = "8760") -> bool: + """ + 验证Excel数据格式是否正确 + + Args: + df: pandas DataFrame对象 + data_type: 数据类型,"24"或"8760" + + Returns: + bool: 验证是否通过 + """ + expected_length = 8760 if data_type == "8760" else 24 + + # 检查行数 + if len(df) != expected_length: + print(f"错误:数据行数应为{expected_length},实际为{len(df)}") + return False + + # 检查必需的列(支持多种列名格式) + required_columns = [ + ['光伏出力(MW)', '风电出力(MW)', '负荷需求(MW)'], # 标准格式 + ['光伏', '风电', '负荷'], # 简化格式 + ['solar', 'wind', 'load'], # 英文格式 + ['Solar', 'Wind', 'Load'] # 英文首字母大写 + ] + + # 检查是否存在任何一组必需列 + columns_found = False + for columns in required_columns: + if all(col in df.columns for col in columns): + columns_found = True + break + + if not columns_found: + print(f"错误:缺少必需的列。支持的列名格式:{required_columns}") + return False + + # 检查数据类型和非负值 + # 找到实际使用的列名 + actual_columns = None + for columns in required_columns: + if all(col in df.columns for col in columns): + actual_columns = columns + break + + if actual_columns: + for col in actual_columns: + if not pd.api.types.is_numeric_dtype(df[col]): + print(f"错误:列'{col}'必须为数值类型") + return False + + if (df[col] < 0).any(): + print(f"错误:列'{col}'包含负值") + return False + + return True + + def read_excel_data(self, file_path: str, sheet_name: str = 0, include_parameters: bool = True) -> Dict[str, Any]: + """ + 从Excel文件读取数据用于多场景分析 + + Args: + file_path: Excel文件路径 + sheet_name: 工作表名称或索引,默认为第一个工作表 + include_parameters: 是否同时读取系统参数和经济参数 + + Returns: + 包含光伏、风电、负荷数据和可选参数的字典 + + Raises: + FileNotFoundError: 文件不存在 + ValueError: 数据格式错误 + """ + try: + # 使用 excel_reader 模块读取数据 + excel_data = read_excel_data(file_path, sheet_name, include_parameters) + + # 提取基本数据 + result = { + 'solar_output': excel_data['solar_output'], + 'wind_output': excel_data['wind_output'], + 'load_demand': excel_data['load_demand'], + 'data_type': excel_data['data_type'], + 'original_length': excel_data['original_length'] + } + + # 如果包含参数,则添加到结果中 + if include_parameters: + if 'system_parameters' in excel_data: + result['system_parameters'] = excel_data['system_parameters'] + if 'optimization_settings' in excel_data: + result['optimization_settings'] = excel_data['optimization_settings'] + + print(f"成功从Excel读取数据:") + print(f" - 数据类型:{result['data_type']}小时") + print(f" - 数据长度:{len(result['solar_output'])} 小时") + print(f" - 光伏出力范围:{min(result['solar_output']):.2f} - {max(result['solar_output']):.2f} MW") + print(f" - 风电出力范围:{min(result['wind_output']):.2f} - {max(result['wind_output']):.2f} MW") + print(f" - 负荷需求范围:{min(result['load_demand']):.2f} - {max(result['load_demand']):.2f} MW") + + if include_parameters and 'system_parameters' in result: + print(f" - 系统参数:已读取") + + return result + + except Exception as e: + raise ValueError(f"读取Excel文件失败:{str(e)}") + + def fit_predict_from_excel(self, file_path: str, sheet_name: str = 0, + find_optimal_k: bool = False, include_parameters: bool = True) -> Tuple[ScenarioResult, Optional[SystemParameters]]: + """ + 从Excel文件读取数据并执行多场景聚类分析 + + Args: + file_path: Excel文件路径 + sheet_name: 工作表名称或索引,默认为第一个工作表 + find_optimal_k: 是否自动寻找最优聚类数 + include_parameters: 是否同时读取系统参数 + + Returns: + 场景聚类结果,系统参数(可选) + """ + print("开始从Excel文件读取数据...") + + # 读取Excel数据 + excel_data = self.read_excel_data(file_path, sheet_name, include_parameters) + + # 执行多场景聚类分析 + result = self.fit_predict( + excel_data['solar_output'], + excel_data['wind_output'], + excel_data['load_demand'], + find_optimal_k=find_optimal_k + ) + + # 提取参数 + system_params = excel_data.get('system_parameters') if include_parameters else None + + return result, system_params + def prepare_multivariate_data(self, solar_output: List[float], wind_output: List[float], load_demand: List[float]) -> np.ndarray: @@ -216,11 +359,11 @@ class MultiScenarioAnalyzer: typical_load = load_demand[typical_day_start:end_hour] typical_days[f'scenario_{cluster_id}'] = { - 'day_start_hour': typical_day_start, - 'solar_profile': typical_solar, - 'wind_profile': typical_wind, - 'load_profile': typical_load, - 'day_of_year': (typical_day_start // 24) + 1 + 'day_start_hour': float(typical_day_start), + 'solar_profile': float(typical_solar[0]) if len(typical_solar) == 1 else typical_solar, # type: ignore + 'wind_profile': float(typical_wind[0]) if len(typical_wind) == 1 else typical_wind, # type: ignore + 'load_profile': float(typical_load[0]) if len(typical_load) == 1 else typical_load, # type: ignore + 'day_of_year': float((typical_day_start // 24) + 1) } return typical_days @@ -295,7 +438,7 @@ class MultiScenarioAnalyzer: def plot_scenario_analysis(self, result: ScenarioResult, solar_output: List[float], wind_output: List[float], load_demand: List[float], - save_path: str = None, show_plot: bool = False): + save_path: Optional[str] = None, show_plot: bool = False) -> None: """ 绘制场景分析图表 @@ -312,7 +455,7 @@ class MultiScenarioAnalyzer: # 1. 场景时间分布 ax1 = plt.subplot(3, 3, 1) hours = np.arange(len(result.cluster_labels)) - colors = plt.cm.Set3(np.linspace(0, 1, result.n_scenarios)) + colors = plt.cm.Set3(np.linspace(0, 1, result.n_scenarios)) # type: ignore[attr-defined] for i in range(result.n_scenarios): mask = result.cluster_labels == i @@ -432,8 +575,8 @@ class MultiScenarioAnalyzer: def optimize_storage_for_scenarios(self, result: ScenarioResult, solar_output: List[float], wind_output: List[float], load_demand: List[float], - system_params: SystemParameters = None, - safety_factor: float = 1.2) -> Dict[str, Dict[str, float]]: + system_params: Optional[SystemParameters] = None, + safety_factor: float = 1.2) -> Dict[str, Dict[str, Any]]: """ 对聚类后的场景进行储能配置优化 @@ -460,7 +603,7 @@ class MultiScenarioAnalyzer: print("开始对各场景进行储能配置优化...") - scenario_optimization_results = {} + scenario_optimization_results: Dict[Union[int, str], Dict[str, Any]] = {} weighted_storage_need = 0.0 max_storage_need = 0.0 @@ -549,7 +692,179 @@ class MultiScenarioAnalyzer: print(f"最大储能需求: {max_storage_need:.2f} MWh") print(f"推荐储能容量: {max_storage_need * safety_factor:.2f} MWh") - return scenario_optimization_results + # 转换为字符串键的字典以匹配返回类型 + result_dict: Dict[str, Dict[str, Any]] = {} + for key, value in scenario_optimization_results.items(): + result_dict[str(key)] = value + + return result_dict + + def optimize_storage_from_excel(self, file_path: str, sheet_name: str = 0, + find_optimal_k: bool = False, safety_factor: float = 1.2) -> Dict[str, Any]: + """ + 从Excel文件读取数据并执行多场景聚类分析和储能优化 + + Args: + file_path: Excel文件路径 + sheet_name: 工作表名称或索引,默认为第一个工作表 + find_optimal_k: 是否自动寻找最优聚类数 + safety_factor: 安全系数,用于极端场景配置 + + Returns: + 包含聚类结果、储能优化结果和参数的完整结果字典 + """ + print("开始从Excel文件读取数据并执行分析...") + + # 读取Excel数据并执行聚类分析 + result, system_params = self.fit_predict_from_excel( + file_path, sheet_name, find_optimal_k, include_parameters=True + ) + + # 重新读取数据以获取原始数据 + excel_data = self.read_excel_data(file_path, sheet_name, include_parameters=False) + + # 执行储能优化 + optimization_results = self.optimize_storage_for_scenarios( + result, + excel_data['solar_output'], + excel_data['wind_output'], + excel_data['load_demand'], + system_params=system_params, + safety_factor=safety_factor + ) + + # 返回完整结果 + return { + 'scenario_result': result, + 'optimization_results': optimization_results, + 'system_parameters': system_params, + 'data_info': { + 'data_type': excel_data['data_type'], + 'original_length': excel_data['original_length'], + 'data_length': len(excel_data['solar_output']) + } + } + """ + 对聚类后的场景进行储能配置优化 + + Args: + result: 聚类结果 + solar_output: 光伏出力曲线 (MW) + wind_output: 风电出力曲线 (MW) + load_demand: 负荷曲线 (MW) + system_params: 系统参数,默认为标准参数 + safety_factor: 安全系数,用于极端场景配置 + + Returns: + 各场景的储能优化结果 + """ + if system_params is None: + system_params = SystemParameters( + max_curtailment_wind=0.1, + max_curtailment_solar=0.1, + max_grid_ratio=0.2, + storage_efficiency=0.9, + discharge_rate=1.0, + charge_rate=1.0 + ) + + print("开始对各场景进行储能配置优化...") + + scenario_optimization_results: Dict[Union[int, str], Dict[str, Any]] = {} + weighted_storage_need = 0.0 + max_storage_need = 0.0 + + for scenario_id in range(result.n_scenarios): + print(f"\n优化场景 {scenario_id + 1}: {result.scenario_names[scenario_id]}") + + # 提取场景数据 + scenario_solar, scenario_wind, scenario_load, duration = self._extract_scenario_data( + solar_output, wind_output, load_demand, result.cluster_labels, scenario_id + ) + + if duration == 0: + print(f" 警告:场景 {scenario_id + 1} 没有数据点") + continue + + print(f" 场景持续时间: {duration} 小时") + + # 确保数据长度符合储能优化要求(24小时或8760小时) + if duration < 24: + # 短于24小时的数据扩展到24小时 + repeats = (24 // duration) + 1 + scenario_solar = (scenario_solar * repeats)[:24] + scenario_wind = (scenario_wind * repeats)[:24] + scenario_load = (scenario_load * repeats)[:24] + duration = 24 + print(f" 扩展为24小时数据进行优化") + elif duration == 8760: + # 如果已经是8760小时,保持不变 + print(f" 使用完整的年度数据进行优化") + else: + # 介于24和8760之间的情况,需要截断或扩展到24小时(更合理的处理) + if duration > 24: + scenario_solar = scenario_solar[:24] + scenario_wind = scenario_wind[:24] + scenario_load = scenario_load[:24] + duration = 24 + print(f" 截取24小时数据进行优化") + else: + # 如果介于两者之间,确保是24小时 + scenario_solar = scenario_solar[:duration] + scenario_wind = scenario_wind[:duration] + scenario_load = scenario_load[:duration] + + # 执行储能优化 + # 创建对应的热电输出(这里设置为0,表示纯电热系统) + thermal_output = [0.0] * duration + + optimization_result = optimize_storage_capacity( + scenario_solar, scenario_wind, thermal_output, scenario_load, system_params + ) + + # 存储结果 + scenario_optimization_results[scenario_id] = { + 'scenario_name': result.scenario_names[scenario_id], + 'duration': duration, + 'required_storage': optimization_result['required_storage_capacity'], + 'curtailment_wind': optimization_result['total_curtailment_wind_ratio'], + 'curtailment_solar': optimization_result['total_curtailment_solar_ratio'], + 'grid_ratio': optimization_result['total_grid_feed_in_ratio'], + 'frequency': result.scenario_frequencies[scenario_id], + 'energy_balance_check': optimization_result['energy_balance_check'] + } + + # 累计计算 + frequency = result.scenario_frequencies[scenario_id] + storage_need = optimization_result['required_storage_capacity'] + weighted_storage_need += storage_need * frequency + max_storage_need = max(max_storage_need, storage_need) + + print(f" 所需储能容量: {storage_need:.2f} MWh") + print(f" 弃风率: {optimization_result['total_curtailment_wind_ratio']:.3f}") + print(f" 弃光率: {optimization_result['total_curtailment_solar_ratio']:.3f}") + print(f" 上网比例: {optimization_result['total_grid_feed_in_ratio']:.3f}") + + # 添加汇总信息 + scenario_optimization_results['summary'] = { + 'weighted_average_storage': weighted_storage_need, + 'maximum_storage_need': max_storage_need, + 'recommended_capacity': max_storage_need * safety_factor, + 'safety_factor': safety_factor, + 'n_scenarios': result.n_scenarios + } + + print(f"\n储能配置优化完成!") + print(f"加权平均储能需求: {weighted_storage_need:.2f} MWh") + print(f"最大储能需求: {max_storage_need:.2f} MWh") + print(f"推荐储能容量: {max_storage_need * safety_factor:.2f} MWh") + + # 转换为字符串键的字典以匹配返回类型 + result_dict: Dict[str, Dict[str, Any]] = {} + for key, value in scenario_optimization_results.items(): + result_dict[str(key)] = value + + return result_dict def _extract_scenario_data(self, solar_output: List[float], wind_output: List[float], load_demand: List[float], cluster_labels: np.ndarray, @@ -575,7 +890,7 @@ class MultiScenarioAnalyzer: return scenario_solar, scenario_wind, scenario_load, duration - def print_storage_optimization_summary(self, optimization_results: Dict[str, Dict[str, float]]): + def print_storage_optimization_summary(self, optimization_results: Dict[str, Dict[str, Any]]) -> None: """ 打印储能配置优化汇总结果 @@ -591,7 +906,7 @@ class MultiScenarioAnalyzer: print("-" * 80) # 各场景数据 - n_scenarios = optimization_results['summary']['n_scenarios'] + n_scenarios = int(optimization_results['summary']['n_scenarios']) for scenario_id in range(n_scenarios): if str(scenario_id) in optimization_results: result = optimization_results[str(scenario_id)] @@ -622,7 +937,7 @@ class MultiScenarioAnalyzer: def export_scenario_results(self, result: ScenarioResult, solar_output: List[float], wind_output: List[float], load_demand: List[float], - filename: str = None) -> str: + filename: Optional[str] = None) -> str: """ 导出场景分析结果到Excel @@ -656,7 +971,7 @@ class MultiScenarioAnalyzer: }) # 场景统计汇总 - summary_data: List[Dict[str, str]] = [] + summary_data: List[Dict[str, Any]] = [] for i in range(result.n_scenarios): stats = result.scenario_stats[f'scenario_{i}'] summary_data.append({ @@ -674,18 +989,26 @@ class MultiScenarioAnalyzer: summary_df = pd.DataFrame(summary_data) # 典型日数据 - typical_data: List[Dict[str, float]] = [] + typical_data: List[Dict[str, Any]] = [] for i in range(result.n_scenarios): if f'scenario_{i}' in result.typical_days: typical = result.typical_days[f'scenario_{i}'] - for hour in range(len(typical['solar_profile'])): + solar_profile = typical['solar_profile'] + if isinstance(solar_profile, (list, tuple)): + profile_len = len(solar_profile) + else: + profile_len = 24 # 默认24小时 + for hour in range(profile_len): + solar_val = typical['solar_profile'][hour] if isinstance(typical['solar_profile'], (list, tuple)) else typical['solar_profile'] # type: ignore + wind_val = typical['wind_profile'][hour] if isinstance(typical['wind_profile'], (list, tuple)) else typical['wind_profile'] # type: ignore + load_val = typical['load_profile'][hour] if isinstance(typical['load_profile'], (list, tuple)) else typical['load_profile'] # type: ignore typical_data.append({ '场景': result.scenario_names[i], - '典型日': f"第{typical['day_of_year']}天", + '典型日': f"第{int(typical['day_of_year'])}天", '小时': hour + 1, - '光伏典型出力(MW)': typical['solar_profile'][hour], - '风电典型出力(MW)': typical['wind_profile'][hour], - '负荷典型需求(MW)': typical['load_profile'][hour] + '光伏典型出力(MW)': solar_val, + '风电典型出力(MW)': wind_val, + '负荷典型需求(MW)': load_val }) typical_df = pd.DataFrame(typical_data) @@ -786,6 +1109,101 @@ def demo_multi_scenario_analysis(): return result + + + if __name__ == "__main__": - # 运行演示 - demo_multi_scenario_analysis() \ No newline at end of file + import sys + + # 检查命令行参数 + if len(sys.argv) > 1 and sys.argv[1] == "--excel": + if len(sys.argv) > 2 and sys.argv[2]: + # 使用指定的Excel文件 + excel_file = sys.argv[2] + if not os.path.exists(excel_file): + print(f"错误: Excel文件 '{excel_file}' 不存在") + sys.exit(1) + + print(f"=== 使用指定Excel文件进行多场景分析 ===\n") + print(f"Excel文件: {excel_file}") + + # 使用MultiScenarioAnalyzer从Excel读取数据并分析 + analyzer = MultiScenarioAnalyzer(n_clusters=8) + + # 执行完整分析(聚类+储能优化) + complete_results = analyzer.optimize_storage_from_excel( + excel_file, + find_optimal_k=True, + safety_factor=1.2 + ) + + # 提取结果 + scenario_result = complete_results['scenario_result'] + optimization_results = complete_results['optimization_results'] + system_params = complete_results['system_parameters'] + data_info = complete_results['data_info'] + + # 输出聚类分析结果 + print(f"\n=== 聚类分析结果 ===") + print(f"数据类型: {data_info['data_type']}小时") + print(f"数据长度: {data_info['data_length']}小时") + print(f"识别场景数: {scenario_result.n_scenarios}") + print(f"轮廓系数: {scenario_result.silhouette_score:.3f}") + + # 输出系统参数 + if system_params: + print(f"\n=== 系统参数 ===") + print(f"最大弃风率: {system_params.max_curtailment_wind}") + print(f"最大弃光率: {system_params.max_curtailment_solar}") + print(f"最大上网电量比例: {system_params.max_grid_ratio}") + print(f"储能效率: {system_params.storage_efficiency}") + print(f"放电倍率: {system_params.discharge_rate}") + print(f"充电倍率: {system_params.charge_rate}") + if system_params.max_storage_capacity: + print(f"最大储能容量: {system_params.max_storage_capacity} MWh") + else: + print(f"最大储能容量: 无限制") + + # 输出储能优化结果 + print(f"\n=== 储能优化结果 ===") + analyzer.print_storage_optimization_summary(optimization_results) + + # 重新读取数据以生成图表 + excel_data = analyzer.read_excel_data(excel_file, include_parameters=False) + + # 生成分析图表 + print(f"\n生成场景分析图表...") + base_filename = os.path.splitext(os.path.basename(excel_file))[0] + analyzer.plot_scenario_analysis( + scenario_result, + excel_data['solar_output'], + excel_data['wind_output'], + excel_data['load_demand'], + save_path=f"images/{base_filename}_analysis.png" + ) + + # 导出场景分析结果 + print(f"\n导出分析结果...") + excel_file_output = analyzer.export_scenario_results( + scenario_result, + excel_data['solar_output'], + excel_data['wind_output'], + excel_data['load_demand'], + filename=f"{base_filename}_results.xlsx" + ) + + print(f"\n=== 分析完成 ===") + print(f"输入文件: {excel_file}") + print(f"分析图表: images/{base_filename}_analysis.png") + print(f"结果文件: {excel_file_output}") + else: + print("错误: 请指定Excel文件路径") + print("用法: python multi_scenario.py --excel <文件路径>") + sys.exit(1) + else: + # 运行基本演示 + demo_multi_scenario_analysis() + + print("\n" + "="*60) + print("提示: 使用 'python multi_scenario.py --excel <文件路径>' 可以分析指定的Excel文件") + print("="*60) \ No newline at end of file diff --git a/src/storage_optimization.py b/src/storage_optimization.py index 47037ea..77ce96d 100644 --- a/src/storage_optimization.py +++ b/src/storage_optimization.py @@ -10,7 +10,7 @@ import numpy as np import math -from typing import List, Dict, Tuple, Optional +from typing import List, Dict, Tuple, Optional, Any from dataclasses import dataclass @@ -351,7 +351,7 @@ def check_constraints( solar_output: List[float], wind_output: List[float], thermal_output: List[float], - balance_result: Dict[str, List[float]], + balance_result: Dict[str, Any], params: SystemParameters ) -> Dict[str, float]: """ @@ -373,9 +373,25 @@ def check_constraints( total_solar_potential = sum(solar_output) total_thermal = sum(thermal_output) - total_curtailed_wind = sum(balance_result['curtailed_wind']) - total_curtailed_solar = sum(balance_result['curtailed_solar']) - total_grid_feed_in = sum(balance_result['grid_feed_in']) + # 确保数据是列表类型 + curtailed_wind = balance_result['curtailed_wind'] + curtailed_solar = balance_result['curtailed_solar'] + grid_feed_in = balance_result['grid_feed_in'] + + if isinstance(curtailed_wind, (list, tuple)): + total_curtailed_wind = sum(curtailed_wind) + else: + total_curtailed_wind = float(curtailed_wind) if isinstance(curtailed_wind, (int, float)) else 0.0 + + if isinstance(curtailed_solar, (list, tuple)): + total_curtailed_solar = sum(curtailed_solar) + else: + total_curtailed_solar = float(curtailed_solar) if isinstance(curtailed_solar, (int, float)) else 0.0 + + if isinstance(grid_feed_in, (list, tuple)): + total_grid_feed_in = sum(grid_feed_in) + else: + total_grid_feed_in = float(grid_feed_in) if isinstance(grid_feed_in, (int, float)) else 0.0 # 实际发电量(考虑弃风弃光) actual_wind_generation = total_wind_potential - total_curtailed_wind @@ -545,7 +561,8 @@ def optimize_storage_capacity( base_curtailed = sum(base_result['curtailed_wind']) + sum(base_result['curtailed_solar']) best_curtailed = base_curtailed - best_result = {**base_result, **check_constraints(solar_output, wind_output, thermal_output, base_result, params)} + constraint_results = check_constraints(solar_output, wind_output, thermal_output, base_result, params) + best_result: Dict[str, Any] = {**base_result, **constraint_results} print(f"基准容量 {best_capacity:.2f} MWh 的弃电量: {best_curtailed:.2f} MWh") @@ -609,7 +626,7 @@ def optimize_storage_capacity( if curtailed < best_curtailed: best_curtailed = curtailed best_capacity = capacity - best_result = {**result, **constraint_results} + best_result = {**result, **constraint_results} # type: ignore print(f" 发现更优容量: {best_capacity} MWh, 弃电量: {best_curtailed:.2f} MWh") # 每处理10个容量值输出一次进度 @@ -647,7 +664,7 @@ def optimize_storage_capacity( adjusted_constraint_results = check_constraints(solar_output, wind_output, thermal_output, adjusted_result, params) # 更新结果 - best_result = {**adjusted_result, **adjusted_constraint_results} + best_result = {**adjusted_result, **adjusted_constraint_results} # type: ignore best_capacity = rounded_capacity best_curtailed = sum(adjusted_result['curtailed_wind']) + sum(adjusted_result['curtailed_solar']) @@ -658,11 +675,20 @@ def optimize_storage_capacity( # 添加能量平衡校验 total_generation = sum(thermal_output) + sum(wind_output) + sum(solar_output) total_consumption = sum(load_demand) - total_curtailed = sum(best_result['curtailed_wind']) + sum(best_result['curtailed_solar']) - total_grid = sum(best_result['grid_feed_in']) - total_charge = sum(best_result['charge_profile']) - total_discharge = sum(best_result['discharge_profile']) - storage_net_change = best_result['storage_profile'][-1] - best_result['storage_profile'][0] + + # 确保best_result中的数据是列表类型 + curtailed_wind_list = best_result['curtailed_wind'] if isinstance(best_result['curtailed_wind'], list) else list(best_result['curtailed_wind']) + curtailed_solar_list = best_result['curtailed_solar'] if isinstance(best_result['curtailed_solar'], list) else list(best_result['curtailed_solar']) + grid_feed_in_list = best_result['grid_feed_in'] if isinstance(best_result['grid_feed_in'], list) else list(best_result['grid_feed_in']) + charge_profile_list = best_result['charge_profile'] if isinstance(best_result['charge_profile'], list) else list(best_result['charge_profile']) + discharge_profile_list = best_result['discharge_profile'] if isinstance(best_result['discharge_profile'], list) else list(best_result['discharge_profile']) + storage_profile_list = best_result['storage_profile'] if isinstance(best_result['storage_profile'], list) else list(best_result['storage_profile']) + + total_curtailed = sum(curtailed_wind_list) + sum(curtailed_solar_list) + total_grid = sum(grid_feed_in_list) + total_charge = sum(charge_profile_list) + total_discharge = sum(discharge_profile_list) + storage_net_change = storage_profile_list[-1] - storage_profile_list[0] # 能量平衡校验:发电量 + 放电量/效率 = 负荷 + 充电量*效率 + 弃风弃光 + 上网电量 # 考虑储能充放电效率的能量平衡 @@ -695,14 +721,14 @@ def optimize_storage_capacity( print(f" SOC差值: {soc_initial_final_diff:.4f} MWh") # 最终结果 - result = { + result: Dict[str, Any] = { 'required_storage_capacity': best_capacity, - 'storage_profile': best_result['storage_profile'], - 'charge_profile': best_result['charge_profile'], - 'discharge_profile': best_result['discharge_profile'], - 'curtailed_wind': best_result['curtailed_wind'], - 'curtailed_solar': best_result['curtailed_solar'], - 'grid_feed_in': best_result['grid_feed_in'], + 'storage_profile': storage_profile_list, + 'charge_profile': charge_profile_list, + 'discharge_profile': discharge_profile_list, + 'curtailed_wind': curtailed_wind_list, + 'curtailed_solar': curtailed_solar_list, + 'grid_feed_in': grid_feed_in_list, 'total_curtailment_wind_ratio': best_result['total_curtailment_wind_ratio'], 'total_curtailment_solar_ratio': best_result['total_curtailment_solar_ratio'], 'total_grid_feed_in_ratio': best_result['total_grid_feed_in_ratio'], diff --git a/test_multi_scenario.py b/tests/test_multi_scenario.py similarity index 100% rename from test_multi_scenario.py rename to tests/test_multi_scenario.py diff --git a/test_scenario_storage_optimization.py b/tests/test_scenario_storage_optimization.py similarity index 100% rename from test_scenario_storage_optimization.py rename to tests/test_scenario_storage_optimization.py