Point cloud data to render product SDG

Hey,

I’ve been able to export SDG data but seems I was exporting the cameras perspective point cloud and not the SDG of the RTX head.

I have the Issac create RTX scan buffer already running with the Debug point cloud.

When exporting

/World/sensor path I just get a flat plane, I’m running windows and not using ROS btw

thanks again

Hi there,

could you elaborate a bit on the issue you are encountering?

Thanks!

Hi Ahaidu,

thanks for getting back to me.

Basically I want to send the data from the rtx lidar head to the writer.

I’m looking to get the “xyz data” (pointcloud) data recording ideally in a custom writer.

I’m using the code found here

https://docs.omniverse.nvidia.com/isaacsim/latest/features/sensors_simulation/isaac_sim_sensors_rtx_based_lidar.html

With the only modification is changing the config to
lidar_config = “ZVISION_ML30S”

In the code here

4. Create Annotator to read the data from with annotator.get_data()

annotator = rep.AnnotatorRegistry.get_annotator(“RtxSensorCpuIsaacCreateRTXLidarScanBuffer”)
annotator.attach([render_product_path])

The node is producing the data as it is being def into the pointcloud debugger

Currently I cant actually feed the data into the writer or there is some other issue.

I would like to get the transform data anlong with some custom data, a couple of constants and 4 xform (dynamic) positions to be written into the SGD writer

I would have expected selecting the camera path seen here /world/sensor would have outputted the SDG XYZ data to the recorder. The above example in the previous post is only using the basic writer as a test. Not sure what I’m doing wrong but I would like to make it more feature complete by adding the additional data to the writer stream for some RL at a later stage…

reading over

https://docs.omniverse.nvidia.com/isaacsim/latest/features/sensors_simulation/isaac_sim_sensors_rtx_based_lidar/annotator_descriptions.html

I’m pretty sure this isnt a very hard fix I just dont think I’m understanding it correctly.

Like in the examples above I’m using a vispy script to sense check the data in the outputted .npy file. I’ve not been able to generate the

Transform RtxSensorCpuIsaacCreateRTXLidarScanBuffer for each frame

hope that helps you see what im trying to do.

ideally I want to use the custom writer as I neeed to add additional data such as the xforms synced with the pointclouds for my DNN training

Also see my forum post

I’m looking to start my writer with some super basic functions. I want to track 4 prims translate attribute to the standard world frame and one constant (it will be randomised later so I need to track it for a training attribute later.

Having a output like [[X,Y,Z],[X,Y,Z],[X,Y,Z],[X,Y,Z],[Constant]] this needs to be outputted per frame with my RTX lidar pointcloud data which is running in my action graph

prims are

/World/main_sim/Main_function/Control_points/CP0
/World/main_sim/Main_function/Control_points/CP1
/World/main_sim/Main_function/Control_points/CP2
/World/main_sim/Main_function/Control_points/CP3
/World/main_sim/xform_graph/constant_float

and also trying to pull in the point cloud and transform data from

/World/sensor

I’ve been having issues with that as I’m not getting the expected data. I can get pointcloud data from an RGB camera bizarrely …

My pointcloud data from my RTX head I just think I’m doing it wrong even in the basic writer. Its just not recording or I’m missing a step to do with annotator, but surely I can just extract all the data from the head at every frame?

I understand that I need to create a json to do this so its really about building that json

I’m reading over

https://docs.omniverse.nvidia.com/extensions/latest/ext_replicator/custom_writer.html

but its pretty hardcore for my programming skill level!

Any help to get me started would be great!

Do you require pointcloud data from a lidar sensor, or just the basic annotator pointcloud data?

Probably both

Here is an example script using pointcloud annotator data and modifying it using transform data from a prim in stage. It basically transforms the points from world frame to camera frame:

import asyncio

import omni.usd
import numpy as np
import omni.replicator.core as rep
from pxr import Usd, UsdGeom, Gf

RESOLUTION = (3, 3)


# Create a plane and a sphere
plane = rep.create.plane(position=(0, 0, -2), scale=(10, 10, 1))
sphere = rep.create.sphere(position=(0, 0, 1), semantics=[("class", "sphere")])

# Create a top down camera and a render product
stage = omni.usd.get_context().get_stage()
cam_prim = stage.DefinePrim("/World/Camera", "Camera")
xform = UsdGeom.Xformable(cam_prim)
transform_op = xform.AddTransformOp()
mat = Gf.Matrix4d()
mat.SetTranslateOnly(Gf.Vec3d(0.0, 0.0, 10.0))
transform_op.Set(mat)
rp = rep.create.render_product(cam_prim.GetPath(), resolution=RESOLUTION)

# https://docs.omniverse.nvidia.com/extensions/latest/ext_replicator/annotators_details.html#point-cloud
# with 'includeUnlabelled' set to True, the point cloud data will include unlabelled objects as well (e.g. the plane)
pc_annot = rep.AnnotatorRegistry.get_annotator("pointcloud", init_params={"includeUnlabelled": True})
pc_annot.attach(rp)


async def run_example_async():
    # Sync annotator data with stage. Rendering/annotator data is 1-2 frames behind stage's state,
    # because the roundtrip duration between the render request and data availability
    # step() or step_async() will make sure the annotator data is up-to-date wit the stage
    await rep.orchestrator.step_async()

    pc_data = pc_annot.get_data()

    # Get camera transform (USD transform is row-major: https://openusd.org/release/api/usd_geom_page_front.html)
    xformable = UsdGeom.Xformable(cam_prim)
    cam_transform_row_major: Gf.Matrix4d = xformable.ComputeLocalToWorldTransform(Usd.TimeCode.Default())
    cam_transform_row_major = np.array(cam_transform_row_major).reshape((4, 4))
    print(f"cam_transform_row_major={cam_transform_row_major}")

    # Point cloud data is in world frame
    pc_world_frame = pc_data["data"]
    print(f"pc_world_frame={pc_world_frame}")

    # Homogenize the point cloud data (x, y, z) -> (x, y, z, 1) for multiplication with the camera transform
    pc_homogenized = np.hstack((pc_world_frame, np.ones((pc_world_frame.shape[0], 1))))
    # Transform to camera frame (no need to transpose the transformation matrix since it is column-major)
    pc_camera_frame = pc_homogenized @ cam_transform_row_major
    # De-homogenize the point cloud data (x, y, z, 1) -> (x, y, z)
    pc_camera_frame = pc_camera_frame[:, :3]
    print(f"pc_camera_frame={pc_camera_frame}")


asyncio.ensure_future(run_example_async())

Hi Andrei,

lets reset here, I think I need to go back a step and focus on getting the correct data and then I’ll circle back at a later date for other info.

The specific data I’m looking to record is

RtxSensorCpuIsaacCreateRTXLidarScanBuffer / data

This is the actual data from the lidar head as displayed in the point cloud debugger created by the graph

I can use the following command to do the local / world frame if required later

annotator.initialize(transformPoints=False)

I’m using the following code to generate my lidar head which i have saved in script editor.

import omni.kit.commands
from pxr import Gf
from omni.isaac.core.utils.render_product import create_hydra_texture
import omni.replicator.core as rep
lidar_config = "ZVISION_ML30S"

 # 1. Create The Camera
_, sensor = omni.kit.commands.execute(
    "IsaacSensorCreateRtxLidar",
    path="/sensor",
    parent=None,
    config=lidar_config,
    translation=(0, 0, 1.0),
    orientation=Gf.Quatd(1,0,0,0),
)
# 2. Create and Attach a render product to the camera
_, render_product_path = create_hydra_texture([1, 1], sensor.GetPath().pathString)

# 3. Create a Replicator Writer that "writes" points into the scene for debug viewing
writer = rep.writers.get("**RtxLidarDebugDrawPointCloudBuffer**")
writer.attach([render_product_path])

# 4. Create Annotator to read the data from with annotator.get_data()
annotator = rep.AnnotatorRegistry.get_annotator("RtxSensorCpuIsaacCreateRTXLidarScanBuffer")
annotator.attach([render_product_path])

I want to get the RtxSensorCpuIsaacCreateRTXLidarScanBuffer every timestamped tick which is the RTXLidarScanBuffeRtxLidarDebugDrawPointCloudBuffer displayed by the points in the viewport

and also record the translation of an /world/xform every frame as this is my ground truth for training data.

I was reading

https://docs.omniverse.nvidia.com/isaacsim/latest/features/sensors_simulation/isaac_sim_sensors_rtx_based_lidar/annotator_descriptions.html

# Create the annotator.
annotator = rep.AnnotatorRegistry.get_annotator("RtxSensorCpuIsaacCreateRTXLidarScanBuffer")

# Initialize the annotator so it will also output the time stamps.
annotator.initialize(outputTimestamp=True)

# Attach the render product after the annotator is initialized.
annotator.attach([render_product_path])

But I’m still unsure how I go about making the custom writer for these two data types

I take it I will require a custom writer?

I looked at the writer class api

https://docs.omniverse.nvidia.com/py/replicator/1.10.10/source/extensions/omni.replicator.core/docs/API.html#omni.replicator.core.scripts.writers_default.Writer

and

https://docs.omniverse.nvidia.com/extensions/latest/ext_replicator/custom_writer.html

I’m lost at where I have to create “MyCustomWriter” and the parameters path

I looked at

ov\pkg\isaac_sim-2023.1.0-hotfix.1\standalone_examples\replicator\offline_generation\config

Think I’m going off track a little

what I’m really looking to do is figure out how to create the custom writer and get the buffer and xform data recorded I should be able to progess after that point.

I’ve been watching a few of your videos and really learned a lot, thanks!

Scott

Hi Scott,

could you confirm the data type you need, since you previously mentioned that probably both. Generic generic pointcloud data coming directly from an annotator attached to a render product, or it has to come from an rtx lidar sensor?

Update:

I forwarded your question to our RTX Lidar sensor developer.

Until then, do you only need access to the data? Or you want a writer to write it to disk in a given format?

Thanks!

I got this code playing about with GPT, not tested yet. I also seen that I was at the right place in regards to the offline SGD location for the custom files

import io
import json
import omni.replicator.core as rep

from omni.replicator.core import Writer, BackendDispatch

class RTXLidarWriter(Writer):
    def __init__(self, output_dir):
        self._output_dir = output_dir
        self._backend = BackendDispatch({"paths": {"out_dir": output_dir}})
        self._frame_id = 0

    def write(self, data):
        # Check for RTX Lidar data
        if "RtxSensorCpuIsaacCreateRTXLidarScanBuffer" in data:
            lidar_data = data["RtxSensorCpuIsaacCreateRTXLidarScanBuffer"]["data"]
            lidar_data_path = f"lidar_{self._frame_id}.json"

            # Writing RTX Lidar data
            buf = io.BytesIO()
            buf.write(json.dumps(lidar_data).encode())
            self._backend.write_blob(lidar_data_path, buf.getvalue())

        # Check for world transform data
        if "/world/xform/translation" in data:
            world_trans_data = data["/world/xform/translation"]
            world_trans_data_path = f"world_trans_{self._frame_id}.json"

            # Writing world transform data
            buf = io.BytesIO()
            buf.write(json.dumps(world_trans_data).encode())
            self._backend.write_blob(world_trans_data_path, buf.getvalue())

        self._frame_id += 1

hey,

it has to come from the sensor forget the camera path, that was my error

I’d like it to be .npy file I have a working script to run the .npy files in vispy as a sense check :)

The lidar scan data is a continuous flow, feeding data each render frame. Do you want to save it as individual frames such as frame_1.npy, frame_2.npy, […], or a full scan in the form of full_scan.npy?

Probably time stamped as I need to reference the xform data I mentioned as my loss function.

So I can have a separate folders for the data

To make sense of this it would be one folder with scan_1.npy files the time stamp could either be burnt into the file name or a header in the .npy file that I can strip out later.

Really I probably need to make a dictionary sort of like

[[Timestamp], [xform_pos], [frame_scan]]

Ultimately that’s what I’ll need I can write a script to separate it later.

Just so it’s all synced up for training

Here is a standalone example on how you can visualize data using the debug writer, and in parallel also access the data using get_data(). The script is in standalone mode, using step_async() it will also work in the editor.

import carb
from omni.isaac.kit import SimulationApp

simulation_app = SimulationApp({"headless": False})
import omni.timeline
import omni.replicator.core as rep
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core.utils.stage import open_stage
from omni.isaac.core.utils.extensions import enable_extension
from pxr import Gf


enable_extension("omni.isaac.debug_draw")

# If True, trigger data every 'played' frame, if False, data will be generated only when rep.orchestrator.step() is called
carb_settings = carb.settings.get_settings()
carb_settings.set("/omni/replicator/captureOnPlay", True)

assets_root_path = get_assets_root_path()
open_stage(assets_root_path + "/Isaac/Environments/Simple_Warehouse/full_warehouse.usd")

# Run a few updates to fully load the environment
for i in range(3):
    simulation_app.update()

# Create the lidar sensor and a render product from it to generate data
_, sensor = omni.kit.commands.execute(
    "IsaacSensorCreateRtxLidar",
    path="/sensor",
    parent=None,
    config="Example_Rotary",
    translation=(0, 0, 1.0),
    orientation=Gf.Quatd(1.0, 0.0, 0.0, 0.0),  # Gf.Quatd is w,i,j,k
)
rp = rep.create.render_product(sensor.GetPath(), [1, 1], name="Isaac")

# Uses the lidar data to draw debug points in the viewport
writer = rep.writers.get("RtxLidar" + "DebugDrawPointCloud" + "Buffer")
writer.attach(rp)

# Access the raw data from the annotator
annotator = rep.AnnotatorRegistry.get_annotator("RtxSensorCpuIsaacCreateRTXLidarScanBuffer")
annotator.attach(rp)

# rep.orchestrator.step() will trigger data generation and will wait until the rendering frame is in sync with the stage
for i in range(10):
    print(f"step: {i}")
    rep.orchestrator.step()
    data = annotator.get_data()
    print(f"data: {data}")

timeline = omni.timeline.get_timeline_interface()
timeline.play()

while simulation_app.is_running():
    simulation_app.update()

simulation_app.close()

Here are some related scripts that you can build upon:

Awesome, looks like I have something to play with for a while…

Much appreciated.

I’ll see how I get on with this.

I’ll close this one out

Cheers!

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.