Retrieving replicator value

Hi, i have a simple issue I need help with:

I have rocks in my scene moved randomly with replicator in my scene. I also have an autonomous robot and I want the rocks to not be positioned within a radius of the robot.

def randomize_rocks(stage, robot_position, radius = 1):
    rocks = rep.get.prims(semantics=[('class', 'rock')])
    with rocks:
        while True:
            _position = rep.distribution.uniform((-3, -9.5, 0.135), (4, -3.5, 0.135))
            # Calculate the distance between the robot and the rock
            distance = np.sqrt((_position[0] - robot_position[0])**2 + (_position[1] - robot_position[1])**2)
            # If the distance is greater than the radius, break the loop
            if distance > radius:
                break
        rep.modify.pose(
            position=_position,
            rotation=rep.distribution.uniform((0,0, -180), (0, 0, 180)),
            scale = rep.distribution.uniform((3, 3, 3),(5, 5, 5)),
        )
    return rocks.node

This does not work since rep.distribution.uniform returns a ReplicatorItem. How can I retrieve the value chosen from the ReplicatorItem?

Thank you

Hi there,

this will not work because the randomization distributions are only triggered (generating values) at runtime e.g. when calling orchestrator.step(). The randomization functions in python are only called once to generate the randomization graph. At runtime the generated graph is executing and randomizing the environment. The graph will not dynamically re-generate its values depending on the robot’s positions.

For such scenarios the best would be to use the USD/Isaac Sim api to set up the randomizations. Replicator’s randomizers and data acquisition parts can be separated:

Here is a short snippet on how you can call randomizers, USD api stage modifiers, and data writing separately:

Here are USD/Isaac Sim API randomization examples:

Including this “chained” randomization which currently is not possible in replicator (this could be a good approach for the robot scenario):

@ahaidu Hi, Thank you.

The question now becomes: how can I scatter the rocks without the replicator?

the function I use now is this:

def randomize_rocks(directory, area, rotate = True, _size = 3):
    rocks = rep.utils.get_usd_files(directory)
    traversable_plane = rep.get.prim_at_path(area)
    with rep.randomizer.instantiate(rocks, size=_size, use_cache = False):
        rep.randomizer.scatter_2d(traversable_plane, check_for_collisions=True)
        if rotate:
            rep.modify.pose(rotation =rep.distribution.uniform((0,0,0), (0,0,180)))

How can i Implement scatter_2d without the replicator (in order to use the mower’s position in real time)?

Thank you

Hi there,

you can start with this example, by randomizing the assets relative to the mower’s position:

  • 10.14. Randomization Snippets — Omniverse IsaacSim latest documentation

  • For the scatter function you can either reuse the plane approach, calculating the bounding box and randomly spawning the assets.

  • If your terrain is not 2d you can also shoot random physx rays and check for collisions on where to spawn the assets

  • If you want to avoid self collisions you can use get_physx_scene_query_interface().overlap_box functionality to check for overlaps

  • You can also solve overlaps by simulating the rocks for 1 physics step, this will automatically solve the collisions by pushing the rocks out of each other.

  • You can also drop the rocks on the surface and let them fall in random positions, and run the simulations afterwards.

Let me know which part is feasable for you and I can point you to specific tutorials/snippets on getting started.

Best,
Andrei

@ahaidu ok, I think the best approach would be to check for collisions with get_physx_scene_query_interface()_overlap_box and then I can just set that object’s visability to false (all colliders are trigger colliders).

Btw, I’ve been using

rocks = rep.utils.get_usd_files(directory)

up until now to get the assets, what is the none replicator equivalent?

Thank you

Here are some helper function that should include most of the things you need:

# Add transformation properties to the prim (if not already present)
def set_transform_attributes(prim, location=None, orientation=None, rotation=None, scale=None):
    if location is not None:
        if not prim.HasAttribute("xformOp:translate"):
            UsdGeom.Xformable(prim).AddTranslateOp()
        prim.GetAttribute("xformOp:translate").Set(location)
    if orientation is not None:
        if not prim.HasAttribute("xformOp:orient"):
            UsdGeom.Xformable(prim).AddOrientOp()
        prim.GetAttribute("xformOp:orient").Set(orientation)
    if rotation is not None:
        if not prim.HasAttribute("xformOp:rotateXYZ"):
            UsdGeom.Xformable(prim).AddRotateXYZOp()
        prim.GetAttribute("xformOp:rotateXYZ").Set(rotation)
    if scale is not None:
        if not prim.HasAttribute("xformOp:scale"):
            UsdGeom.Xformable(prim).AddScaleOp()
        prim.GetAttribute("xformOp:scale").Set(scale)


# Enables collisions with the asset (without rigid body dynamics the asset will be static)
def add_colliders(prim):
    # Iterate descendant prims (including root) and add colliders to mesh or primitive types
    for desc_prim in Usd.PrimRange(prim):
        if desc_prim.IsA(UsdGeom.Mesh) or desc_prim.IsA(UsdGeom.Gprim):
            # Physics
            if not desc_prim.HasAPI(UsdPhysics.CollisionAPI):
                collision_api = UsdPhysics.CollisionAPI.Apply(desc_prim)
            else:
                collision_api = UsdPhysics.CollisionAPI(desc_prim)
            collision_api.CreateCollisionEnabledAttr(True)
            # PhysX
            if not desc_prim.HasAPI(PhysxSchema.PhysxCollisionAPI):
                physx_collision_api = PhysxSchema.PhysxCollisionAPI.Apply(desc_prim)
            else:
                physx_collision_api = PhysxSchema.PhysxCollisionAPI(desc_prim)
            physx_collision_api.CreateRestOffsetAttr(0.0)

        # Add mesh specific collision properties only to mesh types
        if desc_prim.IsA(UsdGeom.Mesh):
            if not desc_prim.HasAPI(UsdPhysics.MeshCollisionAPI):
                mesh_collision_api = UsdPhysics.MeshCollisionAPI.Apply(desc_prim)
            else:
                mesh_collision_api = UsdPhysics.MeshCollisionAPI(desc_prim)
            mesh_collision_api.CreateApproximationAttr().Set("convexHull")


# Enables rigid body dynamics (physics simulation) on the prim (having valid colliders is recommended)
def add_rigid_body_dynamics(prim, disable_gravity=False, angular_damping=None):
    # Physics
    if not prim.HasAPI(UsdPhysics.RigidBodyAPI):
        rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim)
    else:
        rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
    rigid_body_api.CreateRigidBodyEnabledAttr(True)
    # PhysX
    if not prim.HasAPI(PhysxSchema.PhysxRigidBodyAPI):
        physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim)
    else:
        physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
    physx_rigid_body_api.GetDisableGravityAttr().Set(disable_gravity)
    if angular_damping is not None:
        physx_rigid_body_api.CreateAngularDampingAttr().Set(angular_damping)


# Create a new prim with the provided asset URL and transform properties
def create_asset(stage, asset_url, path, location=None, rotation=None, orientation=None, scale=None):
    prim_path = omni.usd.get_stage_next_free_path(stage, path, False)
    reference_url = asset_url if asset_url.startswith("omniverse://") else get_assets_root_path() + asset_url
    prim = stage.DefinePrim(prim_path, "Xform")
    prim.GetReferences().AddReference(reference_url)
    set_transform_attributes(prim, location=location, rotation=rotation, orientation=orientation, scale=scale)
    return prim


# Create a new prim with the provided asset URL and transform properties including colliders
def create_asset_with_colliders(stage, asset_url, path, location=None, rotation=None, orientation=None, scale=None):
    prim = create_asset(stage, asset_url, path, location, rotation, orientation, scale)
    add_colliders(prim)
    return prim

You can still use the util function for getting the asset paths.

@danielle.sisserman here is the script with spawning the assets and choosing a few random ones for randomization:

import os
import random
import omni.usd
# from omni.isaac.nucleus import get_assets_root_path
from omni.isaac.core.utils.nucleus import get_assets_root_path
from pxr import UsdGeom

"/Isaac/Props/YCB/Axis_Aligned/"
PROPS_FOLDER = "/Isaac/Props/YCB/Axis_Aligned"
NUM_PROPS = 10
props_folder_path = get_assets_root_path() + PROPS_FOLDER

props_urls = []
result, entries = omni.client.list(props_folder_path)
if result != omni.client.Result.OK:
    print(f"Could not list assets in path: {props_folder_path}")
for entry in entries:
    _, ext = os.path.splitext(entry.relative_path)
    if ext == ".usd":
        props_urls.append(f"{props_folder_path}/{entry.relative_path}")
# OR:
# props_urls = rep.utils.get_usd_files(props_folder_path)

# Spawn the props
stage = omni.usd.get_context().get_stage()
prims = []
for i in range(NUM_PROPS):
    rand_asset_url = random.choice(props_urls)
    prim_path = omni.usd.get_stage_next_free_path(stage, f"/World/random_asset_{i}", False)
    prim = stage.DefinePrim(prim_path, "Xform")
    prim.GetReferences().AddReference(rand_asset_url)
    prims.append(prim)

# Randomly choose X prims and randomize their locations
randomly_selected_prims = random.sample(prims, 3)
for prim in randomly_selected_prims:
    rand_loc = (random.uniform(-2, 2), random.uniform(-2, 2), 0)
    if not prim.HasAttribute("xformOp:translate"):
        UsdGeom.Xformable(prim).AddTranslateOp()
    prim.GetAttribute("xformOp:translate").Set(rand_loc)


@ahaidu thank you. this is great! I am implemeting this code right now and have a small issue maybe you could help me with. so this is my code:

class AssetsHelper:

    def get_usd_files(directory_path):
        usd_files = []
        for root, dirs, files in os.walk(directory_path):
            for file in files:
                if file.endswith(".usd"):
                    usd_files.append(os.path.join(root, file))
        return usd_files


    def create_asset_with_triggered_colliders(stage, asset_url, path):
        prim = AssetsHelper.create_asset(stage, asset_url, path)
        AssetsHelper.add_rigidbody_kinametic(prim)
        AssetsHelper.add_triggered_colliders(prim)
        return prim


    # Create a new prim with the provided asset URL and transform properties
    def create_asset(stage, asset_url, path):
        prim_path = omni.usd.get_stage_next_free_path(stage, path, False)
        reference_url = asset_url
        prim = stage.DefinePrim(prim_path, "Xform")
        prim.GetReferences().AddReference(reference_url)
        return prim


    def add_colliders(prim):

        # Iterate descendant prims (including root) and add colliders to mesh or primitive types
        for desc_prim in Usd.PrimRange(prim):
            if desc_prim.IsA(UsdGeom.Mesh) or desc_prim.IsA(UsdGeom.Gprim):
                # Physics
                if not desc_prim.HasAPI(UsdPhysics.CollisionAPI):
                    collision_api = UsdPhysics.CollisionAPI.Apply(desc_prim)
                else:
                    collision_api = UsdPhysics.CollisionAPI(desc_prim)
                collision_api.CreateCollisionEnabledAttr(True)
                # PhysX
                if not desc_prim.HasAPI(PhysxSchema.PhysxCollisionAPI):
                    physx_collision_api = PhysxSchema.PhysxCollisionAPI.Apply(desc_prim)
                else:
                    physx_collision_api = PhysxSchema.PhysxCollisionAPI(desc_prim)
                physx_collision_api.CreateRestOffsetAttr(0.0)

            # Add mesh specific collision properties only to mesh types
            if desc_prim.IsA(UsdGeom.Mesh):
                if not desc_prim.HasAPI(UsdPhysics.MeshCollisionAPI):
                    mesh_collision_api = UsdPhysics.MeshCollisionAPI.Apply(desc_prim)
                else:
                    mesh_collision_api = UsdPhysics.MeshCollisionAPI(desc_prim)
                mesh_collision_api.CreateApproximationAttr().Set("convexHull")
                

    def add_triggered_colliders(prim):

        AssetsHelper.add_colliders(prim)

        for desc_prim in Usd.PrimRange(prim):
            if desc_prim.IsA(UsdGeom.Mesh):
                # add trigger
                if not desc_prim.HasAPI(PhysxSchema.PhysxTriggerAPI):
                    physx_trigger_api = PhysxSchema.PhysxTriggerAPI.Apply(desc_prim)
                else:
                    physx_trigger_api = PhysxSchema.PhysxTriggerAPI(desc_prim)  

               

    def add_rigidbody_kinametic(prim):

        for desc_prim in Usd.PrimRange(prim):
            if desc_prim.IsA(UsdGeom.Mesh):
                # Physics
                if not prim.HasAPI(UsdPhysics.RigidBodyAPI):
                    rigid_body_api = UsdPhysics.RigidBodyAPI.Apply(prim)
                else:
                    rigid_body_api = UsdPhysics.RigidBodyAPI(prim)
                rigid_body_api.CreateRigidBodyEnabledAttr(True)
                # PhysX
                if not prim.HasAPI(PhysxSchema.PhysxRigidBodyAPI):
                    physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI.Apply(prim)
                else:
                    physx_rigid_body_api = PhysxSchema.PhysxRigidBodyAPI(prim)
                physx_rigid_body_api.GetDisableGravityAttr().Set(True)
                
                prim.GetAttribute("physics:kinematicEnabled").Set(True)


The I’m seeing is that the rigidbody specifically is being added to the root prim (and not the mesh) of the instantiated assets:

but the collider and the trigger are added correctly to the mesh itself:

I need the rigidbody to be added to the mesh itself for my collision detection to work.
any ideas why my code is not adding the rigidbody to the mesh itself?

thank you so much for all of your help

Here you are adding the rigid body properties to the prim (root prim), if you want to add it to the descendants (e.g. mesh) you should use desc_prim in the physics/physx api etc.

@ahaidu makes sense, I didnt notice that.

The logic is set up and everything should work as expected but now I am dealling with another Issue where the triggered collisions are not always detected.

because of this issue, I think its best to use get_physx_scene_query_interface().overlap_box.

This is my code:

     def is_overlapping(self, prim):
        # get extent of the object from the bounding box
        cache = bounds_utils.create_bbox_cache(time=timeline.get_current_time())
        mesh_paths = AssetsHelper.get_prim_meshes_paths(prim)

        overlapping = False

        for _mesh_path in mesh_paths:
            bounds = bounds_utils.compute_combined_aabb(cache, prim_paths=[_mesh_path])
            usd_bounds = Gf.Range3d((bounds[0], bounds[1], bounds[2]),  # min
                            (bounds[3], bounds[4], bounds[5])) # max
            
            _extent = usd_bounds.GetSize()   
                                
            extent = carb.Float3(_extent[0], _extent[1], _extent[2])

            mesh_prim = self.stage.GetPrimAtPath(_mesh_path)

            global_position, global_rotation, _ = Utils.get_global_transform_xform(mesh_prim)
            
            #get origin from the object's global position
            origin = carb.Float3(global_position[0], global_position[1], global_position[2])
            # get rotation from the object's global rotation
            quaternion = Utils.euler_to_quaternion(global_rotation[0], global_rotation[1], global_rotation[2])
            rotation = carb.Float4(quaternion[0], quaternion[1], quaternion[2], quaternion[3])
            # physX query to detect number of hits for a cubic region

            #carb.log_warn(f"Checking for overlap for {prim.GetName()}. Origin: {origin}, Rotation: {rotation}, Extent: {extent}")
            numHits = get_physx_scene_query_interface().overlap_box(extent, origin, rotation, self.report_hit, False)

            if numHits > 0:
               # carb.log_warn(f"Object {prim.GetName()} is overlapping with {numHits} objects")
                overlapping = True
                break

        return overlapping

    
    def report_hit(self, hit):
        # When a collision is detected, the object color changes to red.
        hitColor = Vt.Vec3fArray([Gf.Vec3f(180.0 / 255.0, 16.0 / 255.0, 0.0)])
        usdGeom = UsdGeom.Mesh.Get(self.stage, hit.rigid_body)
        usdGeom.GetDisplayColorAttr().Set(hitColor)
        carb.log_warn(f"Object {hit} is overlapping with objects")
        return True

numHits is always one, and report_hit is always entered for every prim.
any Idea what could be wrong here? Utils is my own class and I know it works as expected since we’ve been using these function to record odometry and IMU of the robot for a few months.

@ahaidu Thank you again for everything you have been super helpful and I feel we are almost at a complete solution. Could you please take a look at my implementation of is_overlapping(self, prim)?

Once I figure that out I will have a none-replicator scaterring function and could proceed with synthetic data generation.

Thank you again for all of your amazing help.

Could it be that the objects are always in contact with something when you query? From the given snippet it is harder for me to test because I don’t have acess to the rest of the functions nor the assets.

If you could set up a simple scenario (e.g. using primitives, cubes/sphere etc.) and a simplified repro snippet I could take a look.

Otherwise I can come back to you as soon as I have the time with such a simple repro example for you to use as a starting point.

@ahaidu while prepering a simple test case I realized the Issue.

First, the setup of the test case:

I’ve set up an empty stage with two spheres with colliders and added a PhysicsScene:

this is the code I paste in the Script Editor:

import omni.isaac.core.utils.bounds as bounds_utils
import carb
from pxr import Gf
import omni.timeline
timeline = omni.timeline.get_timeline_interface()
from omni.physx import get_physx_scene_query_interface


def report_hit(hit):
    carb.log_warn(f"Object {hit} is overlapping with objects")

def is_overlapping(prim_path):

    stage = omni.usd.get_context().get_stage()
    # get extent of the object from the bounding box
    cache = bounds_utils.create_bbox_cache(time=timeline.get_current_time())

    bounds = bounds_utils.compute_combined_aabb(cache, prim_paths=[prim_path])
    usd_bounds = Gf.Range3d((bounds[0], bounds[1], bounds[2]),  # min
                    (bounds[3], bounds[4], bounds[5])) # max
    
    carb.log_warn(f"Object bounds : {usd_bounds}")
    
    _extent = usd_bounds.GetSize()   
                        
    extent = carb.Float3(_extent[0], _extent[1], _extent[2])

    carb.log_warn(f"Object extent : {extent}")

    mesh_prim = stage.GetPrimAtPath(prim_path)

    global_position = mesh_prim.GetAttribute("xformOp:translate").Get()

    carb.log_warn(f"Object position : {global_position}")
    
    origin = carb.Float3(global_position[0], global_position[1], global_position[2])

    rotation = carb.Float4(0,0, 0, 1)

    numHits = get_physx_scene_query_interface().overlap_box(extent, origin, rotation, report_hit, False)

    if numHits > 0:
        carb.log_warn(f"Object is overlapping with {numHits} objects")
        return True

    carb.log_warn(f"Object is NOT overlapping")
    return False
        


prim_path = "/World/Sphere1"

is_overlapping(prim_path)
        

The only thing that needs to be modifies here is the path to the sphere prim (second to last line in the code snippet).

The issue is that numHits is always at least 1. why? because the function detects colliders in a defined area enclosing the sphere (defined by extent, position, and rotation) and the sphere itself has a collider.
so it is detecting a collision with itself.

I guess the easiest way to handle this is to define a collision when numHits > 1.

Is there a way to visualize the area that is defined by extent, position, and rotation? Looking at my test case. when the two spheres are pretty far apart, numHits is still 2.


so i’m thinking I need to debug extent and probably rotation as well.

But just from putting a simple test case together I already have a good understading of whats going on! Thank you again for all of your high quality help.

also, looking at the report_hit function from the documentation:

def report_hit(self, hit):
    # When a collision is detected, the object color changes to red.
    hitColor = Vt.Vec3fArray([Gf.Vec3f(180.0 / 255.0, 16.0 / 255.0, 0.0)])
    usdGeom = UsdGeom.Mesh.Get(self.stage, hit.rigid_body)
    usdGeom.GetDisplayColorAttr().Set(hitColor)
    return True

in the case where numHits is greater then one, is there a way to traverse all the hits and ignore a collider if it belongs to the prim at prim_path?