Files
multi_energy_complementarity/storage_optimization.py

475 lines
20 KiB
Python
Raw Normal View History

2025-12-25 18:06:12 +08:00
"""
多能互补系统储能容量优化计算程序
该程序计算多能互补系统中所需的储能容量确保系统在24小时内电能平衡
同时满足用户定义的弃风弃光率和上网电量比例约束
作者: iFlow CLI
创建日期: 2025-12-25
"""
import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
@dataclass
class SystemParameters:
"""系统参数配置类"""
max_curtailment_wind: float = 0.1 # 最大允许弃风率 (0.0-1.0)
max_curtailment_solar: float = 0.1 # 最大允许弃光率 (0.0-1.0)
max_grid_ratio: float = 0.2 # 最大允许上网电量比例 (0.0-1.0)
storage_efficiency: float = 0.9 # 储能充放电效率 (0.0-1.0)
discharge_rate: float = 1.0 # 储能放电倍率 (C-rate)
charge_rate: float = 1.0 # 储能充电倍率 (C-rate)
2025-12-25 21:14:11 +08:00
max_storage_capacity: Optional[float] = None # 储能容量上限 (MWh)None表示无限制
2025-12-25 18:06:12 +08:00
def validate_inputs(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters
) -> None:
"""
验证输入数据的有效性
Args:
solar_output: 24小时光伏出力曲线 (MW)
wind_output: 24小时风电出力曲线 (MW)
thermal_output: 24小时火电出力曲线 (MW)
load_demand: 24小时负荷曲线 (MW)
params: 系统参数配置
Raises:
ValueError: 当输入数据无效时抛出异常
"""
# 检查数据长度支持24小时或8760小时
data_length = len(solar_output)
valid_lengths = [24, 8760]
if data_length not in valid_lengths:
raise ValueError(f"输入数据长度必须为24小时或8760小时当前长度为{data_length}")
if len(wind_output) != data_length or len(thermal_output) != data_length or len(load_demand) != data_length:
raise ValueError("所有输入数据长度必须一致")
# 检查数据类型和范围
for name, data in [
("光伏出力", solar_output), ("风电出力", wind_output),
("火电出力", thermal_output), ("负荷需求", load_demand)
]:
if not all(isinstance(x, (int, float)) for x in data):
raise ValueError(f"{name}必须包含数值数据")
if any(x < 0 for x in data):
raise ValueError(f"{name}不能包含负值")
# 检查参数范围
if not (0.0 <= params.max_curtailment_wind <= 1.0):
raise ValueError("弃风率必须在0.0-1.0之间")
if not (0.0 <= params.max_curtailment_solar <= 1.0):
raise ValueError("弃光率必须在0.0-1.0之间")
if not (-1.0 <= params.max_grid_ratio <= 1.0):
raise ValueError("上网电量比例必须在-1.0到1.0之间(负值表示购电)")
2025-12-25 18:06:12 +08:00
if not (0.0 < params.storage_efficiency <= 1.0):
raise ValueError("储能效率必须在0.0-1.0之间")
if params.discharge_rate <= 0 or params.charge_rate <= 0:
raise ValueError("充放电倍率必须大于0")
2025-12-25 21:14:11 +08:00
if params.max_storage_capacity is not None and params.max_storage_capacity <= 0:
raise ValueError("储能容量上限必须大于0")
2025-12-25 18:06:12 +08:00
def calculate_energy_balance(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters,
storage_capacity: float
) -> Dict[str, List[float]]:
"""
计算给定储能容量下的系统电能平衡
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
load_demand: 负荷曲线 (MW) - 支持24小时或8760小时
params: 系统参数配置
storage_capacity: 储能容量 (MWh)
Returns:
包含各种功率曲线的字典
"""
# 转换为numpy数组便于计算
solar = np.array(solar_output)
wind = np.array(wind_output)
thermal = np.array(thermal_output)
load = np.array(load_demand)
# 初始化输出数组
hours = len(solar_output)
storage_soc = np.zeros(hours) # 储能状态 (MWh)
charge_power = np.zeros(hours) # 充电功率 (MW)
discharge_power = np.zeros(hours) # 放电功率 (MW)
curtailed_wind = np.zeros(hours) # 弃风量 (MW)
curtailed_solar = np.zeros(hours) # 弃光量 (MW)
grid_feed_in = np.zeros(hours) # 上网电量 (MW)
# 计算总发电潜力
total_potential_wind = np.sum(wind)
total_potential_solar = np.sum(solar)
# 计算允许的最大弃风弃光量
max_curtailed_wind_total = total_potential_wind * params.max_curtailment_wind
max_curtailed_solar_total = total_potential_solar * params.max_curtailment_solar
# 初始化累计弃风弃光量
accumulated_curtailed_wind = 0.0
accumulated_curtailed_solar = 0.0
# 逐小时计算
for hour in range(hours):
# 确保储能状态不为负
storage_soc[hour] = max(0, storage_soc[hour])
# 可用发电量(未考虑弃风弃光)
available_generation = thermal[hour] + wind[hour] + solar[hour]
# 需求电量(负荷)
demand = load[hour]
# 计算功率平衡
power_surplus = available_generation - demand
if power_surplus > 0:
# 有盈余电力,优先储能,然后上网
max_charge = min(
storage_capacity - storage_soc[hour], # 储能空间限制
storage_capacity * params.charge_rate, # 充电功率限制
power_surplus # 可用盈余电力
)
# 实际充电功率
actual_charge = min(max_charge, power_surplus)
charge_power[hour] = actual_charge
# 更新储能状态(考虑充电效率)
if hour < hours - 1:
storage_soc[hour + 1] = storage_soc[hour] + actual_charge * params.storage_efficiency
# 剩余电力考虑弃风弃光和上网
remaining_surplus = power_surplus - actual_charge
# 计算弃风弃光(优先弃风,然后弃光)
if remaining_surplus > 0:
# 计算当前可弃风量
available_wind_curtail = min(
wind[hour],
max_curtailed_wind_total - accumulated_curtailed_wind
)
if available_wind_curtail > 0:
curtailed_wind[hour] = min(available_wind_curtail, remaining_surplus)
remaining_surplus -= curtailed_wind[hour]
accumulated_curtailed_wind += curtailed_wind[hour]
# 如果还有剩余,弃光
if remaining_surplus > 0:
available_solar_curtail = min(
solar[hour],
max_curtailed_solar_total - accumulated_curtailed_solar
)
if available_solar_curtail > 0:
curtailed_solar[hour] = min(available_solar_curtail, remaining_surplus)
remaining_surplus -= curtailed_solar[hour]
accumulated_curtailed_solar += curtailed_solar[hour]
# 最终剩余电力上网
grid_feed_in[hour] = max(0, remaining_surplus)
else:
# 电力不足,优先放电
power_deficit = -power_surplus
grid_feed_in[hour] = 0 # 初始化购电为0
2025-12-25 18:06:12 +08:00
max_discharge = min(
storage_soc[hour], # 储能状态限制
storage_capacity * params.discharge_rate, # 放电功率限制
power_deficit # 缺电功率
)
# 实际放电功率
actual_discharge = min(max_discharge, power_deficit)
discharge_power[hour] = actual_discharge
# 更新储能状态(考虑放电效率)
if hour < hours - 1:
storage_soc[hour + 1] = storage_soc[hour] - actual_discharge / params.storage_efficiency
# 计算剩余缺电,需要从电网购电
remaining_deficit = power_deficit - actual_discharge
# 如果还有缺电且允许购电,则从电网购电
if remaining_deficit > 0:
# 检查是否允许购电max_grid_ratio为负值
if params.max_grid_ratio < 0:
# 购电功率为负值,表示从电网输入
grid_feed_in[hour] = -remaining_deficit
else:
# 不允许购电,缺电部分无法满足
# 在实际系统中可能需要削减负荷
grid_feed_in[hour] = 0 # 不购电
2025-12-25 18:06:12 +08:00
return {
'storage_profile': storage_soc.tolist(),
'charge_profile': charge_power.tolist(),
'discharge_profile': discharge_power.tolist(),
'curtailed_wind': curtailed_wind.tolist(),
'curtailed_solar': curtailed_solar.tolist(),
'grid_feed_in': grid_feed_in.tolist()
}
def check_constraints(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
balance_result: Dict[str, List[float]],
params: SystemParameters
) -> Dict[str, float]:
"""
检查约束条件是否满足
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
balance_result: 电能平衡计算结果
params: 系统参数配置
Returns:
包含各约束实际比例的字典
"""
# 计算总量
total_wind_potential = sum(wind_output)
total_solar_potential = sum(solar_output)
total_thermal = sum(thermal_output)
total_curtailed_wind = sum(balance_result['curtailed_wind'])
total_curtailed_solar = sum(balance_result['curtailed_solar'])
total_grid_feed_in = sum(balance_result['grid_feed_in'])
# 实际发电量(考虑弃风弃光)
actual_wind_generation = total_wind_potential - total_curtailed_wind
actual_solar_generation = total_solar_potential - total_curtailed_solar
total_generation = total_thermal + actual_wind_generation + actual_solar_generation
# 计算比例
actual_curtailment_wind_ratio = total_curtailed_wind / total_wind_potential if total_wind_potential > 0 else 0
actual_curtailment_solar_ratio = total_curtailed_solar / total_solar_potential if total_solar_potential > 0 else 0
actual_grid_feed_in_ratio = total_grid_feed_in / total_generation if total_generation > 0 else 0
return {
'total_curtailment_wind_ratio': actual_curtailment_wind_ratio,
'total_curtailment_solar_ratio': actual_curtailment_solar_ratio,
'total_grid_feed_in_ratio': actual_grid_feed_in_ratio
}
def optimize_storage_capacity(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters,
max_iterations: int = 100,
tolerance: float = 0.01
) -> Dict:
"""
优化储能容量使用迭代方法寻找满足所有约束的最小储能容量
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
load_demand: 负荷曲线 (MW) - 支持24小时或8760小时
params: 系统参数配置
max_iterations: 最大迭代次数
tolerance: 收敛容差
Returns:
包含优化结果的字典
"""
# 验证输入
validate_inputs(solar_output, wind_output, thermal_output, load_demand, params)
# 初始化搜索范围
lower_bound = 0.0
2025-12-25 21:14:11 +08:00
theoretical_max = max(sum(solar_output) + sum(wind_output) + sum(thermal_output), sum(load_demand))
# 应用储能容量上限限制
if params.max_storage_capacity is not None:
upper_bound = min(theoretical_max, params.max_storage_capacity)
else:
upper_bound = theoretical_max
2025-12-25 18:06:12 +08:00
# 二分搜索寻找最小储能容量
best_capacity = upper_bound
best_result = None
2025-12-25 21:14:11 +08:00
solution_found = False # 标记是否找到可行解
2025-12-25 18:06:12 +08:00
for iteration in range(max_iterations):
mid_capacity = (lower_bound + upper_bound) / 2
# 计算当前容量下的平衡
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, mid_capacity
)
# 检查约束条件
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
# 检查是否满足所有约束
# 对于负的max_grid_ratio购电约束实际grid_feed_in_ratio应该大于等于约束值
grid_constraint_satisfied = (
constraint_results['total_grid_feed_in_ratio'] <= params.max_grid_ratio
if params.max_grid_ratio >= 0
else constraint_results['total_grid_feed_in_ratio'] >= params.max_grid_ratio
)
2025-12-25 18:06:12 +08:00
constraints_satisfied = (
constraint_results['total_curtailment_wind_ratio'] <= params.max_curtailment_wind and
constraint_results['total_curtailment_solar_ratio'] <= params.max_curtailment_solar and
grid_constraint_satisfied
2025-12-25 18:06:12 +08:00
)
# 检查储能日平衡(周期结束时储能状态应接近初始值)
storage_initial = balance_result['storage_profile'][0]
storage_final = balance_result['storage_profile'][-1]
daily_balance = abs(storage_final - storage_initial) < tolerance
if constraints_satisfied and daily_balance:
# 满足条件,尝试减小容量
best_capacity = mid_capacity
best_result = {**balance_result, **constraint_results}
2025-12-25 21:14:11 +08:00
solution_found = True
2025-12-25 18:06:12 +08:00
upper_bound = mid_capacity
else:
# 不满足条件,增大容量
lower_bound = mid_capacity
# 检查收敛
if upper_bound - lower_bound < tolerance:
break
2025-12-25 21:14:11 +08:00
# 处理储能容量上限限制的情况
if not solution_found and params.max_storage_capacity is not None:
print(f"警告:在储能容量上限 {params.max_storage_capacity:.2f} MWh 内无法找到满足所有约束的解")
print("使用最大允许容量进行计算,但某些约束条件可能无法满足")
# 使用最大允许容量计算结果
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, params.max_storage_capacity
)
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
best_result = {**balance_result, **constraint_results}
best_capacity = params.max_storage_capacity
elif best_result is None:
# 如果没有找到可行解(且没有容量上限限制),使用最大容量
2025-12-25 18:06:12 +08:00
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, upper_bound
)
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
best_result = {**balance_result, **constraint_results}
best_capacity = upper_bound
# 添加能量平衡校验
total_generation = sum(thermal_output) + sum(wind_output) + sum(solar_output)
total_consumption = sum(load_demand)
total_curtailed = sum(best_result['curtailed_wind']) + sum(best_result['curtailed_solar'])
total_grid = sum(best_result['grid_feed_in'])
total_charge = sum(best_result['charge_profile'])
total_discharge = sum(best_result['discharge_profile'])
storage_net_change = best_result['storage_profile'][-1] - best_result['storage_profile'][0]
# 能量平衡校验:发电量 + 放电量/效率 = 负荷 + 充电量*效率 + 弃风弃光 + 上网电量
# 考虑储能充放电效率的能量平衡
energy_from_storage = total_discharge / params.storage_efficiency # 储能提供的有效能量
energy_to_storage = total_charge * params.storage_efficiency # 储能消耗的电网能量
# 能量平衡校验应该接近0但允许一定误差
# 当total_grid为负时购电应该加到左侧供给侧
# 当total_grid为正时上网应该加到右侧需求侧
if total_grid < 0: # 购电情况
energy_balance_error = abs(
total_generation + energy_from_storage + abs(total_grid) - total_consumption - energy_to_storage - total_curtailed
)
else: # 上网情况
energy_balance_error = abs(
total_generation + energy_from_storage - total_consumption - energy_to_storage - total_curtailed - total_grid
)
2025-12-25 18:06:12 +08:00
# 使用更大的容差,考虑储能效率损失和数值误差
# 允许误差为总发电量的15%或10MW取较大者
# 储能效率损失可能达到总能量的10%以上
tolerance = max(10.0, total_generation * 0.15)
energy_balance_check = energy_balance_error < tolerance
# 返回最终结果
return {
'required_storage_capacity': best_capacity,
'storage_profile': best_result['storage_profile'],
'charge_profile': best_result['charge_profile'],
'discharge_profile': best_result['discharge_profile'],
'curtailed_wind': best_result['curtailed_wind'],
'curtailed_solar': best_result['curtailed_solar'],
'grid_feed_in': best_result['grid_feed_in'],
'total_curtailment_wind_ratio': best_result['total_curtailment_wind_ratio'],
'total_curtailment_solar_ratio': best_result['total_curtailment_solar_ratio'],
'total_grid_feed_in_ratio': best_result['total_grid_feed_in_ratio'],
2025-12-25 21:14:11 +08:00
'energy_balance_check': energy_balance_check,
'capacity_limit_reached': params.max_storage_capacity is not None and best_capacity >= params.max_storage_capacity,
'theoretical_optimal_capacity': best_capacity if solution_found else None,
'max_storage_limit': params.max_storage_capacity
2025-12-25 18:06:12 +08:00
}
def main():
"""主函数,提供示例使用"""
# 示例数据
solar_output = [0.0] * 6 + [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] * 2
wind_output = [2.0, 3.0, 4.0, 3.0, 2.0, 1.0] * 4
thermal_output = [5.0] * 24
load_demand = [3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 18.0, 20.0, 18.0,
16.0, 14.0, 12.0, 10.0, 8.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 2.0]
# 系统参数
params = SystemParameters(
max_curtailment_wind=0.1,
max_curtailment_solar=0.1,
max_grid_ratio=0.2,
storage_efficiency=0.9,
discharge_rate=1.0,
charge_rate=1.0
)
# 计算最优储能容量
result = optimize_storage_capacity(
solar_output, wind_output, thermal_output, load_demand, params
)
# 打印结果
print("多能互补系统储能容量优化结果:")
print(f"所需储能总容量: {result['required_storage_capacity']:.2f} MWh")
print(f"实际弃风率: {result['total_curtailment_wind_ratio']:.3f}")
print(f"实际弃光率: {result['total_curtailment_solar_ratio']:.3f}")
print(f"实际上网电量比例: {result['total_grid_feed_in_ratio']:.3f}")
print(f"能量平衡校验: {'通过' if result['energy_balance_check'] else '未通过'}")
return result
if __name__ == "__main__":
main()