I want to get the objectId of the LIDAR to know how many LIDAR points come from each object in the scene. The objects have a semantic class previously defined. The objectId obtained has many different and unknown ids, more than the objects I have in the scene. To keep the objectId of my objects, I read all the prim path and count those of each object in the scene. The idea is to get all the points that detect each of my objects.
The problem is that every time I read the data, the number of points of each object varies a lot, it can be that in a frame the cube has only one point with its objectId and in another frame it has 200 points.
Here I pass one of my codes, I have tried to do it with async but I have the same problem.
import time
import asyncio
import json
import io
import numpy as np
import os
import struct
from collections import Counter, defaultdict
from pxr import Gf
import omni.kit
import omni.usd
import omni.replicator.core as rep
import omni.isaac.sensor as omni_sensor
from omni.isaac.core import SimulationContext
from omni.syntheticdata._syntheticdata import acquire_syntheticdata_interface
from omni.replicator.core import Writer, AnnotatorRegistry, BackendDispatch
class PointCloudWriter(Writer):
def __init__(
self,
output_dir, # output directory
dataset_name: str, # name of the dataset
timestamps: bool = True, # if True, the point cloud data will be timestamped
world_coord: bool = False, # if True, the point cloud data will be in world coordinates
frames_to_save=35, # number of frames to save
subframes=10, # subframes to skip between saved frames
):
self._samples_dir = os.path.join(output_dir, "Samples", "LIDAR_OMNIVERSE")
self._dataset_dir = os.path.join(output_dir, dataset_name)
self._backend = BackendDispatch({"paths": {"out_dir": output_dir}})
self._frame_id = 0
self._frames_to_save = frames_to_save
self._subframes = subframes
# Global class variables
self._current_file_name = None # Initialize the current file name
self._current_timestamp = None # Initialize the current timestamp
self._scene_token = None # Initialize the scene token
self._sample_token = None # Initialize the sample token
self.previous_sample_data_token = None # Initialize the previous sample data token
self.next_sample_data_token = None # Initialize the next sample data token
self.previous_sample_token = None # Initialize the previous sample token
self.next_sample_token = None # Initialize the next sample token
self.num_saved_samples = 0 # Initialize the number of saved samples
# Variables for the scene
self._first_sample_token = None
self._last_sample_token = None
self._first_sample_token = None
self._last_sample_token = None
# Variables for the log
self._log_token = None
self.log_created = False # Initialize the log creation flag
# Simulation context
self.simulation_context = SimulationContext()
self.annotators = []
# Point Cloud Annotation
if not timestamps:
self.annotators.append(AnnotatorRegistry.get_annotator("RtxSensorCpuIsaacComputeRTXLidarPointCloud"))
else:
buffer_annotator = AnnotatorRegistry.get_annotator("RtxSensorCpuIsaacCreateRTXLidarScanBuffer")
buffer_annotator.initialize(outputTimestamp=True, outputObjectId=True, transformPoints=world_coord)
self.annotators.append(buffer_annotator)
os.makedirs(self._dataset_dir, exist_ok=True)
os.makedirs(self._samples_dir, exist_ok=True)
def write(self, data):
time.sleep(0.01) # Simulate some processing time
if "RtxSensorCpuIsaacComputeRTXLidarPointCloud" in data:
pointcloud_data = data["RtxSensorCpuIsaacComputeRTXLidarPointCloud"]["data"]
# intensity_data = data["RtxSensorCpuIsaacComputeRTXLidarPointCloud"]["info"]["intensity"]
elif "RtxSensorCpuIsaacCreateRTXLidarScanBuffer" in data:
pointcloud_data = data["RtxSensorCpuIsaacCreateRTXLidarScanBuffer"]["data"]
intensity_data = data["RtxSensorCpuIsaacCreateRTXLidarScanBuffer"]["intensity"]
timestamp = data["RtxSensorCpuIsaacCreateRTXLidarScanBuffer"]["timestamp"]
object_id = data["RtxSensorCpuIsaacCreateRTXLidarScanBuffer"]["objectId"]
# Count occurrences of each number in object_id
count_object_id = Counter(object_id)
# # Acquire synthetic data interface
syntheticdata_interface = acquire_syntheticdata_interface()
# # Prepare a dictionary to map primpaths to their total counts
primpath_counts = defaultdict(int)
# Loop over unique object IDs (keys in Counter)
for obj_id, count in count_object_id.items():
primpath = syntheticdata_interface.get_uri_from_instance_segmentation_id(obj_id)
if primpath: # Check if primpath is valid
primpath_counts[primpath] += count # Accumulate counts for the same primpath
# Print results
for primpath, total_count in primpath_counts.items():
print(f"PrimPath: {primpath}, Total Number of Points: {total_count}")
self._frame_id += 1
# Main Code
# Initialize and register the writer
rep.WriterRegistry.register(PointCloudWriter)
# Create render product
render_product = rep.create.render_product("/Environment/Sensors/Solid_State", [1, 1], name="Isaac")
# Initialize the writer with parameters for frames to save and subframe skipping
writer = rep.WriterRegistry.get("PointCloudWriter")
out_dir = "custom_writer_output"
dataset_name = "dataset_test"
writer.initialize(output_dir=out_dir, dataset_name=dataset_name, timestamps=True, world_coord=False, frames_to_save=10, subframes=5)
writer.attach([render_product])
Any ideas will be of great help, thank you very much for your time!