How to get kit handler in extension?

I have my custom extension and it commands robot to move joint. However, when i want to wait until the robot reached target position, the ‘time.sleep’ command makes the simulator stuck. How can i get kit handler and update its frame when I wait for the move joint command to finish?

How to update frame in extension within a running isaac sim environment? Because i dont want to restart a simulator using

 kit = OmniKitHelper()

in python script everytime i test. @toni.sm @sdebnath @Sheikh_Dawood

The easiest way to do this is in an extension to use asyncio to await

import asyncio
import omni

async def next_frame():
    await omni.kit.app.get_app().next_update_async()
    print("frame one")
    await omni.kit.app.get_app().next_update_async()
    print("frame two")

asyncio.ensure_future(next_frame())

There are also ways to assign callbacks that get called every frame

self._callback_handle = omni.kit.app.get_app().get_update_event_stream().create_subscription_to_pop(self._update)

@Hammad_M I’ve tried the first method and the simulator is still stuck. I hope i can wait until the joints reach target then start the next execution like taking picture,and the simulator is still running rather than stuck in the waiting process. The process is like sim.step() in mujoco-py.

@Hammad_M Could you give a sample for frame update in asyncio way?

@newuhe
Can you share your script that gets stuck?

When the test_script function is called, the simulator will be stuck.

class FlexivSample(FlexivApi):
    def __init__(self):
        super().__init__()

    def test_script(self):
        # get joint state
        joint_state = self.get_joint_pos()
        print("robot joint states:", joint_state)
        
        # get tcp state
        print(self.get_tcp_pose())
        
        self.get_camera_pic("/isaac-sim/python_samples/0.png")

        # set joint state
        joints_target = np.array([0.0, -50.0/180*np.pi, 0.0, 90.0/180*np.pi, 0.0, 40.0/180*np.pi, 0.0])
        self.move_joint(joints_target)
        time.sleep(3)
        
        joints_target = np.array([0.0, -30.0/180*np.pi, 0.0, 80.0/180*np.pi, 0.0, 40.0/180*np.pi, 0.0])
        self.move_joint(joints_target)

        self.get_camera_pic("/isaac-sim/python_samples/1.png")

        self.create_lidar()

        return
    
    async def next_frame(self):
        await omni.kit.app.get_app().next_update_async()

    def step(self, step):
        """This function is called every timestep in the main process, where state machine is defined.
        
        Arguments:
            step (float): elapsed time between steps
        """
        if self.created and self._timeline.is_playing():
            if self.first_step:
                self.register_assets()
                self.first_step = False
            if self.get_control_mode()=="following":
                target_mat = self._target_prim.GetAttribute("xformOp:transform").Get()
                target_pos = target_mat.ExtractTranslation()
                target_rot = target_mat.ExtractRotationMatrix()
                self._tcp_target = {
                    "orig": np.array([target_pos[0], target_pos[1], target_pos[2]]) * self._meters_per_unit,
                    "axis_x": np.array(-target_rot[0]),
                    "axis_y": np.array(target_rot[1]),
                    "axis_z": np.array(-target_rot[2]),
                }
                self._robot.end_effector.go_local(target=self._tcp_target, use_default_config=True, wait_for_target=False)
            
            if self.get_control_mode()=="joint":
                print('Joint Mode')
                if self.is_reached():
                    print('done')
                    # self.switch_mode("idle")
                    if len(self._joint_targets)==0:
                        print('no joints target')
                    else:
                        joints = self._joint_targets.pop(0)
                        self._joint_target = joints
                        self._robot.send_config(self._joint_target)
                        self._robot.end_effector.go_local(use_default_config=True, wait_for_target=False)

            if self.get_control_mode()=="online":
                print('Online Mode')
                if self.is_reached():
                    print('done')
                    if len(self._tcp_targets)==0:
                        print('no tcp target')
                    else:
                        tcp = self._tcp_targets.pop(0)
                        self._tcp_target = tcp
                        self._robot.end_effector.go_local(target=self._tcp_target, use_default_config=False, wait_for_target=False) 
            
            if self.get_control_mode()=="idle":
                print('Idel Mode')

            # update RMP's world and robot states to sync with Kit
            self._world.update()
            self._robot.update()

            if self._save_data:
                self.collect_action_state()    ```

Sleeping is not allowed because it blocks the main thread, if you are running in an extension you have to use asyncio to await a sleep call instead as that is non blocking.