Files
multi_energy_complementarity/storage_optimization.py
2025-12-26 09:22:56 +08:00

511 lines
22 KiB
Python
Raw Blame History

This file contains ambiguous Unicode characters
This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.
"""
多能互补系统储能容量优化计算程序
该程序计算多能互补系统中所需的储能容量确保系统在24小时内电能平衡
同时满足用户定义的弃风弃光率和上网电量比例约束。
作者: iFlow CLI
创建日期: 2025-12-25
"""
import numpy as np
from typing import List, Dict, Tuple, Optional
from dataclasses import dataclass
@dataclass
class SystemParameters:
"""系统参数配置类"""
max_curtailment_wind: float = 0.1 # 最大允许弃风率 (0.0-1.0)
max_curtailment_solar: float = 0.1 # 最大允许弃光率 (0.0-1.0)
max_grid_ratio: float = 0.2 # 最大允许上网电量比例 (0.0-∞,只限制上网电量,不限制购电)
storage_efficiency: float = 0.9 # 储能充放电效率 (0.0-1.0)
discharge_rate: float = 1.0 # 储能放电倍率 (C-rate)
charge_rate: float = 1.0 # 储能充电倍率 (C-rate)
max_storage_capacity: Optional[float] = None # 储能容量上限 (MWh)None表示无限制
# 新增额定装机容量参数
rated_thermal_capacity: float = 100.0 # 额定火电装机容量 (MW)
rated_solar_capacity: float = 100.0 # 额定光伏装机容量 (MW)
rated_wind_capacity: float = 100.0 # 额定风电装机容量 (MW)
# 新增可用发电量参数
available_thermal_energy: float = 2400.0 # 火电可用发电量 (MWh)
available_solar_energy: float = 600.0 # 光伏可用发电量 (MWh)
available_wind_energy: float = 1200.0 # 风电可用发电量 (MWh)
def validate_inputs(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters
) -> None:
"""
验证输入数据的有效性
Args:
solar_output: 24小时光伏出力曲线 (MW)
wind_output: 24小时风电出力曲线 (MW)
thermal_output: 24小时火电出力曲线 (MW)
load_demand: 24小时负荷曲线 (MW)
params: 系统参数配置
Raises:
ValueError: 当输入数据无效时抛出异常
"""
# 检查数据长度支持24小时或8760小时
data_length = len(solar_output)
valid_lengths = [24, 8760]
if data_length not in valid_lengths:
raise ValueError(f"输入数据长度必须为24小时或8760小时当前长度为{data_length}")
if len(wind_output) != data_length or len(thermal_output) != data_length or len(load_demand) != data_length:
raise ValueError("所有输入数据长度必须一致")
# 检查数据类型和范围
for name, data in [
("光伏出力", solar_output), ("风电出力", wind_output),
("火电出力", thermal_output), ("负荷需求", load_demand)
]:
if not all(isinstance(x, (int, float)) for x in data):
raise ValueError(f"{name}必须包含数值数据")
if any(x < 0 for x in data):
raise ValueError(f"{name}不能包含负值")
# 检查参数范围
if not (0.0 <= params.max_curtailment_wind <= 1.0):
raise ValueError("弃风率必须在0.0-1.0之间")
if not (0.0 <= params.max_curtailment_solar <= 1.0):
raise ValueError("弃光率必须在0.0-1.0之间")
# max_grid_ratio只限制上网电量比例必须为非负值
if not (0.0 <= params.max_grid_ratio):
raise ValueError("上网电量比例限制必须为非负值")
if not (0.0 < params.storage_efficiency <= 1.0):
raise ValueError("储能效率必须在0.0-1.0之间")
if params.discharge_rate <= 0 or params.charge_rate <= 0:
raise ValueError("充放电倍率必须大于0")
if params.max_storage_capacity is not None and params.max_storage_capacity <= 0:
raise ValueError("储能容量上限必须大于0")
# 验证新增的额定装机容量参数
if params.rated_thermal_capacity < 0:
raise ValueError("额定火电装机容量必须为非负值")
if params.rated_solar_capacity <= 0:
raise ValueError("额定光伏装机容量必须大于0")
if params.rated_wind_capacity <= 0:
raise ValueError("额定风电装机容量必须大于0")
# 验证新增的可用发电量参数
if params.available_thermal_energy < 0:
raise ValueError("火电可用发电量必须为非负值")
if params.available_solar_energy < 0:
raise ValueError("光伏可用发电量必须为非负值")
if params.available_wind_energy < 0:
raise ValueError("风电可用发电量必须为非负值")
def calculate_energy_balance(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters,
storage_capacity: float
) -> Dict[str, List[float]]:
"""
计算给定储能容量下的系统电能平衡
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
load_demand: 负荷曲线 (MW) - 支持24小时或8760小时
params: 系统参数配置
storage_capacity: 储能容量 (MWh)
Returns:
包含各种功率曲线的字典
"""
# 转换为numpy数组便于计算
solar = np.array(solar_output)
wind = np.array(wind_output)
thermal = np.array(thermal_output)
load = np.array(load_demand)
# 初始化输出数组
hours = len(solar_output)
storage_soc = np.zeros(hours) # 储能状态 (MWh)
charge_power = np.zeros(hours) # 充电功率 (MW)
discharge_power = np.zeros(hours) # 放电功率 (MW)
curtailed_wind = np.zeros(hours) # 弃风量 (MW)
curtailed_solar = np.zeros(hours) # 弃光量 (MW)
grid_feed_in = np.zeros(hours) # 上网电量 (MW)
# 计算总发电潜力
total_potential_wind = np.sum(wind)
total_potential_solar = np.sum(solar)
# 计算允许的最大弃风弃光量
max_curtailed_wind_total = total_potential_wind * params.max_curtailment_wind
max_curtailed_solar_total = total_potential_solar * params.max_curtailment_solar
# 初始化累计弃风弃光量
accumulated_curtailed_wind = 0.0
accumulated_curtailed_solar = 0.0
# 计算总可用发电量上限(不考虑火电)
total_available_energy = params.available_solar_energy + params.available_wind_energy
max_total_grid_feed_in = total_available_energy * params.max_grid_ratio
# 初始化累计上网电量
cumulative_grid_feed_in = 0.0
# 逐小时计算
for hour in range(hours):
# 确保储能状态不为负
storage_soc[hour] = max(0, storage_soc[hour])
# 可用发电量(未考虑弃风弃光)
available_generation = thermal[hour] + wind[hour] + solar[hour]
# 需求电量(负荷)
demand = load[hour]
# 计算功率平衡
power_surplus = available_generation - demand
if power_surplus > 0:
# 有盈余电力,优先储能
max_charge = min(
storage_capacity - storage_soc[hour], # 储能空间限制
storage_capacity * params.charge_rate, # 充电功率限制
power_surplus # 可用盈余电力
)
# 实际充电功率
actual_charge = min(max_charge, power_surplus)
charge_power[hour] = actual_charge
# 更新储能状态(考虑充电效率)
if hour < hours - 1:
storage_soc[hour + 1] = storage_soc[hour] + actual_charge * params.storage_efficiency
# 剩余电力优先上网,超出上网电量比例限制时才弃风弃光
remaining_surplus = power_surplus - actual_charge
# 计算当前允许的最大上网电量
# 基于总可用发电量和已累计上网电量
remaining_grid_quota = max_total_grid_feed_in - cumulative_grid_feed_in
# 优先上网,但不超过剩余配额
grid_feed_allowed = min(remaining_surplus, max(0, remaining_grid_quota))
grid_feed_in[hour] = grid_feed_allowed
cumulative_grid_feed_in += grid_feed_allowed
# 剩余电力考虑弃风弃光
remaining_surplus -= grid_feed_allowed
# 计算弃风弃光(优先弃光,然后弃风)
if remaining_surplus > 0:
# 计算当前可弃光量
available_solar_curtail = min(
solar[hour],
max_curtailed_solar_total - accumulated_curtailed_solar
)
if available_solar_curtail > 0:
curtailed_solar[hour] = min(available_solar_curtail, remaining_surplus)
remaining_surplus -= curtailed_solar[hour]
accumulated_curtailed_solar += curtailed_solar[hour]
# 如果还有剩余,弃风
if remaining_surplus > 0:
available_wind_curtail = min(
wind[hour],
max_curtailed_wind_total - accumulated_curtailed_wind
)
if available_wind_curtail > 0:
curtailed_wind[hour] = min(available_wind_curtail, remaining_surplus)
remaining_surplus -= curtailed_wind[hour]
accumulated_curtailed_wind += curtailed_wind[hour]
else:
# 电力不足,优先放电
power_deficit = -power_surplus
grid_feed_in[hour] = 0 # 初始化购电为0
max_discharge = min(
storage_soc[hour], # 储能状态限制
storage_capacity * params.discharge_rate, # 放电功率限制
power_deficit # 缺电功率
)
# 实际放电功率
actual_discharge = min(max_discharge, power_deficit)
discharge_power[hour] = actual_discharge
# 更新储能状态(考虑放电效率)
if hour < hours - 1:
storage_soc[hour + 1] = storage_soc[hour] - actual_discharge / params.storage_efficiency
# 计算剩余缺电,需要从电网购电
remaining_deficit = power_deficit - actual_discharge
# 如果还有缺电,从电网购电
if remaining_deficit > 0:
# 购电功率为负值,表示从电网输入
grid_feed_in[hour] = -remaining_deficit
return {
'storage_profile': storage_soc.tolist(),
'charge_profile': charge_power.tolist(),
'discharge_profile': discharge_power.tolist(),
'curtailed_wind': curtailed_wind.tolist(),
'curtailed_solar': curtailed_solar.tolist(),
'grid_feed_in': grid_feed_in.tolist()
}
def check_constraints(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
balance_result: Dict[str, List[float]],
params: SystemParameters
) -> Dict[str, float]:
"""
检查约束条件是否满足
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
balance_result: 电能平衡计算结果
params: 系统参数配置
Returns:
包含各约束实际比例的字典
"""
# 计算总量
total_wind_potential = sum(wind_output)
total_solar_potential = sum(solar_output)
total_thermal = sum(thermal_output)
total_curtailed_wind = sum(balance_result['curtailed_wind'])
total_curtailed_solar = sum(balance_result['curtailed_solar'])
total_grid_feed_in = sum(balance_result['grid_feed_in'])
# 实际发电量(考虑弃风弃光)
actual_wind_generation = total_wind_potential - total_curtailed_wind
actual_solar_generation = total_solar_potential - total_curtailed_solar
total_generation = total_thermal + actual_wind_generation + actual_solar_generation
# 计算比例
actual_curtailment_wind_ratio = total_curtailed_wind / total_wind_potential if total_wind_potential > 0 else 0
actual_curtailment_solar_ratio = total_curtailed_solar / total_solar_potential if total_solar_potential > 0 else 0
actual_grid_feed_in_ratio = total_grid_feed_in / total_generation if total_generation > 0 else 0
return {
'total_curtailment_wind_ratio': actual_curtailment_wind_ratio,
'total_curtailment_solar_ratio': actual_curtailment_solar_ratio,
'total_grid_feed_in_ratio': actual_grid_feed_in_ratio
}
def optimize_storage_capacity(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters,
max_iterations: int = 100,
tolerance: float = 0.01
) -> Dict:
"""
优化储能容量,使用迭代方法寻找满足所有约束的最小储能容量
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
load_demand: 负荷曲线 (MW) - 支持24小时或8760小时
params: 系统参数配置
max_iterations: 最大迭代次数
tolerance: 收敛容差
Returns:
包含优化结果的字典
"""
# 验证输入
validate_inputs(solar_output, wind_output, thermal_output, load_demand, params)
# 初始化搜索范围
lower_bound = 0.0
theoretical_max = max(sum(solar_output) + sum(wind_output) + sum(thermal_output), sum(load_demand))
# 应用储能容量上限限制
if params.max_storage_capacity is not None:
upper_bound = min(theoretical_max, params.max_storage_capacity)
else:
upper_bound = theoretical_max
# 二分搜索寻找最小储能容量
best_capacity = upper_bound
best_result = None
solution_found = False # 标记是否找到可行解
for iteration in range(max_iterations):
mid_capacity = (lower_bound + upper_bound) / 2
# 计算当前容量下的平衡
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, mid_capacity
)
# 检查约束条件
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
# 检查是否满足所有约束
# max_grid_ratio只限制上网电量比例不约束购电
# 只有当grid_feed_in为正时上网才需要检查约束
total_grid_feed_in = sum(balance_result['grid_feed_in'])
if total_grid_feed_in > 0:
# 有上网电量,检查是否超过限制
grid_constraint_satisfied = constraint_results['total_grid_feed_in_ratio'] <= params.max_grid_ratio
else:
# 没有上网电量或为负值(购电),总是满足约束
grid_constraint_satisfied = True
constraints_satisfied = (
constraint_results['total_curtailment_wind_ratio'] <= params.max_curtailment_wind and
constraint_results['total_curtailment_solar_ratio'] <= params.max_curtailment_solar and
grid_constraint_satisfied
)
# 检查储能日平衡(周期结束时储能状态应接近初始值)
storage_initial = balance_result['storage_profile'][0]
storage_final = balance_result['storage_profile'][-1]
daily_balance = abs(storage_final - storage_initial) < tolerance
if constraints_satisfied and daily_balance:
# 满足条件,尝试减小容量
best_capacity = mid_capacity
best_result = {**balance_result, **constraint_results}
solution_found = True
upper_bound = mid_capacity
else:
# 不满足条件,增大容量
lower_bound = mid_capacity
# 检查收敛
if upper_bound - lower_bound < tolerance:
break
# 处理储能容量上限限制的情况
if not solution_found and params.max_storage_capacity is not None:
print(f"警告:在储能容量上限 {params.max_storage_capacity:.2f} MWh 内无法找到满足所有约束的解")
print("使用最大允许容量进行计算,但某些约束条件可能无法满足")
# 使用最大允许容量计算结果
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, params.max_storage_capacity
)
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
best_result = {**balance_result, **constraint_results}
best_capacity = params.max_storage_capacity
elif best_result is None:
# 如果没有找到可行解(且没有容量上限限制),使用最大容量
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, upper_bound
)
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
best_result = {**balance_result, **constraint_results}
best_capacity = upper_bound
# 添加能量平衡校验
total_generation = sum(thermal_output) + sum(wind_output) + sum(solar_output)
total_consumption = sum(load_demand)
total_curtailed = sum(best_result['curtailed_wind']) + sum(best_result['curtailed_solar'])
total_grid = sum(best_result['grid_feed_in'])
total_charge = sum(best_result['charge_profile'])
total_discharge = sum(best_result['discharge_profile'])
storage_net_change = best_result['storage_profile'][-1] - best_result['storage_profile'][0]
# 能量平衡校验:发电量 + 放电量/效率 = 负荷 + 充电量*效率 + 弃风弃光 + 上网电量
# 考虑储能充放电效率的能量平衡
energy_from_storage = total_discharge / params.storage_efficiency # 储能提供的有效能量
energy_to_storage = total_charge * params.storage_efficiency # 储能消耗的电网能量
# 能量平衡校验应该接近0但允许一定误差
# 当total_grid为负时购电应该加到左侧供给侧
# 当total_grid为正时上网应该加到右侧需求侧
if total_grid < 0: # 购电情况
energy_balance_error = abs(
total_generation + energy_from_storage + abs(total_grid) - total_consumption - energy_to_storage - total_curtailed
)
else: # 上网情况
energy_balance_error = abs(
total_generation + energy_from_storage - total_consumption - energy_to_storage - total_curtailed - total_grid
)
# 使用更大的容差,考虑储能效率损失和数值误差
# 允许误差为总发电量的15%或10MW取较大者
# 储能效率损失可能达到总能量的10%以上
tolerance = max(10.0, total_generation * 0.15)
energy_balance_check = energy_balance_error < tolerance
# 返回最终结果
return {
'required_storage_capacity': best_capacity,
'storage_profile': best_result['storage_profile'],
'charge_profile': best_result['charge_profile'],
'discharge_profile': best_result['discharge_profile'],
'curtailed_wind': best_result['curtailed_wind'],
'curtailed_solar': best_result['curtailed_solar'],
'grid_feed_in': best_result['grid_feed_in'],
'total_curtailment_wind_ratio': best_result['total_curtailment_wind_ratio'],
'total_curtailment_solar_ratio': best_result['total_curtailment_solar_ratio'],
'total_grid_feed_in_ratio': best_result['total_grid_feed_in_ratio'],
'energy_balance_check': energy_balance_check,
'capacity_limit_reached': params.max_storage_capacity is not None and best_capacity >= params.max_storage_capacity,
'theoretical_optimal_capacity': best_capacity if solution_found else None,
'max_storage_limit': params.max_storage_capacity
}
def main():
"""主函数,提供示例使用"""
# 示例数据
solar_output = [0.0] * 6 + [1.0, 2.0, 3.0, 4.0, 5.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 0.0] * 2
wind_output = [2.0, 3.0, 4.0, 3.0, 2.0, 1.0] * 4
thermal_output = [5.0] * 24
load_demand = [3.0, 4.0, 5.0, 6.0, 8.0, 10.0, 12.0, 14.0, 16.0, 18.0, 20.0, 18.0,
16.0, 14.0, 12.0, 10.0, 8.0, 6.0, 5.0, 4.0, 3.0, 2.0, 1.0, 2.0]
# 系统参数
params = SystemParameters(
max_curtailment_wind=0.1,
max_curtailment_solar=0.1,
max_grid_ratio=0.2,
storage_efficiency=0.9,
discharge_rate=1.0,
charge_rate=1.0
)
# 计算最优储能容量
result = optimize_storage_capacity(
solar_output, wind_output, thermal_output, load_demand, params
)
# 打印结果
print("多能互补系统储能容量优化结果:")
print(f"所需储能总容量: {result['required_storage_capacity']:.2f} MWh")
print(f"实际弃风率: {result['total_curtailment_wind_ratio']:.3f}")
print(f"实际弃光率: {result['total_curtailment_solar_ratio']:.3f}")
print(f"实际上网电量比例: {result['total_grid_feed_in_ratio']:.3f}")
print(f"能量平衡校验: {'通过' if result['energy_balance_check'] else '未通过'}")
return result
if __name__ == "__main__":
main()