How can I implement a sequence of tasks on a single robot in IsaacSim using Python?

I am trying to execute a sequence of pick and place tasks using a cobotta_pro_900 in IsaacSim with DynamicCylinders as objects for the tasks. I took the pick and place example from omni.isaac.core and used the template to create a new task class that picks an object and places it in a desired position. However, when I try to create a multiple tasks and execute them one after the other, the simulation fails after the first object with the below warning

2024-06-11 17:58:18 [91,858ms] [Warning] [omni.usd] Unexpected reference count of 2 for UsdStage 'anon:0x13b006d0:World0.usd' while being closed in UsdContext (this may indicate it is still resident in memory).
[91.917s] Simulation App Shutting Down

The only reference to multiple tasks I have found from the documentation is at Multiple Tasks — Omniverse IsaacSim latest documentation but this doesn’t describe how to sequentially implement multiple tasks on the same robot. Is there any reference to this specific use case and how can I achieve this functionality?

Below is the code snippet I am using to create a list of tasks. For the controller, I am using the rmpflow controller from standalone_examples/api/omni.isaac.manipulators/cobotta_900

picking_robot = my_world.scene.get_object("cobotta_robot")
picking_controller = PickPlaceController(name="controller", robot_articulation=picking_robot, gripper=picking_robot.gripper)
articulation_controller = picking_robot.get_articulation_controller()

tasks = []
for i in range(num_tasks):
    target_position = dest_positions[i]
    task_name = "task_"+str(i)
    tasks.append(PickPlaceObject(name=task_name, target_position=target_position,robot=picking_robot,target_obj = cylinder_block))
    my_world.add_task(tasks[-1])
    
my_world.reset()

for i in range(num_tasks):

    target_position = dest_positions[i]
    task_name = tasks[i].name
    task_params = my_world.get_task(task_name).get_params()

    while simulation_app.is_running():
        my_world.step(render=True)
        if my_world.is_playing():
            if my_world.current_time_step_index == 0:
                my_world.reset()
                picking_controller.reset()
            observations = my_world.get_observations()
            actions = picking_controller.forward(
                    picking_position = source_positions[i],
                    placing_position= dest_positions[i],
                    current_joint_positions=observations[task_params["robot_name"]["value"]]["joint_positions"],
                    # This offset needs tuning as well
                    end_effector_offset=np.array([0.0, 0, 0.25]))
            articulation_controller.apply_action(actions)
            if picking_controller.is_done():
                print("Done picking and placing object")
                break

simulation_app.close()
1 Like

Very good question. I also studied this multitask tutorial myself and came up to the same problem.
Let me follow the topic and If have any updates I’ll share them here.

1 Like

I actually figured out the solution for this after posting the question. The below code snippet worked for me. The key difference is adding a controller reset after each task is finished.

Having the task counter increment inside the while simulation_app.is_running loop results in a smooth behavior whereas using the for loop for tasks controlling the simulation app inside is erratic. Hope this helps your use case too.

tasks = []
for i in range(num_tasks):
    target_position = dest_positions[i]
    task_name = "task_"+str(i)
    tasks.append(PickPlaceObject(name=task_name, target_position=target_position,robot=picking_robot,target_obj = cylinder_block))
    my_world.add_task(tasks[-1])
    
my_world.reset()


i = 0
target_position = dest_positions[i]
task_name = tasks[i].name
task_params = my_world.get_task(task_name).get_params()
while simulation_app.is_running():
    my_world.step(render=True)
    if my_world.is_playing():

        if my_world.current_time_step_index == 0:
            my_world.reset()
            picking_controller.reset()
        observations = my_world.get_observations()
        actions = picking_controller.forward(
                picking_position = source_positions[i],
                placing_position= dest_positions[i],
                current_joint_positions=observations[task_params["robot_name"]["value"]]["joint_positions"],
                # This offset needs tuning as well
                end_effector_offset=np.array([0.0, 0, 0.25]))
        articulation_controller.apply_action(actions)
        if picking_controller.is_done():
            print("Done picking and placing gear")

            if (i==num_tasks-1):
                break
            else:
                i = i+1
                target_position = dest_positions[i]
                task_name = tasks[i].name
                task_params = my_world.get_task(task_name).get_params()
                picking_controller.reset()
        
simulation_app.close()
1 Like

Nice. Well, I came up to something similar myself but for StackingController and Tasks defined in individual classes. The approach is really similar.

I had a minor problem with having manipulator duplicated while setting up scene since the parent omni.isaac.core.tasks.Stacking class has the robot initialization for each task. I had to add the getter for existing robot and setter for the case when you don’t need to add an existing robot to the same scene. I guess you did something close for your PickPlaceObject class.
I do think that the robot initialization should be optional on BaseTask level, but it is always possible to prepare custom Task based on it, so it is just a matter of personal preferences.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.