Hi @toni.sm ,
I am really appreciate the rapid reply.
I confirmed that the customizing example is working well, and I customized it in my case.
However, I am not sure the learning process is working well, because I cannot check the prim as video below.
I created my robot (/World/M1013_0) using <creat_prim> and also add the Inverse kinematics target (/World/M1013_0/Target).
The positions of end_effector and target position are collected by <get_prim_property> in [def get_observation]. (“/World/M1013_0/TCP” and “/World/M1013_0/Target”, respectably)
And then, the target is deleted and created again when the reset is 1 using <delete_prim> in [def reset].
This is the full code for my “task_train.py”.
from omni.isaac.core.utils.nucleus import get_assets_root_path
from omni.isaac.core.utils.stage import add_reference_to_stage
from omni.isaac.core.tasks.base_task import BaseTask
from omni.isaac.core.articulations import ArticulationView
from omni.isaac.core.utils.prims import create_prim, delete_prim
from omni.isaac.core.utils.viewports import set_camera_view
from omni.isaac.cloner import GridCloner
from omni.isaac.core.utils.prims import get_prim_property
import omni.kit
import gym
from gym import spaces
import numpy as np
import torch
import math
import random
class RobotTask(BaseTask):
def __init__(self, name, offset=None) -> None:
# task-specific parameters
self._robot_position = [0.0, 0.0, 0.2]
self._reset_dist = 2
self._success_dist = 0.02
self._step = 0
self._max_episode_length = 1000
self._time_penalty = -0.01
self._dist_reward_scale = 0.1
self._target_position = []
# values used for defining RL buffers
self._num_observations = 13 #joint_curret(6)/Position_EE(3)/Position_target(3)
self._num_actions = 6 #delta joints(6)
self._device = "cpu"
self.num_envs = 4
# a few class buffers to store RL-related states
self.obs = torch.zeros((self.num_envs, self._num_observations))
self.resets = torch.zeros((self.num_envs, 1))
# set the action and observation space for RL
self.action_space = spaces.Box(np.ones(self._num_actions) * -0.0174, np.ones(self._num_actions) * 0.0174)
self.observation_space = spaces.Box(
np.ones(self._num_observations) * -np.Inf, np.ones(self._num_observations) * np.Inf
)
# trigger __init__ of parent class
BaseTask.__init__(self, name=name, offset=offset)
def set_up_scene(self, scene) -> None:
# retrieve file path for the Robot USD file
usd_path = "omniverse://localhost/Isaac/Robots/K-CLOUD/robots/AMR_M1013/mark5.usd"
create_prim(prim_path="/World/M1013_0", prim_type="Xform", position=self._robot_position)
self._target_position = [random.uniform(0.4, 0.6), random.uniform(-0.3, 0.3), random.uniform(0.55, 1.0)]
create_prim(prim_path="/World/M1013_0/Target", prim_type="Xform", position=self._target_position)
add_reference_to_stage(usd_path, "/World/M1013_0")
# create a GridCloner instance
cloner = GridCloner(spacing=5)
target_paths = cloner.generate_paths("/World/M1013", self.num_envs)
print(target_paths)
position_offsets = np.zeros([self.num_envs, 3]) + np.array([0, 0, self._robot_position[2]])
cloner.clone(source_prim_path="/World/M1013_0", prim_paths=target_paths, position_offsets=position_offsets)
# create an ArticulationView wrapper for our robot - this can be extended towards accessing multiple cartpoles
self._robots = ArticulationView(prim_paths_expr="/World/M1013*", name="robot_view")
# add Robot ArticulationView and ground plane to the Scene
scene.add(self._robots)
scene.add_default_ground_plane()
# set default camera viewport position and target
self.set_initial_camera_params()
def set_initial_camera_params(self, camera_position=[10, 10, 3], camera_target=[0, 0, 0]):
set_camera_view(eye=camera_position, target=camera_target, camera_prim_path="/OmniverseKit_Persp")
def post_reset(self):
self._joint1_dof_idx = self._robots.get_dof_index("joint1")
self._joint2_dof_idx = self._robots.get_dof_index("joint2")
self._joint3_dof_idx = self._robots.get_dof_index("joint3")
self._joint4_dof_idx = self._robots.get_dof_index("joint4")
self._joint5_dof_idx = self._robots.get_dof_index("joint5")
self._joint6_dof_idx = self._robots.get_dof_index("joint6")
# randomize all envs
indices = torch.arange(self._robots.count, dtype=torch.int64, device=self._device)
self.reset(indices)
def reset(self, env_ids=None):
if env_ids is None:
env_ids = torch.arange(self.num_envs, device=self._device)
num_resets = len(env_ids)
delete_prim(prim_path="/World/M1013_0/Target")
self._step = 0
# Initialized DOF positions
dof_pos = torch.zeros((num_resets, self._robots.num_dof), device=self._device)
dof_pos[:, self._joint1_dof_idx] = 0 * torch.rand(num_resets, device=self._device)
dof_pos[:, self._joint2_dof_idx] = 0 * torch.rand(num_resets, device=self._device)
dof_pos[:, self._joint3_dof_idx] = 0 * torch.rand(num_resets, device=self._device)
dof_pos[:, self._joint4_dof_idx] = 0 * torch.rand(num_resets, device=self._device)
dof_pos[:, self._joint5_dof_idx] = 0 * torch.rand(num_resets, device=self._device)
dof_pos[:, self._joint6_dof_idx] = 0 * torch.rand(num_resets, device=self._device)
# print("dof_pos = ", dof_pos)
self._target_position = [random.uniform(0.4, 0.6), random.uniform(-0.3, 0.3), random.uniform(0.75, 1.2)]
create_prim(prim_path="/World/M1013_0/Target", prim_type="Xform", position=self._target_position)
# apply resets
indices = env_ids.to(dtype=torch.int32)
self._robots.set_joint_positions(dof_pos, indices=indices)
# bookkeeping
self.resets[env_ids] = 0
def pre_physics_step(self, actions) -> None: # Action function
reset_env_ids = self.resets.nonzero(as_tuple=False).squeeze(-1)
if len(reset_env_ids) > 0:
self.reset(reset_env_ids)
actions = torch.tensor(actions)
j_current = self._robots.get_joint_positions()
delta_joint = torch.zeros((self._robots.count, self._robots.num_dof), dtype=torch.float32, device=self._device)
# print("robot count = ", self._robots.count, "j_current = ", j_current, "action = ", actions)
delta_joint[:, self._joint1_dof_idx] = j_current[:, 0] + actions[:, 0]
delta_joint[:, self._joint2_dof_idx] = j_current[:, 1] + actions[:, 1]
delta_joint[:, self._joint3_dof_idx] = j_current[:, 2] + actions[:, 2]
delta_joint[:, self._joint4_dof_idx] = j_current[:, 3] + actions[:, 3]
delta_joint[:, self._joint5_dof_idx] = j_current[:, 4] + actions[:, 4]
delta_joint[:, self._joint6_dof_idx] = j_current[:, 5] + actions[:, 5]
indices = torch.arange(self._robots.count, dtype=torch.int32, device=self._device)
self._step += 1
self._robots.set_joint_positions(delta_joint, indices=indices)
def get_observations(self): #Observation function
dof_pos = self._robots.get_joint_positions()
EE_pos = get_prim_property(prim_path = "/World/M1013_0/TCP", property_name = "xformOp:translate")
Target_pos = get_prim_property(prim_path = "/World/M1013_0/Target", property_name = "xformOp:translate")
if Target_pos is not None:
self.distance = np.linalg.norm(Target_pos - EE_pos)
else:
self.distance = 0
# collect pole and cart joint positions for observation
j1_pos = dof_pos[:, self._joint1_dof_idx]
j2_pos = dof_pos[:, self._joint2_dof_idx]
j3_pos = dof_pos[:, self._joint3_dof_idx]
j4_pos = dof_pos[:, self._joint4_dof_idx]
j5_pos = dof_pos[:, self._joint5_dof_idx]
j6_pos = dof_pos[:, self._joint6_dof_idx]
self.obs[:, 0] = j1_pos
self.obs[:, 1] = j2_pos
self.obs[:, 2] = j3_pos
self.obs[:, 3] = j4_pos
self.obs[:, 4] = j5_pos
self.obs[:, 5] = j6_pos
self.obs[:, 6] = EE_pos[0]
self.obs[:, 7] = EE_pos[1]
self.obs[:, 8] = EE_pos[2]
self.obs[:, 9] = Target_pos[0]
self.obs[:, 10] = Target_pos[1]
self.obs[:, 11] = Target_pos[2]
self.obs[:, 12] = self.distance
# print("OBS=", self.obs, "obs[:,12] = ", self.obs[:, 12])
return self.obs
def calculate_metrics(self) -> None: # Reward function
distance = self.obs[:, 12]
# compute reward based on distance btw target and end-effector
if distance is not None:
reward = 1.0 / (1.0 + distance ** 2)
reward *= reward
reward = torch.where(distance <= self._success_dist, reward * 5, reward)
# apply a penalty if cart is too far from center
reward = torch.where(torch.abs(distance) > self._reset_dist, torch.ones_like(reward) * -10.0, self._dist_reward_scale * reward - self._time_penalty)
#print("Time step = ", self._step, "Reward = ", reward, "Distance = ", distance, "reward.item = ", reward.item())
return reward
def is_done(self) -> None: # Reset function
distance = self.obs[:, 12]
# reset the robot if cart has reached reset_dist or pole is too far from upright
resets = torch.where(torch.abs(distance) > self._reset_dist, 1, 0)
resets = torch.where(torch.tensor(self._step) > self._max_episode_length - 1, 1, resets)
self.resets = resets
# print("Is Done = ", resets, "Type = ", resets.type())
return resets
My question is whether the method above has no logical error.
Is it ok to create, delete, and get the prim property from only “/World/M1013_0/Target” or should I change the method for it ?