Execute Function everytime a new scene gets generated by Replicator

I have a very simple question, but couldn’t figure out for myself, since I am not very good at coding tbh.
Is there a possibility to execute a defined function every time the replicator generates a new scene/after taking the pictures. I want to read out the bounding box of my bin and do other stuff, but when I just write it like this it won’t work. The bounding box will only get read out at the beginning of my code execution, but not every time the scene gets generated new. Same if I put the code sniplet in my “bin_place” rep. function.

def bbox_bin():
    cache = create_bbox_cache()
    bin_prim_path = "/Replicator/Ref_Xform"
    bbox = compute_aabb(cache,bin_prim_path,include_children=True)

with rep.trigger.on_time(interval= 8, num=2):

Replicator has a built in bbox writer.

You can set it up like so based on your code provided:

import omni.replicator.core as rep

with rep.new_layer():

    camera = rep.create.camera(position=(0, 0, 1000))
    render_product = rep.create.render_product(camera, (1024, 1024))

    # Initialize and attach writer
    writer = rep.WriterRegistry.get("BasicWriter")
    writer.initialize( output_dir="_output", rgb=True,   bounding_box_2d_tight=True)

    with rep.trigger.on_time(interval= 8, num=2):

Hello Jen, yes I also use this for my writer, but I need to use the coordinates of my bounding box to drop parts only inbetween my corners, since I want to simulate a bin-picking scenario with parts falling into a bin and delete parts outside before taking the images. Since I change the rotation and position of my bin slightly with every iteration I can’t use global coordinates, that’s why I want to use the bbox of my object every iteration.

Currently, replicator does not support adding relative locations using omnigraph for moving targets. E.g. randomizing the location of objects relative to another object. As I understood, this is something you wanted to achieve.

You can, however, use Isaac Sim / USD API for this. You can use rep.orhecstrator.step() to manually generate the randomizations and after each step you can read out the location and bounding boxes of your assets, and move them around.

Here are some examples of using orchestrator.step() and of spwaning/reading bounding box values in isaac sim.

Actually it did work for me and I can spawn my assets inside the bounding box each step and delete every asset that lays outside the bounding box after moving. My biggest problem is, that it takes 200 images each iteration and I will only need the last one. I don’t know how to do this since there is no working trigger functionality for this.

Do you still need to run the randomization, is not capturing the data? Right now, the two are coupled together in replicator, e.g. each randomization will trigger a data capture as well.

What I need is a randomization, then let the simulation go till the parts lay down and don’t move, then trigger the writer. But right now the writer does trigger the whole time from beginning, through the simulation till the parts lay down. That is why I get 200 frames each simulation. Is there a possibility to use the randomizer and trigger seperately? Or is it a possibility to just use the Annotator itself in a function instead of an existing writer?

Would separating the randomization and simulation to using isaac sim / usd API and only using the data collection using replicator be an option for you? Similar to the simulate_falling_objects where you can then use rep.orchestrator.step() to manually trigger the data collection.

I already tried to to that in my extension example, but the scene generation did not wait for my data collection. Can await and ensure_future function get rid of this problem like in these examples? Annotators Information — Omniverse Extensions documentation (nvidia.com)

Yes, async functions should do that.

How do you call the scene generation?

def random_scene(self, isautopilot = False, israndomization = False): 
        #load world and check settings
        _world = self.get_world()
        self.isautopilot = isautopilot
        self.israndomization = israndomization

        #Zufällige Farbe und Reflektivität Material (Domain Randomization)
        if israndomization:
            self.material_num = 5

        #Löschen der alten Teile

        #self.rect_lights(light_num)#Anzahl der zu generierenden Lichter

        #change camera position slightly

        #initiate random drop

        _world = self.get_world()
        self.poses = self.get_poses()


def auto_save(self, step_size):
        #here we make sure that no part is moving (less than 10^-10 meters in 0.5 seconds)
        #then we pause our world to make sure no part will move, write our data and restart the dropping process (if autopilot is on)
        _world = self.get_world()
        while not self.is_moving() or _world.current_time>5: 
            #When no part is moving or 5 seconds passed the parts outside the bin will get deleted and the parts will be forced to stop moving
                self.random_scene(self.isautopilot, self.israndomization)
            self.next_check_time = self.next_check_time + 0.5


    def get_poses(self): 
        #get current poses of all objects
        stage = omni.usd.get_context().get_stage()
        mappings = self.generic_helper_lib.get_instance_mappings()
        poses = []
        for m in mappings:
            prim_path = m[1]+"/part"
            this_partX = XFormPrim(prim_path)
            prim_tf = this_partX.get_world_pose()
            #position, orientation = this_partX.get_world_pose()
            poses.append((str(prim_path), m[2], str(m[3]), np.array(prim_tf,dtype=object)))
        return poses

    def is_moving(self): 
        #Prints how much the position varies (until movement is really small) (Bool) Helps to delete parts falling outside bin after movement stopped
        tmp = self.poses
        self.poses = self.get_poses()
        if len(tmp) == 0:
            return np.inf
        abs_move = 0
        for i in range(len(self.poses)):
            this_pose = self.poses[i][3][0]
            this_pose_old = tmp[i][3][0]
            diff_arr = this_pose-this_pose_old
            abs_diff = np.sum(np.square(diff_arr))
            abs_move = abs_move+abs_diff
        abs_move = abs_move/self.number_of_parts
        return abs_move>10**-10

I get the positions of all my parts each 0.5 seconds, check them with the previous position and if the parts did not move 10**-10 m in the time between, I pause the scene, delete unnecessary parts and trigger my writer. After that I stop the world and call the random_scene function to generate the new scene.

Can you see if using the async versions of world e.g. await world.play_async() and await world.stop_async() fixes things?

Also, are you using await rep.orchestrator.step_async() for triggering the data acquisition?

I did that for the world.play and world.stop in the random_scene and auto_save function but now the window just freezes and it won’t generate any props. Yes I am using step_async now, but that won’t help.

I should mention that I call the auto_save function as a physics callback:

async def setup_post_load(self):
        self._world = self.get_world()
        self._world.add_physics_callback("sim_step", callback_fn=self.auto_save) #callback names have to be unique

Sorry, I missed that

So if I use step_async the script will run for one iteration and then stop without writing data. It never exits the await rep.orchestrator.step_async(). If I use normal step then it will take 2 random pictures (before the script is at rep.orchestrator.step) and one picture that is correct. After that in the following iterations it will take hundreds of pictures and won’t stop. Seems like it takes a picture every physical step. What is going on and how can I fix this problem? I did try out a lot of combinations today with and without async functions but nothing works. I am really frustrated right now.

Hi @valentinhendrik I can confirm this, the simulation (timeline) is triggering the data collection every frame. I will look into a workaround and get back to you.

1 Like

Okay thank you!

Hi there,

as a workaround you can use the annotators get_data() function to access the data. This will not write the data to files, one can, however, reuse the basicwriter’s (omni/replicator/core/scripts/writers_default/basicwriter.py) write() implementation for each annotator type to get the same results. For example the write_image and write_blob functions from omni/replicator/core/scripts/backends/disk.py

For example, you can register an rgb annotator like this:

camera = rep.create.camera(position=(0, 0, 5), look_at=[0, 0, 0])
render_product = rep.create.render_product(camera, (128, 128))
self._rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")

Helper function to write the rgb data:

from PIL import Image
import numpy as np

def save_rgb(rgb_data, file_name):
    rgb_image_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape, -1)
    rgb_img = Image.fromarray(rgb_image_data, "RGBA")
    rgb_img.save(file_name + ".png")
    print(f"Saved {file_name}.png, shape: {rgb_data.shape}")

Button to trigger when to write the data:

ui.Button("Capture", clicked_fn=lambda: asyncio.ensure_future(self._on_capture_async()))

Capture triggered at button press:

async def _on_capture_async(self):
    await rep.orchestrator.step_async()        
    rgb_data = self._rgb_annot.get_data()
    save_rgb(rgb_data, f"{self._output_directory}/rgb_step_{self._capture_count}")
    self._capture_count += 1

Similar example using a standalone app:

1 Like

Thank you very much for your answer! I am using the get_data function of the annotators as a workaround right now and it works fine. I was not using the helper function to write the data, but using a very similar approach with np.save and PIL. I can trigger that now with each iteration and it saves accordingly. Thank you anyway!

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.