You need to pass a specific cost_matrix for each type of vehicle in your dataset.
Using the Python SDK, I get
🎯 Objective: balanced | Fleet: 74/74 vehicles 🔄 Solving routing optimization… Error in solve: {“CUOPT_ERROR_TYPE”: “ValidationError”, “msg”: “All vehicle cost matrices should be set”}
Here is the high level Python script I created which uses cuOpt’s Python SDK instead of the Service APIs:
import pandas as pd
import sys
import numpy as np
import cudf
from cuopt import routing
---------- HELPER FUNCTIONS ----------
def fix_location_indexing(tasks_df, vehicles_df, cost_matrix):
max_task_location = tasks_df[‘location_id’].max()
max_vehicle_location = vehicles_df[‘start_location_id’].max()
if (max_task_location >= cost_matrix.shape[0] or
max_vehicle_location >= cost_matrix.shape[0]):
tasks_df[‘location_id’] = tasks_df[‘location_id’] - 1
vehicles_df[‘start_location_id’] = (
vehicles_df[‘start_location_id’] - 1)
vehicles_df[‘end_location_id’] = vehicles_df[‘end_location_id’] - 1
class OptimizationConfig:
def init(self, objective=“balanced”):
self.objective = objective
self.max_tasks_per_vehicle = 30
self.min_tasks_per_vehicle = 2
self.target_utilization = 0.75
def calculate_optimal_fleet_size(self, total_tasks,
avg_tasks_per_optimal_route):
if self.objective == "cost_minimization":
return max(1, int(total_tasks / self.max_tasks_per_vehicle))
elif self.objective == "speed_optimization":
return max(1, int(total_tasks / self.min_tasks_per_vehicle))
elif self.objective == "balanced":
return max(1, int(total_tasks / avg_tasks_per_optimal_route))
else:
return total_tasks
def auto_balance_constraints(tasks_df, vehicles_df, config):
total_demand = tasks_df[‘demand’].sum()
total_capacity = vehicles_df[‘capacity_kg’].sum()
current_ratio = total_demand / total_capacity
if current_ratio > config.target_utilization:
demand_adjustment = config.target_utilization / current_ratio
tasks_df['effective_demand'] = (
(tasks_df['demand'] * demand_adjustment)
.astype(int).clip(lower=1))
vehicles_df['effective_capacity'] = (
(vehicles_df['capacity_kg'] / 1000).astype(int))
else:
tasks_df['effective_demand'] = tasks_df['demand'].clip(lower=1)
vehicles_df['effective_capacity'] = (
(vehicles_df['capacity_kg'] / 500).astype(int).clip(lower=10))
return tasks_df, vehicles_df
def calculate_dynamic_fleet_size(tasks_df, vehicles_df, config):
total_tasks = len(tasks_df)
total_available_vehicles = len(vehicles_df)
avg_optimal_tasks = 15
optimal_fleet_size = config.calculate_optimal_fleet_size(
total_tasks, avg_optimal_tasks)
return min(optimal_fleet_size, total_available_vehicles)
---------- LOAD DATA ----------
try:
cost_matrix_df = pd.read_csv(‘cost_matrix.csv’, index_col=0)
cost_matrix = cost_matrix_df.values
tasks_df = pd.read_csv(‘cuopt_tasks.csv’)
vehicles_df = pd.read_excel(‘cuopt_vehicles_time.xlsx’)
locations_df = pd.read_csv(‘cuopt_locations.csv’)
# Clean data
vehicles_df.columns = vehicles_df.columns.str.strip()
tasks_df.columns = tasks_df.columns.str.strip()
locations_df.columns = locations_df.columns.str.strip()
fix_location_indexing(tasks_df, vehicles_df, cost_matrix)
vehicles_df = vehicles_df.drop_duplicates(
subset=['vehicle_id']).reset_index(drop=True)
vehicles_df['unique_vehicle_id'] = (
vehicles_df['vehicle_id'].astype(str) + '_' +
vehicles_df.index.astype(str))
except Exception as e:
print(f"❌ Data loading error: {e}")
sys.exit()
---------- DYNAMIC OPTIMIZATION ----------
Change this line to modify optimization strategy:
Options: “cost_minimization”, “speed_optimization”, “balanced”,
“maximum_distribution”
optimization_objective = “balanced”
config = OptimizationConfig(objective=optimization_objective)
tasks_df, vehicles_df = auto_balance_constraints(
tasks_df, vehicles_df, config)
optimal_fleet_size = calculate_dynamic_fleet_size(
tasks_df, vehicles_df, config)
Apply dynamic fleet sizing
vehicles_df[‘unique_vehicle_id’] = (
vehicles_df[‘vehicle_id’].astype(str) + ‘_’ +
vehicles_df.index.astype(str))
print(f"🎯 Objective: {optimization_objective} | Fleet: "
f"{len(vehicles_df)}/{len(pd.read_excel(‘cuopt_vehicles.xlsx’))} "
f"vehicles")
---------- BUILD CUOPT SDK DATA MODEL ----------
try:
unique_vehicle_types = vehicles_df[‘vehicle_type’].unique()
vehicle_type_mapping = {vtype: i + 1 for i, vtype in
enumerate(unique_vehicle_types)}
depot_location_id = int(vehicles_df[‘start_location_id’].iloc[0])
# Convert cost matrix to cudf DataFrame
cost_matrix_cudf = cudf.DataFrame(cost_matrix.astype(np.float32))
# Create DataModel
n_locations = cost_matrix.shape[0]
n_vehicles = len(vehicles_df)
n_orders = len(tasks_df)
data_model = routing.DataModel(n_locations, n_vehicles, n_orders)
# Add cost matrix (assuming same for all vehicle types for simplicity)
data_model.add_cost_matrix(cost_matrix_cudf)
# Set vehicle locations (start and end at depot)
vehicle_starts = cudf.Series([depot_location_id] * n_vehicles)
vehicle_ends = cudf.Series([depot_location_id] * n_vehicles)
data_model.set_vehicle_locations(vehicle_starts, vehicle_ends)
# Set vehicle types
vehicle_types_series = cudf.Series(
[vehicle_type_mapping[vtype] for vtype in
vehicles_df['vehicle_type']])
data_model.set_vehicle_types(vehicle_types_series)
# Set vehicle capacities (two dimensions: weight and task count)
capacity_weight = cudf.Series(
vehicles_df['effective_capacity'].tolist())
capacity_count = cudf.Series([23] * n_vehicles)
data_model.add_capacity_dimension(
"weight",
cudf.Series(tasks_df['effective_demand'].tolist()),
capacity_weight)
data_model.add_capacity_dimension(
"count",
cudf.Series([1] * n_orders), # Each task counts as 1
capacity_count)
# Set vehicle time windows
vehicle_earliest = cudf.Series([50000] * n_vehicles)
vehicle_latest = cudf.Series([57600] * n_vehicles)
data_model.set_vehicle_time_windows(vehicle_earliest, vehicle_latest)
# Set order locations
order_locations = cudf.Series(tasks_df['location_id'].tolist())
data_model.set_order_locations(order_locations)
# Set order time windows
order_earliest = cudf.Series([10000] * n_orders)
order_latest = cudf.Series([86400] * n_orders)
data_model.set_order_time_windows(order_earliest, order_latest)
# Set service times
service_times = cudf.Series([300] * n_orders)
data_model.set_order_service_times(service_times)
# Set minimum vehicles based on optimization objective
if optimization_objective != "cost_minimization":
min_vehicles = max(1, int(optimal_fleet_size * 0.5))
else:
min_vehicles = 1
data_model.set_min_vehicles(min_vehicles)
# Create solver settings
solver_settings = routing.SolverSettings()
solver_settings.set_time_limit(30)
except Exception as e:
print(f"❌ Data model setup error: {e}")
sys.exit()
---------- SOLVE OPTIMIZATION ----------
try:
print(“\n🔄 Solving routing optimization…”)
# Solve the problem
routing_solution = routing.Solve(data_model, solver_settings)
# Get solution status
status = routing_solution.get_status()
is_feasible = (status == 0)
# ========== RAW CUOPT SDK RESPONSE ==========
print("\n" + "=" * 60)
print("CUOPT SDK SOLUTION SUMMARY:")
print("=" * 60)
print(f"✅ Status: {'FEASIBLE' if is_feasible else 'INFEASIBLE'}")
if is_feasible:
# Get solution details
total_cost = routing_solution.get_total_objective()
vehicle_count = routing_solution.get_vehicle_count()
route_df = routing_solution.get_route()
print(f"🚚 Vehicles Used: {vehicle_count}/{n_vehicles}")
print(f"💰 Solution Cost: {total_cost:.2f}")
# Convert to pandas for easier processing
route_pandas = route_df.to_pandas()
# Group by vehicle to get route summary
print("\n📋 Route Summary:")
total_tasks_assigned = 0
for vehicle_id in route_pandas['truck_id'].unique():
vehicle_route = route_pandas[
route_pandas['truck_id'] == vehicle_id]
# Filter out depot visits to count only actual tasks
task_visits = vehicle_route[
vehicle_route['route'] != depot_location_id]
num_tasks = len(task_visits)
total_tasks_assigned += num_tasks
if len(vehicle_route) > 1:
duration = ((vehicle_route['arrival_stamp'].max() -
vehicle_route['arrival_stamp'].min()) / 3600)
print(f" Vehicle {vehicle_id}: {num_tasks} tasks "
f"({duration:.1f}h)")
completion_rate = ((total_tasks_assigned / n_orders) * 100
if n_orders > 0 else 0)
print(f"\n📊 Completion: {total_tasks_assigned}/{n_orders} tasks "
f"({completion_rate:.1f}%)")
# Show detailed route information
print("\n🗺️ Detailed Routes:")
print(route_pandas.to_string(index=False))
if vehicle_count > 1:
print("\n🎉 SUCCESS: Multi-vehicle optimization achieved!")
else:
print("\n⚠️ Single vehicle solution - consider adjusting "
"objective")
else:
print("❌ Infeasible solution - try adjusting constraints or "
"fleet size")
print("=" * 60)
except Exception as e:
print(f"❌ Solver Error: {e}")
import traceback
traceback.print_exc()
print(f"\n🏁 Optimization completed using ‘{optimization_objective}’ "
f"strategy with cuOpt Python SDK.")