Omnivers Isaac Sim Replicator: palletizing_demo.py on the webiner by Andrei Haidu

Hello,

We’d like to run the demo on the below webiner by Andrei Haidu .

Could you tell us where can we get those below python code?
pal

Best regards,

Hi there,

here is the script from the webinar. The 2023.1.0 Isaac Sim release will include an updated version as well.

import asyncio
import json
import os

import carb
import numpy as np
import omni.physx as _physx
import omni.replicator.core as rep
import omni.timeline
import omni.usd
from omni.isaac.core.utils.bounds import create_bbox_cache
from omni.isaac.kit.utils import set_carb_setting
from omni.physx import get_physx_scene_query_interface
from PIL import Image


class PalletizingDemo:
    def __init__(self):
        # Iterate through all 36 bins
        self._stage = None
        self._bin_counter = 0
        self._active_bin = None

        # Flag the state of the simulation
        self._bin_holder_scenario_done = False
        self._replicator_running = False

        # Track the state of the active bin (e.g., incontact with pallet)
        self._physx_sub = None
        self._overlap_update_timer = 0.0
        self._overlap_update_rate = 1.0
        self._overlap_extent = None

        # Pause / resume palletizing simulation
        self._timeline = None

        # Output directory for the data
        self._output_dir = os.path.join(os.getcwd(), "_out_demo", "")

    def start(self):
        self._init()
        self._physx_sub = _physx.get_physx_interface().subscribe_physics_step_events(self._on_physics_step)
        print("Starting palletizing demo")

    def _init(self):
        self._stage = omni.usd.get_context().get_stage()
        self._active_bin = self._stage.GetPrimAtPath(f"/environments/env/bins/bin_{self._bin_counter}")

        bb_cache = create_bbox_cache()
        half_ext = bb_cache.ComputeLocalBound(self._active_bin).GetRange().GetSize() * 0.5
        self._overlap_extent = carb.Float3(half_ext[0] * 1.1, half_ext[1] * 1.1, half_ext[2] * 1.1)

        self._timeline = omni.timeline.get_timeline_interface()

    def _on_physics_step(self, dt):
        self._overlap_update_timer += dt
        if self._overlap_update_timer > self._overlap_update_rate:
            self._overlap_update_timer = 0.0
            self._check_bin_overlaps()

    def _check_bin_overlaps(self):
        bin_pose = omni.usd.utils.get_world_transform_matrix(self._active_bin)
        origin = bin_pose.ExtractTranslation()
        quat_gf = bin_pose.ExtractRotation().GetQuaternion()

        hit_info = get_physx_scene_query_interface().overlap_box(
            carb.Float3(self._overlap_extent),
            carb.Float3(origin[0], origin[1], origin[2]),
            carb.Float4(
                quat_gf.GetImaginary()[0], quat_gf.GetImaginary()[1], quat_gf.GetImaginary()[2], quat_gf.GetReal()
            ),
            self._on_overlap_hit,
            False,  # 'False' to indicate an 'overlap multiple' query
        )

    def _on_overlap_hit(self, hit):
        if self._replicator_running:
            return False  # return False to stop the query

        self_hit = hit.rigid_body == self._active_bin.GetPrimPath()
        if self_hit:
            return True  # return True to continue the query

        bin_holder_hit = hit.rigid_body.startswith("/environments/env/pallet_holder")
        if bin_holder_hit and not self._bin_holder_scenario_done:
            self._replicator_running = True
            asyncio.ensure_future(self._run_bin_holder_scenario())
            return False

        pallet_hit = hit.rigid_body.startswith("/environments/env/pallet/Xform/Mesh_015")
        other_bin_hit = hit.rigid_body.startswith("/environments/env/bins/bin_")
        if pallet_hit or other_bin_hit:
            self._replicator_running = True
            asyncio.ensure_future(self._run_pallet_scenario())
            return False
        return True

    async def _run_bin_holder_scenario(self):
        print(f"Running bin holder scenario for bin {self._bin_counter}..")

        # Util function to save rgb images to file
        def save_img(rgb_data, filename):
            rgb_image_data = np.frombuffer(rgb_data, dtype=np.uint8).reshape(*rgb_data.shape, -1)
            rgb_img = Image.fromarray(rgb_image_data, "RGBA")
            rgb_img.save(filename + ".png")

        self._timeline.pause()
        self.create_bin_holder_graph()
        self._switch_to_pathtracing()

        rgb_annot = rep.AnnotatorRegistry.get_annotator("rgb")
        is_annot = rep.AnnotatorRegistry.get_annotator("instance_segmentation", init_params={"colorize": True})
        rp = rep.create.render_product("/Replicator/Camera_Xform/Camera", (512, 512))
        rgb_annot.attach([rp])
        is_annot.attach([rp])

        out_dir = os.path.join(self._output_dir, f"annot_bin_{self._bin_counter}", "")
        os.makedirs(out_dir, exist_ok=True)

        for i in range(4):
            await rep.orchestrator.step_async(pause_timeline=False)

            rgb_data = rgb_annot.get_data()
            rgb_filename = f"{out_dir}rgb_{i}"
            save_img(rgb_data, rgb_filename)

            is_data = is_annot.get_data()
            is_filename = f"{out_dir}is_{i}"
            is_img_data = is_data["data"]
            height, width = is_img_data.shape[:2]
            is_img_data = is_img_data.view(np.uint8).reshape(height, width, -1)
            save_img(is_img_data, is_filename)

            is_info = is_data["info"]
            with open(f"{out_dir}is_info_{i}.json", "w") as f:
                json.dump(is_info, f, indent=4)

        rgb_annot.detach([rp])
        is_annot.detach([rp])
        await rep.orchestrator.stop_async()
        self._switch_to_raytracing()
        self._stage.RemovePrim("/Replicator")

        self._bin_holder_scenario_done = True
        self._replicator_running = False
        self._timeline.play()

    def create_bin_holder_graph(self):
        color_palette = [(1, 0, 0), (0, 1, 0), (0, 0, 1)]

        def randomize_bin_holder_lights():
            lights = rep.create.light(
                light_type="Sphere",
                temperature=rep.distribution.normal(6500, 2000),
                intensity=rep.distribution.normal(95000, 15000),
                position=rep.distribution.uniform((0.25, 0.25, 0.5), (1, 1, 0.75)),
                scale=rep.distribution.uniform(0.5, 0.8),
                color=rep.distribution.choice(color_palette),
                count=3,
            )
            return lights.node

        rep.randomizer.register(randomize_bin_holder_lights)

        camera_positions = [(1.96, 0.72, -0.34), (1.48, 0.70, 0.90), (0.79, -0.86, 0.12), (-0.49, 1.47, 0.58)]
        camera = rep.create.camera()
        with rep.trigger.on_frame():
            rep.randomizer.randomize_bin_holder_lights()
            with camera:
                rep.modify.pose(position=rep.distribution.sequence(camera_positions), look_at=(0.78, 0.72, -0.1))

    def _switch_to_pathtracing(self):
        carb_settings = carb.settings.get_settings()
        set_carb_setting(carb_settings, "/rtx/rendermode", "PathTracing")
        set_carb_setting(carb_settings, "/rtx/pathtracing/spp", 32)
        set_carb_setting(carb_settings, "/rtx/pathtracing/totalSpp", 32)
        set_carb_setting(carb_settings, "/rtx/pathtracing/clampSpp", 32)

    def _switch_to_raytracing(self):
        carb_settings = carb.settings.get_settings()
        set_carb_setting(carb_settings, "/rtx/rendermode", "RayTracedLighting")
        # 0: Disabled, 1: TAA, 2: FXAA, 3: DLSS, 4:RTXAA
        set_carb_setting(carb_settings, "/rtx/post/aa/op", 3)

    async def _run_pallet_scenario(self):
        print(f"Running pallet scenario for bin {self._bin_counter}")
        self._timeline.pause()
        self.create_bin_graph()
        self.create_pallet_graph()

        out_dir = os.path.join(self._output_dir, f"writer_bin_{self._bin_counter}", "")
        os.makedirs(out_dir, exist_ok=True)

        writer = rep.WriterRegistry.get("BasicWriter")
        writer.initialize(output_dir=out_dir, rgb=True, instance_segmentation=True, colorize_instance_segmentation=True)

        rp = rep.create.render_product("/Replicator/Camera_Xform/Camera", (512, 512))
        await writer.attach_async([rp])

        await rep.orchestrator.run_async(num_frames=16)
        await rep.orchestrator.wait_until_complete_async()
        writer.detach()

        self._stage.RemovePrim("/Replicator")

        self._replicator_running = False
        self._next_bin()
        self._timeline.play()

    def create_bin_graph(self):
        bin_paths = [f"/environments/env/bins/bin_{i}" for i in range(36)]

        bins_node = rep.get.prim_at_path(bin_paths)
        mats = rep.create.material_omnipbr(
            diffuse=rep.distribution.uniform((0.2, 0.1, 0.3), (0.6, 0.6, 0.7)),
            roughness=rep.distribution.choice([0.1, 0.9]),
            count=10,
        )

        def randomize_bin_materials():
            with bins_node:
                rep.randomizer.materials(mats)
            return bins_node.node

        rep.randomizer.register(randomize_bin_materials)

        camera = rep.create.camera()
        with rep.trigger.on_frame():
            rep.randomizer.randomize_bin_materials()
            with camera:
                rep.modify.pose(
                    position=rep.distribution.uniform((0, -2, 2), (2, 1, 3)), look_at="/environments/env/pallet"
                )

    def create_pallet_graph(self):
        pallet_node = rep.get.prim_at_path("/environments/env/pallet")

        def randomize_pallet_textures():
            with pallet_node:
                rep.randomizer.texture(
                    textures=[
                        "omniverse://localhost/NVIDIA/Materials/Base/Wood/Oak/Oak_BaseColor.png",
                        "omniverse://localhost/NVIDIA/Materials/Base/Wood/Ash/Ash_BaseColor.png",
                        "omniverse://localhost/NVIDIA/Materials/Base/Wood/Plywood/Plywood_BaseColor.png",
                        "omniverse://localhost/NVIDIA/Materials/Base/Wood/Timber/Timber_BaseColor.png",
                    ],
                    texture_rotate=rep.distribution.uniform(80, 95),
                )
            return pallet_node.node

        rep.randomizer.register(randomize_pallet_textures)

        with rep.trigger.on_frame(interval=4, rt_subframes=4):
            rep.randomizer.randomize_pallet_textures()

    def _next_bin(self):
        self._bin_counter += 1
        if self._bin_counter >= 36:
            self._timeline.pause()
            print("Demo finished!")
            return
        self._active_bin = self._stage.GetPrimAtPath(f"/environments/env/bins/bin_{self._bin_counter}")
        self._bin_holder_scenario_done = False

Hi,

Thank you very much for your kindness.
I really appreciate your help.

I’d like to continue to check the behavior with your code.

Best regards,

Hi,

Thank you very much for your kindness.
I really appreciate your help.

I can run the demo with your code.
Thank you!

Best regards,

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.