Fix MIP solver variable duplication and function structure

This commit is contained in:
dmy
2026-01-08 15:30:36 +08:00
parent 04a5e19451
commit b3a4513f94
3 changed files with 270 additions and 54 deletions

128
mip.py
View File

@@ -129,57 +129,31 @@ def design_with_mip(
prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize) prob = pulp.LpProblem("WindFarmCollectorMIP", pulp.LpMinimize)
# Create all decision variables upfront to avoid duplicates
assign_vars = {}
for i in range(n_turbines):
for k in range(max_clusters):
assign_vars[(i, k)] = pulp.LpVariable(f"assign_{i}_{k}", cat="Binary")
cluster_vars = {}
for k in range(max_clusters):
cluster_vars[k] = pulp.LpVariable(f"cluster_{k}", cat="Binary")
# Helper functions to access variables
def assign_var(i, k): def assign_var(i, k):
return pulp.LpVariable(f"assign_{i}_{k}", cat="Binary") return assign_vars[(i, k)]
def cluster_var(k): def cluster_var(k):
return pulp.LpVariable(f"cluster_{k}", cat="Binary") return cluster_vars[k]
def cluster_connection_var(k): # Simplified objective function: minimize total distance
return pulp.LpVariable(f"cluster_connection_{k}", cat="Binary") prob += pulp.lpSum(
[
turbine_coords = turbines[["x", "y"]].values dist_matrix_full[0, i + 1] * assign_var(i, k)
turbine_powers = turbines["power"].values for i in range(n_turbines)
for k in range(max_clusters)
# Calculate cost per meter for cluster-to-substation connections ]
# Higher power clusters need thicker cables = higher cost )
cost_per_meter_per_mw = 1000 # Base cost per MW per meter (can be adjusted)
# Objective function: minimize total investment including:
# 1. Intra-cluster connections (estimated using pairwise distances)
# 2. Cluster-to-substation connections (based on distance and power)
objective_terms = []
# Intra-cluster connection costs (estimated)
for k in range(max_clusters):
for i in range(n_turbines):
for j in range(i + 1, n_turbines):
# Only count if both turbines are in the same cluster
# This is a simplified approximation of MST cost
both_in_cluster = assign_var(i, k) + assign_var(j, k) - 1
distance_ij = np.linalg.norm(turbine_coords[i] - turbine_coords[j])
objective_terms.append(distance_ij * both_in_cluster * 0.5)
# Cluster-to-substation connection costs
for k in range(max_clusters):
cluster_power = pulp.lpSum(
[turbine_powers[i] * assign_var(i, k) for i in range(n_turbines)]
)
cluster_to_substation_distance = dist_matrix_full[
0, :
] # Distance from each turbine to substation
# Use minimum distance from any turbine in cluster to substation
for i in range(n_turbines):
objective_terms.append(
cluster_to_substation_distance[i + 1]
* assign_var(i, k)
* cost_per_meter_per_mw
* turbine_powers[i]
* 0.001
)
prob += pulp.lpSum(objective_terms)
for i in range(n_turbines): for i in range(n_turbines):
prob += pulp.lpSum([assign_var(i, k) for k in range(max_clusters)]) == 1 prob += pulp.lpSum([assign_var(i, k) for k in range(max_clusters)]) == 1
@@ -188,7 +162,7 @@ def design_with_mip(
cluster_power = pulp.lpSum( cluster_power = pulp.lpSum(
[turbines.iloc[i]["power"] * assign_var(i, k) for i in range(n_turbines)] [turbines.iloc[i]["power"] * assign_var(i, k) for i in range(n_turbines)]
) )
prob += cluster_power <= max_mw * 1.0 * cluster_var(k) prob += cluster_power <= max_mw * 1.2 * cluster_var(k)
for k in range(max_clusters): for k in range(max_clusters):
for i in range(n_turbines): for i in range(n_turbines):
@@ -198,19 +172,65 @@ def design_with_mip(
f"MIP Model: {len(prob.variables())} variables, {len(prob.constraints)} constraints" f"MIP Model: {len(prob.variables())} variables, {len(prob.constraints)} constraints"
) )
print("MIP: Starting to solve...") # Debug: Print model structure
solver = pulp.PULP_CBC_CMD(timeLimit=time_limit, msg=0, warmStart=False, path=None) print("MIP model structure check:")
print(f" Variables: {len(prob.variables())}")
print(f" Constraints: {len(prob.constraints)}")
print(f" Time limit: {time_limit}s")
print(f" Turbines: {n_turbines}, Clusters: {max_clusters}")
# Test solver availability
try: try:
import subprocess
test_solver = subprocess.run(
[
r"D:\code\windfarm\.venv\Lib\site-packages\pulp\apis\..\solverdir\cbc\win\i64\cbc.exe",
"-version",
],
capture_output=True,
text=True,
timeout=5,
)
print(
f"CBC solver test: {test_solver.stdout[:100] if test_solver.stdout else 'No output'}"
)
except Exception as solver_test_error:
print(f"CBC solver test failed: {solver_test_error}")
print("MIP: Starting to solve...")
try:
# Try to use CBC solver with different configurations
solver = pulp.PULP_CBC_CMD(
timeLimit=time_limit,
msg=False,
warmStart=False,
)
print(f"Using CBC solver with time limit: {time_limit}s")
status = prob.solve(solver) status = prob.solve(solver)
print( print(
f"MIP: Solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}" f"MIP: Solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
) )
except Exception as e: except Exception as e:
print(f"MIP: Solver execution failed: {e}, falling back to MST") print(f"MIP: CBC solver execution failed: {e}")
from main import design_with_mst # Try alternative solver configurations
try:
print("MIP: Trying alternative solver configuration...")
solver = pulp.PULP_CBC_CMD(
msg=True, # Enable messages for debugging
threads=1, # Single thread
timeLimit=time_limit,
)
status = prob.solve(solver)
print(
f"MIP: Alternative solver status={pulp.LpStatus[prob.status]}, Objective value={pulp.value(prob.objective):.4f}"
)
except Exception as e2:
print(f"MIP: All solver attempts failed: {e2}, falling back to MST")
from main import design_with_mst
connections = design_with_mst(turbines, substation) connections = design_with_mst(turbines, substation)
return connections, turbines return connections, turbines
if pulp.LpStatus[prob.status] != "Optimal": if pulp.LpStatus[prob.status] != "Optimal":
print( print(

146
test_cbc_solver.py Normal file
View File

@@ -0,0 +1,146 @@
"""
Simple test to verify CBC solver functionality
"""
import pulp
import sys
import subprocess
import os
print("=== PuLP and CBC Solver Test ===")
print(f"Python version: {sys.version}")
print(f"PuLP version: {pulp.__version__}")
# Test 1: Check PuLP installation
print("\n1. Checking PuLP installation...")
try:
from pulp import LpProblem, LpVariable, LpMinimize, LpMaximize, lpSum, value
print("[OK] PuLP imported successfully")
except ImportError as e:
print(f"[FAIL] PuLP import failed: {e}")
sys.exit(1)
# Test 2: Check CBC solver file existence
print("\n2. Checking CBC solver file...")
solver_dir = os.path.join(
os.path.dirname(pulp.__file__), "apis", "..", "solverdir", "cbc", "win", "i64"
)
solver_path = os.path.join(solver_dir, "cbc.exe")
print(f"Looking for CBC at: {solver_path}")
if os.path.exists(solver_path):
print(f"[OK] CBC solver file found")
file_size = os.path.getsize(solver_path)
print(f" File size: {file_size:,} bytes ({file_size / 1024 / 1024:.2f} MB)")
else:
print(f"[FAIL] CBC solver file not found")
print(f" Checking directory contents:")
try:
parent_dir = os.path.dirname(solver_path)
if os.path.exists(parent_dir):
for item in os.listdir(parent_dir):
print(f" - {item}")
else:
print(f" Directory does not exist: {parent_dir}")
except Exception as e:
print(f" Error listing directory: {e}")
# Test 3: Try to run CBC solver directly
print("\n3. Testing CBC solver execution...")
if os.path.exists(solver_path):
try:
result = subprocess.run(
[solver_path, "-version"],
capture_output=True,
text=True,
timeout=10,
check=True,
)
print("[OK] CBC solver executed successfully")
print(f" Output: {result.stdout[:200]}")
except subprocess.CalledProcessError as e:
print(f"[FAIL] CBC solver execution failed (exit code {e.returncode})")
print(f" stdout: {e.stdout[:200]}")
print(f" stderr: {e.stderr[:200]}")
except subprocess.TimeoutExpired:
print("[FAIL] CBC solver execution timed out")
except Exception as e:
print(f"[FAIL] CBC solver execution error: {e}")
else:
print("[FAIL] Cannot test CBC execution - file not found")
# Test 4: Solve a simple linear programming problem
print("\n4. Testing simple LP problem...")
try:
# Simple problem: minimize x + y subject to x + y >= 5, x >= 0, y >= 0
prob = LpProblem("Simple_LP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Continuous")
y = LpVariable("y", lowBound=0, cat="Continuous")
prob += x + y # Objective: minimize x + y
prob += x + y >= 5 # Constraint
print(" Created simple LP problem: minimize x + y subject to x + y >= 5")
# Try to solve with CBC
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] LP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] LP problem solving failed: {e}")
import traceback
traceback.print_exc()
# Test 5: Solve a simple mixed integer programming problem
print("\n5. Testing simple MIP problem...")
try:
# Simple MIP: minimize x + y subject to x + y >= 5, x, y integers >= 0
prob = LpProblem("Simple_MIP_Test", LpMinimize)
x = LpVariable("x", lowBound=0, cat="Integer")
y = LpVariable("y", lowBound=0, cat="Integer")
prob += x + y # Objective
prob += x + y >= 5 # Constraint
print(
" Created simple MIP problem: minimize x + y subject to x + y >= 5, x,y integers"
)
solver = pulp.PULP_CBC_CMD(msg=False, timeLimit=10)
print(" Attempting to solve with CBC...")
status = prob.solve(solver)
print(f"[OK] MIP problem solved")
print(f" Status: {pulp.LpStatus[prob.status]}")
print(f" Objective value: {value(prob.objective)}")
print(f" x = {value(x)}, y = {value(y)}")
if abs(value(prob.objective) - 5.0) < 0.01:
print(" [OK] Correct solution found!")
else:
print(f" [FAIL] Unexpected solution (expected 5.0)")
except Exception as e:
print(f"[FAIL] MIP problem solving failed: {e}")
import traceback
traceback.print_exc()
print("\n=== Test Complete ===")

50
test_mip.py Normal file
View File

@@ -0,0 +1,50 @@
"""
Test script to verify MIP functionality
"""
import numpy as np
import pandas as pd
from mip import design_with_mip
# Create test data
np.random.seed(42)
n_turbines = 10
turbines = pd.DataFrame(
{
"x": np.random.uniform(0, 2000, n_turbines),
"y": np.random.uniform(0, 2000, n_turbines),
"power": np.random.uniform(5, 10, n_turbines),
}
)
substation = np.array([1000, 1000])
print("Test data created:")
print(f"Number of turbines: {n_turbines}")
print(f"Substation location: {substation}")
print(f"Total power: {turbines['power'].sum():.2f} MW")
# Test MIP function
print("\nTesting MIP design...")
try:
connections, turbines_with_clusters = design_with_mip(
turbines,
substation,
cable_specs=None,
voltage=66000,
power_factor=0.95,
system_params=None,
max_clusters=None,
time_limit=30,
evaluate_func=None,
total_invest_func=None,
get_max_capacity_func=None,
)
print(f"MIP test successful!")
print(f"Number of connections: {len(connections)}")
print(f"Clusters assigned: {turbines_with_clusters['cluster'].tolist()}")
except Exception as e:
print(f"MIP test failed with error: {e}")
import traceback
traceback.print_exc()