修正没有利用--excel参数的bug。

This commit is contained in:
dmy
2025-12-27 20:40:18 +08:00
parent b55f083be8
commit 1b09e4299c
7 changed files with 511 additions and 52 deletions

1
.gitignore vendored
View File

@@ -8,6 +8,5 @@ wheels/
# Virtual environments
.venv
test_*
*.xls*
*.png

12
.vscode/launch.json vendored
View File

@@ -9,9 +9,17 @@
"name": "Main",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"program": "src\\main.py",
"console": "integratedTerminal",
"args": "--excel .\\data_template_8760-in-use.xlsx"
"args": "--excel .\\templates\\data_template_8760-in-use2.xlsx"
},
{
"name": "Multi_Scenarios",
"type": "debugpy",
"request": "launch",
"program": "src\\multi_scenario.py",
"console": "integratedTerminal",
"args": "--excel .\\templates\\data_template_8760-in-use2.xlsx"
}
]
}

View File

@@ -9,4 +9,12 @@ dependencies = [
"numpy>=1.19.0",
"openpyxl>=3.1.5",
"pandas>=2.3.3",
"pyinstaller>=6.17.0",
"scikit-learn>=1.8.0",
"seaborn>=0.13.2",
]
[dependency-groups]
dev = [
"mypy>=1.19.1",
]

View File

@@ -9,17 +9,19 @@
"""
import numpy as np
import pandas as pd
import pandas as pd # type: ignore[import-untyped]
import matplotlib.pyplot as plt
from sklearn.cluster import KMeans
from sklearn.preprocessing import StandardScaler
from sklearn.metrics import silhouette_score
from typing import List, Dict, Tuple, Optional, Union
from sklearn.cluster import KMeans # type: ignore[import-untyped]
from sklearn.preprocessing import StandardScaler # type: ignore[import-untyped]
from sklearn.metrics import silhouette_score # type: ignore[import-untyped]
from typing import List, Dict, Tuple, Optional, Union, Any
from storage_optimization import optimize_storage_capacity, SystemParameters
from dataclasses import dataclass
import seaborn as sns
import seaborn as sns # type: ignore[import-untyped]
from datetime import datetime, timedelta
import warnings
import os
from excel_reader import read_excel_data, read_system_parameters, get_optimization_settings
warnings.filterwarnings('ignore')
# 设置中文字体
@@ -57,6 +59,147 @@ class MultiScenarioAnalyzer:
self.kmeans: KMeans = KMeans(n_clusters=n_clusters, random_state=random_state, n_init=10)
self.scenario_names: List[str] = []
@staticmethod
def validate_excel_data(df: pd.DataFrame, data_type: str = "8760") -> bool:
"""
验证Excel数据格式是否正确
Args:
df: pandas DataFrame对象
data_type: 数据类型,"24""8760"
Returns:
bool: 验证是否通过
"""
expected_length = 8760 if data_type == "8760" else 24
# 检查行数
if len(df) != expected_length:
print(f"错误:数据行数应为{expected_length},实际为{len(df)}")
return False
# 检查必需的列(支持多种列名格式)
required_columns = [
['光伏出力(MW)', '风电出力(MW)', '负荷需求(MW)'], # 标准格式
['光伏', '风电', '负荷'], # 简化格式
['solar', 'wind', 'load'], # 英文格式
['Solar', 'Wind', 'Load'] # 英文首字母大写
]
# 检查是否存在任何一组必需列
columns_found = False
for columns in required_columns:
if all(col in df.columns for col in columns):
columns_found = True
break
if not columns_found:
print(f"错误:缺少必需的列。支持的列名格式:{required_columns}")
return False
# 检查数据类型和非负值
# 找到实际使用的列名
actual_columns = None
for columns in required_columns:
if all(col in df.columns for col in columns):
actual_columns = columns
break
if actual_columns:
for col in actual_columns:
if not pd.api.types.is_numeric_dtype(df[col]):
print(f"错误:列'{col}'必须为数值类型")
return False
if (df[col] < 0).any():
print(f"错误:列'{col}'包含负值")
return False
return True
def read_excel_data(self, file_path: str, sheet_name: str = 0, include_parameters: bool = True) -> Dict[str, Any]:
"""
从Excel文件读取数据用于多场景分析
Args:
file_path: Excel文件路径
sheet_name: 工作表名称或索引,默认为第一个工作表
include_parameters: 是否同时读取系统参数和经济参数
Returns:
包含光伏、风电、负荷数据和可选参数的字典
Raises:
FileNotFoundError: 文件不存在
ValueError: 数据格式错误
"""
try:
# 使用 excel_reader 模块读取数据
excel_data = read_excel_data(file_path, sheet_name, include_parameters)
# 提取基本数据
result = {
'solar_output': excel_data['solar_output'],
'wind_output': excel_data['wind_output'],
'load_demand': excel_data['load_demand'],
'data_type': excel_data['data_type'],
'original_length': excel_data['original_length']
}
# 如果包含参数,则添加到结果中
if include_parameters:
if 'system_parameters' in excel_data:
result['system_parameters'] = excel_data['system_parameters']
if 'optimization_settings' in excel_data:
result['optimization_settings'] = excel_data['optimization_settings']
print(f"成功从Excel读取数据")
print(f" - 数据类型:{result['data_type']}小时")
print(f" - 数据长度:{len(result['solar_output'])} 小时")
print(f" - 光伏出力范围:{min(result['solar_output']):.2f} - {max(result['solar_output']):.2f} MW")
print(f" - 风电出力范围:{min(result['wind_output']):.2f} - {max(result['wind_output']):.2f} MW")
print(f" - 负荷需求范围:{min(result['load_demand']):.2f} - {max(result['load_demand']):.2f} MW")
if include_parameters and 'system_parameters' in result:
print(f" - 系统参数:已读取")
return result
except Exception as e:
raise ValueError(f"读取Excel文件失败{str(e)}")
def fit_predict_from_excel(self, file_path: str, sheet_name: str = 0,
find_optimal_k: bool = False, include_parameters: bool = True) -> Tuple[ScenarioResult, Optional[SystemParameters]]:
"""
从Excel文件读取数据并执行多场景聚类分析
Args:
file_path: Excel文件路径
sheet_name: 工作表名称或索引,默认为第一个工作表
find_optimal_k: 是否自动寻找最优聚类数
include_parameters: 是否同时读取系统参数
Returns:
场景聚类结果,系统参数(可选)
"""
print("开始从Excel文件读取数据...")
# 读取Excel数据
excel_data = self.read_excel_data(file_path, sheet_name, include_parameters)
# 执行多场景聚类分析
result = self.fit_predict(
excel_data['solar_output'],
excel_data['wind_output'],
excel_data['load_demand'],
find_optimal_k=find_optimal_k
)
# 提取参数
system_params = excel_data.get('system_parameters') if include_parameters else None
return result, system_params
def prepare_multivariate_data(self, solar_output: List[float],
wind_output: List[float],
load_demand: List[float]) -> np.ndarray:
@@ -216,11 +359,11 @@ class MultiScenarioAnalyzer:
typical_load = load_demand[typical_day_start:end_hour]
typical_days[f'scenario_{cluster_id}'] = {
'day_start_hour': typical_day_start,
'solar_profile': typical_solar,
'wind_profile': typical_wind,
'load_profile': typical_load,
'day_of_year': (typical_day_start // 24) + 1
'day_start_hour': float(typical_day_start),
'solar_profile': float(typical_solar[0]) if len(typical_solar) == 1 else typical_solar, # type: ignore
'wind_profile': float(typical_wind[0]) if len(typical_wind) == 1 else typical_wind, # type: ignore
'load_profile': float(typical_load[0]) if len(typical_load) == 1 else typical_load, # type: ignore
'day_of_year': float((typical_day_start // 24) + 1)
}
return typical_days
@@ -295,7 +438,7 @@ class MultiScenarioAnalyzer:
def plot_scenario_analysis(self, result: ScenarioResult, solar_output: List[float],
wind_output: List[float], load_demand: List[float],
save_path: str = None, show_plot: bool = False):
save_path: Optional[str] = None, show_plot: bool = False) -> None:
"""
绘制场景分析图表
@@ -312,7 +455,7 @@ class MultiScenarioAnalyzer:
# 1. 场景时间分布
ax1 = plt.subplot(3, 3, 1)
hours = np.arange(len(result.cluster_labels))
colors = plt.cm.Set3(np.linspace(0, 1, result.n_scenarios))
colors = plt.cm.Set3(np.linspace(0, 1, result.n_scenarios)) # type: ignore[attr-defined]
for i in range(result.n_scenarios):
mask = result.cluster_labels == i
@@ -432,8 +575,8 @@ class MultiScenarioAnalyzer:
def optimize_storage_for_scenarios(self, result: ScenarioResult, solar_output: List[float],
wind_output: List[float], load_demand: List[float],
system_params: SystemParameters = None,
safety_factor: float = 1.2) -> Dict[str, Dict[str, float]]:
system_params: Optional[SystemParameters] = None,
safety_factor: float = 1.2) -> Dict[str, Dict[str, Any]]:
"""
对聚类后的场景进行储能配置优化
@@ -460,7 +603,7 @@ class MultiScenarioAnalyzer:
print("开始对各场景进行储能配置优化...")
scenario_optimization_results = {}
scenario_optimization_results: Dict[Union[int, str], Dict[str, Any]] = {}
weighted_storage_need = 0.0
max_storage_need = 0.0
@@ -549,7 +692,179 @@ class MultiScenarioAnalyzer:
print(f"最大储能需求: {max_storage_need:.2f} MWh")
print(f"推荐储能容量: {max_storage_need * safety_factor:.2f} MWh")
return scenario_optimization_results
# 转换为字符串键的字典以匹配返回类型
result_dict: Dict[str, Dict[str, Any]] = {}
for key, value in scenario_optimization_results.items():
result_dict[str(key)] = value
return result_dict
def optimize_storage_from_excel(self, file_path: str, sheet_name: str = 0,
find_optimal_k: bool = False, safety_factor: float = 1.2) -> Dict[str, Any]:
"""
从Excel文件读取数据并执行多场景聚类分析和储能优化
Args:
file_path: Excel文件路径
sheet_name: 工作表名称或索引,默认为第一个工作表
find_optimal_k: 是否自动寻找最优聚类数
safety_factor: 安全系数,用于极端场景配置
Returns:
包含聚类结果、储能优化结果和参数的完整结果字典
"""
print("开始从Excel文件读取数据并执行分析...")
# 读取Excel数据并执行聚类分析
result, system_params = self.fit_predict_from_excel(
file_path, sheet_name, find_optimal_k, include_parameters=True
)
# 重新读取数据以获取原始数据
excel_data = self.read_excel_data(file_path, sheet_name, include_parameters=False)
# 执行储能优化
optimization_results = self.optimize_storage_for_scenarios(
result,
excel_data['solar_output'],
excel_data['wind_output'],
excel_data['load_demand'],
system_params=system_params,
safety_factor=safety_factor
)
# 返回完整结果
return {
'scenario_result': result,
'optimization_results': optimization_results,
'system_parameters': system_params,
'data_info': {
'data_type': excel_data['data_type'],
'original_length': excel_data['original_length'],
'data_length': len(excel_data['solar_output'])
}
}
"""
对聚类后的场景进行储能配置优化
Args:
result: 聚类结果
solar_output: 光伏出力曲线 (MW)
wind_output: 风电出力曲线 (MW)
load_demand: 负荷曲线 (MW)
system_params: 系统参数,默认为标准参数
safety_factor: 安全系数,用于极端场景配置
Returns:
各场景的储能优化结果
"""
if system_params is None:
system_params = SystemParameters(
max_curtailment_wind=0.1,
max_curtailment_solar=0.1,
max_grid_ratio=0.2,
storage_efficiency=0.9,
discharge_rate=1.0,
charge_rate=1.0
)
print("开始对各场景进行储能配置优化...")
scenario_optimization_results: Dict[Union[int, str], Dict[str, Any]] = {}
weighted_storage_need = 0.0
max_storage_need = 0.0
for scenario_id in range(result.n_scenarios):
print(f"\n优化场景 {scenario_id + 1}: {result.scenario_names[scenario_id]}")
# 提取场景数据
scenario_solar, scenario_wind, scenario_load, duration = self._extract_scenario_data(
solar_output, wind_output, load_demand, result.cluster_labels, scenario_id
)
if duration == 0:
print(f" 警告:场景 {scenario_id + 1} 没有数据点")
continue
print(f" 场景持续时间: {duration} 小时")
# 确保数据长度符合储能优化要求24小时或8760小时
if duration < 24:
# 短于24小时的数据扩展到24小时
repeats = (24 // duration) + 1
scenario_solar = (scenario_solar * repeats)[:24]
scenario_wind = (scenario_wind * repeats)[:24]
scenario_load = (scenario_load * repeats)[:24]
duration = 24
print(f" 扩展为24小时数据进行优化")
elif duration == 8760:
# 如果已经是8760小时保持不变
print(f" 使用完整的年度数据进行优化")
else:
# 介于24和8760之间的情况需要截断或扩展到24小时更合理的处理
if duration > 24:
scenario_solar = scenario_solar[:24]
scenario_wind = scenario_wind[:24]
scenario_load = scenario_load[:24]
duration = 24
print(f" 截取24小时数据进行优化")
else:
# 如果介于两者之间确保是24小时
scenario_solar = scenario_solar[:duration]
scenario_wind = scenario_wind[:duration]
scenario_load = scenario_load[:duration]
# 执行储能优化
# 创建对应的热电输出这里设置为0表示纯电热系统
thermal_output = [0.0] * duration
optimization_result = optimize_storage_capacity(
scenario_solar, scenario_wind, thermal_output, scenario_load, system_params
)
# 存储结果
scenario_optimization_results[scenario_id] = {
'scenario_name': result.scenario_names[scenario_id],
'duration': duration,
'required_storage': optimization_result['required_storage_capacity'],
'curtailment_wind': optimization_result['total_curtailment_wind_ratio'],
'curtailment_solar': optimization_result['total_curtailment_solar_ratio'],
'grid_ratio': optimization_result['total_grid_feed_in_ratio'],
'frequency': result.scenario_frequencies[scenario_id],
'energy_balance_check': optimization_result['energy_balance_check']
}
# 累计计算
frequency = result.scenario_frequencies[scenario_id]
storage_need = optimization_result['required_storage_capacity']
weighted_storage_need += storage_need * frequency
max_storage_need = max(max_storage_need, storage_need)
print(f" 所需储能容量: {storage_need:.2f} MWh")
print(f" 弃风率: {optimization_result['total_curtailment_wind_ratio']:.3f}")
print(f" 弃光率: {optimization_result['total_curtailment_solar_ratio']:.3f}")
print(f" 上网比例: {optimization_result['total_grid_feed_in_ratio']:.3f}")
# 添加汇总信息
scenario_optimization_results['summary'] = {
'weighted_average_storage': weighted_storage_need,
'maximum_storage_need': max_storage_need,
'recommended_capacity': max_storage_need * safety_factor,
'safety_factor': safety_factor,
'n_scenarios': result.n_scenarios
}
print(f"\n储能配置优化完成!")
print(f"加权平均储能需求: {weighted_storage_need:.2f} MWh")
print(f"最大储能需求: {max_storage_need:.2f} MWh")
print(f"推荐储能容量: {max_storage_need * safety_factor:.2f} MWh")
# 转换为字符串键的字典以匹配返回类型
result_dict: Dict[str, Dict[str, Any]] = {}
for key, value in scenario_optimization_results.items():
result_dict[str(key)] = value
return result_dict
def _extract_scenario_data(self, solar_output: List[float], wind_output: List[float],
load_demand: List[float], cluster_labels: np.ndarray,
@@ -575,7 +890,7 @@ class MultiScenarioAnalyzer:
return scenario_solar, scenario_wind, scenario_load, duration
def print_storage_optimization_summary(self, optimization_results: Dict[str, Dict[str, float]]):
def print_storage_optimization_summary(self, optimization_results: Dict[str, Dict[str, Any]]) -> None:
"""
打印储能配置优化汇总结果
@@ -591,7 +906,7 @@ class MultiScenarioAnalyzer:
print("-" * 80)
# 各场景数据
n_scenarios = optimization_results['summary']['n_scenarios']
n_scenarios = int(optimization_results['summary']['n_scenarios'])
for scenario_id in range(n_scenarios):
if str(scenario_id) in optimization_results:
result = optimization_results[str(scenario_id)]
@@ -622,7 +937,7 @@ class MultiScenarioAnalyzer:
def export_scenario_results(self, result: ScenarioResult, solar_output: List[float],
wind_output: List[float], load_demand: List[float],
filename: str = None) -> str:
filename: Optional[str] = None) -> str:
"""
导出场景分析结果到Excel
@@ -656,7 +971,7 @@ class MultiScenarioAnalyzer:
})
# 场景统计汇总
summary_data: List[Dict[str, str]] = []
summary_data: List[Dict[str, Any]] = []
for i in range(result.n_scenarios):
stats = result.scenario_stats[f'scenario_{i}']
summary_data.append({
@@ -674,18 +989,26 @@ class MultiScenarioAnalyzer:
summary_df = pd.DataFrame(summary_data)
# 典型日数据
typical_data: List[Dict[str, float]] = []
typical_data: List[Dict[str, Any]] = []
for i in range(result.n_scenarios):
if f'scenario_{i}' in result.typical_days:
typical = result.typical_days[f'scenario_{i}']
for hour in range(len(typical['solar_profile'])):
solar_profile = typical['solar_profile']
if isinstance(solar_profile, (list, tuple)):
profile_len = len(solar_profile)
else:
profile_len = 24 # 默认24小时
for hour in range(profile_len):
solar_val = typical['solar_profile'][hour] if isinstance(typical['solar_profile'], (list, tuple)) else typical['solar_profile'] # type: ignore
wind_val = typical['wind_profile'][hour] if isinstance(typical['wind_profile'], (list, tuple)) else typical['wind_profile'] # type: ignore
load_val = typical['load_profile'][hour] if isinstance(typical['load_profile'], (list, tuple)) else typical['load_profile'] # type: ignore
typical_data.append({
'场景': result.scenario_names[i],
'典型日': f"{typical['day_of_year']}",
'典型日': f"{int(typical['day_of_year'])}",
'小时': hour + 1,
'光伏典型出力(MW)': typical['solar_profile'][hour],
'风电典型出力(MW)': typical['wind_profile'][hour],
'负荷典型需求(MW)': typical['load_profile'][hour]
'光伏典型出力(MW)': solar_val,
'风电典型出力(MW)': wind_val,
'负荷典型需求(MW)': load_val
})
typical_df = pd.DataFrame(typical_data)
@@ -786,6 +1109,101 @@ def demo_multi_scenario_analysis():
return result
if __name__ == "__main__":
# 运行演示
import sys
# 检查命令行参数
if len(sys.argv) > 1 and sys.argv[1] == "--excel":
if len(sys.argv) > 2 and sys.argv[2]:
# 使用指定的Excel文件
excel_file = sys.argv[2]
if not os.path.exists(excel_file):
print(f"错误: Excel文件 '{excel_file}' 不存在")
sys.exit(1)
print(f"=== 使用指定Excel文件进行多场景分析 ===\n")
print(f"Excel文件: {excel_file}")
# 使用MultiScenarioAnalyzer从Excel读取数据并分析
analyzer = MultiScenarioAnalyzer(n_clusters=8)
# 执行完整分析(聚类+储能优化)
complete_results = analyzer.optimize_storage_from_excel(
excel_file,
find_optimal_k=True,
safety_factor=1.2
)
# 提取结果
scenario_result = complete_results['scenario_result']
optimization_results = complete_results['optimization_results']
system_params = complete_results['system_parameters']
data_info = complete_results['data_info']
# 输出聚类分析结果
print(f"\n=== 聚类分析结果 ===")
print(f"数据类型: {data_info['data_type']}小时")
print(f"数据长度: {data_info['data_length']}小时")
print(f"识别场景数: {scenario_result.n_scenarios}")
print(f"轮廓系数: {scenario_result.silhouette_score:.3f}")
# 输出系统参数
if system_params:
print(f"\n=== 系统参数 ===")
print(f"最大弃风率: {system_params.max_curtailment_wind}")
print(f"最大弃光率: {system_params.max_curtailment_solar}")
print(f"最大上网电量比例: {system_params.max_grid_ratio}")
print(f"储能效率: {system_params.storage_efficiency}")
print(f"放电倍率: {system_params.discharge_rate}")
print(f"充电倍率: {system_params.charge_rate}")
if system_params.max_storage_capacity:
print(f"最大储能容量: {system_params.max_storage_capacity} MWh")
else:
print(f"最大储能容量: 无限制")
# 输出储能优化结果
print(f"\n=== 储能优化结果 ===")
analyzer.print_storage_optimization_summary(optimization_results)
# 重新读取数据以生成图表
excel_data = analyzer.read_excel_data(excel_file, include_parameters=False)
# 生成分析图表
print(f"\n生成场景分析图表...")
base_filename = os.path.splitext(os.path.basename(excel_file))[0]
analyzer.plot_scenario_analysis(
scenario_result,
excel_data['solar_output'],
excel_data['wind_output'],
excel_data['load_demand'],
save_path=f"images/{base_filename}_analysis.png"
)
# 导出场景分析结果
print(f"\n导出分析结果...")
excel_file_output = analyzer.export_scenario_results(
scenario_result,
excel_data['solar_output'],
excel_data['wind_output'],
excel_data['load_demand'],
filename=f"{base_filename}_results.xlsx"
)
print(f"\n=== 分析完成 ===")
print(f"输入文件: {excel_file}")
print(f"分析图表: images/{base_filename}_analysis.png")
print(f"结果文件: {excel_file_output}")
else:
print("错误: 请指定Excel文件路径")
print("用法: python multi_scenario.py --excel <文件路径>")
sys.exit(1)
else:
# 运行基本演示
demo_multi_scenario_analysis()
print("\n" + "="*60)
print("提示: 使用 'python multi_scenario.py --excel <文件路径>' 可以分析指定的Excel文件")
print("="*60)

View File

@@ -10,7 +10,7 @@
import numpy as np
import math
from typing import List, Dict, Tuple, Optional
from typing import List, Dict, Tuple, Optional, Any
from dataclasses import dataclass
@@ -351,7 +351,7 @@ def check_constraints(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
balance_result: Dict[str, List[float]],
balance_result: Dict[str, Any],
params: SystemParameters
) -> Dict[str, float]:
"""
@@ -373,9 +373,25 @@ def check_constraints(
total_solar_potential = sum(solar_output)
total_thermal = sum(thermal_output)
total_curtailed_wind = sum(balance_result['curtailed_wind'])
total_curtailed_solar = sum(balance_result['curtailed_solar'])
total_grid_feed_in = sum(balance_result['grid_feed_in'])
# 确保数据是列表类型
curtailed_wind = balance_result['curtailed_wind']
curtailed_solar = balance_result['curtailed_solar']
grid_feed_in = balance_result['grid_feed_in']
if isinstance(curtailed_wind, (list, tuple)):
total_curtailed_wind = sum(curtailed_wind)
else:
total_curtailed_wind = float(curtailed_wind) if isinstance(curtailed_wind, (int, float)) else 0.0
if isinstance(curtailed_solar, (list, tuple)):
total_curtailed_solar = sum(curtailed_solar)
else:
total_curtailed_solar = float(curtailed_solar) if isinstance(curtailed_solar, (int, float)) else 0.0
if isinstance(grid_feed_in, (list, tuple)):
total_grid_feed_in = sum(grid_feed_in)
else:
total_grid_feed_in = float(grid_feed_in) if isinstance(grid_feed_in, (int, float)) else 0.0
# 实际发电量(考虑弃风弃光)
actual_wind_generation = total_wind_potential - total_curtailed_wind
@@ -545,7 +561,8 @@ def optimize_storage_capacity(
base_curtailed = sum(base_result['curtailed_wind']) + sum(base_result['curtailed_solar'])
best_curtailed = base_curtailed
best_result = {**base_result, **check_constraints(solar_output, wind_output, thermal_output, base_result, params)}
constraint_results = check_constraints(solar_output, wind_output, thermal_output, base_result, params)
best_result: Dict[str, Any] = {**base_result, **constraint_results}
print(f"基准容量 {best_capacity:.2f} MWh 的弃电量: {best_curtailed:.2f} MWh")
@@ -609,7 +626,7 @@ def optimize_storage_capacity(
if curtailed < best_curtailed:
best_curtailed = curtailed
best_capacity = capacity
best_result = {**result, **constraint_results}
best_result = {**result, **constraint_results} # type: ignore
print(f" 发现更优容量: {best_capacity} MWh, 弃电量: {best_curtailed:.2f} MWh")
# 每处理10个容量值输出一次进度
@@ -647,7 +664,7 @@ def optimize_storage_capacity(
adjusted_constraint_results = check_constraints(solar_output, wind_output, thermal_output, adjusted_result, params)
# 更新结果
best_result = {**adjusted_result, **adjusted_constraint_results}
best_result = {**adjusted_result, **adjusted_constraint_results} # type: ignore
best_capacity = rounded_capacity
best_curtailed = sum(adjusted_result['curtailed_wind']) + sum(adjusted_result['curtailed_solar'])
@@ -658,11 +675,20 @@ def optimize_storage_capacity(
# 添加能量平衡校验
total_generation = sum(thermal_output) + sum(wind_output) + sum(solar_output)
total_consumption = sum(load_demand)
total_curtailed = sum(best_result['curtailed_wind']) + sum(best_result['curtailed_solar'])
total_grid = sum(best_result['grid_feed_in'])
total_charge = sum(best_result['charge_profile'])
total_discharge = sum(best_result['discharge_profile'])
storage_net_change = best_result['storage_profile'][-1] - best_result['storage_profile'][0]
# 确保best_result中的数据是列表类型
curtailed_wind_list = best_result['curtailed_wind'] if isinstance(best_result['curtailed_wind'], list) else list(best_result['curtailed_wind'])
curtailed_solar_list = best_result['curtailed_solar'] if isinstance(best_result['curtailed_solar'], list) else list(best_result['curtailed_solar'])
grid_feed_in_list = best_result['grid_feed_in'] if isinstance(best_result['grid_feed_in'], list) else list(best_result['grid_feed_in'])
charge_profile_list = best_result['charge_profile'] if isinstance(best_result['charge_profile'], list) else list(best_result['charge_profile'])
discharge_profile_list = best_result['discharge_profile'] if isinstance(best_result['discharge_profile'], list) else list(best_result['discharge_profile'])
storage_profile_list = best_result['storage_profile'] if isinstance(best_result['storage_profile'], list) else list(best_result['storage_profile'])
total_curtailed = sum(curtailed_wind_list) + sum(curtailed_solar_list)
total_grid = sum(grid_feed_in_list)
total_charge = sum(charge_profile_list)
total_discharge = sum(discharge_profile_list)
storage_net_change = storage_profile_list[-1] - storage_profile_list[0]
# 能量平衡校验:发电量 + 放电量/效率 = 负荷 + 充电量*效率 + 弃风弃光 + 上网电量
# 考虑储能充放电效率的能量平衡
@@ -695,14 +721,14 @@ def optimize_storage_capacity(
print(f" SOC差值: {soc_initial_final_diff:.4f} MWh")
# 最终结果
result = {
result: Dict[str, Any] = {
'required_storage_capacity': best_capacity,
'storage_profile': best_result['storage_profile'],
'charge_profile': best_result['charge_profile'],
'discharge_profile': best_result['discharge_profile'],
'curtailed_wind': best_result['curtailed_wind'],
'curtailed_solar': best_result['curtailed_solar'],
'grid_feed_in': best_result['grid_feed_in'],
'storage_profile': storage_profile_list,
'charge_profile': charge_profile_list,
'discharge_profile': discharge_profile_list,
'curtailed_wind': curtailed_wind_list,
'curtailed_solar': curtailed_solar_list,
'grid_feed_in': grid_feed_in_list,
'total_curtailment_wind_ratio': best_result['total_curtailment_wind_ratio'],
'total_curtailment_solar_ratio': best_result['total_curtailment_solar_ratio'],
'total_grid_feed_in_ratio': best_result['total_grid_feed_in_ratio'],