修复了储能电量不平衡的问题。

This commit is contained in:
dmy
2025-12-27 12:25:01 +08:00
parent a522132ede
commit 164b9da026
2 changed files with 251 additions and 101 deletions

89
main.py
View File

@@ -25,11 +25,11 @@ plt.rcParams['font.sans-serif'] = ['SimHei', 'Microsoft YaHei', 'DejaVu Sans']
plt.rcParams['axes.unicode_minus'] = False plt.rcParams['axes.unicode_minus'] = False
def plot_system_curves(solar_output, wind_output, thermal_output, load_demand, result, storage_efficiency=0.9, show_window=False, display_only=False): def plot_system_curves(solar_output, wind_output, thermal_output, load_demand, result, storage_efficiency=0.9, show_window=False, display_only=False, output_dir=None):
""" """
绘制系统运行曲线 绘制系统运行曲线
Args: Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时 solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时 wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时 thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
@@ -37,6 +37,7 @@ def plot_system_curves(solar_output, wind_output, thermal_output, load_demand, r
result: 优化结果字典 result: 优化结果字典
show_window: 是否显示图形窗口 show_window: 是否显示图形窗口
display_only: 是否只显示不保存文件 display_only: 是否只显示不保存文件
output_dir: 输出目录路径,默认为 None当前目录
""" """
import matplotlib.pyplot as plt import matplotlib.pyplot as plt
import numpy as np import numpy as np
@@ -148,15 +149,17 @@ def plot_system_curves(solar_output, wind_output, thermal_output, load_demand, r
plt.tight_layout() plt.tight_layout()
# 根据参数决定是否保存和显示图形 # 根据参数决定是否保存和显示图形
if display_only: if not display_only:
# 只显示,不保存 # 确定输出目录
try: if output_dir is None:
plt.show() output_dir = 'results'
except Exception as e:
print(f"无法显示图形窗口:{str(e)}") # 创建输出目录(如果不存在)
else: os.makedirs(output_dir, exist_ok=True)
# 保存图片
plt.savefig('system_curves.png', dpi=300, bbox_inches='tight') # 保存图片到指定目录
output_path = os.path.join(output_dir, 'system_curves.png')
plt.savefig(output_path, dpi=300, bbox_inches='tight')
# 根据参数决定是否显示图形窗口 # 根据参数决定是否显示图形窗口
if show_window: if show_window:
@@ -164,7 +167,7 @@ def plot_system_curves(solar_output, wind_output, thermal_output, load_demand, r
plt.show() plt.show()
except Exception as e: except Exception as e:
print(f"无法显示图形窗口:{str(e)}") print(f"无法显示图形窗口:{str(e)}")
print("图形已保存为 'system_curves.png'") print(f"图形已保存为 '{output_path}'")
else: else:
plt.close() # 关闭图形,不显示窗口 plt.close() # 关闭图形,不显示窗口
@@ -238,7 +241,7 @@ def plot_system_curves(solar_output, wind_output, thermal_output, load_demand, r
print(f"储能损耗率: {(storage_loss/total_charge*100) if total_charge > 0 else 0:.2f}%") print(f"储能损耗率: {(storage_loss/total_charge*100) if total_charge > 0 else 0:.2f}%")
def export_results_to_excel(solar_output, wind_output, thermal_output, load_demand, result, params, filename=None): def export_results_to_excel(solar_output, wind_output, thermal_output, load_demand, result, params, filename=None, output_dir=None):
""" """
将多能互补系统储能优化结果导出到Excel文件包含运行数据、统计结果和系统参数。 将多能互补系统储能优化结果导出到Excel文件包含运行数据、统计结果和系统参数。
@@ -262,6 +265,7 @@ def export_results_to_excel(solar_output, wind_output, thermal_output, load_dema
- capacity_limit_reached: 容量限制是否达到 - capacity_limit_reached: 容量限制是否达到
params (object): 系统参数对象,包含各种技术参数 params (object): 系统参数对象,包含各种技术参数
filename (str, optional): 输出文件名,如未提供则自动生成 filename (str, optional): 输出文件名,如未提供则自动生成
output_dir (str, optional): 输出目录路径,默认为 None使用 results 目录)
Returns: Returns:
str: 生成的Excel文件路径 str: 生成的Excel文件路径
@@ -287,8 +291,18 @@ def export_results_to_excel(solar_output, wind_output, thermal_output, load_dema
if filename is None: if filename is None:
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S") timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
filename = f"storage_optimization_results_{timestamp}.xlsx" filename = f"storage_optimization_results_{timestamp}.xlsx"
# 确定输出目录
if output_dir is None:
output_dir = 'results'
# 创建输出目录(如果不存在)
os.makedirs(output_dir, exist_ok=True)
# 构建完整的输出路径
output_path = os.path.join(output_dir, filename)
print(f"\n正在导出结果到Excel文件: {filename}") print(f"\n正在导出结果到Excel文件: {output_path}")
# 准备数据 # 准备数据
hours = list(range(1, len(solar_output) + 1)) hours = list(range(1, len(solar_output) + 1))
@@ -410,7 +424,7 @@ def export_results_to_excel(solar_output, wind_output, thermal_output, load_dema
}) })
# 写入Excel文件 # 写入Excel文件
with pd.ExcelWriter(filename, engine='openpyxl') as writer: with pd.ExcelWriter(output_path, engine='openpyxl') as writer:
# 写入主要数据 # 写入主要数据
data_df.to_excel(writer, sheet_name='运行数据', index=False) data_df.to_excel(writer, sheet_name='运行数据', index=False)
@@ -439,8 +453,8 @@ def export_results_to_excel(solar_output, wind_output, thermal_output, load_dema
}) })
description_df.to_excel(writer, sheet_name='说明', index=False) description_df.to_excel(writer, sheet_name='说明', index=False)
print(f"结果已成功导出到: {filename}") print(f"结果已成功导出到: {output_path}")
return filename return output_path
def generate_yearly_data(): def generate_yearly_data():
@@ -491,6 +505,17 @@ def main():
command = sys.argv[1] command = sys.argv[1]
show_window = '--show' in sys.argv # 检查是否包含--show参数 show_window = '--show' in sys.argv # 检查是否包含--show参数
display_only = '--display-only' in sys.argv # 检查是否只显示不保存 display_only = '--display-only' in sys.argv # 检查是否只显示不保存
# 解析输出目录参数
output_dir = None
if '--output' in sys.argv:
output_index = sys.argv.index('--output')
if output_index + 1 < len(sys.argv):
output_dir = sys.argv[output_index + 1]
else:
print("错误:--output 参数需要指定目录路径")
print("用法python main.py --output <目录路径>")
return
if command == '--excel': if command == '--excel':
if len(sys.argv) < 3: if len(sys.argv) < 3:
@@ -619,38 +644,46 @@ def main():
# 绘制曲线 # 绘制曲线
print("正在绘制系统运行曲线...") print("正在绘制系统运行曲线...")
plot_system_curves(solar_output, wind_output, thermal_output, load_demand, result, params.storage_efficiency, show_window, display_only) plot_system_curves(solar_output, wind_output, thermal_output, load_demand, result, params.storage_efficiency, show_window, display_only, output_dir)
# 导出结果到Excel # 导出结果到Excel
try: try:
export_results_to_excel(solar_output, wind_output, thermal_output, load_demand, result, params) export_results_to_excel(solar_output, wind_output, thermal_output, load_demand, result, params, output_dir=output_dir)
except Exception as e: except Exception as e:
print(f"导出Excel文件失败{str(e)}") print(f"导出Excel文件失败{str(e)}")
if display_only: if display_only:
print("\n正在显示图形窗口...") print("\n正在显示图形窗口...")
elif show_window: elif show_window:
print("\n曲线图已保存为 'system_curves.png' 并显示图形窗口") output_path = os.path.join(output_dir if output_dir else 'results', 'system_curves.png')
print(f"\n曲线图已保存为 '{output_path}' 并显示图形窗口")
else: else:
print("\n曲线图已保存为 'system_curves.png'") output_path = os.path.join(output_dir if output_dir else 'results', 'system_curves.png')
print(f"\n曲线图已保存为 '{output_path}'")
def print_usage(): def print_usage():
"""打印使用说明""" """打印使用说明"""
print("多能互补系统储能容量优化程序") print("多能互补系统储能容量优化程序")
print("\n使用方法:") print("\n使用方法:")
print(" python main.py --excel <文件路径> # 从Excel文件读取数据") print(" python main.py --excel <文件路径> # 从Excel文件读取数据")
print(" python main.py --output <目录路径> # 指定输出目录默认results")
print(" python main.py --create-template [类型] # 创建Excel模板(24或8760)") print(" python main.py --create-template [类型] # 创建Excel模板(24或8760)")
print(" python main.py # 使用24小时示例数据") print(" python main.py # 使用24小时示例数据")
print(" python main.py --show # 显示图形窗口(可与其他参数组合使用)") print(" python main.py --show # 显示图形窗口(可与其他参数组合使用)")
print(" python main.py --display-only # 只显示图形窗口,不保存文件") print(" python main.py --display-only # 只显示图形窗口,不保存文件")
print("\n示例:") print("\n示例:")
print(" python main.py --excel data.xlsx") print(" python main.py --excel data.xlsx")
print(" python main.py --excel data.xlsx --show") print(" python main.py --excel data.xlsx --show")
print(" python main.py --excel data.xlsx --output my_results")
print(" python main.py --excel data.xlsx --display-only") print(" python main.py --excel data.xlsx --display-only")
print(" python main.py --create-template 8760") print(" python main.py --create-template 8760")
print(" python main.py --create-template 24") print(" python main.py --create-template 24")
print(" python main.py --display-only # 使用示例数据并只显示图形窗口") print(" python main.py --display-only # 使用示例数据并只显示图形窗口")
print(" python main.py --output custom_results # 使用示例数据,输出到 custom_results 目录")
print("\n说明:")
print(" - 结果文件默认保存到 results 目录")
print(" - 使用 --output 参数可指定自定义输出目录")
print(" - 如果输出目录不存在,程序会自动创建")
if __name__ == "__main__": if __name__ == "__main__":

View File

@@ -109,11 +109,12 @@ def calculate_energy_balance(
thermal_output: List[float], thermal_output: List[float],
load_demand: List[float], load_demand: List[float],
params: SystemParameters, params: SystemParameters,
storage_capacity: float storage_capacity: float,
initial_soc: float = 0.0
) -> Dict[str, List[float]]: ) -> Dict[str, List[float]]:
""" """
计算给定储能容量下的系统电能平衡 计算给定储能容量下的系统电能平衡
Args: Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时 solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时 wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
@@ -121,7 +122,8 @@ def calculate_energy_balance(
load_demand: 负荷曲线 (MW) - 支持24小时或8760小时 load_demand: 负荷曲线 (MW) - 支持24小时或8760小时
params: 系统参数配置 params: 系统参数配置
storage_capacity: 储能容量 (MWh) storage_capacity: 储能容量 (MWh)
initial_soc: 初始储能状态 (MWh)默认为0.0
Returns: Returns:
包含各种功率曲线的字典 包含各种功率曲线的字典
""" """
@@ -130,7 +132,7 @@ def calculate_energy_balance(
wind = np.array(wind_output) wind = np.array(wind_output)
thermal = np.array(thermal_output) thermal = np.array(thermal_output)
load = np.array(load_demand) load = np.array(load_demand)
# 初始化输出数组 # 初始化输出数组
hours = len(solar_output) hours = len(solar_output)
storage_soc = np.zeros(hours) # 储能状态 (MWh) storage_soc = np.zeros(hours) # 储能状态 (MWh)
@@ -139,16 +141,19 @@ def calculate_energy_balance(
curtailed_wind = np.zeros(hours) # 弃风量 (MW) curtailed_wind = np.zeros(hours) # 弃风量 (MW)
curtailed_solar = np.zeros(hours) # 弃光量 (MW) curtailed_solar = np.zeros(hours) # 弃光量 (MW)
grid_feed_in = np.zeros(hours) # 上网电量 (MW) grid_feed_in = np.zeros(hours) # 上网电量 (MW)
# 设置初始储能状态
storage_soc[0] = initial_soc
# 计算总发电潜力 # 计算总发电潜力
total_potential_wind = np.sum(wind) total_potential_wind = np.sum(wind)
total_potential_solar = np.sum(solar) total_potential_solar = np.sum(solar)
# 判断是否只有一种可再生能源 # 判断是否只有一种可再生能源
has_wind = total_potential_wind > 0 has_wind = total_potential_wind > 0
has_solar = total_potential_solar > 0 has_solar = total_potential_solar > 0
single_renewable = (has_wind and not has_solar) or (has_solar and not has_wind) single_renewable = (has_wind and not has_solar) or (has_solar and not has_wind)
# 计算允许的最大弃风弃光量 # 计算允许的最大弃风弃光量
if single_renewable: if single_renewable:
# 只有一种可再生能源时,弃电量不受限制 # 只有一种可再生能源时,弃电量不受限制
@@ -162,32 +167,32 @@ def calculate_energy_balance(
# 有多种可再生能源且上网电量限制不为0时应用弃风弃光限制 # 有多种可再生能源且上网电量限制不为0时应用弃风弃光限制
max_curtailed_wind_total = total_potential_wind * params.max_curtailment_wind max_curtailed_wind_total = total_potential_wind * params.max_curtailment_wind
max_curtailed_solar_total = total_potential_solar * params.max_curtailment_solar max_curtailed_solar_total = total_potential_solar * params.max_curtailment_solar
# 初始化累计弃风弃光量 # 初始化累计弃风弃光量
accumulated_curtailed_wind = 0.0 accumulated_curtailed_wind = 0.0
accumulated_curtailed_solar = 0.0 accumulated_curtailed_solar = 0.0
# 计算总可用发电量上限(不考虑火电) # 计算总可用发电量上限(不考虑火电)
total_available_energy = params.available_solar_energy + params.available_wind_energy total_available_energy = params.available_solar_energy + params.available_wind_energy
max_total_grid_feed_in = total_available_energy * params.max_grid_ratio max_total_grid_feed_in = total_available_energy * params.max_grid_ratio
# 初始化累计上网电量 # 初始化累计上网电量
cumulative_grid_feed_in = 0.0 cumulative_grid_feed_in = 0.0
# 逐小时计算 # 逐小时计算
for hour in range(hours): for hour in range(hours):
# 确保储能状态不为负 # 确保储能状态不为负且不超过容量
storage_soc[hour] = max(0, storage_soc[hour]) storage_soc[hour] = max(0, min(storage_capacity, storage_soc[hour]))
# 可用发电量(未考虑弃风弃光) # 可用发电量(未考虑弃风弃光)
available_generation = thermal[hour] + wind[hour] + solar[hour] available_generation = thermal[hour] + wind[hour] + solar[hour]
# 需求电量(负荷) # 需求电量(负荷)
demand = load[hour] demand = load[hour]
# 计算功率平衡 # 计算功率平衡
power_surplus = available_generation - demand power_surplus = available_generation - demand
if power_surplus > 0: if power_surplus > 0:
# 有盈余电力,优先储能 # 有盈余电力,优先储能
max_charge = min( max_charge = min(
@@ -195,30 +200,30 @@ def calculate_energy_balance(
storage_capacity * params.charge_rate, # 充电功率限制 storage_capacity * params.charge_rate, # 充电功率限制
power_surplus # 可用盈余电力 power_surplus # 可用盈余电力
) )
# 实际充电功率 # 实际充电功率
actual_charge = min(max_charge, power_surplus) actual_charge = min(max_charge, power_surplus)
charge_power[hour] = actual_charge charge_power[hour] = actual_charge
# 更新储能状态(考虑充电效率) # 更新储能状态(考虑充电效率)
if hour < hours - 1: if hour < hours - 1:
storage_soc[hour + 1] = storage_soc[hour] + actual_charge * params.storage_efficiency storage_soc[hour + 1] = storage_soc[hour] + actual_charge * params.storage_efficiency
# 剩余电力优先上网,超出上网电量比例限制时才弃风弃光 # 剩余电力优先上网,超出上网电量比例限制时才弃风弃光
remaining_surplus = power_surplus - actual_charge remaining_surplus = power_surplus - actual_charge
# 计算当前允许的最大上网电量 # 计算当前允许的最大上网电量
# 基于总可用发电量和已累计上网电量 # 基于总可用发电量和已累计上网电量
remaining_grid_quota = max_total_grid_feed_in - cumulative_grid_feed_in remaining_grid_quota = max_total_grid_feed_in - cumulative_grid_feed_in
# 优先上网,但不超过剩余配额 # 优先上网,但不超过剩余配额
grid_feed_allowed = min(remaining_surplus, max(0, remaining_grid_quota)) grid_feed_allowed = min(remaining_surplus, max(0, remaining_grid_quota))
grid_feed_in[hour] = grid_feed_allowed grid_feed_in[hour] = grid_feed_allowed
cumulative_grid_feed_in += grid_feed_allowed cumulative_grid_feed_in += grid_feed_allowed
# 剩余电力考虑弃风弃光 # 剩余电力考虑弃风弃光
remaining_surplus -= grid_feed_allowed remaining_surplus -= grid_feed_allowed
# 计算弃风弃光(优先弃光,然后弃风) # 计算弃风弃光(优先弃光,然后弃风)
if remaining_surplus > 0: if remaining_surplus > 0:
# 在单一可再生能源场景下,弃风弃光不受限制 # 在单一可再生能源场景下,弃风弃光不受限制
@@ -228,7 +233,7 @@ def calculate_energy_balance(
curtailed_solar[hour] = min(solar[hour], remaining_surplus) curtailed_solar[hour] = min(solar[hour], remaining_surplus)
remaining_surplus -= curtailed_solar[hour] remaining_surplus -= curtailed_solar[hour]
accumulated_curtailed_solar += curtailed_solar[hour] accumulated_curtailed_solar += curtailed_solar[hour]
# 如果还有剩余,弃风 # 如果还有剩余,弃风
if remaining_surplus > 0 and wind[hour] > 0: if remaining_surplus > 0 and wind[hour] > 0:
curtailed_wind[hour] = min(wind[hour], remaining_surplus) curtailed_wind[hour] = min(wind[hour], remaining_surplus)
@@ -246,12 +251,12 @@ def calculate_energy_balance(
solar[hour], solar[hour],
max_curtailed_solar_total - accumulated_curtailed_solar max_curtailed_solar_total - accumulated_curtailed_solar
) )
if available_solar_curtail > 0: if available_solar_curtail > 0:
curtailed_solar[hour] = min(available_solar_curtail, remaining_surplus) curtailed_solar[hour] = min(available_solar_curtail, remaining_surplus)
remaining_surplus -= curtailed_solar[hour] remaining_surplus -= curtailed_solar[hour]
accumulated_curtailed_solar += curtailed_solar[hour] accumulated_curtailed_solar += curtailed_solar[hour]
# 如果还有剩余,弃风 # 如果还有剩余,弃风
if remaining_surplus > 0: if remaining_surplus > 0:
if max_curtailed_wind_total == float('inf'): if max_curtailed_wind_total == float('inf'):
@@ -263,45 +268,45 @@ def calculate_energy_balance(
wind[hour], wind[hour],
max_curtailed_wind_total - accumulated_curtailed_wind max_curtailed_wind_total - accumulated_curtailed_wind
) )
if available_wind_curtail > 0: if available_wind_curtail > 0:
curtailed_wind[hour] = min(available_wind_curtail, remaining_surplus) curtailed_wind[hour] = min(available_wind_curtail, remaining_surplus)
remaining_surplus -= curtailed_wind[hour] remaining_surplus -= curtailed_wind[hour]
accumulated_curtailed_wind += curtailed_wind[hour] accumulated_curtailed_wind += curtailed_wind[hour]
# 确保电力平衡:如果仍有剩余电力,强制弃掉(安全机制) # 确保电力平衡:如果仍有剩余电力,强制弃掉(安全机制)
if remaining_surplus > 0: if remaining_surplus > 0:
# 记录警告但不影响计算 # 记录警告但不影响计算
# 在实际系统中,这种情况不应该发生,但作为安全保护 # 在实际系统中,这种情况不应该发生,但作为安全保护
pass pass
else: else:
# 电力不足,优先放电 # 电力不足,优先放电
power_deficit = -power_surplus power_deficit = -power_surplus
grid_feed_in[hour] = 0 # 初始化购电为0 grid_feed_in[hour] = 0 # 初始化购电为0
max_discharge = min( max_discharge = min(
storage_soc[hour], # 储能状态限制 storage_soc[hour], # 储能状态限制
storage_capacity * params.discharge_rate, # 放电功率限制 storage_capacity * params.discharge_rate, # 放电功率限制
power_deficit # 缺电功率 power_deficit # 缺电功率
) )
# 实际放电功率 # 实际放电功率
actual_discharge = min(max_discharge, power_deficit) actual_discharge = min(max_discharge, power_deficit)
discharge_power[hour] = actual_discharge discharge_power[hour] = actual_discharge
# 更新储能状态(考虑放电效率) # 更新储能状态(考虑放电效率)
if hour < hours - 1: if hour < hours - 1:
storage_soc[hour + 1] = storage_soc[hour] - actual_discharge / params.storage_efficiency storage_soc[hour + 1] = storage_soc[hour] - actual_discharge / params.storage_efficiency
# 计算剩余缺电,需要从电网购电 # 计算剩余缺电,需要从电网购电
remaining_deficit = power_deficit - actual_discharge remaining_deficit = power_deficit - actual_discharge
# 如果还有缺电,从电网购电 # 如果还有缺电,从电网购电
if remaining_deficit > 0: if remaining_deficit > 0:
# 购电功率为负值,表示从电网输入 # 购电功率为负值,表示从电网输入
grid_feed_in[hour] = -remaining_deficit grid_feed_in[hour] = -remaining_deficit
return { return {
'storage_profile': storage_soc.tolist(), 'storage_profile': storage_soc.tolist(),
'charge_profile': charge_power.tolist(), 'charge_profile': charge_power.tolist(),
@@ -312,6 +317,83 @@ def calculate_energy_balance(
} }
def find_periodic_steady_state(
solar_output: List[float],
wind_output: List[float],
thermal_output: List[float],
load_demand: List[float],
params: SystemParameters,
storage_capacity: float,
soc_convergence_threshold: float = 0.001,
max_iterations: int = 100
) -> Dict[str, List[float]]:
"""
通过迭代找到满足周期性平衡的储能初始状态
步骤:
1. 从初始SOC=0开始运行一次全年仿真记录最后一小时的SOC值
2. 将这个SOC值作为新的"初始SOC",再次运行仿真
3. 重复上述过程直到首尾SOC的差值小于设定的阈值
Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
load_demand: 负荷曲线 (MW) - 支持24小时或8760小时
params: 系统参数配置
storage_capacity: 储能容量 (MWh)
soc_convergence_threshold: SOC收敛阈值相对于容量的比例默认0.1%
max_iterations: 最大迭代次数
Returns:
包含各种功率曲线的字典,且满足周期性平衡条件
"""
# 计算收敛阈值的绝对值
absolute_threshold = storage_capacity * soc_convergence_threshold
# 初始SOC从0开始
initial_soc = 0.0
iteration = 0
soc_diff = float('inf')
print(f"正在寻找周期性平衡状态SOC收敛阈值: {absolute_threshold:.4f} MWh...")
while iteration < max_iterations and soc_diff > absolute_threshold:
# 运行仿真
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand,
params, storage_capacity, initial_soc
)
# 获取初始和最终的SOC
storage_initial = balance_result['storage_profile'][0]
storage_final = balance_result['storage_profile'][-1]
# 计算SOC差值
soc_diff = abs(storage_final - storage_initial)
# 更新初始SOC使用最终SOC作为下一次的初始SOC
initial_soc = storage_final
# 确保SOC在合理范围内
initial_soc = max(0, min(storage_capacity, initial_soc))
iteration += 1
# 输出迭代信息每10次迭代或最后一次
if iteration % 10 == 0 or iteration == 1 or soc_diff <= absolute_threshold:
print(f" 迭代 {iteration}: 初始SOC={storage_initial:.4f} MWh, "
f"最终SOC={storage_final:.4f} MWh, 差值={soc_diff:.4f} MWh")
# 输出收敛结果
if soc_diff <= absolute_threshold:
print(f"✓ 周期性平衡收敛成功(迭代{iteration}SOC差值={soc_diff:.4f} MWh")
else:
print(f"⚠ 未达到收敛条件(迭代{iteration}SOC差值={soc_diff:.4f} MWh")
return balance_result
def check_constraints( def check_constraints(
solar_output: List[float], solar_output: List[float],
wind_output: List[float], wind_output: List[float],
@@ -321,14 +403,14 @@ def check_constraints(
) -> Dict[str, float]: ) -> Dict[str, float]:
""" """
检查约束条件是否满足 检查约束条件是否满足
Args: Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时 solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时 wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时 thermal_output: 火电出力曲线 (MW) - 支持24小时或8760小时
balance_result: 电能平衡计算结果 balance_result: 电能平衡计算结果
params: 系统参数配置 params: 系统参数配置
Returns: Returns:
包含各约束实际比例的字典 包含各约束实际比例的字典
""" """
@@ -336,21 +418,21 @@ def check_constraints(
total_wind_potential = sum(wind_output) total_wind_potential = sum(wind_output)
total_solar_potential = sum(solar_output) total_solar_potential = sum(solar_output)
total_thermal = sum(thermal_output) total_thermal = sum(thermal_output)
total_curtailed_wind = sum(balance_result['curtailed_wind']) total_curtailed_wind = sum(balance_result['curtailed_wind'])
total_curtailed_solar = sum(balance_result['curtailed_solar']) total_curtailed_solar = sum(balance_result['curtailed_solar'])
total_grid_feed_in = sum(balance_result['grid_feed_in']) total_grid_feed_in = sum(balance_result['grid_feed_in'])
# 实际发电量(考虑弃风弃光) # 实际发电量(考虑弃风弃光)
actual_wind_generation = total_wind_potential - total_curtailed_wind actual_wind_generation = total_wind_potential - total_curtailed_wind
actual_solar_generation = total_solar_potential - total_curtailed_solar actual_solar_generation = total_solar_potential - total_curtailed_solar
total_generation = total_thermal + actual_wind_generation + actual_solar_generation total_generation = total_thermal + actual_wind_generation + actual_solar_generation
# 计算比例 # 计算比例
actual_curtailment_wind_ratio = total_curtailed_wind / total_wind_potential if total_wind_potential > 0 else 0 actual_curtailment_wind_ratio = total_curtailed_wind / total_wind_potential if total_wind_potential > 0 else 0
actual_curtailment_solar_ratio = total_curtailed_solar / total_solar_potential if total_solar_potential > 0 else 0 actual_curtailment_solar_ratio = total_curtailed_solar / total_solar_potential if total_solar_potential > 0 else 0
actual_grid_feed_in_ratio = total_grid_feed_in / total_generation if total_generation > 0 else 0 actual_grid_feed_in_ratio = total_grid_feed_in / total_generation if total_generation > 0 else 0
return { return {
'total_curtailment_wind_ratio': actual_curtailment_wind_ratio, 'total_curtailment_wind_ratio': actual_curtailment_wind_ratio,
'total_curtailment_solar_ratio': actual_curtailment_solar_ratio, 'total_curtailment_solar_ratio': actual_curtailment_solar_ratio,
@@ -369,7 +451,7 @@ def optimize_storage_capacity(
) -> Dict: ) -> Dict:
""" """
优化储能容量,使用迭代方法寻找满足所有约束的最小储能容量 优化储能容量,使用迭代方法寻找满足所有约束的最小储能容量
Args: Args:
solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时 solar_output: 光伏出力曲线 (MW) - 支持24小时或8760小时
wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时 wind_output: 风电出力曲线 (MW) - 支持24小时或8760小时
@@ -378,39 +460,54 @@ def optimize_storage_capacity(
params: 系统参数配置 params: 系统参数配置
max_iterations: 最大迭代次数 max_iterations: 最大迭代次数
tolerance: 收敛容差 tolerance: 收敛容差
Returns: Returns:
包含优化结果的字典 包含优化结果的字典
""" """
# 验证输入 # 验证输入
validate_inputs(solar_output, wind_output, thermal_output, load_demand, params) validate_inputs(solar_output, wind_output, thermal_output, load_demand, params)
# 初始化搜索范围 # 初始化搜索范围
lower_bound = 0.0 lower_bound = 0.0
theoretical_max = max(sum(solar_output) + sum(wind_output) + sum(thermal_output), sum(load_demand)) theoretical_max = max(sum(solar_output) + sum(wind_output) + sum(thermal_output), sum(load_demand))
# 应用储能容量上限限制 # 应用储能容量上限限制
if params.max_storage_capacity is not None: if params.max_storage_capacity is not None:
upper_bound = min(theoretical_max, params.max_storage_capacity) upper_bound = min(theoretical_max, params.max_storage_capacity)
else: else:
upper_bound = theoretical_max upper_bound = theoretical_max
# 判断数据类型24小时或8760小时
data_length = len(solar_output)
is_yearly_data = data_length == 8760
if is_yearly_data:
print(f"处理8760小时全年数据启用周期性平衡优化...")
# 二分搜索寻找最小储能容量 # 二分搜索寻找最小储能容量
best_capacity = upper_bound best_capacity = upper_bound
best_result = None best_result = None
solution_found = False # 标记是否找到可行解 solution_found = False # 标记是否找到可行解
for iteration in range(max_iterations): for iteration in range(max_iterations):
mid_capacity = (lower_bound + upper_bound) / 2 mid_capacity = (lower_bound + upper_bound) / 2
# 计算当前容量下的平衡 # 计算当前容量下的平衡
balance_result = calculate_energy_balance( # 对于8760小时数据使用周期性平衡函数
solar_output, wind_output, thermal_output, load_demand, params, mid_capacity # 对于24小时数据使用普通平衡函数初始SOC=0
) if is_yearly_data:
balance_result = find_periodic_steady_state(
solar_output, wind_output, thermal_output, load_demand,
params, mid_capacity
)
else:
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, mid_capacity
)
# 检查约束条件 # 检查约束条件
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params) constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
# 检查是否满足所有约束 # 检查是否满足所有约束
# max_grid_ratio只限制上网电量比例不约束购电 # max_grid_ratio只限制上网电量比例不约束购电
# 只有当grid_feed_in为正时上网才需要检查约束 # 只有当grid_feed_in为正时上网才需要检查约束
@@ -421,16 +518,16 @@ def optimize_storage_capacity(
else: else:
# 没有上网电量或为负值(购电),总是满足约束 # 没有上网电量或为负值(购电),总是满足约束
grid_constraint_satisfied = True grid_constraint_satisfied = True
# 判断是否只有一种可再生能源 # 判断是否只有一种可再生能源
has_wind = sum(wind_output) > 0 has_wind = sum(wind_output) > 0
has_solar = sum(solar_output) > 0 has_solar = sum(solar_output) > 0
single_renewable = (has_wind and not has_solar) or (has_solar and not has_wind) single_renewable = (has_wind and not has_solar) or (has_solar and not has_wind)
# 特殊情况当上网电量限制为0时所有超额电力都必须被弃掉 # 特殊情况当上网电量限制为0时所有超额电力都必须被弃掉
# 此时应该允许无限制弃风弃光 # 此时应该允许无限制弃风弃光
grid_quota_zero = params.max_grid_ratio == 0 grid_quota_zero = params.max_grid_ratio == 0
if single_renewable or grid_quota_zero: if single_renewable or grid_quota_zero:
# 只有一种可再生能源时或上网电量限制为0时跳过弃风弃光约束检查 # 只有一种可再生能源时或上网电量限制为0时跳过弃风弃光约束检查
constraints_satisfied = grid_constraint_satisfied constraints_satisfied = grid_constraint_satisfied
@@ -441,12 +538,12 @@ def optimize_storage_capacity(
constraint_results['total_curtailment_solar_ratio'] <= params.max_curtailment_solar and constraint_results['total_curtailment_solar_ratio'] <= params.max_curtailment_solar and
grid_constraint_satisfied grid_constraint_satisfied
) )
# 检查储能日平衡(周期结束时储能状态应接近初始值) # 检查储能日平衡(周期结束时储能状态应接近初始值)
storage_initial = balance_result['storage_profile'][0] storage_initial = balance_result['storage_profile'][0]
storage_final = balance_result['storage_profile'][-1] storage_final = balance_result['storage_profile'][-1]
daily_balance = abs(storage_final - storage_initial) < tolerance daily_balance = abs(storage_final - storage_initial) < tolerance
if constraints_satisfied and daily_balance: if constraints_satisfied and daily_balance:
# 满足条件,尝试减小容量 # 满足条件,尝试减小容量
best_capacity = mid_capacity best_capacity = mid_capacity
@@ -456,32 +553,44 @@ def optimize_storage_capacity(
else: else:
# 不满足条件,增大容量 # 不满足条件,增大容量
lower_bound = mid_capacity lower_bound = mid_capacity
# 检查收敛 # 检查收敛
if upper_bound - lower_bound < tolerance: if upper_bound - lower_bound < tolerance:
break break
# 处理储能容量上限限制的情况 # 处理储能容量上限限制的情况
if not solution_found and params.max_storage_capacity is not None: if not solution_found and params.max_storage_capacity is not None:
print(f"警告:在储能容量上限 {params.max_storage_capacity:.2f} MWh 内无法找到满足所有约束的解") print(f"警告:在储能容量上限 {params.max_storage_capacity:.2f} MWh 内无法找到满足所有约束的解")
print("使用最大允许容量进行计算,但某些约束条件可能无法满足") print("使用最大允许容量进行计算,但某些约束条件可能无法满足")
# 使用最大允许容量计算结果 # 使用最大允许容量计算结果
balance_result = calculate_energy_balance( if is_yearly_data:
solar_output, wind_output, thermal_output, load_demand, params, params.max_storage_capacity balance_result = find_periodic_steady_state(
) solar_output, wind_output, thermal_output, load_demand,
params, params.max_storage_capacity
)
else:
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, params.max_storage_capacity
)
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params) constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
best_result = {**balance_result, **constraint_results} best_result = {**balance_result, **constraint_results}
best_capacity = params.max_storage_capacity best_capacity = params.max_storage_capacity
elif best_result is None: elif best_result is None:
# 如果没有找到可行解(且没有容量上限限制),使用最大容量 # 如果没有找到可行解(且没有容量上限限制),使用最大容量
balance_result = calculate_energy_balance( if is_yearly_data:
solar_output, wind_output, thermal_output, load_demand, params, upper_bound balance_result = find_periodic_steady_state(
) solar_output, wind_output, thermal_output, load_demand,
params, upper_bound
)
else:
balance_result = calculate_energy_balance(
solar_output, wind_output, thermal_output, load_demand, params, upper_bound
)
constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params) constraint_results = check_constraints(solar_output, wind_output, thermal_output, balance_result, params)
best_result = {**balance_result, **constraint_results} best_result = {**balance_result, **constraint_results}
best_capacity = upper_bound best_capacity = upper_bound
# 添加能量平衡校验 # 添加能量平衡校验
total_generation = sum(thermal_output) + sum(wind_output) + sum(solar_output) total_generation = sum(thermal_output) + sum(wind_output) + sum(solar_output)
total_consumption = sum(load_demand) total_consumption = sum(load_demand)
@@ -490,12 +599,12 @@ def optimize_storage_capacity(
total_charge = sum(best_result['charge_profile']) total_charge = sum(best_result['charge_profile'])
total_discharge = sum(best_result['discharge_profile']) total_discharge = sum(best_result['discharge_profile'])
storage_net_change = best_result['storage_profile'][-1] - best_result['storage_profile'][0] storage_net_change = best_result['storage_profile'][-1] - best_result['storage_profile'][0]
# 能量平衡校验:发电量 + 放电量/效率 = 负荷 + 充电量*效率 + 弃风弃光 + 上网电量 # 能量平衡校验:发电量 + 放电量/效率 = 负荷 + 充电量*效率 + 弃风弃光 + 上网电量
# 考虑储能充放电效率的能量平衡 # 考虑储能充放电效率的能量平衡
energy_from_storage = total_discharge / params.storage_efficiency # 储能提供的有效能量 energy_from_storage = total_discharge / params.storage_efficiency # 储能提供的有效能量
energy_to_storage = total_charge * params.storage_efficiency # 储能消耗的电网能量 energy_to_storage = total_charge * params.storage_efficiency # 储能消耗的电网能量
# 能量平衡校验应该接近0但允许一定误差 # 能量平衡校验应该接近0但允许一定误差
# 当total_grid为负时购电应该加到左侧供给侧 # 当total_grid为负时购电应该加到左侧供给侧
# 当total_grid为正时上网应该加到右侧需求侧 # 当total_grid为正时上网应该加到右侧需求侧
@@ -512,7 +621,15 @@ def optimize_storage_capacity(
# 储能效率损失可能达到总能量的10%以上 # 储能效率损失可能达到总能量的10%以上
tolerance = max(10.0, total_generation * 0.15) tolerance = max(10.0, total_generation * 0.15)
energy_balance_check = energy_balance_error < tolerance energy_balance_check = energy_balance_error < tolerance
# 输出周期性平衡信息
if is_yearly_data:
soc_initial_final_diff = abs(best_result['storage_profile'][-1] - best_result['storage_profile'][0])
print(f"\n周期性平衡信息:")
print(f" 初始SOC: {best_result['storage_profile'][0]:.4f} MWh")
print(f" 最终SOC: {best_result['storage_profile'][-1]:.4f} MWh")
print(f" SOC差值: {soc_initial_final_diff:.4f} MWh")
# 返回最终结果 # 返回最终结果
return { return {
'required_storage_capacity': best_capacity, 'required_storage_capacity': best_capacity,