Hello,
I am working on synthetic dataset generation in Isaac Sim and I want to export the generated data in the NuScenes dataset format.
Specifically, I am looking for guidance on:
-
How to structure the output (images, annotations, calibration, ego poses) so it matches NuScenes format.
-
Whether there is existing code, a writer, or an extension that supports exporting to NuScenes format.
-
If not directly available, what would be the best workflow to adapt the Replicator/SDG output to match NuScenes conventions?
I would really appreciate example code snippets or pointers to resources that show how to customize the data writer for this purpose. If anybody is working on this, I would like to collaborate with you:)I have attached my code below. The sample_annotation and instance is not populated. I am not sure where I am going wrong:
import os
import json
import time
import uuid
import shutil
import traceback
from isaacsim import SimulationApp
# -------------------- Launch Isaac Sim --------------------
simulation_app = SimulationApp({
"headless": True, # FIX: Run headless to save GPU memory
"renderer": "PathTracing", # FIX: Use less memory-intensive renderer
"width": 640, # FIX: Smaller window
"height": 480,
"multi_gpu": False,
"vsync": False,
"fast_shutdown": True,
"anti_aliasing": 0, # FIX: Disable anti-aliasing to save memory
"enable_scene_query_support": False, # FIX: Disable unnecessary features
"enable_livestream": False,
"subdiv_refinement_level": 0, # FIX: Reduce subdivision level
"physics_gpu": False # FIX: Use CPU physics to save GPU memory
})
import omni.usd
import omni.replicator.core as rep
import omni.kit.app
from omni.replicator.core import AnnotatorRegistry
from pxr import Usd, UsdGeom, Gf
from PIL import Image
import numpy as np
import gc
# -------------------- CONFIG --------------------
OUT_DIR = os.path.expanduser("~/Downloads/trial9_nuscenes_ready")
NUM_FRAMES = 20 # FIX: Drastically reduce frame count to prevent GPU exhaustion
IMG_RES = (640, 480) # FIX: Reduce image resolution to save GPU memory
BATCH_SIZE = 5 # FIX: Save more frequently
MAX_RETRIES = 2 # FIX: Reduce retries to prevent hanging
os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUT_DIR, "samples"), exist_ok=True)
# Asset paths
WAREHOUSE_ROOT = "omniverse://localhost/Users/retrolifts/Assets/dataset_WH.usd"
FORKLIFT = {
"forklift_7500": "omniverse://localhost/Users/retrolifts/Assets/RM_FL_7500_2.usd"
}
FORKLIFT_SEM = "forklift_7500"
# -------------------- nuScenes table containers --------------------
scene_table = []
sample_table = []
sample_data_table = []
sensor_table = []
calibrated_sensor_table = []
ego_pose_table = []
instance_table = []
sample_annotation_table = []
category_table = []
log_table = []
attribute_table = []
visibility_table = []
# Helper function for tokens
def new_token():
return uuid.uuid4().hex
# Global tracking variables
prev_sample_token = ""
first_sample_token = ""
scene_token = new_token()
log_token = new_token()
sensor_tokens = {}
calibrated_sensor_tokens = {}
forklift_instances = {} # Track forklift instances across frames
# FIX: Add cleanup tracking
active_instances = []
active_annotators = []
current_prims = [] # Track current prims to ensure proper population
def wait_for_prim_population(max_wait=3.0):
"""Wait for prims to be properly populated before proceeding"""
import omni.usd
context = omni.usd.get_context()
if not context:
return False
stage = context.get_stage()
if not stage:
return False
# Wait for stage to be ready - simplified approach
wait_time = 0.0
step_size = 0.1
while wait_time < max_wait:
try:
# Force stage to process pending operations
omni.kit.app.get_app().update()
time.sleep(step_size)
wait_time += step_size
# FIX: Simple wait without HasPendingActiveAuthoring (not available in this version)
if wait_time > 0.5: # Minimum wait time
print(f"[INFO] Stage wait completed after {wait_time:.1f}s")
return True
except Exception as e:
print(f"[WARNING] Error waiting for population: {e}")
break
print(f"[WARNING] Stage wait completed after {max_wait}s")
return True # Continue anyway
# def cleanup_resources():
# """Properly cleanup GPU resources"""
# try:
# print("[INFO] Starting resource cleanup...")
# # Clear replicator instances first
# try:
# rep.randomizer.instantiate("", size=0, mode="scene_instance")
# print("[INFO] Cleared replicator instances")
# except Exception as e:
# print(f"[WARNING] Failed to clear replicator instances: {e}")
# # Clear active instances
# if active_instances:
# for instance in active_instances:
# try:
# # Force cleanup of instance
# if hasattr(instance, 'destroy'):
# instance.destroy()
# except Exception as e:
# print(f"[WARNING] Failed to destroy instance: {e}")
# active_instances.clear()
# print(f"[INFO] Cleared {len(active_instances)} active instances")
# # Clear current prims tracking
# current_prims.clear()
# # Force garbage collection
# gc.collect()
# # Wait for cleanup to complete
# wait_for_prim_population(1.0)
# print("[INFO] Resource cleanup completed")
# except Exception as e:
# print(f"[WARNING] Cleanup failed: {e}")
def cleanup_resources():
"""Properly cleanup GPU + USD resources before spawning next forklift"""
try:
print("[INFO] Starting resource cleanup...")
# Clear replicator instances
try:
rep.randomizer.instantiate("", size=0, mode="scene_instance")
print("[INFO] Cleared replicator replicator instances")
except Exception as e:
print(f"[WARNING] Failed to clear replicator instances: {e}")
# Delete forklift prims from stage
stage = omni.usd.get_context().get_stage()
if stage:
prims_to_remove = []
for prim in stage.Traverse():
if prim.GetPath().pathString.startswith("/World") and "forklift" in prim.GetName().lower():
prims_to_remove.append(prim)
for prim in prims_to_remove:
try:
stage.RemovePrim(prim.GetPath())
print(f"[INFO] Removed prim: {prim.GetPath()}")
except Exception as e:
print(f"[WARNING] Could not remove prim {prim.GetPath()}: {e}")
# Clear current tracking
active_instances.clear()
current_prims.clear()
# Force garbage collection
gc.collect()
wait_for_prim_population(1.0)
print("[INFO] Resource cleanup completed")
except Exception as e:
print(f"[WARNING] Cleanup failed: {e}")
# Initialize static tables
def initialize_static_tables():
# Category table
category_table.append({
"token": "cat_forklift",
"name": "vehicle.forklift",
"description": "Synthetic forklift category",
"index": 0
})
# Attribute table
attribute_table.extend([
{
"token": "attr_moving",
"name": "vehicle.moving",
"description": "Vehicle is moving"
},
{
"token": "attr_stopped",
"name": "vehicle.stopped",
"description": "Vehicle is stationary"
}
])
# Visibility table
visibility_table.extend([
{
"token": "vis_full",
"level": "v80-100",
"description": "visibility of whole object is between 80 and 100%"
},
{
"token": "vis_most",
"level": "v60-80",
"description": "visibility of whole object is between 60 and 80%"
}
])
# Log table
log_table.append({
"token": log_token,
"logfile": "synthetic_warehouse_001",
"vehicle": "isaac_sim_ego",
"date_captured": time.strftime("%Y-%m-%d"),
"location": "isaac-sim-warehouse"
})
# Scene table (will be updated with sample info later)
scene_table.append({
"token": scene_token,
"log_token": log_token,
"nbr_samples": NUM_FRAMES,
"first_sample_token": "",
"last_sample_token": "",
"name": "scene-warehouse-001",
"description": "Synthetic warehouse scene with forklift generated in Isaac Sim"
})
def compute_intrinsics():
"""Compute camera intrinsics matrix (flattened 3x3)"""
focal = 11.0 # mm
w_px, h_px = IMG_RES
ha, va = 36.0, 24.0 # sensor size in mm
fx = focal * (w_px / ha)
fy = focal * (h_px / va)
cx = w_px / 2.0
cy = h_px / 2.0
return [fx, 0.0, cx, 0.0, fy, cy, 0.0, 0.0, 1.0]
def create_sensors_and_calibration():
"""Create sensor and calibrated_sensor entries for cameras"""
camera_positions = [(-29.5, 29.5, 8.4)] # Only one camera
for i, pos in enumerate(camera_positions):
# Sensor entry
sensor_tok = new_token()
sensor_table.append({
"token": sensor_tok,
"channel": f"CAM_{i}",
"modality": "camera"
})
sensor_tokens[i] = sensor_tok
# Calibrated sensor entry
calib_tok = new_token()
calibrated_sensor_table.append({
"token": calib_tok,
"sensor_token": sensor_tok,
"translation": list(map(float, pos)),
"rotation": [1.0, 0.0, 0.0, 0.0], # w, x, y, z quaternion
"camera_intrinsic": compute_intrinsics()
})
calibrated_sensor_tokens[i] = calib_tok
def save_json_files():
"""Save all JSON tables to disk"""
tables = {
"scene": scene_table,
"sample": sample_table,
"sample_data": sample_data_table,
"sensor": sensor_table,
"calibrated_sensor": calibrated_sensor_table,
"ego_pose": ego_pose_table,
"instance": instance_table,
"sample_annotation": sample_annotation_table,
"category": category_table,
"log": log_table,
"attribute": attribute_table,
"visibility": visibility_table
}
for name, data in tables.items():
try:
file_path = os.path.join(OUT_DIR, f"{name}.json")
with open(file_path, "w") as f:
json.dump(data, f, indent=2)
except Exception as e:
print(f"[WARNING] Failed to save {name}.json: {e}")
def find_or_create_instance(forklift_id, category_token):
"""Find existing instance or create new one for a forklift"""
if forklift_id not in forklift_instances:
inst_tok = new_token()
forklift_instances[forklift_id] = {
"token": inst_tok,
"annotations": [],
"first_annotation_token": "",
"last_annotation_token": ""
}
# Create instance table entry
instance_table.append({
"token": inst_tok,
"category_token": category_token,
"nbr_annotations": 0,
"first_annotation_token": "",
"last_annotation_token": ""
})
return forklift_instances[forklift_id]
def link_sample_annotations(current_ann_token, instance_info):
"""Link sample annotations temporally using prev/next"""
if instance_info["annotations"]:
# Link with previous annotation
prev_ann_token = instance_info["annotations"][-1]
# Update previous annotation's next field
for ann in sample_annotation_table:
if ann["token"] == prev_ann_token:
ann["next"] = current_ann_token
break
# Set current annotation's prev field
return prev_ann_token
return ""
# FIX: Much more conservative forklift randomizer
def randomize_forklift(file_name, class_name, max_number=1):
"""Create forklift instances with minimal GPU usage"""
try:
print(f"[INFO] Creating single forklift instance")
# FIX: Clear any existing instances first
rep.randomizer.instantiate("", size=0, mode="scene_instance")
# FIX: Wait for cleanup
time.sleep(1.0)
# Create single instance with fixed parameters (no distributions)
instances = rep.randomizer.instantiate(
file_name, size=max_number, mode="scene_instance"
)
if instances is not None:
print("[INFO] Forklift instance created, applying modifications...")
with instances:
rep.modify.semantics([("class", class_name)])
# FIX: Use fixed pose to avoid distribution calculations
rep.modify.pose(
# position=(-20.0, 20.0, 0.0), # Fixed position
# rotation=(90, 0, 0), # Fixed rotation
# scale=1.0
position=rep.distribution.uniform((-29.5, 10.5, 0), (-10.5, 29.5, 0)),
rotation=rep.distribution.uniform((90, 0, -180), (90, 0, 180)),
scale=rep.distribution.uniform((0.9), (1.1)),
)
rep.modify.visibility(True)
print("[INFO] Forklift modifications applied successfully")
return instances
else:
print("[WARNING] Failed to create forklift instance")
return None
except Exception as e:
print(f"[ERROR] Forklift creation failed: {e}")
return None
# -------------------- Main execution --------------------
try:
print("[INFO] Initializing nuScenes dataset structure...")
initialize_static_tables()
print("[INFO] Setting up stage and environment...")
omni.usd.get_context().new_stage()
# Create environment
env = rep.create.from_usd(WAREHOUSE_ROOT)
# Lights - FIX: Minimal lighting setup to save GPU memory
with rep.new_layer():
rep.create.light(
light_type="Dome", # FIX: Use dome light instead of multiple rect lights
intensity=2000, # FIX: Much lower intensity
temperature=6000,
count=1 # FIX: Only one light
)
# Camera setup - FIX: Use only ONE camera to reduce GPU load
camera_positions = [(-29.5, 29.5, 8.4)] # Only one camera
cameras = []
render_products = []
for i, pos in enumerate(camera_positions):
cam = rep.create.camera(focal_length=11)
rp = rep.create.render_product(cam, IMG_RES)
cameras.append(cam)
render_products.append(rp)
# Create annotators - FIX: Track them for cleanup
rgb_annotators = []
bbox3d_annotators = []
for rp in render_products:
rgb_ann = AnnotatorRegistry.get_annotator("rgb")
bbox3d_ann = AnnotatorRegistry.get_annotator("bounding_box_3d")
rgb_ann.attach([rp])
bbox3d_ann.attach([rp])
rgb_annotators.append(rgb_ann)
bbox3d_annotators.append(bbox3d_ann)
# Track for cleanup
active_annotators.extend([rgb_ann, bbox3d_ann])
# Create sensors and calibration
create_sensors_and_calibration()
print("[INFO] Starting frame-by-frame data capture...")
# Track previous sample data tokens for linking
prev_sample_data_tokens = {}
for frame_id in range(NUM_FRAMES):
print(f"[INFO] Processing frame {frame_id+1}/{NUM_FRAMES}")
try:
# FIX: Much longer wait between frames to let GPU recover
if frame_id > 0:
cleanup_resources()
# FIX: Very long delay between frames
time.sleep(2.0)
# Randomize forklift placement - simplified
forklift_instances_obj = None
for k, fpath in FORKLIFT.items():
print(f"[INFO] Creating forklift: {k}")
forklift_instances_obj = randomize_forklift(fpath, k)
break # Only try once
# Skip frame if forklift creation failed
if forklift_instances_obj is None:
print(f"[WARNING] Skipping frame {frame_id} due to forklift failure")
continue
# Set camera poses
for i, cam in enumerate(cameras):
with cam:
rep.modify.pose(position=camera_positions[i], look_at=(-20, 20, 0))
# FIX: Simple orchestrator step with minimal retries
try:
print(f"[INFO] Stepping orchestrator for frame {frame_id}")
time.sleep(1.0) # Wait before stepping
rep.orchestrator.step(rt_subframes=1)
time.sleep(1.0) # Wait after stepping
print("[INFO] Orchestrator step completed")
except Exception as e:
print(f"[ERROR] Orchestrator step failed for frame {frame_id}: {e}")
continue
# Create sample
sample_tok = new_token()
timestamp_us = int((time.time() + frame_id * 0.5) * 1e6) # 0.5s intervals
sample_entry = {
"token": sample_tok,
"timestamp": timestamp_us,
"scene_token": scene_token,
"prev": prev_sample_token if prev_sample_token else "",
"next": ""
}
# Link previous sample
if prev_sample_token and sample_table:
sample_table[-1]["next"] = sample_tok
sample_table.append(sample_entry)
if not first_sample_token:
first_sample_token = sample_tok
# Create ego pose
ego_tok = new_token()
ego_pose_table.append({
"token": ego_tok,
"timestamp": timestamp_us,
"rotation": [1.0, 0.0, 0.0, 0.0], # w, x, y, z quaternion
"translation": [0.0, 0.0, 0.0] # x, y, z (z always 0 in nuScenes)
})
# Process each camera
current_sample_data_tokens = {}
for cam_idx in range(len(cameras)):
try:
# FIX: Add timeout and null checks for RGB data
rgb_data = None
for attempt in range(MAX_RETRIES):
try:
rgb_data = rgb_annotators[cam_idx].get_data()
if rgb_data is not None:
break
time.sleep(0.1)
except Exception as e:
print(f"[WARNING] RGB data attempt {attempt + 1} failed: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(0.2)
if rgb_data is not None:
# Save image
channel_name = f"CAM_{cam_idx}"
channel_dir = os.path.join(OUT_DIR, "samples", channel_name)
os.makedirs(channel_dir, exist_ok=True)
# Generate filename
image_filename = f"n000-{time.strftime('%Y-%m-%d-%H-%M-%S')}+0000__{channel_name}__{timestamp_us}.jpg"
file_path = os.path.join(channel_dir, image_filename)
# FIX: More robust image conversion with validation
try:
if isinstance(rgb_data, np.ndarray):
# Validate array shape
if len(rgb_data.shape) != 3:
print(f"[WARNING] Invalid RGB data shape: {rgb_data.shape}")
continue
if rgb_data.dtype != np.uint8:
rgb_data = np.clip(rgb_data * 255, 0, 255).astype(np.uint8)
# Ensure proper image format
if rgb_data.shape[2] == 3:
Image.fromarray(rgb_data, 'RGB').save(file_path)
elif rgb_data.shape[2] == 4:
Image.fromarray(rgb_data[:,:,:3], 'RGB').save(file_path)
else:
print(f"[WARNING] Unexpected image channels: {rgb_data.shape[2]}")
continue
else:
print(f"[WARNING] RGB data is not numpy array: {type(rgb_data)}")
continue
except Exception as e:
print(f"[WARNING] Failed to save image for camera {cam_idx}: {e}")
continue
# Create sample_data entry
sd_tok = new_token()
sample_data_entry = {
"token": sd_tok,
"sample_token": sample_tok,
"ego_pose_token": ego_tok,
"calibrated_sensor_token": calibrated_sensor_tokens[cam_idx],
"filename": f"samples/{channel_name}/{image_filename}",
"fileformat": "jpg",
"width": IMG_RES[0],
"height": IMG_RES[1],
"timestamp": timestamp_us,
"is_key_frame": True,
"prev": prev_sample_data_tokens.get(cam_idx, ""),
"next": ""
}
# Link previous sample_data
if cam_idx in prev_sample_data_tokens and sample_data_table:
# Find and update previous sample_data entry
for sd in sample_data_table:
if sd["token"] == prev_sample_data_tokens[cam_idx]:
sd["next"] = sd_tok
break
sample_data_table.append(sample_data_entry)
current_sample_data_tokens[cam_idx] = sd_tok
except Exception as e:
print(f"[WARNING] Failed to process camera {cam_idx} for frame {frame_id}: {e}")
continue
# FIX: More robust 3D bounding box processing with validation
try:
bbox_data = None
for attempt in range(MAX_RETRIES):
try:
bbox_data = bbox3d_annotators[0].get_data()
if bbox_data is not None:
break
time.sleep(0.1)
except Exception as e:
print(f"[WARNING] Bbox data attempt {attempt + 1} failed: {e}")
if attempt < MAX_RETRIES - 1:
time.sleep(0.2)
if bbox_data and len(bbox_data) > 0:
for bbox_idx, box in enumerate(bbox_data):
try:
# FIX: Validate box data structure
if not isinstance(box, dict):
print(f"[WARNING] Invalid box data type: {type(box)}")
continue
# Use semantic label or position as forklift ID
forklift_id = f"forklift_{bbox_idx}"
# Find or create instance
instance_info = find_or_create_instance(forklift_id, category_table[0]["token"])
# Create annotation
ann_tok = new_token()
# FIX: More robust extraction with validation
translation = [0.0, 0.0, 1.0] # Default values
size = [2.0, 1.5, 2.0] # Default size
rotation = [1.0, 0.0, 0.0, 0.0] # Default rotation
try:
if "translation" in box and box["translation"] is not None:
trans = box["translation"]
if isinstance(trans, (list, tuple, np.ndarray)) and len(trans) >= 3:
translation = [float(x) for x in trans[:3]]
except (ValueError, TypeError) as e:
print(f"[WARNING] Invalid translation data: {e}")
try:
if "size" in box and box["size"] is not None:
sz = box["size"]
if isinstance(sz, (list, tuple, np.ndarray)) and len(sz) >= 3:
size = [float(x) for x in sz[:3]]
except (ValueError, TypeError) as e:
print(f"[WARNING] Invalid size data: {e}")
try:
if "rotation" in box and box["rotation"] is not None:
rot = box["rotation"]
if isinstance(rot, (list, tuple, np.ndarray)) and len(rot) >= 4:
rotation = [float(x) for x in rot[:4]]
except (ValueError, TypeError) as e:
print(f"[WARNING] Invalid rotation data: {e}")
# Link with previous annotation
prev_ann = link_sample_annotations(ann_tok, instance_info)
sample_annotation_entry = {
"token": ann_tok,
"sample_token": sample_tok,
"instance_token": instance_info["token"],
"attribute_tokens": ["attr_moving"],
"visibility_token": "vis_full",
"translation": translation,
"size": size,
"rotation": rotation,
"num_lidar_pts": 0,
"num_radar_pts": 0,
"next": "",
"prev": prev_ann
}
sample_annotation_table.append(sample_annotation_entry)
# Update instance tracking
instance_info["annotations"].append(ann_tok)
if not instance_info["first_annotation_token"]:
instance_info["first_annotation_token"] = ann_tok
instance_info["last_annotation_token"] = ann_tok
except Exception as e:
print(f"[WARNING] Failed to process bbox {bbox_idx}: {e}")
continue
except Exception as e:
print(f"[WARNING] Failed to process bounding boxes for frame {frame_id}: {e}")
# Update tracking variables
prev_sample_token = sample_tok
prev_sample_data_tokens = current_sample_data_tokens
# FIX: Save more frequently and with better cleanup
if frame_id % BATCH_SIZE == 0 or frame_id == NUM_FRAMES - 1:
try:
save_json_files()
print(f"[INFO] Saved progress at frame {frame_id}")
except Exception as e:
print(f"[WARNING] Failed to save progress: {e}")
# Force comprehensive cleanup
cleanup_resources()
except Exception as e:
print(f"[ERROR] Failed to process frame {frame_id}: {e}")
traceback.print_exc()
# Cleanup on error
cleanup_resources()
continue # Continue with next frame
# Finalize instance table
for forklift_id, instance_info in forklift_instances.items():
for inst in instance_table:
if inst["token"] == instance_info["token"]:
inst["nbr_annotations"] = len(instance_info["annotations"])
inst["first_annotation_token"] = instance_info["first_annotation_token"]
inst["last_annotation_token"] = instance_info["last_annotation_token"]
break
# Finalize scene table
if sample_table:
scene_table[0]["first_sample_token"] = first_sample_token
scene_table[0]["last_sample_token"] = sample_table[-1]["token"]
# Final save
save_json_files()
print(f"[SUCCESS] nuScenes dataset generated at: {OUT_DIR}")
print(f"[INFO] Generated {len(sample_table)} samples")
print(f"[INFO] Generated {len(sample_data_table)} camera images")
print(f"[INFO] Generated {len(sample_annotation_table)} annotations")
print(f"[INFO] Tracked {len(instance_table)} object instances")
except Exception as e:
print(f"[ERROR] Script failed: {e}")
traceback.print_exc()
finally:
print("[INFO] Shutting down...")
try:
# FIX: More thorough cleanup
cleanup_resources()
if rep.orchestrator.get_is_started():
rep.orchestrator.stop()
# Give time for proper shutdown
time.sleep(1.0)
except Exception as e:
print(f"[WARNING] Cleanup during shutdown failed: {e}")
simulation_app.close()