Hi @Hammad_M
sure, here you are
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#
from dis import dis
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core.utils.stage import add_reference_to_stage
from omni.isaac.core.tasks.base_task import BaseTask
from omni.isaac.core.articulations import ArticulationView
from omni.isaac.core.utils.prims import create_prim
from omni.isaac.core.prims import RigidPrimView, RigidPrim, GeometryPrimView , GeometryPrim
import omni.kit
from gym import spaces
import numpy as np
import torch
import math
import random
class CartpoleTask(BaseTask):
def __init__(self, name, offset=None) -> None:
# task-specific parameters
self._cartpole_position = [0.0, 0.0, 2.0]
self._reset_dist = 0.2
self._max_push_effort = 50000.0
# set stage
usd_context = omni.usd.get_context()
self._stage = usd_context.get_stage()
# values used for defining RL buffers
self._num_observations = 11
self._num_actions = 5
self._device = "cpu"
self.num_envs = 1
# a few class buffers to store RL-related states
self.obs = torch.zeros((self.num_envs, self._num_observations))
self.resets = torch.zeros((self.num_envs, 1))
# set the action and observation space for RL
self.action_space = spaces.Box(np.ones(self._num_actions) * -1.0, np.ones(self._num_actions) * 1.0)
self.observation_space = spaces.Box(
np.ones(self._num_observations) * -np.Inf, np.ones(self._num_observations) * np.Inf
)
# trigger __init__ of parent class
BaseTask.__init__(self, name=name, offset=offset)
def set_up_scene(self, scene) -> None:
# retrieve file path for the Cartpole USD file
# assets_root_path = get_assets_root_path()
usd_path = "G:\Omniverse\Isaac Sim\Robohouse\Modules\Robotic Arm\kr210-r3100-ultra.project.usd"
# add the Cartpole USD to our stage
# create_prim(prim_path="/World/Cartpole", prim_type="Xform", position=self._cartpole_position)
add_reference_to_stage(usd_path, "/World")
# create an ArticulationView wrapper for our cartpole - this can be extended towards accessing multiple cartpoles
self._robot_arm = ArticulationView(prim_paths_expr="/World/kr210_r3100_ultra", name="robot_view")
# add Cartpole ArticulationView and ground plane to the Scene
scene.add(self._robot_arm)
self._head = GeometryPrimView(prim_paths_expr="/World/kr210_r3100_ultra/axis_6_kr210_r3100_ultra/kr210_r3100_ultra_sldasm_part_93/kr210_r3100_ultra_sldasm_part_93", name="head_view" , reset_xform_properties=False)
scene.add(self._head)
self._target = GeometryPrimView(prim_paths_expr="/World/Target", name="target_view", reset_xform_properties=False)
scene.add(self._target)
# set default camera viewport position and target
self.set_initial_camera_params()
def set_initial_camera_params(self, camera_position=[10, 10, 3], camera_target=[0, 0, 0]):
viewport = omni.kit.viewport_legacy.get_default_viewport_window()
viewport.set_camera_position(
"/OmniverseKit_Persp", camera_position[0], camera_position[1], camera_position[2], True
)
viewport.set_camera_target("/OmniverseKit_Persp", camera_target[0], camera_target[1], camera_target[2], True)
def post_reset(self):
self._axis_1 = self._robot_arm.get_dof_index("RevoluteJoint_Axis_1_Axis_2")
self._axis_2 = self._robot_arm.get_dof_index("RevoluteJoint_Axis_2_Axis_3")
self._axis_3 = self._robot_arm.get_dof_index("RevoluteJoint_Axis_3_Axis_4")
self._axis_4 = self._robot_arm.get_dof_index("RevoluteJoint_Axis_4_Axis_5")
self._axis_5 = self._robot_arm.get_dof_index("RevoluteJoint_Axis_5_Axis_6")
# self._cart_dof_idx = self._robot_arm.get_dof_index("cartJoint")
# self._pole_dof_idx = self._robot_arm.get_dof_index("poleJoint")
# randomize all envs
indices = torch.arange(self._robot_arm.count, dtype=torch.int64, device=self._device)
self.reset(indices)
def reset(self, env_ids=None):
if env_ids is None:
env_ids = torch.arange(self.num_envs, device=self._device)
num_resets = len(env_ids)
# set target to random position
self._target.set_world_poses(positions = torch.FloatTensor( [[1.4, random.uniform(-1, 1), random.uniform(.7, 2.8)]]))
print("reset called")
self._last_distance = 2000
self._steps = 0
# randomize DOF positions
dof_pos = torch.zeros((num_resets, self._robot_arm.num_dof), device=self._device)
dof_pos[:, self._axis_1] = 0
dof_pos[:, self._axis_2] = 0
dof_pos[:, self._axis_3] = 0
dof_pos[:, self._axis_4] = 0
dof_pos[:, self._axis_5] = 0
# dof_pos[:, self._cart_dof_idx] = 1.0 * (1.0 - 2.0 * torch.rand(num_resets, device=self._device))
# dof_pos[:, self._pole_dof_idx] = 0.125 * math.pi * (1.0 - 2.0 * torch.rand(num_resets, device=self._device))
# randomize DOF velocities
dof_vel = torch.zeros((num_resets, self._robot_arm.num_dof), device=self._device)
dof_vel[:, self._axis_1] = 0
dof_vel[:, self._axis_2] = 0
dof_vel[:, self._axis_3] = 0
dof_vel[:, self._axis_4] = 0
dof_vel[:, self._axis_5] = 0
# dof_vel[:, self._cart_dof_idx] = 0.5 * (1.0 - 2.0 * torch.rand(num_resets, device=self._device))
# dof_vel[:, self._pole_dof_idx] = 0.25 * math.pi * (1.0 - 2.0 * torch.rand(num_resets, device=self._device))
# apply resets
indices = env_ids.to(dtype=torch.int32)
self._robot_arm.set_joint_positions(dof_pos, indices=indices)
self._robot_arm.set_joint_velocities(dof_vel, indices=indices)
# bookkeeping
self.resets[env_ids] = 0
def pre_physics_step(self, actions) -> None:
reset_env_ids = self.resets.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self.reset(reset_env_ids)
# print("pre_physics_step" , actions[0])
# increase steps
self._steps += 1
actions = torch.tensor(actions)
forces = torch.zeros((self._robot_arm.count, self._robot_arm.num_dof), dtype=torch.float32, device=self._device)
# forces[:, self._cart_dof_idx] = self._max_push_effort * actions[0]
forces[:, self._axis_1] = self._max_push_effort * actions[0]
forces[:, self._axis_2] = self._max_push_effort * actions[1]
forces[:, self._axis_3] = self._max_push_effort * actions[2]
forces[:, self._axis_4] = self._max_push_effort * actions[3]
forces[:, self._axis_5] = self._max_push_effort * actions[4]
indices = torch.arange(self._robot_arm.count, dtype=torch.int32, device=self._device)
self._robot_arm.set_joint_efforts(forces, indices=indices)
def get_observations(self):
dof_pos = self._robot_arm.get_joint_positions()
# collect pole and cart joint positions and velocities for observation
# cart_pos = dof_pos[:, self._cart_dof_idx]
# cart_vel = dof_vel[:, self._cart_dof_idx]
# pole_pos = dof_pos[:, self._pole_dof_idx]
# pole_vel = dof_vel[:, self._pole_dof_idx]
# self.obs[:, 0] = cart_pos
# self.obs[:, 1] = cart_vel
# self.obs[:, 2] = pole_pos
# self.obs[:, 3] = pole_vel
axis_1_pos = dof_pos[:, self._axis_1]
axis_2_pos = dof_pos[:, self._axis_2]
axis_3_pos = dof_pos[:, self._axis_3]
axis_4_pos = dof_pos[:, self._axis_4]
axis_5_pos = dof_pos[:, self._axis_5]
# print(axis_2_pos)
targetPose = self._target.get_world_poses()
headPose = self._head.get_world_poses()
robotArmPoses = self._robot_arm.get_world_poses(
clone = False)
# print(robotArmPoses)
# print(headPose)
# we have to take a look at the position of the target and head
# head_pos = self.get_object_position("/World/kr210_r3100_ultra/axis_6_kr210_r3100_ultra/kr210_r3100_ultra_sldasm_part_93/kr210_r3100_ultra_sldasm_part_93")
# target_pos = self.get_object_position("/World/Target")
# print("axis 5 is " , type(head_pos) , head_pos[0])
self.obs[:, 0] = axis_1_pos
self.obs[:, 1] = axis_2_pos
self.obs[:, 2] = axis_3_pos
self.obs[:, 3] = axis_4_pos
self.obs[:, 4] = axis_5_pos
# we have to assign the values
# Head
self.obs[:, 5] = headPose[0][0][0]
self.obs[:, 6] = headPose[0][0][1]
self.obs[:, 7] = headPose[0][0][2]
# Target
self.obs[:, 8] = targetPose[0][0][0]
self.obs[:, 9] = targetPose[0][0][1]
self.obs[:, 10] = targetPose[0][0][2]
return self.obs
def get_object_position(self , prim_path):
import omni
from pxr import UsdGeom, Gf
timeline = omni.timeline.get_timeline_interface()
timecode = timeline.get_current_time() * timeline.get_time_codes_per_seconds()
curr_prim = self._stage.GetPrimAtPath(prim_path)
pose = omni.usd.utils.get_world_transform_matrix(curr_prim, timecode)
# print(timecode)
# print(prim_path , pose.ExtractTranslation())
return pose.ExtractTranslation()
def calculate_metrics(self) -> None:
head_pos = np.array([self.obs[:, 5] , self.obs[:, 6] , self.obs[:, 7]])
target_pos = np.array([self.obs[:, 8] , self.obs[:, 9] , self.obs[:, 10]])
# add negative reward
reward = 0.1
dist = np.linalg.norm(head_pos - target_pos)
# print("distance" , dist)
# if dist < self._last_distance:
# self._last_distance = dist\
reward += 1.0 / dist
if dist < 0.2:
reward += 5000
# print("distance is ", dist)
# compute reward based on angle of pole and cart velocity
reward = torch.FloatTensor([reward])
# max_distance = 2
# reward = torch.where(torch.abs(torch.FloatTensor([dist])) > max_distance, torch.ones_like(reward) * -2.0, reward)
return reward.item()
def is_done(self) -> None:
head_pos = np.array([self.obs[:, 5] , self.obs[:, 6] , self.obs[:, 7]])
target_pos = np.array([self.obs[:, 8] , self.obs[:, 9] , self.obs[:, 10]])
dist = np.linalg.norm(head_pos - target_pos)
dist = torch.FloatTensor([dist])
if dist < self._reset_dist:
print("Reached Target")
# reset the robot if cart has reached reset_dist or pole is too far from upright
resets = torch.where(dist < self._reset_dist, 1, 0)
resets = torch.where(torch.FloatTensor([self._steps]) > 400, 1, resets)
# resets = torch.where(torch.abs(pole_pos) > math.pi / 2, 1, resets)
self.resets = resets
return resets.item()