Help: Generate Synthetic Data in NuScenes Format with Isaac Sim (Replicator)

Hello,

I am working on synthetic dataset generation in Isaac Sim and I want to export the generated data in the NuScenes dataset format.

Specifically, I am looking for guidance on:

  • How to structure the output (images, annotations, calibration, ego poses) so it matches NuScenes format.

  • Whether there is existing code, a writer, or an extension that supports exporting to NuScenes format.

  • If not directly available, what would be the best workflow to adapt the Replicator/SDG output to match NuScenes conventions?

I would really appreciate example code snippets or pointers to resources that show how to customize the data writer for this purpose. If anybody is working on this, I would like to collaborate with you:)I have attached my code below. The sample_annotation and instance is not populated. I am not sure where I am going wrong:

import os
import json
import time
import uuid
import shutil
import traceback
from isaacsim import SimulationApp

# -------------------- Launch Isaac Sim --------------------
simulation_app = SimulationApp({
    "headless": True,  # FIX: Run headless to save GPU memory
    "renderer": "PathTracing",  # FIX: Use less memory-intensive renderer
    "width": 640,  # FIX: Smaller window
    "height": 480,
    "multi_gpu": False,
    "vsync": False,
    "fast_shutdown": True,
    "anti_aliasing": 0,  # FIX: Disable anti-aliasing to save memory
    "enable_scene_query_support": False,  # FIX: Disable unnecessary features
    "enable_livestream": False,
    "subdiv_refinement_level": 0,  # FIX: Reduce subdivision level
    "physics_gpu": False  # FIX: Use CPU physics to save GPU memory
})

import omni.usd
import omni.replicator.core as rep
import omni.kit.app
from omni.replicator.core import AnnotatorRegistry
from pxr import Usd, UsdGeom, Gf
from PIL import Image
import numpy as np
import gc

# -------------------- CONFIG --------------------
OUT_DIR = os.path.expanduser("~/Downloads/trial9_nuscenes_ready")
NUM_FRAMES = 20  # FIX: Drastically reduce frame count to prevent GPU exhaustion
IMG_RES = (640, 480)  # FIX: Reduce image resolution to save GPU memory
BATCH_SIZE = 5  # FIX: Save more frequently
MAX_RETRIES = 2  # FIX: Reduce retries to prevent hanging

os.makedirs(OUT_DIR, exist_ok=True)
os.makedirs(os.path.join(OUT_DIR, "samples"), exist_ok=True)

# Asset paths
WAREHOUSE_ROOT = "omniverse://localhost/Users/retrolifts/Assets/dataset_WH.usd"
FORKLIFT = {
    "forklift_7500": "omniverse://localhost/Users/retrolifts/Assets/RM_FL_7500_2.usd"
}
FORKLIFT_SEM = "forklift_7500"

# -------------------- nuScenes table containers --------------------
scene_table = []
sample_table = []
sample_data_table = []
sensor_table = []
calibrated_sensor_table = []
ego_pose_table = []
instance_table = []
sample_annotation_table = []
category_table = []
log_table = []
attribute_table = []
visibility_table = []

# Helper function for tokens
def new_token():
    return uuid.uuid4().hex

# Global tracking variables
prev_sample_token = ""
first_sample_token = ""
scene_token = new_token()
log_token = new_token()
sensor_tokens = {}
calibrated_sensor_tokens = {}
forklift_instances = {}  # Track forklift instances across frames

# FIX: Add cleanup tracking
active_instances = []
active_annotators = []
current_prims = []  # Track current prims to ensure proper population

def wait_for_prim_population(max_wait=3.0):
    """Wait for prims to be properly populated before proceeding"""
    import omni.usd
    
    context = omni.usd.get_context()
    if not context:
        return False
    
    stage = context.get_stage()
    if not stage:
        return False
    
    # Wait for stage to be ready - simplified approach
    wait_time = 0.0
    step_size = 0.1
    
    while wait_time < max_wait:
        try:
            # Force stage to process pending operations
            omni.kit.app.get_app().update()
            time.sleep(step_size)
            wait_time += step_size
            
            # FIX: Simple wait without HasPendingActiveAuthoring (not available in this version)
            if wait_time > 0.5:  # Minimum wait time
                print(f"[INFO] Stage wait completed after {wait_time:.1f}s")
                return True
                
        except Exception as e:
            print(f"[WARNING] Error waiting for population: {e}")
            break
    
    print(f"[WARNING] Stage wait completed after {max_wait}s")
    return True  # Continue anyway

# def cleanup_resources():
#     """Properly cleanup GPU resources"""
#     try:
#         print("[INFO] Starting resource cleanup...")
        
#         # Clear replicator instances first
#         try:
#             rep.randomizer.instantiate("", size=0, mode="scene_instance")
#             print("[INFO] Cleared replicator instances")
#         except Exception as e:
#             print(f"[WARNING] Failed to clear replicator instances: {e}")
        
#         # Clear active instances
#         if active_instances:
#             for instance in active_instances:
#                 try:
#                     # Force cleanup of instance
#                     if hasattr(instance, 'destroy'):
#                         instance.destroy()
#                 except Exception as e:
#                     print(f"[WARNING] Failed to destroy instance: {e}")
#             active_instances.clear()
#             print(f"[INFO] Cleared {len(active_instances)} active instances")
        
#         # Clear current prims tracking
#         current_prims.clear()
        
#         # Force garbage collection
#         gc.collect()
        
#         # Wait for cleanup to complete
#         wait_for_prim_population(1.0)
        
#         print("[INFO] Resource cleanup completed")
        
#     except Exception as e:
#         print(f"[WARNING] Cleanup failed: {e}")


def cleanup_resources():
    """Properly cleanup GPU + USD resources before spawning next forklift"""
    try:
        print("[INFO] Starting resource cleanup...")

        # Clear replicator instances
        try:
            rep.randomizer.instantiate("", size=0, mode="scene_instance")
            print("[INFO] Cleared replicator replicator instances")
        except Exception as e:
            print(f"[WARNING] Failed to clear replicator instances: {e}")

        # Delete forklift prims from stage
        stage = omni.usd.get_context().get_stage()
        if stage:
            prims_to_remove = []
            for prim in stage.Traverse():
                if prim.GetPath().pathString.startswith("/World") and "forklift" in prim.GetName().lower():
                    prims_to_remove.append(prim)

            for prim in prims_to_remove:
                try:
                    stage.RemovePrim(prim.GetPath())
                    print(f"[INFO] Removed prim: {prim.GetPath()}")
                except Exception as e:
                    print(f"[WARNING] Could not remove prim {prim.GetPath()}: {e}")

        # Clear current tracking
        active_instances.clear()
        current_prims.clear()

        # Force garbage collection
        gc.collect()
        wait_for_prim_population(1.0)

        print("[INFO] Resource cleanup completed")

    except Exception as e:
        print(f"[WARNING] Cleanup failed: {e}")



# Initialize static tables
def initialize_static_tables():
    # Category table
    category_table.append({
        "token": "cat_forklift",
        "name": "vehicle.forklift",
        "description": "Synthetic forklift category",
        "index": 0
    })
    
    # Attribute table
    attribute_table.extend([
        {
            "token": "attr_moving",
            "name": "vehicle.moving",
            "description": "Vehicle is moving"
        },
        {
            "token": "attr_stopped",
            "name": "vehicle.stopped", 
            "description": "Vehicle is stationary"
        }
    ])
    
    # Visibility table
    visibility_table.extend([
        {
            "token": "vis_full",
            "level": "v80-100",
            "description": "visibility of whole object is between 80 and 100%"
        },
        {
            "token": "vis_most",
            "level": "v60-80", 
            "description": "visibility of whole object is between 60 and 80%"
        }
    ])
    
    # Log table
    log_table.append({
        "token": log_token,
        "logfile": "synthetic_warehouse_001",
        "vehicle": "isaac_sim_ego",
        "date_captured": time.strftime("%Y-%m-%d"),
        "location": "isaac-sim-warehouse"
    })
    
    # Scene table (will be updated with sample info later)
    scene_table.append({
        "token": scene_token,
        "log_token": log_token,
        "nbr_samples": NUM_FRAMES,
        "first_sample_token": "",
        "last_sample_token": "",
        "name": "scene-warehouse-001",
        "description": "Synthetic warehouse scene with forklift generated in Isaac Sim"
    })

def compute_intrinsics():
    """Compute camera intrinsics matrix (flattened 3x3)"""
    focal = 11.0  # mm
    w_px, h_px = IMG_RES
    ha, va = 36.0, 24.0  # sensor size in mm
    fx = focal * (w_px / ha)
    fy = focal * (h_px / va)
    cx = w_px / 2.0
    cy = h_px / 2.0
    return [fx, 0.0, cx, 0.0, fy, cy, 0.0, 0.0, 1.0]

def create_sensors_and_calibration():
    """Create sensor and calibrated_sensor entries for cameras"""
    camera_positions = [(-29.5, 29.5, 8.4)]  # Only one camera
    
    for i, pos in enumerate(camera_positions):
        # Sensor entry
        sensor_tok = new_token()
        sensor_table.append({
            "token": sensor_tok,
            "channel": f"CAM_{i}",
            "modality": "camera"
        })
        sensor_tokens[i] = sensor_tok
        
        # Calibrated sensor entry
        calib_tok = new_token()
        calibrated_sensor_table.append({
            "token": calib_tok,
            "sensor_token": sensor_tok,
            "translation": list(map(float, pos)),
            "rotation": [1.0, 0.0, 0.0, 0.0],  # w, x, y, z quaternion
            "camera_intrinsic": compute_intrinsics()
        })
        calibrated_sensor_tokens[i] = calib_tok

def save_json_files():
    """Save all JSON tables to disk"""
    tables = {
        "scene": scene_table,
        "sample": sample_table,
        "sample_data": sample_data_table,
        "sensor": sensor_table,
        "calibrated_sensor": calibrated_sensor_table,
        "ego_pose": ego_pose_table,
        "instance": instance_table,
        "sample_annotation": sample_annotation_table,
        "category": category_table,
        "log": log_table,
        "attribute": attribute_table,
        "visibility": visibility_table
    }
    
    for name, data in tables.items():
        try:
            file_path = os.path.join(OUT_DIR, f"{name}.json")
            with open(file_path, "w") as f:
                json.dump(data, f, indent=2)
        except Exception as e:
            print(f"[WARNING] Failed to save {name}.json: {e}")

def find_or_create_instance(forklift_id, category_token):
    """Find existing instance or create new one for a forklift"""
    if forklift_id not in forklift_instances:
        inst_tok = new_token()
        forklift_instances[forklift_id] = {
            "token": inst_tok,
            "annotations": [],
            "first_annotation_token": "",
            "last_annotation_token": ""
        }
        
        # Create instance table entry
        instance_table.append({
            "token": inst_tok,
            "category_token": category_token,
            "nbr_annotations": 0,
            "first_annotation_token": "",
            "last_annotation_token": ""
        })
    
    return forklift_instances[forklift_id]

def link_sample_annotations(current_ann_token, instance_info):
    """Link sample annotations temporally using prev/next"""
    if instance_info["annotations"]:
        # Link with previous annotation
        prev_ann_token = instance_info["annotations"][-1]
        
        # Update previous annotation's next field
        for ann in sample_annotation_table:
            if ann["token"] == prev_ann_token:
                ann["next"] = current_ann_token
                break
        
        # Set current annotation's prev field
        return prev_ann_token
    return ""

# FIX: Much more conservative forklift randomizer
def randomize_forklift(file_name, class_name, max_number=1):
    """Create forklift instances with minimal GPU usage"""
    try:
        print(f"[INFO] Creating single forklift instance")
        
        # FIX: Clear any existing instances first
        rep.randomizer.instantiate("", size=0, mode="scene_instance")
        
        # FIX: Wait for cleanup
        time.sleep(1.0)
        
        # Create single instance with fixed parameters (no distributions)
        instances = rep.randomizer.instantiate(
            file_name, size=max_number, mode="scene_instance"
        )
        
        if instances is not None:
            print("[INFO] Forklift instance created, applying modifications...")
            
            with instances:
                rep.modify.semantics([("class", class_name)])
                # FIX: Use fixed pose to avoid distribution calculations
                rep.modify.pose(
                    # position=(-20.0, 20.0, 0.0),  # Fixed position
                    # rotation=(90, 0, 0),  # Fixed rotation
                    # scale=1.0

                    position=rep.distribution.uniform((-29.5, 10.5, 0), (-10.5, 29.5, 0)),
                    rotation=rep.distribution.uniform((90, 0, -180), (90, 0, 180)),
                    scale=rep.distribution.uniform((0.9), (1.1)),
                )
                rep.modify.visibility(True)
            
            print("[INFO] Forklift modifications applied successfully")
            return instances
            
        else:
            print("[WARNING] Failed to create forklift instance")
            return None
            
    except Exception as e:
        print(f"[ERROR] Forklift creation failed: {e}")
        return None
    

    

# -------------------- Main execution --------------------
try:
    print("[INFO] Initializing nuScenes dataset structure...")
    initialize_static_tables()
    
    print("[INFO] Setting up stage and environment...")
    omni.usd.get_context().new_stage()

    # Create environment
    env = rep.create.from_usd(WAREHOUSE_ROOT)

    # Lights - FIX: Minimal lighting setup to save GPU memory
    with rep.new_layer():
        rep.create.light(
            light_type="Dome",  # FIX: Use dome light instead of multiple rect lights
            intensity=2000,  # FIX: Much lower intensity
            temperature=6000,
            count=1  # FIX: Only one light
        )

    # Camera setup - FIX: Use only ONE camera to reduce GPU load
    camera_positions = [(-29.5, 29.5, 8.4)]  # Only one camera
    cameras = []
    render_products = []
    
    for i, pos in enumerate(camera_positions):
        cam = rep.create.camera(focal_length=11)
        rp = rep.create.render_product(cam, IMG_RES)
        cameras.append(cam)
        render_products.append(rp)

    # Create annotators - FIX: Track them for cleanup
    rgb_annotators = []
    bbox3d_annotators = []
    
    for rp in render_products:
        rgb_ann = AnnotatorRegistry.get_annotator("rgb")
        bbox3d_ann = AnnotatorRegistry.get_annotator("bounding_box_3d")
        rgb_ann.attach([rp])
        bbox3d_ann.attach([rp])
        rgb_annotators.append(rgb_ann)
        bbox3d_annotators.append(bbox3d_ann)
        
        # Track for cleanup
        active_annotators.extend([rgb_ann, bbox3d_ann])

    # Create sensors and calibration
    create_sensors_and_calibration()

    print("[INFO] Starting frame-by-frame data capture...")
    
    # Track previous sample data tokens for linking
    prev_sample_data_tokens = {}
    
    for frame_id in range(NUM_FRAMES):
        print(f"[INFO] Processing frame {frame_id+1}/{NUM_FRAMES}")
        
        try:
            # FIX: Much longer wait between frames to let GPU recover
            if frame_id > 0:
                cleanup_resources()
            
            # FIX: Very long delay between frames
            time.sleep(2.0)
            
            # Randomize forklift placement - simplified
            forklift_instances_obj = None
            for k, fpath in FORKLIFT.items():
                print(f"[INFO] Creating forklift: {k}")
                forklift_instances_obj = randomize_forklift(fpath, k)
                break  # Only try once
            
            # Skip frame if forklift creation failed
            if forklift_instances_obj is None:
                print(f"[WARNING] Skipping frame {frame_id} due to forklift failure")
                continue
            # Set camera poses
            for i, cam in enumerate(cameras):
                with cam:
                    rep.modify.pose(position=camera_positions[i], look_at=(-20, 20, 0))
            
            # FIX: Simple orchestrator step with minimal retries
            try:
                print(f"[INFO] Stepping orchestrator for frame {frame_id}")
                time.sleep(1.0)  # Wait before stepping
                
                rep.orchestrator.step(rt_subframes=1)
                
                time.sleep(1.0)  # Wait after stepping
                print("[INFO] Orchestrator step completed")
                
            except Exception as e:
                print(f"[ERROR] Orchestrator step failed for frame {frame_id}: {e}")
                continue
        
            # Create sample
            sample_tok = new_token()
            timestamp_us = int((time.time() + frame_id * 0.5) * 1e6)  # 0.5s intervals
            
            sample_entry = {
                "token": sample_tok,
                "timestamp": timestamp_us,
                "scene_token": scene_token,
                "prev": prev_sample_token if prev_sample_token else "",
                "next": ""
            }
            
            # Link previous sample
            if prev_sample_token and sample_table:
                sample_table[-1]["next"] = sample_tok
            
            sample_table.append(sample_entry)
            
            if not first_sample_token:
                first_sample_token = sample_tok
            
            # Create ego pose
            ego_tok = new_token()
            ego_pose_table.append({
                "token": ego_tok,
                "timestamp": timestamp_us,
                "rotation": [1.0, 0.0, 0.0, 0.0],  # w, x, y, z quaternion
                "translation": [0.0, 0.0, 0.0]    # x, y, z (z always 0 in nuScenes)
            })
            
            # Process each camera
            current_sample_data_tokens = {}
            
            for cam_idx in range(len(cameras)):
                try:
                    # FIX: Add timeout and null checks for RGB data
                    rgb_data = None
                    for attempt in range(MAX_RETRIES):
                        try:
                            rgb_data = rgb_annotators[cam_idx].get_data()
                            if rgb_data is not None:
                                break
                            time.sleep(0.1)
                        except Exception as e:
                            print(f"[WARNING] RGB data attempt {attempt + 1} failed: {e}")
                            if attempt < MAX_RETRIES - 1:
                                time.sleep(0.2)
                    
                    if rgb_data is not None:
                        # Save image
                        channel_name = f"CAM_{cam_idx}"
                        channel_dir = os.path.join(OUT_DIR, "samples", channel_name)
                        os.makedirs(channel_dir, exist_ok=True)
                        
                        # Generate filename
                        image_filename = f"n000-{time.strftime('%Y-%m-%d-%H-%M-%S')}+0000__{channel_name}__{timestamp_us}.jpg"
                        file_path = os.path.join(channel_dir, image_filename)
                        
                        # FIX: More robust image conversion with validation
                        try:
                            if isinstance(rgb_data, np.ndarray):
                                # Validate array shape
                                if len(rgb_data.shape) != 3:
                                    print(f"[WARNING] Invalid RGB data shape: {rgb_data.shape}")
                                    continue
                                
                                if rgb_data.dtype != np.uint8:
                                    rgb_data = np.clip(rgb_data * 255, 0, 255).astype(np.uint8)
                                
                                # Ensure proper image format
                                if rgb_data.shape[2] == 3:
                                    Image.fromarray(rgb_data, 'RGB').save(file_path)
                                elif rgb_data.shape[2] == 4:
                                    Image.fromarray(rgb_data[:,:,:3], 'RGB').save(file_path)
                                else:
                                    print(f"[WARNING] Unexpected image channels: {rgb_data.shape[2]}")
                                    continue
                            else:
                                print(f"[WARNING] RGB data is not numpy array: {type(rgb_data)}")
                                continue
                        except Exception as e:
                            print(f"[WARNING] Failed to save image for camera {cam_idx}: {e}")
                            continue
                        
                        # Create sample_data entry
                        sd_tok = new_token()
                        sample_data_entry = {
                            "token": sd_tok,
                            "sample_token": sample_tok,
                            "ego_pose_token": ego_tok,
                            "calibrated_sensor_token": calibrated_sensor_tokens[cam_idx],
                            "filename": f"samples/{channel_name}/{image_filename}",
                            "fileformat": "jpg",
                            "width": IMG_RES[0],
                            "height": IMG_RES[1],
                            "timestamp": timestamp_us,
                            "is_key_frame": True,
                            "prev": prev_sample_data_tokens.get(cam_idx, ""),
                            "next": ""
                        }
                        
                        # Link previous sample_data
                        if cam_idx in prev_sample_data_tokens and sample_data_table:
                            # Find and update previous sample_data entry
                            for sd in sample_data_table:
                                if sd["token"] == prev_sample_data_tokens[cam_idx]:
                                    sd["next"] = sd_tok
                                    break
                        
                        sample_data_table.append(sample_data_entry)
                        current_sample_data_tokens[cam_idx] = sd_tok
                        
                except Exception as e:
                    print(f"[WARNING] Failed to process camera {cam_idx} for frame {frame_id}: {e}")
                    continue
            
            # FIX: More robust 3D bounding box processing with validation
            try:
                bbox_data = None
                for attempt in range(MAX_RETRIES):
                    try:
                        bbox_data = bbox3d_annotators[0].get_data()
                        if bbox_data is not None:
                            break
                        time.sleep(0.1)
                    except Exception as e:
                        print(f"[WARNING] Bbox data attempt {attempt + 1} failed: {e}")
                        if attempt < MAX_RETRIES - 1:
                            time.sleep(0.2)
                
                if bbox_data and len(bbox_data) > 0:
                    for bbox_idx, box in enumerate(bbox_data):
                        try:
                            # FIX: Validate box data structure
                            if not isinstance(box, dict):
                                print(f"[WARNING] Invalid box data type: {type(box)}")
                                continue
                            
                            # Use semantic label or position as forklift ID
                            forklift_id = f"forklift_{bbox_idx}"
                            
                            # Find or create instance
                            instance_info = find_or_create_instance(forklift_id, category_table[0]["token"])
                            
                            # Create annotation
                            ann_tok = new_token()
                            
                            # FIX: More robust extraction with validation
                            translation = [0.0, 0.0, 1.0]  # Default values
                            size = [2.0, 1.5, 2.0]  # Default size
                            rotation = [1.0, 0.0, 0.0, 0.0]  # Default rotation
                            
                            try:
                                if "translation" in box and box["translation"] is not None:
                                    trans = box["translation"]
                                    if isinstance(trans, (list, tuple, np.ndarray)) and len(trans) >= 3:
                                        translation = [float(x) for x in trans[:3]]
                            except (ValueError, TypeError) as e:
                                print(f"[WARNING] Invalid translation data: {e}")
                            
                            try:
                                if "size" in box and box["size"] is not None:
                                    sz = box["size"]
                                    if isinstance(sz, (list, tuple, np.ndarray)) and len(sz) >= 3:
                                        size = [float(x) for x in sz[:3]]
                            except (ValueError, TypeError) as e:
                                print(f"[WARNING] Invalid size data: {e}")
                            
                            try:
                                if "rotation" in box and box["rotation"] is not None:
                                    rot = box["rotation"]
                                    if isinstance(rot, (list, tuple, np.ndarray)) and len(rot) >= 4:
                                        rotation = [float(x) for x in rot[:4]]
                            except (ValueError, TypeError) as e:
                                print(f"[WARNING] Invalid rotation data: {e}")
                            
                            # Link with previous annotation
                            prev_ann = link_sample_annotations(ann_tok, instance_info)
                            
                            sample_annotation_entry = {
                                "token": ann_tok,
                                "sample_token": sample_tok,
                                "instance_token": instance_info["token"],
                                "attribute_tokens": ["attr_moving"],
                                "visibility_token": "vis_full",
                                "translation": translation,
                                "size": size,
                                "rotation": rotation,
                                "num_lidar_pts": 0,
                                "num_radar_pts": 0,
                                "next": "",
                                "prev": prev_ann
                            }
                            
                            sample_annotation_table.append(sample_annotation_entry)
                            
                            # Update instance tracking
                            instance_info["annotations"].append(ann_tok)
                            if not instance_info["first_annotation_token"]:
                                instance_info["first_annotation_token"] = ann_tok
                            instance_info["last_annotation_token"] = ann_tok
                            
                        except Exception as e:
                            print(f"[WARNING] Failed to process bbox {bbox_idx}: {e}")
                            continue
                            
            except Exception as e:
                print(f"[WARNING] Failed to process bounding boxes for frame {frame_id}: {e}")
            
            # Update tracking variables
            prev_sample_token = sample_tok
            prev_sample_data_tokens = current_sample_data_tokens
            
            # FIX: Save more frequently and with better cleanup
            if frame_id % BATCH_SIZE == 0 or frame_id == NUM_FRAMES - 1:
                try:
                    save_json_files()
                    print(f"[INFO] Saved progress at frame {frame_id}")
                except Exception as e:
                    print(f"[WARNING] Failed to save progress: {e}")
                
                # Force comprehensive cleanup
                cleanup_resources()
                
        except Exception as e:
            print(f"[ERROR] Failed to process frame {frame_id}: {e}")
            traceback.print_exc()
            
            # Cleanup on error
            cleanup_resources()
            continue  # Continue with next frame
    
    # Finalize instance table
    for forklift_id, instance_info in forklift_instances.items():
        for inst in instance_table:
            if inst["token"] == instance_info["token"]:
                inst["nbr_annotations"] = len(instance_info["annotations"])
                inst["first_annotation_token"] = instance_info["first_annotation_token"]
                inst["last_annotation_token"] = instance_info["last_annotation_token"]
                break
    
    # Finalize scene table
    if sample_table:
        scene_table[0]["first_sample_token"] = first_sample_token
        scene_table[0]["last_sample_token"] = sample_table[-1]["token"]
    
    # Final save
    save_json_files()
    
    print(f"[SUCCESS] nuScenes dataset generated at: {OUT_DIR}")
    print(f"[INFO] Generated {len(sample_table)} samples")
    print(f"[INFO] Generated {len(sample_data_table)} camera images")  
    print(f"[INFO] Generated {len(sample_annotation_table)} annotations")
    print(f"[INFO] Tracked {len(instance_table)} object instances")

except Exception as e:
    print(f"[ERROR] Script failed: {e}")
    traceback.print_exc()
finally:
    print("[INFO] Shutting down...")
    try:
        # FIX: More thorough cleanup
        cleanup_resources()
        
        if rep.orchestrator.get_is_started():
            rep.orchestrator.stop()
            
        # Give time for proper shutdown
        time.sleep(1.0)
        
    except Exception as e:
        print(f"[WARNING] Cleanup during shutdown failed: {e}")
    
    simulation_app.close()

Hi @prasadpr we don’t have a writer for the NuScenes format in Replicator, but writing one can be done through Custom Writers. Take a look at this page:

You can also refer to the API for writers here:

Lastly, I recommend taking a look at the existing KITTI writer kitti.py. Depending on your OS, its probably in a directory like IsaacSim\5.0\isaacsim_build\windows-x86_64\release\extscache\omni.replicator.core-1.12.10+107.3.0.wx64.r.cp311\omni\replicator\core\scripts\writers_default