How to write Raw USD Property each frame

I have a prim with a Raw USD Proprty like show in figure. This property force_value is updated by an ActionGraph.

omniverse_custom_property

I can get its value in python:

import omni.isaac.core.utils.prims as prims_utils

force = prims_utils.get_prim_property(prim_path="path/to/prim", property_name="force_value")

But how I save this property in the dataset? I’m using BasicWriter and I would like that my property force_value is saved for each frame inside the dataset.

Hi @federico.domeniconi. This sounds like a Replicator questions so I’ve moved it over to the SDG forum for you.

I did this:

class CustomWriter(BasicWriter):
    def _write_rgb(self, data: dict, render_product_path: str, annotator: str):
        file_path = f"{render_product_path}rgb_{self._sequence_id}{self._frame_id:0{self._frame_padding}}.{self._image_output_format}"
        self._backend.write_image(file_path, data[annotator])

        velocity = prims_utils.get_prim_property(
            prim_path="/Replicator/Ref_Xform/Ref/scatola", property_name="force_value"
        )
        file_path_velocity = f"{render_product_path}velocity_{self._sequence_id}{self._frame_id:0{self._frame_padding}}.json"
        serializable_data = {"velocity": velocity}
        buf = io.BytesIO()
        buf.write(json.dumps(serializable_data).encode())
        self._backend.write_blob(file_path_velocity, buf.getvalue())

Is this the correct way to do this? Or is there a better method?

Hello @federico.domeniconi. I want to first apologize in the extensive delay in getting back to you on this.

The issue with this approach is that the writer does not necessarily execute at the same time as the simulation. In fact, it’s at best going to be 1 frame behind and can be up to 3 frames out of step with the simulation. Because of this, it’s best to avoid querying the stage within the writer unless you know the property being queried is not changing.

If you are using samplers (eg. rep.distribution.uniform) to set force_value, then a convenient solution is to simply name the distribution. The sampled value(s) from that distribution will then be automatically avalailable in the data payload under the provided name under distribution_outputs.

Example:

import asyncio
import omni.replicator.core as rep

class MyWriter(rep.Writer):
    def __init__(self):
        self.annotators = ["LdrColor"]
    
    def write(self, data):
        print(data["distribution_outputs"].keys(), data["distribution_outputs"].get("SphereRadius"))

async def main():
    cam = rep.create.camera()
    rp = rep.create.render_product(cam, (1024, 1024))
    writer = MyWriter()
    writer.attach(rp)


    sphere = rep.create.sphere(as_mesh=False)
    with rep.trigger.on_frame():
        with sphere:
            rep.modify.attribute("radius", rep.distribution.uniform(0.5, 10.0, name="SphereRadius"))
    
    await rep.orchestrator.step_async()
    
asyncio.ensure_future(main())