Adding our sim script below
# Copyright (c) 2021-2023, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#
# Generel imports
import os
import yaml
import argparse
import numpy as np
from importlib.resources import files
# Need to move arg parsing here to handle whatever SimulationApp does under the hood wrt args and subsequent imports
def parse_arguments():
"""Parse arguments early, before SimulationApp creation"""
parser = argparse.ArgumentParser(description="Phoenix Isaac Sim - Robot Simulation Platform")
# Add backward compatibility arguments to main parser
parser.add_argument(
"--config",
type=str,
default="config/simulation_config.yaml",
help="Path to the config file",
)
parser.add_argument(
"--robots", nargs="+", default=["spot"], help="Specify robot (go2 or spot)"
)
parser.add_argument("--scene", type=str, default="GQ", help="Specify scene")
parser.add_argument(
"--show_ui", action="store_true", help="Show the UI in the app window"
)
parser.add_argument("--headless", action="store_true", help="Run the sim headless")
parser.add_argument(
"--autonomous",
action="store_true",
help="Start the robot in autonomy mode, otherwise teleop",
)
parser.add_argument("--task", type=str, default="None", help="Specify scenario (e.g. Scenario1)")
# Create subparsers for different commands
subparsers = parser.add_subparsers(dest='command', help='Available commands')
# Run command (inherits arguments from parent parser)
run_parser = subparsers.add_parser('run', help='Run the simulation', parents=[parser], add_help=False)
# List robots command
list_parser = subparsers.add_parser('list-robots', help='List available robot platforms')
# Robot info command
info_parser = subparsers.add_parser('robot-info', help='Show detailed information about a robot')
info_parser.add_argument('robot_name', nargs='?', help='Robot name (optional - shows all if not specified)')
args_cli, _ = parser.parse_known_args()
# If no command specified, default to 'run'
if args_cli.command is None:
args_cli.command = 'run'
return args_cli
args_cli = parse_arguments()
# Handle non-simulation commands first (before creating SimulationApp)
if args_cli.command == 'list-robots':
from .cli_utils import handle_list_robots_command
handle_list_robots_command()
elif args_cli.command == 'robot-info':
from .cli_utils import handle_robot_info_command
robot_name = getattr(args_cli, 'robot_name', None)
handle_robot_info_command(robot_name)
# IsaacSim imports - SimulationApp must be created before other Isaac Sim imports
from isaacsim import SimulationApp
simulation_app = SimulationApp(
{
"headless": args_cli.headless,
"display_options": 1 << 0 | 1 << 13 | 1 << 14,
}
)
import carb
import omni
import omni.ui as ui
import omni.appwindow # Contains handle to keyboard
from isaacsim.core.api import World
from isaacsim.core.utils.nucleus import get_assets_root_path
from isaacsim.core.utils.stage import add_reference_to_stage
from isaacsim.core.prims import XFormPrim
# ROS imports
import threading
# enable ROS2 bridge extension
ext_manager = omni.kit.app.get_app().get_extension_manager()
ext_manager.set_extension_enabled_immediate("isaacsim.ros2.bridge", True)
ext_manager.set_extension_enabled_immediate("isaacsim.sensors.physx", True)
import rclpy
from rclpy.executors import SingleThreadedExecutor
from geometry_msgs.msg import Twist
# Local imports
from phoenix_isaacsim.scenes import SceneBuilder
from phoenix_isaacsim.robots import RobotInitializer
from .utils.keybindings import KeyBindings, AutonomyMode
from .utils.viewport_utils import ViewportUtils
from .utils.tasks import TaskBuilder
from .utils.buttons import Buttons
class simulation_runner(object):
def __init__(
self,
app: SimulationApp,
physics_dt: float,
render_dt: float,
robots: list,
autonomous: bool,
scenario: str = None
) -> None:
"""
creates the simulation world with preset physics_dt and render_dt and creates robot inside the world
Argument:
physics_dt {float} -- Physics downtime of the scene.
render_dt {float} -- Render downtime of the scene.
"""
self._robots = robots
self.app = app
self._world = World(stage_units_in_meters=1.0, physics_dt=physics_dt, rendering_dt=render_dt)
assets_root_path = get_assets_root_path()
if assets_root_path is None:
carb.log_error("Could not find Isaac Sim assets folder")
self._base_command = np.zeros(3)
self._cmd_vel = [np.zeros(3), rclpy.time.Time(seconds=0, nanoseconds=0)]
self.viewport_utils = ViewportUtils(self._robots[0])
if scenario != "None":
self.task_builder = TaskBuilder(scenario_name=scenario,
robot=robots[0],
)
try:
self.buttons = Buttons()
# self.buttons.initialize_button()
self._world.scene.add(self.buttons.button_robot)
self.using_buttons = True
except:
self.using_buttons = False
else:
self.task_builder = None
# Use KeyBindings for keyboard logic
self.keybindings = KeyBindings(
robots=self._robots,
update_camera_pose=self.viewport_utils.update_camera_pose,
simulation_app=simulation_app,
setup_fn=self.setup,
world=self._world,
run_fn=self.run,
robot_mode=AutonomyMode.AUTONOMOUS if autonomous else AutonomyMode.TELEOP,
)
self.needs_reset = False
self.stop_simulation = False
self.first_step = True
self.robots_initialized = False
def setup(self) -> None:
"""
Set up keyboard listener and add physics callback
"""
self._appwindow = omni.appwindow.get_default_app_window()
self._input = carb.input.acquire_input_interface()
self._keyboard = self._appwindow.get_keyboard()
self._sub_keyboard = self._input.subscribe_to_keyboard_events(self._keyboard, self.keybindings.sub_keyboard_event)
self._world.add_physics_callback("physics_forward", callback_fn=self.on_physics_step)
def run(self) -> None:
"""
Step simulation based on rendering downtime
"""
robot_names = [robot.name for robot in self._robots]
self.add_cmd_sub(robot_names)
if not args_cli.headless:
# Cache method lookups
keybindings_get_mode = self.keybindings.get_mode
keybindings_get_camera_mode = self.keybindings.is_camera_tracking
keybindings_get_speed = self.keybindings.get_current_speed
keybindings_get_controls_desc = self.keybindings.get_controls_description
keybindings_get_robot_name = self.keybindings.get_target_robot_name
keybindings_get_robot_num = self.keybindings.get_target_robot_num
viewport_utils_update_camera_pose = self.viewport_utils.update_camera_pose
# Create UI once outside the loop
window = ui.Window("Viewport")
with window.frame:
with ui.VStack():
mode_label = ui.Label(f"Mode: {keybindings_get_mode()}", alignment=ui.Alignment.CENTER_TOP)
mode_label.visible = True
mode_label.set_style({"background_color": 0x00000000, "font_size": 25})
# Controls label
controls_label = ui.Label(
keybindings_get_controls_desc(),
alignment=ui.Alignment.LEFT_BOTTOM)
controls_label.visible = True
controls_label.set_style({"background_color": 0xFFFFFF, "font_size": 18})
world_step = self._world.step
world_is_stopped = self._world.is_stopped
while simulation_app.is_running():
if not args_cli.headless:
mode_label.text = f"Mode: {keybindings_get_mode()}"
controls_label.text = f"{keybindings_get_robot_name().capitalize()}\nTeleop Speed: {keybindings_get_speed():.1f}\n\n{keybindings_get_controls_desc()}"
if keybindings_get_camera_mode():
self.viewport_utils.robot = self._robots[keybindings_get_robot_num()]
viewport_utils_update_camera_pose()
if world_is_stopped():
self.needs_reset = True
if self.task_builder is not None:
if self.task_builder.reset_requested:
self.task_builder.reset_requested = False
self.keybindings._reset_sim()
if self.task_builder != None:
self.task_builder.task_idx = 1
self.task_builder.current_task_idx = 1
self.task_builder.publish_scenario()
self.task_builder.publish_current_task()
self.task_builder.get_start_time()
if self.using_buttons:
self.buttons._reset_game()
print("Buttons Reset")
try:
# self._world.scene.add(self.buttons.button_robot)
self.using_buttons = True
print("Buttons Init Done after hard reset")
except:
self.using_buttons = False
self.buttons.update()
if self.task_builder.stop_simulation:
self.stop_simulation = True
if self.task_builder.soft_reset_requested:
print("Robot Spawn Reset Requested")
self.task_builder.soft_reset_requested=False
global start_location
robot_names = [robot.name for robot in self._robots]
robot_names=f"/World/{robot_names[0]}/spot_body"
# print(robot_names,start_location)
# self._world.scene.get_object(robot_names[0]).set_world_pose(spawn_locations[0])
robot_root = XFormPrim(prim_paths_expr=robot_names)
robot_root.initialize()
robot_root.set_world_poses(positions=np.array(start_location[0]).reshape(-1, 3),
orientations=np.array([1,0,0,0]).reshape(-1, 4))
if self.using_buttons:
self.buttons.update()
world_step(render=True)
return
def on_physics_step(self, step_size) -> None:
"""
Physics call back, initialize robot (first frame) and call robot advance function to compute and apply joint torque
"""
# Spin task node if running
if self.task_builder != None:
rclpy.spin_once(self.task_builder, timeout_sec=0.0)
if self.first_step:
# self.task_builder.get_logger().info(f"Clock type: {self.task_builder.get_sim_time()}")
if self.task_builder!=None and self.task_builder.get_sim_time()>=15 :
self.first_step = False
# Publish first task upon initialization of scene
if self.task_builder != None:
self.task_builder.publish_scenario()
self.task_builder.publish_current_task()
self.task_builder.get_start_time()
elif self.task_builder==None:
self.first_step = False
# # start button robot
# self.buttons.initialize_button()
elif self.stop_simulation:
self._world.stop()
self.app.close()
elif not self.robots_initialized:
for robot in self._robots:
robot.initialize()
self.robots_initialized = True
else:
mode = self.keybindings.get_mode()
if mode == "Teleop":
base_command = self.keybindings.get_base_command()
if self.keybindings.is_individual_control():
command = np.zeros((len(self._robots), 5))
command[self.keybindings.get_target_robot_num()] = [*base_command, 0, 0]
else:
command = np.tile(base_command, (len(self._robots), 1))
else: # Autonomy
command = self._cmd_vel # ignore time stamps
for i, robot in enumerate(self._robots):
robot.forward(step_size, command[i])
def cmd_vel_cb(self, msg, robot_index):
"""Callback for specific robot cmd_vel"""
x = msg.linear.x
y = msg.linear.y
z = msg.angular.z
stamp = rclpy.clock.Clock().now()
(secs, nsecs) = stamp.seconds_nanoseconds()
self._cmd_vel[robot_index] = [x, y, z, secs, nsecs]
def add_cmd_sub(self, robot_names):
"""Add cmd_vel subscribers for multiple robots using a single node"""
# Initialize cmd_vel array for all robots
self._cmd_vel = np.zeros((len(robot_names), 5))
# Create a single node for all subscriptions
self.cmd_vel_node = rclpy.create_node('multi_robot_cmd_vel_subscriber')
# Create subscription for each robot on the same node
for i, robot_name in enumerate(robot_names):
self.cmd_vel_node.create_subscription(
Twist,
f'{robot_name}/cmd_vel',
lambda msg, idx=i: self.cmd_vel_cb(msg, idx),
10
)
# Use a SingleThreadedExecutor to manage the node's callbacks
self.executor = SingleThreadedExecutor()
self.executor.add_node(self.cmd_vel_node)
# The thread's target is the executor's own spin method
self.cmd_vel_thread = threading.Thread(target=self.executor.spin)
self.cmd_vel_thread.start()
def main(args=None):
"""
Parse arguments and handle different commands (run, list-robots, robot-info)
"""
if not args_cli.show_ui:
settings = carb.settings.get_settings()
settings.set("/app/window/hideUi", True)
robot_names = [robot.lower() for robot in args_cli.robots]
rclpy.init()
# Load config file if it was provided, otherwise use an empty dict
try:
with open(args_cli.config, "r") as f:
config = yaml.safe_load(f)
except:
config = {}
# Load scene configuration using the new class method
scene_cfg = SceneBuilder.load_scene(args_cli.scene, robot_names)
spawn_locations = scene_cfg.spawn_locations
global start_location
start_location=spawn_locations
scene_path = scene_cfg.file_path
# Initialize robots using the new method
robots, temp_files = RobotInitializer.initialize_robots(robot_names, spawn_locations)
stage = add_reference_to_stage(usd_path=scene_path, prim_path="/World")
physics_dt = 0.005
render_dt = 0.02
runner = simulation_runner(
app=simulation_app,
physics_dt=physics_dt,
render_dt=render_dt,
robots=robots,
autonomous=args_cli.autonomous,
scenario=args_cli.task
)
print("Wait for env to load ...")
simulation_app.update()
runner.setup()
simulation_app.update()
runner._world.reset()
simulation_app.update()
runner.run()
print(f"Cleaning up {len(temp_files)} temporary files...")
for temp_file in temp_files:
try:
os.remove(temp_file)
except Exception as e:
print(f"Warning: Could not remove temporary file {temp_file}: {e}")
print("Closing simulation app...")
simulation_app.close()
print("--- CLEANUP COMPLETE ---")
if __name__ == "__main__":
main()