Compare commits

..

23 Commits

Author SHA1 Message Date
dmy
2b3ab83d91 Fix substation coordinate handling for cable crossing detection 2026-01-08 16:05:06 +08:00
dmy
68df76702e Fix cable crossing detection coordinate unpacking error 2026-01-08 15:53:15 +08:00
dmy
b3a4513f94 Fix MIP solver variable duplication and function structure 2026-01-08 15:30:36 +08:00
dmy
04a5e19451 Improve MIP optimization and add log export feature 2026-01-08 15:08:04 +08:00
dmy
ebd5883dbf Fix unreachable code in design_with_pyomo function 2026-01-08 13:06:26 +08:00
dmy
41ac6f3963 Change MIP objective function to minimize total investment 2026-01-08 13:01:36 +08:00
dmy
09b2ada5df Add debug prints to check MIP toggle functionality 2026-01-08 12:58:15 +08:00
dmy
6441ddc059 Fix MIP import issue: move design_with_mst function outside __main__ protection block 2026-01-08 12:39:07 +08:00
dmy
2f095df12e Fix MIP algorithm: simplify model formulation and add detailed debugging 2026-01-08 10:28:35 +08:00
dmy
a3837a6707 Rewrite MIP model formulation and add comprehensive debugging 2026-01-08 10:22:39 +08:00
dmy
886fba4d15 Clear comparison results and topology visualization on new file upload 2026-01-08 10:10:46 +08:00
dmy
397ca8847e Fix MIP fallback return values: ensure consistent unpacking 2026-01-08 10:06:46 +08:00
dmy
6ad11a9b69 Fix MIP model: make objective function linear to avoid multiplication error 2026-01-08 10:03:49 +08:00
dmy
579f8866c4 Fix MIP toggle bug: handle PuLP import gracefully 2026-01-08 10:01:46 +08:00
dmy
4230d2221d Add MIP module for collector layout optimization 2026-01-08 09:54:40 +08:00
dmy
46e929bfce Implement genetic algorithm for collector layout optimization 2026-01-08 09:46:00 +08:00
dmy
f2a960e789 feat: 优化回路数计算逻辑,提升报表准确性 2026-01-07 16:55:11 +08:00
dmy
87cea6ed86 feat: 优化文件保存对话框并增强系统稳定性
- 重新启用run.io_bound()调用,使用后台线程执行PowerShell脚本
- 为所有文件保存操作添加按钮防重复点击功能
- 新增win32_helper模块,提供Win32 API和COM接口的文件对话框
- 简化导出最佳方案DXF的代码结构
- 改进异步操作和错误处理机制
2026-01-07 12:47:58 +08:00
dmy
e0b5b0c3dc feat: 在方案对比表格中增加损耗费用净现值列
- 新增'损耗费用净现值 (万元)'列,显示生命周期内损耗费用的净现值
- 使用npv_loss字段替代annual_loss_cost,考虑折现率对全生命周期成本的影响
- 支持按损耗费用净现值排序,便于方案经济性对比
2026-01-07 11:27:29 +08:00
dmy
7aef58de1e fix: 修正损耗计算单位从瓦特(W)转换为千瓦(kW)
- 将evaluate_design函数中的损耗计算结果从W转换为kW
- loss_w变量存储三相损耗(W),loss_kw转换为kW后累加
- 确保total_loss返回值单位为kW,与后续经济性分析计算一致
2026-01-07 10:01:32 +08:00
dmy
45c99b41b3 fix: 移除 native 模式下的 run.io_bound() 调用
- 在 native=True 模式下不能使用 run.io_bound() 执行 CPU 密集型任务
- 将 PowerShell 调用改为同步执行 subprocess.run()
- 解决 'Unable to run CPU-bound in script mode' 错误
2026-01-07 01:40:42 +08:00
dmy
837158270e fix: 优化文件保存对话框并启用原生窗口模式
- 添加 matplotlib.use('Agg') 设置非交互式后端
- 重构 save_file_with_dialog 函数,使用 PowerShell 原生对话框替代 Tkinter
- 解决 PyWebview/Tkinter 线程冲突导致的 PicklingError 问题
- 启用 native=True 原生窗口模式,提供更好的用户体验
2026-01-07 01:03:46 +08:00
dmy
61fa870778 feat: 完善经济性分析功能和优化界面显示
- 新增工程运行期限、折现率、年损耗小时数参数配置
- 实现总费用计算功能(包含电缆投资NPV和电费损耗NPV)
- 修复total_investment函数调用时机问题,确保GUI模式正确计算
- 优化电缆单价显示为万元/km单位
- 总长度显示单位改为公里
- 方案对比结果新增总费用列,支持全生命周期成本比选
- 代码格式化和导入顺序优化
- 添加IFLOW.md项目上下文文档
2026-01-06 15:09:52 +08:00
12 changed files with 2989 additions and 851 deletions

16
.vscode/launch.json vendored Normal file
View File

@@ -0,0 +1,16 @@
{
// Use IntelliSense to learn about possible attributes.
// Hover to view descriptions of existing attributes.
// For more information, visit: https://go.microsoft.com/fwlink/?linkid=830387
"version": "0.2.0",
"configurations": [
{
"name": "Python Debugger: Current File with Arguments",
"type": "debugpy",
"request": "launch",
"program": "main.py",
"console": "integratedTerminal",
"args": "--excel abc.xlsx"
},
]
}

240
IFLOW.md Normal file
View File

@@ -0,0 +1,240 @@
# 海上风电场集电线路设计优化系统 - 项目上下文
## 项目概述
这是一个用于设计和优化海上风电场集电系统拓扑的综合工具专为海上能源业务开发部电气专业设计。该系统通过多种先进的拓扑优化算法MST、旋转扫描法、Esau-Williams等根据风机坐标、功率以及海缆规格自动生成投资成本最低、损耗最优的设计方案。
### 核心功能
- 🖥️ **图形化界面**:基于 NiceGUI 的现代化桌面应用,支持原生窗口模式
- 🌊 **多种布局生成**:支持规则网格和随机分布布局的模拟数据生成
- 🔌 **多算法优化**
- MST (Minimum Spanning Tree):无容量约束基准方案
- Capacitated Sweep (Base):基础扇区扫描分组
- Rotational Sweep全局最优起始角度旋转扫描优化
- Esau-Williams经典启发式算法在距离与容量间寻找最优平衡
- ⚙️ **灵活参数配置**:通过 Excel 自定义系统电压、功率因数、电价及电缆规格
- 📊 **智能方案对比**:自动运行三大场景(标准方案、含可选电缆方案、限制最大截面方案)
- 📁 **多格式导出**CAD图纸(.dxf)、Excel报告、压缩包
### 技术栈
- **语言**Python 3.12+
- **GUI框架**NiceGUI 3.4.1 + PyWebview 6.1
- **核心库**
- numpy 2.4.0:数值计算
- pandas 2.3.3:数据处理
- matplotlib 3.10.8:可视化
- scikit-learn 1.8.0:聚类算法
- networkx 3.6.1:图算法
- ezdxf 1.4.3CAD导出
- scipy 1.16.3:科学计算
## 项目结构
```
D:\code\windfarm\
├── main.py # 核心算法和业务逻辑1388行
├── gui.py # NiceGUI图形界面1067行
├── esau_williams.py # Esau-Williams算法实现242行
├── generate_template.py # Excel模板生成器
├── make_version.py # 版本号自动生成脚本
├── pyproject.toml # 项目依赖配置
├── Makefile # 构建脚本
├── 使用说明/ # 中文操作手册和截图
├── build/ # 构建输出目录
└── dist/ # 打包输出目录
```
## 构建和运行
### 环境配置
项目使用 `uv` 作为包管理器,也支持 `pip`
```bash
# 使用 uv推荐
uv sync
# 或使用 pip
pip install -r requirements.txt # 如果有requirements.txt
# 或手动安装依赖
pip install numpy pandas matplotlib scikit-learn scipy networkx ezdxf nicegui openpyxl pywebview
```
### 运行方式
#### 1. 图形化界面(推荐)
```bash
python gui.py
```
启动后,程序将弹出独立窗口,提供完整的交互式界面。
#### 2. 命令行模式
```bash
python main.py --excel your_data.xlsx
```
### 构建可执行文件
使用 Makefile 进行构建:
```bash
# 构建exe文件自动生成版本号
make build
# 重新构建(先清理再构建)
make rebuild
# 清理构建文件
make clean
# 查看帮助
make help
```
构建过程:
1. 运行 `make_version.py` 生成版本号
2. 使用 `nicegui-pack` 打包为单文件exe
3. 重命名输出文件包含版本号
构建输出位于 `dist/` 目录,文件名格式:`海上风电场集电线路设计优化系统_{VERSION}.exe`
## 输入数据规范
### Excel文件格式
输入Excel文件应包含以下三个Sheet
#### 1. Coordinates坐标数据- 必需
| Type | ID | X | Y | Power | PlatformHeight |
|------|----|---|---|-------|----------------|
| Substation | Sub1 | 4000 | -800 | 0 | 0 |
| Turbine | 1 | 0 | 0 | 8.0 | 25 |
- **Type**: `Substation``Turbine`
- **X/Y**: 投影坐标(米),建议使用高斯投影坐标
- **Power**: 功率MW升压站填0
- **PlatformHeight**: 塔筒/平台高度(米)
#### 2. Cables电缆规格- 必需
| CrossSection | Capacity | Resistance | Cost | Optional |
|--------------|----------|------------|------|----------|
| 35 | 150 | 0.524 | 80 | |
| 400 | 580 | 0.0470 | 600 | Y |
- **CrossSection**: 导体截面mm²
- **Capacity**: 额定载流量A需考虑降容系数
- **Resistance**: 交流电阻(Ω/km
- **Cost**: 综合单价(元/m
- **Optional**: 可选标记Y表示可选大截面电缆
**重要规则**
- 电缆必须按截面从小到大排列
- `Optional` 为 'Y' 的电缆最多只能有一条
- 若存在可选电缆,它必须是列表中截面最大的一条
#### 3. Parameters系统参数- 必需
| Parameter | Value |
|-----------|-------|
| Voltage (kV) / 电压 (kV) | 66 |
| Power Factor / 功率因数 | 0.95 |
| Electricity Price (元/kWh) / 电价 (元/kWh) | 0.4 |
## 核心算法说明
### 1. MST最小生成树
- **原理**:基于 Kruskal 或 Prim 算法,寻找连接所有风机且总路径长度最短的树状结构
- **特点**:不考虑电缆载流量限制,仅作为理论距离基准参考
- **适用场景**:小规模风电场的理论分析
### 2. Capacitated Sweep基础扇区扫描
- **原理**:以升压站为中心,将平面划分为扇区,按顺时针扫描风机
- **特点**:计算速度快,拓扑结构简单清晰
- **局限**:对起始扫描角度敏感,可能产生"长尾巴"连线
### 3. Rotational Sweep旋转扫描优化
- **原理**:尝试 0° 到 360° 之间的所有起始扫描角度
- **优势**:比基础扫描法节省 3%~8% 的线缆成本
- **适用场景**:最接近人工精细化排布的自动化算法
### 4. Esau-Williams 启发式算法
- **原理**约束最小生成树CMST算法迭代计算互联操作的成本节省
- **优势**:能发现树状、多分叉等复杂但更经济的拓扑结构
- **适用场景**:风机分布不规则、离岸距离较远或电缆造价极高的情况
## 方案场景说明
系统自动运行三种场景:
1. **Scenario 1 (Standard)**:仅使用非可选(标准)电缆进行优化
2. **Scenario 2 (With Optional)**:包含标记为 'Y' 的大型电缆,适用于尝试增加单回路容量
3. **Scenario 3 (No Max)**:排除最大截面电缆,测试电缆供应受限时的最优拓扑
## 输出文件说明
- **Excel报告**`[文件名]_result.xlsx` - 包含所有方案总览及详细连接清单
- **CAD图纸**`design_[方案名].dxf` - 分层分色的拓扑图
- **全部方案**`[文件名]_result.zip` - 包含所有图纸及Excel报告
## 关键常量和配置
### 电气参数
- **系统电压**66,000 V (66kV)
- **功率因数**0.95
- **电价**0.4 元/kWh
### 电缆规格示例
- 最小截面35mm² (载流量150A)
- 最大截面400mm² (载流量580A)
- 降容系数0.8(实际载流量 = 额定载流量 × 0.8
### 算法参数
- 默认风机数量30台
- 默认布局:随机分布或网格
- 默认间距800米网格布局
## 开发约定
### 代码风格
- 使用中文注释和文档字符串
- 函数命名使用 snake_case
- 类名使用 PascalCase
- 常量使用 UPPER_CASE
### 版本管理
- 版本号通过 `make_version.py` 自动生成
- 版本号格式v{major}.{minor}.{patch}
- 版本号存储在 `version.py` 文件中
### 构建约定
- 使用 `nicegui-pack` 进行打包
- 单文件模式(--onefile
- 无窗口模式(--windowed
- 输出文件名包含版本号
### 测试约定
- GUI测试使用 frontend-tester agent
- Python代码测试使用 python-pro agent
- 测试覆盖率要求:核心算法部分 > 80%
## 常见问题
### Q1: MST算法显示极高的成本和损耗
**A**: 这是预期行为。MST算法不考虑载流量约束会产生单一树状结构导致根部电缆严重过载。这仅作为理论基准参考。
### Q2: 如何在CAD图纸中找到图形
**A**: 双击鼠标滚轮Zoom Extents全屏显示。风机坐标通常是大地坐标数值很大如果CAD当前视口在(0,0)附近,可能会找不到图形。
### Q3: 可选电缆的使用规则是什么?
**A**:
- 可选电缆Optional='Y')最多只能有一条
- 必须是列表中截面最大的电缆
- 用于特定场景(如增加单回路容量)
## 技术支持
- **适用对象**:海上能源业务开发部 - 电气专业
- **技术支持**:杜孟远
- **文档版本**v1.0
- **编制日期**2026年1月5日
## 许可证
本项目仅供工程学习、研究和初步设计评估使用。详细计算应以专业设计院规范为准。

193
ga.py Normal file
View File

@@ -0,0 +1,193 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
def design_with_ga(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
pop_size=50,
generations=50,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用遗传算法优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param pop_size: 种群大小
:param generations: 迭代代数
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0 # 默认值
total_power = turbines["power"].sum()
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
# 预计算距离矩阵
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
def fitness(chromosome):
cluster_assign = chromosome
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
# 连接到升压站
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
eval_res = evaluate_func(
turbines,
connections,
substation,
cable_specs,
is_offshore=False,
method_name="GA",
voltage=voltage,
power_factor=power_factor,
)
if system_params and total_invest_func:
res_list = total_invest_func(
[
{
"cost": eval_res["total_cost"],
"loss": eval_res["total_loss"],
"eval": eval_res,
}
],
system_params,
)
return res_list[0]["total_cost_npv"]
return eval_res["total_cost"]
def init_individual():
assign = np.zeros(n_turbines, dtype=int)
cluster_powers = np.zeros(max_clusters)
for i in range(n_turbines):
p = turbines.iloc[i]["power"]
possible = [
c for c in range(max_clusters) if cluster_powers[c] + p <= max_mw
]
if possible:
c = random.choice(possible)
else:
c = random.randint(0, max_clusters - 1)
assign[i] = c
cluster_powers[c] += p
return assign.tolist()
population = [init_individual() for _ in range(pop_size)]
best = None
best_fitness = float("inf")
for gen in range(generations):
fitnesses = [fitness(ind) for ind in population]
min_fit = min(fitnesses)
if min_fit < best_fitness:
best_fitness = min_fit
best = population[fitnesses.index(min_fit)].copy()
def tournament(size=3):
candidates = random.sample(list(zip(population, fitnesses)), size)
return min(candidates, key=lambda x: x[1])[0]
selected = [tournament() for _ in range(pop_size)]
new_pop = []
for i in range(0, pop_size, 2):
p1 = selected[i]
p2 = selected[i + 1] if i + 1 < pop_size else selected[0]
if random.random() < 0.8:
point = random.randint(1, n_turbines - 1)
child1 = p1[:point] + p2[point:]
child2 = p2[:point] + p1[point:]
else:
child1, child2 = p1.copy(), p2.copy()
new_pop.extend([child1, child2])
for ind in new_pop:
if random.random() < 0.1:
idx = random.randint(0, n_turbines - 1)
old_c = ind[idx]
new_c = random.randint(0, max_clusters - 1)
ind[idx] = new_c
cluster_powers = defaultdict(float)
for j, c in enumerate(ind):
cluster_powers[c] += turbines.iloc[j]["power"]
if max(cluster_powers.values()) > max_mw:
ind[idx] = max_clusters
max_clusters += 1
elites = sorted(zip(population, fitnesses), key=lambda x: x[1])[
: int(0.1 * pop_size)
]
new_pop[: len(elites)] = [e[0] for e in elites]
population = new_pop[:pop_size]
# 解码最佳个体
cluster_assign = best
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
return connections, turbines

View File

@@ -38,15 +38,15 @@ def create_template(output_file='windfarm_template.xlsx'):
# Create Cable data
cable_data = [
{'CrossSection': 35, 'Capacity': 150, 'Resistance': 0.524, 'Cost': 80, 'Optional': ''},
{'CrossSection': 70, 'Capacity': 215, 'Resistance': 0.268, 'Cost': 120, 'Optional': ''},
{'CrossSection': 95, 'Capacity': 260, 'Resistance': 0.193, 'Cost': 150, 'Optional': ''},
{'CrossSection': 120, 'Capacity': 295, 'Resistance': 0.153, 'Cost': 180, 'Optional': ''},
{'CrossSection': 150, 'Capacity': 330, 'Resistance': 0.124, 'Cost': 220, 'Optional': ''},
{'CrossSection': 185, 'Capacity': 370, 'Resistance': 0.0991, 'Cost': 270, 'Optional': ''},
{'CrossSection': 240, 'Capacity': 425, 'Resistance': 0.0754, 'Cost': 350, 'Optional': ''},
{'CrossSection': 300, 'Capacity': 500, 'Resistance': 0.0601, 'Cost': 450, 'Optional': ''},
{'CrossSection': 400, 'Capacity': 580, 'Resistance': 0.0470, 'Cost': 600, 'Optional': ''}
{'CrossSection': 35, 'Capacity': 150, 'Resistance': 0.524, 'Cost': 8, 'Optional': ''},
{'CrossSection': 70, 'Capacity': 215, 'Resistance': 0.268, 'Cost': 12, 'Optional': ''},
{'CrossSection': 95, 'Capacity': 260, 'Resistance': 0.193, 'Cost': 15, 'Optional': ''},
{'CrossSection': 120, 'Capacity': 295, 'Resistance': 0.153, 'Cost': 18, 'Optional': ''},
{'CrossSection': 150, 'Capacity': 330, 'Resistance': 0.124, 'Cost': 22, 'Optional': ''},
{'CrossSection': 185, 'Capacity': 370, 'Resistance': 0.0991, 'Cost': 27, 'Optional': ''},
{'CrossSection': 240, 'Capacity': 425, 'Resistance': 0.0754, 'Cost': 35, 'Optional': ''},
{'CrossSection': 300, 'Capacity': 500, 'Resistance': 0.0601, 'Cost': 45, 'Optional': ''},
{'CrossSection': 400, 'Capacity': 580, 'Resistance': 0.0470, 'Cost': 60, 'Optional': ''}
]
df_cables = pd.DataFrame(cable_data)
@@ -54,7 +54,10 @@ def create_template(output_file='windfarm_template.xlsx'):
param_data = [
{'Parameter': 'Voltage (kV) / 电压 (kV)', 'Value': 66},
{'Parameter': 'Power Factor / 功率因数', 'Value': 0.95},
{'Parameter': 'Electricity Price (元/kWh) / 电价 (元/kWh)', 'Value': 0.4}
{'Parameter': 'Electricity Price (元/kWh) / 电价 (元/kWh)', 'Value': 0.4},
{'Parameter': 'Project Lifetime (years) / 工程运行期限/年', 'Value': 25},
{'Parameter': 'Discount Rate (%) / 折现率%', 'Value': 8},
{'Parameter': 'Annual Loss Hours (hours) / 年损耗小时数/小时', 'Value': 1400}
]
df_params = pd.DataFrame(param_data)

512
gui.py
View File

@@ -1,20 +1,25 @@
import contextlib
import io
import os
import sys
import io
import contextlib
import tempfile
import matplotlib.pyplot as plt
import matplotlib
matplotlib.use("Agg")
import matplotlib.backends.backend_svg
from nicegui import ui, events, app
import matplotlib.pyplot as plt
import pandas as pd
from nicegui import app, events, ui, run
from main import (
compare_design_methods,
export_to_dxf,
load_data_from_excel,
generate_wind_farm_data,
visualize_design,
export_all_scenarios_to_excel,
export_to_dxf,
generate_wind_farm_data,
load_data_from_excel,
visualize_design,
)
import pandas as pd
# 尝试导入自动生成的版本号
try:
@@ -85,6 +90,9 @@ def index():
"run_btn": None,
"current_file_container": None, # 替换 label 为 container
"info_container": None, # 新增信息展示容器
"ga_switch": None, # 遗传算法开关
"mip_switch": None, # MIP开关
"log_content": "", # 存储计算日志内容
}
def update_info_panel():
@@ -143,6 +151,51 @@ def index():
ep_str += " (默认)"
params_text.append(ep_str)
# 获取工程运行期限
lifetime = 25 # Default
is_default_lifetime = True
if (
state.get("system_params")
and "project_lifetime" in state["system_params"]
):
lifetime = state["system_params"]["project_lifetime"]
is_default_lifetime = False
lifetime_str = f"工程运行期限: {lifetime}"
if is_default_lifetime:
lifetime_str += " (默认)"
params_text.append(lifetime_str)
# 获取折现率
discount_rate = 8 # Default
is_default_discount = True
if (
state.get("system_params")
and "discount_rate" in state["system_params"]
):
discount_rate = state["system_params"]["discount_rate"]
is_default_discount = False
discount_str = f"折现率: {discount_rate}%"
if is_default_discount:
discount_str += " (默认)"
params_text.append(discount_str)
# 获取年损耗小时数
annual_hours = 1400 # Default
is_default_hours = True
if (
state.get("system_params")
and "annual_loss_hours" in state["system_params"]
):
annual_hours = state["system_params"]["annual_loss_hours"]
is_default_hours = False
hours_str = f"年损耗小时数: {annual_hours} 小时"
if is_default_hours:
hours_str += " (默认)"
params_text.append(hours_str)
for p in params_text:
ui.chip(p, icon="bolt").props("outline color=primary")
@@ -175,7 +228,7 @@ def index():
},
{
"name": "cost",
"label": "参考单价 (元/m)",
"label": "参考单价(元/km)",
"field": "cost",
"align": "center",
},
@@ -194,7 +247,9 @@ def index():
"section": spec[0],
"capacity": spec[1],
"resistance": spec[2],
"cost": spec[3],
"cost": f"{spec[3] / 10:.2f}"
if spec[3] is not None
else "0.00",
"is_optional": "Y" if len(spec) > 4 and spec[4] else "",
}
)
@@ -288,6 +343,16 @@ def index():
update_info_panel()
# 清空方案对比结果和拓扑可视化
state["results"] = []
if refs["results_table"]:
refs["results_table"].rows = []
refs["results_table"].selected = []
if refs["plot_container"]:
refs["plot_container"].clear()
if refs["export_row"]:
refs["export_row"].clear()
# 清空上传组件列表,以便下次选择(配合 .no-list CSS 使用)
if refs["upload_widget"]:
refs["upload_widget"].reset()
@@ -295,7 +360,9 @@ def index():
except Exception as ex:
ui.notify(f"上传处理失败: {ex}", type="negative")
async def save_file_with_dialog(filename, callback, file_filter="All files (*.*)"):
async def save_file_with_dialog(
filename, callback, file_filter="All files (*.*)", sender=None
):
"""
跨平台文件保存助手。
如果是原生模式,弹出系统保存对话框。
@@ -304,136 +371,84 @@ def index():
:param filename: 默认文件名
:param callback: 接收文件路径并执行保存操作的函数 (filepath) -> None
:param file_filter: 格式如 "Excel Files (*.xlsx)"
:param sender: 触发该操作的 UI 组件,用于在操作期间禁用以防重复点击
"""
# 检测是否为原生模式 (PyWebview)
is_native = False
native_window = None
if sender:
sender.disable()
try:
# 使用 getattr 安全获取 app.native避免属性不存在错误
# 并在 reload=True 时 native 可能未能正确初始化
n_obj = getattr(app, "native", None)
if n_obj and getattr(n_obj, "main_window", None):
is_native = True
native_window = n_obj.main_window
except Exception as e:
print(f"DEBUG: Native check error: {e}")
# 方案:使用 PowerShell 弹出原生保存对话框 (仅限 Windows)
import platform
import subprocess
print(
f"DEBUG: save_file_with_dialog called. is_native={is_native}, filename={filename}"
if platform.system() == "Windows":
try:
# 构建 PowerShell 脚本
# 注意:过滤器格式为 "描述|*.ext|所有文件|*.*"
ps_filter = file_filter.replace("(", "|").replace(")", "")
if "|" not in ps_filter:
ps_filter += f"|{os.path.splitext(filename)[1] or '*.*'}"
# 简单清洗 filter 字符串以适应 PowerShell (e.g., "Excel Files *.xlsx" -> "Excel Files|*.xlsx")
# 这里做一个简化的映射,确保格式正确
if "Excel" in file_filter:
ps_filter = "Excel Files (*.xlsx)|*.xlsx|All Files (*.*)|*.*"
elif "DXF" in file_filter:
ps_filter = "DXF Files (*.dxf)|*.dxf|All Files (*.*)|*.*"
elif "ZIP" in file_filter:
ps_filter = "ZIP Archives (*.zip)|*.zip|All Files (*.*)|*.*"
else:
ps_filter = "All Files (*.*)|*.*"
ps_script = f"""
Add-Type -AssemblyName System.Windows.Forms
$d = New-Object System.Windows.Forms.SaveFileDialog
$d.Filter = "{ps_filter}"
$d.FileName = "{filename}"
$d.Title = "保存文件"
if ($d.ShowDialog() -eq [System.Windows.Forms.DialogResult]::OK) {{
Write-Output $d.FileName
}}
"""
# 运行 PowerShell
# 使用 startupinfo 隐藏控制台窗口 (防止黑框闪烁)
startupinfo = subprocess.STARTUPINFO()
startupinfo.dwFlags |= subprocess.STARTF_USESHOWWINDOW
print("DEBUG: invoking PowerShell SaveFileDialog...")
# 使用 run.io_bound 在后台线程执行,避免阻塞主事件循环
# 这样按钮的禁用状态可以立即同步到前端
result = await run.io_bound(
subprocess.run,
["powershell", "-Command", ps_script],
capture_output=True,
text=True,
startupinfo=startupinfo,
)
if is_native and native_window:
try:
# PyWebview 的 create_file_dialog 的 file_types 参数期望一个字符串元组
# 格式如: ('Description (*.ext)', 'All files (*.*)')
file_types = (file_filter,)
print(f"DEBUG: calling create_file_dialog with types={file_types}")
# 在 Native 模式下create_file_dialog 是同步阻塞的
# 注意:必须使用 app.native.SAVE_DIALOG
save_path = native_window.create_file_dialog(
app.native.SAVE_DIALOG,
directory="",
save_filename=filename,
file_types=file_types,
)
print(f"DEBUG: save_path result: {save_path}")
# 用户取消
if not save_path:
return
# 处理返回类型 (PyWebview 可能返回字符串或列表)
if isinstance(save_path, (list, tuple)):
if not save_path:
return
save_path = save_path[0]
# 确保文件名后缀正确
if not save_path.lower().endswith(
os.path.splitext(filename)[1].lower()
):
save_path += os.path.splitext(filename)[1]
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return # 成功处理,退出
except Exception as e:
import traceback
traceback.print_exc()
print(f"ERROR in save_file_with_dialog (native): {e}")
# ui.notify(f"原生保存失败,尝试其他方式: {e}", type="warning")
print(f"原生保存失败,尝试其他方式: {e}")
# 继续向下执行,尝试 fallback
# 非 Native 模式 (或 Native 失败),尝试使用 Tkinter (仅限本地环境)
try:
import tkinter as tk
from tkinter import filedialog
from nicegui import run
print("DEBUG: Attempting Tkinter dialog...")
def get_save_path_tk(default_name, f_filter):
try:
# 创建隐藏的根窗口
root = tk.Tk()
root.withdraw()
root.attributes("-topmost", True) # 尝试置顶
# 转换 filter 格式: "Excel Files (*.xlsx)" -> [("Excel Files", "*.xlsx")]
filetypes = []
if "(" in f_filter and ")" in f_filter:
desc = f_filter.split("(")[0].strip()
ext = f_filter.split("(")[1].split(")")[0]
filetypes.append((desc, ext))
filetypes.append(("All files", "*.*"))
path = filedialog.asksaveasfilename(
initialfile=default_name, filetypes=filetypes, title="保存文件"
)
root.destroy()
return path
except Exception as ex:
print(f"Tkinter inner error: {ex}")
return None
# 在线程中运行 tkinter避免阻塞 asyncio 事件循环
save_path = await run.io_bound(get_save_path_tk, filename, file_filter)
save_path = result.stdout.strip()
if save_path:
print(f"DEBUG: Tkinter save_path: {save_path}")
# 确保文件名后缀正确
if not save_path.lower().endswith(
os.path.splitext(filename)[1].lower()
):
save_path += os.path.splitext(filename)[1]
print(f"DEBUG: PowerShell returned path: {save_path}")
await callback(save_path)
ui.notify(f"文件已保存至: {save_path}", type="positive")
return # 成功处理
elif save_path is None:
print("DEBUG: Tkinter dialog cancelled or failed silently.")
# 如果是用户取消(返回空字符串),通常不需要回退到下载。
# 但这里如果 Tkinter 彻底失败返回 None可能需要回退。
# askopenfilename 返回空字符串表示取消。我们假设 None 是异常。
# 这里简化处理:只要没拿到路径且没报错,就认为是取消。
if save_path == "":
return
else:
print("DEBUG: PowerShell dialog cancelled or empty result.")
# 用户取消,直接返回,不回退
return
except Exception as e:
print(f"Tkinter dialog failed: {e}")
# Fallback to ui.download if tkinter fails
print(f"PowerShell dialog failed: {e}")
# 出错则回退到 ui.download
# 最后的回退方案:浏览器下载
print("DEBUG: Falling back to ui.download")
# 统一回退方案:浏览器下载
print("DEBUG: Using ui.download fallback")
temp_path = os.path.join(state["temp_dir"], filename)
await callback(temp_path)
ui.download(temp_path)
finally:
if sender:
sender.enable()
def update_export_buttons():
if refs["export_row"]:
@@ -464,7 +479,6 @@ def index():
best_res = min(state["results"], key=lambda x: x["cost"])
with refs["export_row"]:
# --- 下载 Excel ---
async def save_excel(path):
import shutil
@@ -476,9 +490,12 @@ def index():
# 如果不存在,重新生成
export_all_scenarios_to_excel(state["results"], path)
async def on_click_excel():
async def on_click_excel(e):
await save_file_with_dialog(
default_excel_name, save_excel, "Excel Files (*.xlsx)"
default_excel_name,
save_excel,
"Excel Files (*.xlsx)",
sender=e.sender,
)
ui.button(
@@ -487,41 +504,7 @@ def index():
).props("icon=download")
# --- 导出推荐方案 DXF ---
def export_best_dxf():
if state["substation"] is not None:
safe_name = "".join(
[
c
for c in best_res["name"]
if c.isalnum() or c in (" ", "-", "_")
]
).strip()
default_name = f"{file_prefix}_best_{safe_name}.dxf"
async def save_dxf(path):
export_to_dxf(
best_res["turbines"],
state["substation"],
best_res["eval"]["details"],
path,
)
# 包装为 async 任务,并在 NiceGUI 事件循环中执行
async def run_save():
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
)
# 这里的 export_best_dxf 本身是普通函数,绑定到 on_click
# 但我们需要它执行异步操作。最简单的是让 export_best_dxf 变为 async
# 或者在这里直接调用 run_save (但这在普通函数里不行)
# 更好的方法是将 export_best_dxf 定义为 async如下所示
return run_save()
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
# 将 export_best_dxf 改为 async 并重命名,以便直接用作回调
async def on_click_best_dxf():
async def on_click_best_dxf(e):
if state["substation"] is not None:
safe_name = "".join(
[
@@ -541,17 +524,17 @@ def index():
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
)
else:
ui.notify("缺少升压站数据,无法导出 DXF", type="negative")
ui.button(
f'导出推荐方案 DXF ({best_res["name"]})', on_click=on_click_best_dxf
f"导出推荐方案 DXF ({best_res['name']})", on_click=on_click_best_dxf
).props("icon=architecture color=accent")
# --- 导出选中方案 DXF ---
async def on_click_selected_dxf():
async def on_click_selected_dxf(e):
if not refs["results_table"] or not refs["results_table"].selected:
ui.notify("请先在上方表格中选择一个方案", type="warning")
return
@@ -581,7 +564,7 @@ def index():
)
await save_file_with_dialog(
default_name, save_dxf, "DXF Files (*.dxf)"
default_name, save_dxf, "DXF Files (*.dxf)", sender=e.sender
)
else:
ui.notify(
@@ -596,7 +579,7 @@ def index():
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
# --- 导出全部 ZIP ---
async def on_click_all_dxf():
async def on_click_all_dxf(e):
if not state["results"] or state["substation"] is None:
ui.notify("无方案数据可导出", type="warning")
return
@@ -642,12 +625,71 @@ def index():
except:
pass
await save_file_with_dialog(default_name, save_zip, "ZIP Files (*.zip)")
await save_file_with_dialog(
default_name, save_zip, "ZIP Files (*.zip)", sender=e.sender
)
ui.button("导出全部方案 DXF (ZIP)", on_click=on_click_all_dxf).props(
"icon=folder_zip color=secondary"
)
# --- 导出计算日志 ---
async def on_click_export_log(e):
# 尝试多种方式获取日志内容
log_content = ""
method_used = "unknown"
# 方法1: 首先尝试从保存的日志内容获取
if refs.get("log_content") and refs["log_content"].strip():
log_content = refs["log_content"]
method_used = "saved_memory"
# 方法2: 如果保存的日志为空,尝试使用 JavaScript 获取 log 组件的内容
if not log_content.strip() and refs["log_box"]:
try:
log_id = refs["log_box"].id
js_code = f"""
(function() {{
const logElement = document.querySelector("#c{log_id}");
if (logElement) {{
console.log("Found log element:", logElement);
return logElement.innerText || logElement.textContent || "";
}}
console.log("Log element not found for ID: c{log_id}");
return "";
}})()
"""
result = await ui.run_javascript(js_code)
if result and result.strip():
log_content = result
method_used = "javascript"
except Exception as js_error:
print(f"JavaScript method failed: {js_error}")
if not log_content.strip():
ui.notify(
"没有可导出的日志内容。请先运行计算任务。", type="warning"
)
print(f"Log export failed. Method tried: {method_used}")
return
print(
f"Successfully exported log using method: {method_used}, length: {len(log_content)}"
)
default_name = f"{file_prefix}_calculation_log.txt"
async def save_log(path):
with open(path, "w", encoding="utf-8") as f:
f.write(log_content)
await save_file_with_dialog(
default_name, save_log, "Text Files (*.txt)", sender=e.sender
)
ui.button("导出计算日志", on_click=on_click_export_log).props(
"icon=description color=info"
)
def update_plot(result):
if refs["plot_container"]:
refs["plot_container"].clear()
@@ -694,7 +736,6 @@ def index():
clean_name = row_name.replace("(推荐) ", "")
refs["export_selected_btn"].set_text(f"导出选中方案 ({clean_name})")
from nicegui import run
import queue
async def run_analysis():
@@ -703,8 +744,15 @@ def index():
return
if refs["log_box"]:
refs["log_box"].clear()
# 重置日志内容
refs["log_content"] = ""
log_queue = queue.Queue()
# 获取开关状态
use_ga = refs["ga_switch"].value if refs["ga_switch"] else False
use_mip = refs["mip_switch"].value if refs["mip_switch"] else False
print(f"Switch values: GA={use_ga}, MIP={use_mip}")
class QueueLogger(io.StringIO):
def write(self, message):
if message and message.strip():
@@ -718,13 +766,15 @@ def index():
try:
msg = log_queue.get_nowait()
refs["log_box"].push(msg)
# 同时保存到日志内容中
refs["log_content"] += msg + "\n"
new_msg = True
if msg.startswith("--- Scenario"):
scenario_name = msg.replace("---", "").strip()
if refs["status_label"]:
refs["status_label"].text = (
f"正在计算: {scenario_name}..."
)
refs[
"status_label"
].text = f"正在计算: {scenario_name}..."
elif "开始比较电缆方案" in msg:
if refs["status_label"]:
refs["status_label"].text = "准备开始计算..."
@@ -752,6 +802,8 @@ def index():
n_clusters_override=None,
interactive=False,
plot_results=False,
use_ga=use_ga,
use_mip=use_mip,
)
# 在后台线程运行计算任务
@@ -777,7 +829,7 @@ def index():
update_plot(best_res)
ui.notify(
f'计算完成!已自动加载推荐方案: {best_res["name"]}', type="positive"
f"计算完成!已自动加载推荐方案: {best_res['name']}", type="positive"
)
# 更新结果表格
@@ -813,14 +865,25 @@ def index():
elif "No Max" in original_name:
note += "不包含可选电缆型号,且可使用的最大截面电缆降一档截面。"
# 计算总长度
total_length = sum(d["length"] for d in res["eval"]["details"])
# 计算总长度(转换为公里)
total_length_m = sum(d["length"] for d in res["eval"]["details"])
total_length_km = total_length_m / 1000
# 获取回路数 (通过统计从升压站发出的连接)
n_circuits = sum(
1
for d in res["eval"]["details"]
if d["source"] == "substation" or d["target"] == "substation"
)
row_dict = {
"name": name_display,
"n_circuits": n_circuits,
"cost_wan": f"{res['cost'] / 10000:.2f}",
"loss_kw": f"{res['loss']:.2f}",
"total_length": f"{total_length:.2f}",
"total_length": f"{total_length_km:.2f}",
"npv_loss_wan": f"{res.get('npv_loss', 0) / 10000:.2f}",
"total_cost_npv_wan": f"{res.get('total_cost_npv', res['cost']) / 10000:.2f}",
"note": note,
"original_name": res["name"],
}
@@ -881,14 +944,18 @@ def index():
ui.label("配置面板").classes("text-xl font-semibold mb-4 border-b pb-2")
# 使用 items-stretch 确保所有子元素高度一致
with ui.row().classes('w-full items-stretch gap-4'):
with ui.row().classes("w-full items-stretch gap-4"):
# 1. 导出模板按钮
async def export_template():
from generate_template import create_template
async def export_template(e):
import shutil
from generate_template import create_template
async def save_template(path):
# 生成模板到系统临时目录
temp_template = os.path.join(state["temp_dir"], "coordinates_template.xlsx")
temp_template = os.path.join(
state["temp_dir"], "coordinates_template.xlsx"
)
create_template(temp_template)
if os.path.exists(temp_template):
shutil.copy2(temp_template, path)
@@ -902,7 +969,8 @@ def index():
await save_file_with_dialog(
"coordinates.xlsx",
save_template,
"Excel Files (*.xlsx)"
"Excel Files (*.xlsx)",
sender=e.sender,
)
ui.button("导出 Excel 模板", on_click=export_template).classes(
@@ -910,11 +978,17 @@ def index():
).props("icon=file_download outline color=primary")
# 2. 上传文件区域 (垂直堆叠 Label 和 Upload 组件)
with ui.column().classes('flex-1 gap-0 justify-between'):
with ui.column().classes("flex-1 gap-0 justify-between"):
# 使用 .no-list CSS 隐藏 Quasar 默认列表,完全自定义文件显示
refs["upload_widget"] = ui.upload(
label="选择Excel文件", on_upload=handle_upload, auto_upload=True
).classes("w-full no-list h-full").props('flat bordered color=primary')
refs["upload_widget"] = (
ui.upload(
label="选择Excel文件",
on_upload=handle_upload,
auto_upload=True,
)
.classes("w-full no-list h-full")
.props("flat bordered color=primary")
)
# 自定义文件显示容器
refs["current_file_container"] = ui.column().classes("w-full")
@@ -931,6 +1005,17 @@ def index():
.classes("flex-1 py-4")
.props("icon=play_arrow color=secondary")
)
# 4. 遗传算法开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["ga_switch"] = ui.switch("启用遗传算法", value=False).props(
"color=orange"
)
# 5. MIP开关
with ui.column().classes("flex-1 gap-0 justify-center items-center"):
refs["mip_switch"] = ui.switch("启用MIP", value=False).props(
"color=blue"
)
with ui.column().classes("w-full gap-4"):
# 新增:信息展示卡片
@@ -946,9 +1031,15 @@ def index():
)
with ui.card().classes("w-full p-0 shadow-md overflow-hidden"):
with ui.expansion(
"方案对比结果 (点击行查看拓扑详情)", icon="analytics", value=True
).classes("w-full").props("header-class=\"text-xl font-semibold\""):
with (
ui.expansion(
"方案对比结果 (点击行查看拓扑详情)",
icon="analytics",
value=True,
)
.classes("w-full")
.props('header-class="text-xl font-semibold"')
):
columns = [
{
"name": "name",
@@ -957,6 +1048,12 @@ def index():
"required": True,
"align": "left",
},
{
"name": "n_circuits",
"label": "回路数",
"field": "n_circuits",
"sortable": True,
},
{
"name": "cost_wan",
"label": "总投资 (万元)",
@@ -971,16 +1068,29 @@ def index():
},
{
"name": "total_length",
"label": "总长度 (m)",
"label": "总长度/km",
"field": "total_length",
"sortable": True,
},
{
"name": "npv_loss_wan",
"label": "损耗费用净现值 (万元)",
"field": "npv_loss_wan",
"sortable": True,
},
{
"name": "total_cost_npv_wan",
"label": "总费用 (万元)",
"field": "total_cost_npv_wan",
"sortable": True,
},
{
"name": "note",
"label": "备注",
"field": "note",
"align": "left",
}, ]
},
]
# 使用内置的 selection='single' 结合行点击事件实现背景高亮
# 这样可以完全由 Python 事件逻辑控制,不依赖 CSS 伪类
refs["results_table"] = ui.table(
@@ -1055,12 +1165,18 @@ if getattr(sys, "frozen", False):
)
else:
# 普通使用环境保留日志功能
ui.run(title="海上风电场集电线路优化", host='127.0.0.1', reload=True, port=target_port, native=False)
# ui.run(
# title="海上风电场集电线路优化",
# host="127.0.0.1",
# port=target_port,
# reload=True,
# window_size=(1280, 800),
# native=True,
# port=target_port,
# native=False,
# )
ui.run(
title="海上风电场集电线路优化",
host="127.0.0.1",
port=target_port,
reload=True,
window_size=(1280, 800),
native=True,
)

1404
main.py

File diff suppressed because it is too large Load Diff

472
mip.py Normal file
View File

@@ -0,0 +1,472 @@
import numpy as np
import pandas as pd
from scipy.spatial import distance_matrix
from scipy.sparse.csgraph import minimum_spanning_tree
from collections import defaultdict
import random
try:
import pulp
pulp_available = True
except ImportError:
pulp = None
pulp_available = False
try:
import pyomo.environ as pyo_env
pyomo_available = True
except (ImportError, AttributeError):
pyomo_available = False
print("Pyomo not available, falling back to PuLP")
def design_with_pyomo(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用Pyomo求解器优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0
total_power = turbines["power"].sum()
if max_clusters is None:
max_clusters = int(np.ceil(total_power / max_mw))
n_turbines = len(turbines)
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
# Simple fallback for now - use PuLP instead
print("Pyomo not fully implemented, falling back to PuLP")
return design_with_mip(
turbines,
substation,
cable_specs,
voltage,
power_factor,
system_params,
max_clusters,
time_limit,
evaluate_func,
total_invest_func,
get_max_capacity_func,
)
def design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=300,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
):
"""
使用混合整数规划(MIP)优化集电线路布局
:param turbines: 风机DataFrame
:param substation: 升压站坐标
:param cable_specs: 电缆规格
:param system_params: 系统参数用于NPV计算
:param max_clusters: 最大簇数,默认基于功率计算
:param time_limit: 求解时间限制(秒)
:param evaluate_func: 评估函数
:param total_invest_func: 总投资计算函数
:param get_max_capacity_func: 获取最大容量函数
:return: 连接列表和带有簇信息的turbines
"""
if not pulp_available:
print(
"WARNING: PuLP library not available. MIP optimization skipped, falling back to MST."
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if get_max_capacity_func:
max_mw = get_max_capacity_func(cable_specs, voltage, power_factor)
else:
max_mw = 100.0
if max_clusters is None:
max_clusters = int(np.ceil(turbines["power"].sum() / max_mw))
n_turbines = len(turbines)
print(
f"MIP Model Setup: n_turbines={n_turbines}, max_clusters={max_clusters}, max_mw={max_mw:.2f} MW"
)
all_coords = np.vstack([substation, turbines[["x", "y"]].values])
dist_matrix_full = distance_matrix(all_coords, all_coords)
prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize)
# Create all decision variables upfront to avoid duplicates
assign_vars = {}
for i in range(n_turbines):
for k in range(max_clusters):
assign_vars[(i, k)] = pulp.LpVariable(f"assign_{i}_{k}", cat="Binary")
cluster_vars = {}
for k in range(max_clusters):
cluster_vars[k] = pulp.LpVariable(f"cluster_{k}", cat="Binary")
# Helper functions to access variables
def assign_var(i, k):
return assign_vars[(i, k)]
def cluster_var(k):
return cluster_vars[k]
# Simplified objective function: minimize total distance
prob += pulp.lpSum(
[
dist_matrix_full[0, i + 1] * assign_var(i, k)
for i in range(n_turbines)
for k in range(max_clusters)
]
)
for i in range(n_turbines):
prob += pulp.lpSum([assign_var(i, k) for k in range(max_clusters)]) == 1
for k in range(max_clusters):
cluster_power = pulp.lpSum(
[turbines.iloc[i]["power"] * assign_var(i, k) for i in range(n_turbines)]
)
prob += cluster_power <= max_mw * 1.2 * cluster_var(k)
for k in range(max_clusters):
for i in range(n_turbines):
prob += assign_var(i, k) <= cluster_var(k)
print(
f"MIP Model: {len(prob.variables())} variables, {len(prob.constraints)} constraints"
)
# Debug: Print model structure
print("MIP model structure check:")
print(f" Variables: {len(prob.variables())}")
print(f" Constraints: {len(prob.constraints)}")
print(f" Time limit: {time_limit}s")
print(f" Turbines: {n_turbines}, Clusters: {max_clusters}")
# Test solver availability
try:
import subprocess
test_solver = subprocess.run(
[
r"D:\code\windfarm\.venv\Lib\site-packages\pulp\apis\..\solverdir\cbc\win\i64\cbc.exe",
"-version",
],
capture_output=True,
text=True,
timeout=5,
)
print(
f"CBC solver test: {test_solver.stdout[:100] if test_solver.stdout else 'No output'}"
)
except Exception as solver_test_error:
print(f"CBC solver test failed: {solver_test_error}")
print("MIP: Starting to solve...")
try:
# Try to use CBC solver with different configurations
solver = pulp.PULP_CBC_CMD(
timeLimit=time_limit,
msg=False,
warmStart=False,
)
print(f"Using CBC solver with time limit: {time_limit}s")
status = prob.solve(solver)
print(
f"MIP: Solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e:
print(f"MIP: CBC solver execution failed: {e}")
# Try alternative solver configurations
try:
print("MIP: Trying alternative solver configuration...")
solver = pulp.PULP_CBC_CMD(
msg=True, # Enable messages for debugging
threads=1, # Single thread
timeLimit=time_limit,
)
status = prob.solve(solver)
print(
f"MIP: Alternative solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e2:
print(f"MIP: All solver attempts failed: {e2}, falling back to MST")
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
if pulp.LpStatus[prob.status] != "Optimal":
print(
f"MIP solver status: {pulp.LpStatus[prob.status]}, solution not found, falling back to MST"
)
print("Model feasibility check:")
print(f"Total power: {turbines['power'].sum():.2f} MW")
print(f"Max cluster capacity: {max_mw:.2f} MW")
print(f"Number of clusters: {max_clusters}, Number of turbines: {n_turbines}")
for k in range(max_clusters):
cluster_power = pulp.value(
pulp.lpSum(
[
turbines.iloc[i]["power"] * assign_var(i, k)
for i in range(n_turbines)
]
)
)
cluster_used = pulp.value(cluster_var(k))
print(
f"Cluster {k}: Power={cluster_power:.2f} MW (max {max_mw * 1.2:.2f}), Used={cluster_used}"
)
from main import design_with_mst
connections = design_with_mst(turbines, substation)
return connections, turbines
cluster_assign = [-1] * n_turbines
active_clusters = []
for k in range(max_clusters):
if pulp.value(cluster_var(k)) > 0.5:
active_clusters.append(k)
for i in range(n_turbines):
assigned = False
for k in active_clusters:
if pulp.value(assign_var(i, k)) > 0.5:
cluster_assign[i] = k
assigned = True
break
if not assigned:
dists = [dist_matrix_full[0, i + 1] for k in active_clusters]
cluster_assign[i] = active_clusters[np.argmin(dists)]
clusters = defaultdict(list)
for i, c in enumerate(cluster_assign):
clusters[c].append(i)
connections = []
for c, members in clusters.items():
if len(members) == 0:
continue
coords = turbines.iloc[members][["x", "y"]].values
if len(members) > 1:
dm = distance_matrix(coords, coords)
mst = minimum_spanning_tree(dm).toarray()
for i in range(len(members)):
for j in range(len(members)):
if mst[i, j] > 0:
connections.append(
(
f"turbine_{members[i]}",
f"turbine_{members[j]}",
mst[i, j],
)
)
dists = [dist_matrix_full[0, m + 1] for m in members]
closest = members[np.argmin(dists)]
connections.append((f"turbine_{closest}", "substation", min(dists)))
turbines["cluster"] = cluster_assign
# Check cluster distances
min_cluster_distance = check_cluster_distances(clusters, turbines)
if min_cluster_distance is not None:
print(
f"Cluster validation: Minimum distance between clusters = {min_cluster_distance:.2f} m"
)
if min_cluster_distance < 1000:
print(
f"WARNING: Clusters are very close to each other ({min_cluster_distance:.2f} m < 1000 m)"
)
elif min_cluster_distance < 2000:
print(
f"NOTICE: Clusters are relatively close ({min_cluster_distance:.2f} m)"
)
# Check for cable crossings
cable_crossings = check_cable_crossings(connections, turbines, substation)
if cable_crossings:
print(
f"WARNING: Found {len(cable_crossings)} cable crossing(s) in the solution"
)
for i, (idx1, idx2, p1, p2, p3, p4) in enumerate(cable_crossings):
conn1 = connections[idx1]
conn2 = connections[idx2]
print(
f" Crossing {i + 1}: Connection {conn1[0]}-{conn1[1]} crosses {conn2[0]}-{conn2[1]}"
)
else:
print("No cable crossings detected in the solution")
print(
f"MIP optimization completed successfully, {len(connections)} connections generated"
)
return connections, turbines
def calculate_cluster_centroids(clusters, turbines):
"""Calculate the centroid coordinates for each cluster."""
centroids = {}
for c, members in clusters.items():
if len(members) == 0:
centroids[c] = (0, 0)
else:
coords = turbines.iloc[members][["x", "y"]].values
centroid_x = np.mean(coords[:, 0])
centroid_y = np.mean(coords[:, 1])
centroids[c] = (centroid_x, centroid_y)
return centroids
def check_cluster_distances(clusters, turbines, min_distance_threshold=1000):
"""Check if any clusters are too close to each other."""
if len(clusters) < 2:
return None
centroids = calculate_cluster_centroids(clusters, turbines)
active_clusters = [c for c, members in clusters.items() if len(members) > 0]
min_distance = float("inf")
min_pair = None
for i in range(len(active_clusters)):
for j in range(i + 1, len(active_clusters)):
c1, c2 = active_clusters[i], active_clusters[j]
centroid1 = np.array(centroids[c1])
centroid2 = np.array(centroids[c2])
distance = np.linalg.norm(centroid1 - centroid2)
if distance < min_distance:
min_distance = distance
min_pair = (c1, c2)
return min_distance
def check_cable_crossings(connections, turbines, substation):
"""Check if there are cable crossings in the solution."""
crossings = []
def line_intersection(p1, p2, p3, p4):
"""Check if line segments (p1,p2) and (p3,p4) intersect."""
x1, y1 = p1
x2, y2 = p2
x3, y3 = p3
x4, y4 = p4
denom = (y4 - y3) * (x2 - x1) - (x4 - x3) * (y2 - y1)
if abs(denom) < 1e-10:
return False
ua = ((x4 - x3) * (y1 - y3) - (y4 - y3) * (x1 - x3)) / denom
ub = ((x2 - x1) * (y1 - y3) - (y2 - y1) * (x1 - x3)) / denom
return 0 <= ua <= 1 and 0 <= ub <= 1
def get_turbine_coord(connection_part):
"""Get coordinates from connection part (turbine_# or substation)."""
if connection_part == "substation":
# Handle different substation formats robustly
if isinstance(substation, np.ndarray):
if substation.ndim == 1:
# 1D array [x, y]
return (substation[0], substation[1])
elif substation.ndim == 2:
# 2D array [[x, y]] or shape (n, 2)
if substation.shape[0] == 1:
return (substation[0, 0], substation[0, 1])
else:
# Multiple points, use first one
return (substation[0, 0], substation[0, 1])
else:
# Unexpected dimension, try fallback
return (substation.flat[0], substation.flat[1])
elif isinstance(substation, (list, tuple)):
# List or tuple format
# Handle nested lists like [[x, y]]
if (
isinstance(substation[0], (list, tuple, np.ndarray))
and len(substation[0]) >= 2
):
return (substation[0][0], substation[0][1])
elif len(substation) >= 2:
return (substation[0], substation[1])
else:
return (float("inf"), float("inf"))
else:
# Unexpected format, try to convert
try:
sub_array = np.array(substation)
if sub_array.ndim == 1:
return (sub_array[0], sub_array[1])
else:
return (sub_array.flat[0], sub_array.flat[1])
except:
return (float("inf"), float("inf"))
else:
turbine_idx = int(connection_part.split("_")[1])
return (
turbines.iloc[turbine_idx]["x"],
turbines.iloc[turbine_idx]["y"],
)
for i in range(len(connections)):
for j in range(i + 1, len(connections)):
conn1 = connections[i]
conn2 = connections[j]
p1 = get_turbine_coord(conn1[0])
p2 = get_turbine_coord(conn1[1])
p3 = get_turbine_coord(conn2[0])
p4 = get_turbine_coord(conn2[1])
if (
np.array_equal(p1, p3)
or np.array_equal(p1, p4)
or np.array_equal(p2, p3)
or np.array_equal(p2, p4)
):
continue
if line_intersection(p1, p2, p3, p4):
crossings.append((i, j, p1, p2, p3, p4))
return crossings

View File

@@ -12,6 +12,8 @@ dependencies = [
"numpy>=2.4.0",
"openpyxl>=3.1.5",
"pandas>=2.3.3",
"pulp>=3.3.0",
"pyomo>=6.9.5",
"pywebview>=6.1",
"scikit-learn>=1.8.0",
"scipy>=1.16.3",

146
test_cbc_solver.py Normal file
View File

@@ -0,0 +1,146 @@
"""
Simple test to verify CBC solver functionality
"""
import pulp
import sys
import subprocess
import os
print("=== PuLP and CBC Solver Test ===")
print(f"Python version: {sys.version}")
print(f"PuLP version: {pulp.__version__}")
# Test 1: Check PuLP installation
print("\n1. Checking PuLP installation...")
try:
from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, lpSum, value
print("[OK] PuLP imported successfully")
except ImportError as e:
print(f"[FAIL] PuLP import failed: {e}")
sys.exit(1)
# Test 2: Check CBC solver file existence
print("\n2. Checking CBC solver file...")
solver_dir = os.path.join(
os.path.dirname(pulp.__file__), "apis", "..", "solverdir", "cbc", "win", "i64"
)
solver_path = os.path.join(solver_dir, "cbc.exe")
print(f"Looking for CBC at: {solver_path}")
if os.path.exists(solver_path):
print(f"[OK] CBC solver file found")
file_size = os.path.getsize(solver_path)
print(f" File size: {file_size:,} bytes ({file_size / 1024 / 1024:.2f} MB)")
else:
print(f"[FAIL] CBC solver file not found")
print(f" Checking directory contents:")
try:
parent_dir = os.path.dirname(solver_path)
if os.path.exists(parent_dir):
for item in os.listdir(parent_dir):
print(f" - {item}")
else:
print(f" Directory does not exist: {parent_dir}")
except Exception as e:
print(f" Error listing directory: {e}")
# Test 3: Try to run CBC solver directly
print("\n3. Testing CBC solver execution...")
if os.path.exists(solver_path):
try:
result = subprocess.run(
[solver_path, "-version"],
capture_output=True,
text=True,
timeout=10,
check=True,
)
print("[OK] CBC solver executed successfully")
print(f" Output: {result.stdout[:200]}")
except subprocess.CalledProcessError as e:
print(f"[FAIL] CBC solver execution failed (exit code {e.returncode})")
print(f" stdout: {e.stdout[:200]}")
print(f" stderr: {e.stderr[:200]}")
except subprocess.TimeoutExpired:
print("[FAIL] CBC solver execution timed out")
except Exception as e:
print(f"[FAIL] CBC solver execution error: {e}")
else:
print("[FAIL] Cannot test CBC execution - file not found")
# Test 4: Solve a simple linear programming problem
print("\n4. Testing simple LP problem...")
try:
# Simple problem: minimize x + y subject to x + y >= 5, x >= 0, y >= 0
prob = LpProblem("Simple_LP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Continuous")
y = LpVariable("y", lowBound=0, cat="Continuous")
prob += x + y # Objective: minimize x + y
prob += x + y >= 5 # Constraint
print(" Created simple LP problem: minimize x + y subject to x + y >= 5")
# Try to solve with CBC
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] LP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] LP problem solving failed: {e}")
import traceback
traceback.print_exc()
# Test 5: Solve a simple mixed integer programming problem
print("\n5. Testing simple MIP problem...")
try:
# Simple MIP: minimize x + y subject to x + y >= 5, x, y integers >= 0
prob = LpProblem("Simple_MIP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Integer")
y = LpVariable("y", lowBound=0, cat="Integer")
prob += x + y # Objective
prob += x + y >= 5 # Constraint
print(
" Created simple MIP problem: minimize x + y subject to x + y >= 5, x,y integers"
)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] MIP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] MIP problem solving failed: {e}")
import traceback
traceback.print_exc()
print("\n=== Test Complete ===")

50
test_mip.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Test script to verify MIP functionality
"""
import numpy as np
import pandas as pd
from mip import design_with_mip
# Create test data
np.random.seed(42)
n_turbines = 10
turbines = pd.DataFrame(
{
"x": np.random.uniform(0, 2000, n_turbines),
"y": np.random.uniform(0, 2000, n_turbines),
"power": np.random.uniform(5, 10, n_turbines),
}
)
substation = np.array([1000, 1000])
print("Test data created:")
print(f"Number of turbines: {n_turbines}")
print(f"Substation location: {substation}")
print(f"Total power: {turbines['power'].sum():.2f} MW")
# Test MIP function
print("\nTesting MIP design...")
try:
connections, turbines_with_clusters = design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=30,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
)
print(f"MIP test successful!")
print(f"Number of connections: {len(connections)}")
print(f"Clusters assigned: {turbines_with_clusters['cluster'].tolist()}")
except Exception as e:
print(f"MIP test failed with error: {e}")
import traceback
traceback.print_exc()

50
uv.lock generated
View File

@@ -1243,6 +1243,15 @@ wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/fc/f5/68334c015eed9b5cff77814258717dec591ded209ab5b6fb70e2ae873d1d/pillow-12.1.0-cp314-cp314t-win_arm64.whl", hash = "sha256:f61333d817698bdcdd0f9d7793e365ac3d2a21c1f1eb02b32ad6aefb8d8ea831" },
]
[[package]]
name = "ply"
version = "3.11"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/e5/69/882ee5c9d017149285cab114ebeab373308ef0f874fcdac9beb90e0ac4da/ply-3.11.tar.gz", hash = "sha256:00c7c1aaa88358b9c765b6d3000c6eec0ba42abca5351b095321aef446081da3" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a3/58/35da89ee790598a0700ea49b2a66594140f44dec458c07e8e3d4979137fc/ply-3.11-py2.py3-none-any.whl", hash = "sha256:096f9b8350b65ebd2fd1346b12452efe5b9607f7482813ffca50c22722a807ce" },
]
[[package]]
name = "propcache"
version = "0.4.1"
@@ -1333,6 +1342,15 @@ version = "0.1.0"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/f2/cf/77d3e19b7fabd03895caca7857ef51e4c409e0ca6b37ee6e9f7daa50b642/proxy_tools-0.1.0.tar.gz", hash = "sha256:ccb3751f529c047e2d8a58440d86b205303cf0fe8146f784d1cbcd94f0a28010" }
[[package]]
name = "pulp"
version = "3.3.0"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/16/1c/d880b739b841a8aa81143091c9bdda5e72e226a660aa13178cb312d4b27f/pulp-3.3.0.tar.gz", hash = "sha256:7eb99b9ce7beeb8bbb7ea9d1c919f02f003ab7867e0d1e322f2f2c26dd31c8ba" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/99/6c/64cafaceea3f99927e84b38a362ec6a8f24f33061c90bda77dfe1cd4c3c6/pulp-3.3.0-py3-none-any.whl", hash = "sha256:dd6ad2d63f196d1254eddf9dcff5cd224912c1f046120cb7c143c5b0eda63fae" },
]
[[package]]
name = "pycparser"
version = "2.23"
@@ -1571,6 +1589,34 @@ wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/86/637cda4983dc0936b73a385f3906256953ac434537b812814cb0b6d231a2/pyobjc_framework_webkit-12.1-cp314-cp314t-macosx_10_15_universal2.whl", hash = "sha256:1aaa3bf12c7b68e1a36c0b294d2728e06f2cc220775e6dc4541d5046290e4dc8" },
]
[[package]]
name = "pyomo"
version = "6.9.5"
source = { registry = "https://mirrors.pku.edu.cn/pypi/web/simple" }
dependencies = [
{ name = "ply" },
]
sdist = { url = "https://mirrors.pku.edu.cn/pypi/web/packages/87/d8/f32e0dcacc8219694709200d4402c86a6e28d3af50380a5ccf7f7e15ffae/pyomo-6.9.5.tar.gz", hash = "sha256:0734020fcd5cc03ee200fd3f79d143fbfc14e6be116e0d16bab79f3f89609879" }
wheels = [
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f8/63/5f163b231a924ba7a5f6c58466c751f70be88568fa446524b6e806c98e4b/pyomo-6.9.5-cp312-cp312-macosx_10_13_x86_64.whl", hash = "sha256:549ee4226cab6e2ff6efe5b3b9891ce1dfd866d38a024715315ea850fa1bf0ec" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/2d/bf/0cebcfce70be04d6d7aa19fbcbdeecdd5843caac617424f34ab3feb8e96e/pyomo-6.9.5-cp312-cp312-macosx_11_0_arm64.whl", hash = "sha256:b382cc8c3728199c8332024d64eed8622dabb3f8aebe5874c86a036489064f7a" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/14/27/967545514a2d0f4ca5ac6b595661cb0927cdcd10c3bb2832c5aa0ee15990/pyomo-6.9.5-cp312-cp312-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:43c6e425ca5231b530cd23460e371b7ca9119224dd57237c34580e15f31e4d72" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/85/fe/691e5eb26f58ee4a072add6cc484756d9e3c367901ec6701d2c6789b394d/pyomo-6.9.5-cp312-cp312-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:a1923c358e1e8009a05ada911fc72e615c9e2ce6988f0979ec1ecc75880ee1f7" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/c7/b3/ae47340790f2f1f92f76b176acf475890717f0cb7def073e504b9857a057/pyomo-6.9.5-cp313-cp313-macosx_10_13_x86_64.whl", hash = "sha256:694262dc2eb53ca1ab245261f432a5ed1ec30cf3e651b5a6a1c276bc2dd81076" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/29/e9/7f782864afd28a9eb53057c9d046541be6535b2da35e11c2bcb80839c6bd/pyomo-6.9.5-cp313-cp313-macosx_11_0_arm64.whl", hash = "sha256:1f99ce91f2710d60b380a3a519288282d2183c44e1d66c131909313a3b63e7a2" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/cd/4d/9ca17a602e31a1c3f3148c455a5739fcbe23c102b80a12ec3e6d3bf5e847/pyomo-6.9.5-cp313-cp313-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:d22f99e0ba8e2fb7d0e806bf630b8ce9b0a41d777c51f22711adbcb905f7486e" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/44/2e/78c3ac876791b59c836338b73dc49317b01cef574b01af061999a04a064a/pyomo-6.9.5-cp313-cp313-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:d5953e490b9e9ea42d28804dd0358a9d3ef82560022c2b538e70a638790bc392" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/3c/27/3eb3db8e9ed6a01dee63219389aec761d5cc29b6dc5015b32f826f2a9225/pyomo-6.9.5-cp314-cp314-macosx_10_15_x86_64.whl", hash = "sha256:058eddde05b4354307975f1ecd25cfda9f8a282ad2e3b4f168ff8fee3c3623a1" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/9a/31/7f4750fc9bb0ec18a9534549e4c80ea63f1267aa828d495a48bbf0018f49/pyomo-6.9.5-cp314-cp314-macosx_11_0_arm64.whl", hash = "sha256:105a073c47a2d2d6e74e48ed6fc82c6f6d19027488d5003aabb7ed5d10271483" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f8/67/639d0006eddab30cf415b0154763ccc51f3c15b934e866eb4fb07bc2b6ed/pyomo-6.9.5-cp314-cp314-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f2c636c2c640b33dde3b119f6f0941a1bbde39397c392dba55351b0438d8600f" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/67/ef/023b74b8f161f15a51febdd160354f1e3fd7e1475abbe5ccfb3d7588cf1f/pyomo-6.9.5-cp314-cp314-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:e02813b4021eeed7214a1ca5d7daecbdc78d3db7059962553a57fd138d747c22" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a0/ca/edab1b532fd5e2d146d0cb96836eb5ae387b8a5bd255213e306793f6168e/pyomo-6.9.5-cp314-cp314t-macosx_10_15_x86_64.whl", hash = "sha256:83789ce89271da31e0ff5bbef692af1621ab1747798183a5603b6577b7074277" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/a9/3c/2745386f57030bc60b626adba002b68db3f9538d5b52900f48026a4a17d7/pyomo-6.9.5-cp314-cp314t-macosx_11_0_arm64.whl", hash = "sha256:1f449aaceac5078daaecc21d19b96a15529f9ac8aa90f6472e8811cc07112ecc" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/f1/93/2058af0890b13f7e1a26e4925ff8d681c23d9cbdc2ecc9db17c744941617/pyomo-6.9.5-cp314-cp314t-manylinux_2_24_aarch64.manylinux_2_28_aarch64.whl", hash = "sha256:f94d03f122fcf04a769c28ad48c423cd7b6d3d2c40da20bc8ea1a41bb20d0c36" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/de/30/c808931fc034851a16d3f8360d045b087ac743ea97bfe96cdb4b1df47c21/pyomo-6.9.5-cp314-cp314t-manylinux_2_24_x86_64.manylinux_2_28_x86_64.whl", hash = "sha256:96ff300e96cdab75e2e6983c99e3a61eaff2a6d0f5ed83acd939e74e361de537" },
{ url = "https://mirrors.pku.edu.cn/pypi/web/packages/68/29/394967f7df51788cbdf1b4aedfb7c5a3a62e11b85b4c9d806b86cc576be4/pyomo-6.9.5-py3-none-any.whl", hash = "sha256:60326f7d3143ee7d0f5c5c4a3cbf871b53e08cc6c2b0c9e6d25568880233472f" },
]
[[package]]
name = "pyparsing"
version = "3.3.1"
@@ -2106,6 +2152,8 @@ dependencies = [
{ name = "numpy" },
{ name = "openpyxl" },
{ name = "pandas" },
{ name = "pulp" },
{ name = "pyomo" },
{ name = "pywebview" },
{ name = "scikit-learn" },
{ name = "scipy" },
@@ -2125,6 +2173,8 @@ requires-dist = [
{ name = "numpy", specifier = ">=2.4.0" },
{ name = "openpyxl", specifier = ">=3.1.5" },
{ name = "pandas", specifier = ">=2.3.3" },
{ name = "pulp", specifier = ">=3.3.0" },
{ name = "pyomo", specifier = ">=6.9.5" },
{ name = "pywebview", specifier = ">=6.1" },
{ name = "scikit-learn", specifier = ">=1.8.0" },
{ name = "scipy", specifier = ">=1.16.3" },

232
win32_helper.py Normal file
View File

@@ -0,0 +1,232 @@
import ctypes
import ctypes.wintypes
import os
def show_save_dialog_win32():
"""
使用 ctypes 直接调用 Windows API (GetSaveFileNameW)
不需要子进程,可以在线程中运行 (run.io_bound)
"""
try:
# 定义 OPENFILENAME 结构体
class OPENFILENAME(ctypes.Structure):
_fields_ = [
("lStructSize", ctypes.wintypes.DWORD),
("hwndOwner", ctypes.wintypes.HWND),
("hInstance", ctypes.wintypes.HINSTANCE),
("lpstrFilter", ctypes.wintypes.LPCWSTR),
("lpstrCustomFilter", ctypes.wintypes.LPWSTR),
("nMaxCustFilter", ctypes.wintypes.DWORD),
("nFilterIndex", ctypes.wintypes.DWORD),
("lpstrFile", ctypes.wintypes.LPWSTR),
("nMaxFile", ctypes.wintypes.DWORD),
("lpstrFileTitle", ctypes.wintypes.LPWSTR),
("nMaxFileTitle", ctypes.wintypes.DWORD),
("lpstrInitialDir", ctypes.wintypes.LPCWSTR),
("lpstrTitle", ctypes.wintypes.LPCWSTR),
("Flags", ctypes.wintypes.DWORD),
("nFileOffset", ctypes.wintypes.WORD),
("nFileExtension", ctypes.wintypes.WORD),
("lpstrDefExt", ctypes.wintypes.LPCWSTR),
("lCustData", ctypes.wintypes.LPARAM),
("lpfnHook", ctypes.wintypes.LPVOID),
("lpTemplateName", ctypes.wintypes.LPCWSTR),
# 还有更多字段,但这通常足够了
# ("pvReserved", ctypes.wintypes.LPVOID),
# ("dwReserved", ctypes.wintypes.DWORD),
# ("FlagsEx", ctypes.wintypes.DWORD),
]
# 准备缓冲区
filename_buffer = ctypes.create_unicode_buffer(260) # MAX_PATH
# 设置初始文件名
filename_buffer.value = "win32_save.xlsx"
# 准备过滤器 (用 \0 分隔)
# 格式: "描述\0模式\0描述\0模式\0\0"
filter_str = "Excel Files (*.xlsx)\0*.xlsx\0All Files (*.*)\0*.*\0\0"
ofn = OPENFILENAME()
ofn.lStructSize = ctypes.sizeof(OPENFILENAME)
ofn.hwndOwner = 0 # NULL
ofn.lpstrFilter = filter_str
ofn.lpstrFile = ctypes.cast(filename_buffer, ctypes.wintypes.LPWSTR)
ofn.nMaxFile = 260
ofn.lpstrDefExt = "xlsx"
ofn.lpstrTitle = "保存文件 (Win32 API)"
# OFN_OVERWRITEPROMPT | OFN_PATHMUSTEXIST | OFN_NOCHANGEDIR
ofn.Flags = 0x00000002 | 0x00000800 | 0x00000008
comdlg32 = ctypes.windll.comdlg32
# 调用 API
# GetSaveFileNameW 返回非零值表示成功
if comdlg32.GetSaveFileNameW(ctypes.byref(ofn)):
return filename_buffer.value
else:
return None
except Exception as e:
print(f"Win32 API Error: {e}")
return None
def show_save_dialog_com():
"""
使用 COM 接口 IFileSaveDialog (Windows Vista+)
提供更现代化的文件保存对话框,支持更多功能
"""
try:
import ctypes
import ctypes.wintypes
import uuid
# 定义必要的常量
CLSCTX_INPROC_SERVER = 1
S_OK = 0
FOS_OVERWRITEPROMPT = 0x00000002
FOS_PATHMUSTEXIST = 0x00000800
FOS_NOCHANGEDIR = 0x00000008
SIGDN_FILESYSPATH = 0x80058000
# IFileSaveDialog 的 CLSID 和 IID
CLSID_FileSaveDialog = uuid.UUID("{C0B4E2F3-BA21-4773-8DBA-335EC946EB8B}")
IID_IFileSaveDialog = uuid.UUID("{84bccd23-5fde-4cdb-aea4-af64b83d78ab}")
IID_IShellItem = uuid.UUID("{43826d1e-e718-42ee-bc55-a1e261c37bfe}")
# 加载 ole32.dll
ole32 = ctypes.windll.ole32
# CoInitialize
ole32.CoInitialize(None)
# CoCreateInstance
p_dialog = ctypes.c_void_p()
hr = ole32.CoCreateInstance(
ctypes.byref(CLSID_FileSaveDialog),
None,
CLSCTX_INPROC_SERVER,
ctypes.byref(IID_IFileSaveDialog),
ctypes.byref(p_dialog)
)
if hr != S_OK:
print(f"CoCreateInstance failed: {hr}")
return None
# 定义 IFileSaveDialog 的 vtable 方法
# 我们只需要调用 Show, GetResult, SetOptions, SetFileName, SetDefaultExtension, SetFileTypeIndex
# 这些方法在 IFileOpenDialog 基类中定义
# SetOptions
class IFileSaveDialogVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
# IModalWindow
("Show", ctypes.c_void_p),
# IFileDialog
("SetFileTypes", ctypes.c_void_p),
("SetFileTypeIndex", ctypes.c_void_p),
("GetFileTypeIndex", ctypes.c_void_p),
("Advise", ctypes.c_void_p),
("Unadvise", ctypes.c_void_p),
("SetOptions", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong)),
("GetOptions", ctypes.c_void_p),
("SetDefaultFolder", ctypes.c_void_p),
("SetFolder", ctypes.c_void_p),
("GetFolder", ctypes.c_void_p),
("GetCurrentSelection", ctypes.c_void_p),
("SetFileName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("GetFileName", ctypes.c_void_p),
("SetTitle", ctypes.c_void_p),
("SetOkButtonLabel", ctypes.c_void_p),
("SetFileNameLabel", ctypes.c_void_p),
("GetResult", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.POINTER(ctypes.c_void_p))),
("AddPlace", ctypes.c_void_p),
("SetDefaultExtension", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_wchar_p)),
("Close", ctypes.c_void_p),
("SetClientGuid", ctypes.c_void_p),
("ClearClientData", ctypes.c_void_p),
("SetFilter", ctypes.c_void_p),
]
# 获取 vtable
vtable = ctypes.cast(p_dialog, ctypes.POINTER(ctypes.POINTER(IFileSaveDialogVtbl))).contents.contents
# 调用 SetOptions
hr = vtable.SetOptions(p_dialog, FOS_OVERWRITEPROMPT | FOS_PATHMUSTEXIST | FOS_NOCHANGEDIR)
if hr != S_OK:
print(f"SetOptions failed: {hr}")
return None
# 调用 SetFileName
hr = vtable.SetFileName(p_dialog, "com_save.xlsx")
if hr != S_OK:
print(f"SetFileName failed: {hr}")
return None
# 调用 SetDefaultExtension
hr = vtable.SetDefaultExtension(p_dialog, "xlsx")
if hr != S_OK:
print(f"SetDefaultExtension failed: {hr}")
return None
# 调用 SetFileTypeIndex
hr = vtable.SetFileTypeIndex(p_dialog, 1)
if hr != S_OK:
print(f"SetFileTypeIndex failed: {hr}")
return None
# 调用 Show
hr = vtable.Show(p_dialog, 0) # 0 表示没有父窗口
if hr != S_OK:
# 用户取消
return None
# 调用 GetResult
p_result = ctypes.c_void_p()
hr = vtable.GetResult(p_dialog, ctypes.byref(p_result))
if hr != S_OK:
print(f"GetResult failed: {hr}")
return None
# 定义 IShellItem 接口
class IShellItemVtbl(ctypes.Structure):
_fields_ = [
("QueryInterface", ctypes.c_void_p),
("AddRef", ctypes.c_void_p),
("Release", ctypes.c_void_p),
("BindToHandler", ctypes.c_void_p),
("GetParent", ctypes.c_void_p),
("GetDisplayName", ctypes.CFUNCTYPE(ctypes.c_long, ctypes.c_void_p, ctypes.c_ulong, ctypes.POINTER(ctypes.c_wchar_p))),
("GetAttributes", ctypes.c_void_p),
("Compare", ctypes.c_void_p),
]
# 获取 IShellItem 的 vtable
result_vtable = ctypes.cast(p_result, ctypes.POINTER(ctypes.POINTER(IShellItemVtbl))).contents.contents
# 调用 GetDisplayName
p_display_name = ctypes.c_wchar_p()
hr = result_vtable.GetDisplayName(p_result, SIGDN_FILESYSPATH, ctypes.byref(p_display_name))
if hr != S_OK:
print(f"GetDisplayName failed: {hr}")
return None
filepath = p_display_name.value
# 清理
ole32.CoUninitialize()
return filepath
except ImportError as e:
print(f"COM Error: {e}")
return None
except Exception as e:
print(f"COM Error: {e}")
import traceback
traceback.print_exc()
return None