I think _create_envs
function occured problems.
import numpy as np
import os
import pyquaternion as pyq
import math
from isaacgym import gymtorch
from isaacgym import gymapi
import torch
from rlgpu.utils.torch_jit_utils import *
from rlgpu.tasks.base.base_task import BaseTask
class STEProbot(BaseTask):
def __init__(self, cfg, sim_params, physics_engine, device_type, device_id, headless):
self.cfg = cfg
self.sim_params = sim_params
self.physics_engine = physics_engine
# set PhysX-specific parameters
self.sim_params.physx.use_gpu = True
self.sim_params.physx.solver_type = 1
self.sim_params.physx.num_position_iterations = 4
self.sim_params.physx.num_velocity_iterations = 0
self.sim_params.physx.contact_offset = 0.1
self.sim_params.physx.rest_offset = 0.0
self.sim_params.physx.bounce_threshold_velocity = 0.2
self.sim_params.physx.max_depenetration_velocity = 1.0
self.num_envs = self.cfg["env"]["numEnvs"]
self.reset_dist = self.cfg["env"]["resetDist"]
self.max_push_effort = self.cfg["env"]["maxEffort"]
self.max_episode_length = 500
self.cfg["env"]["numObservations"] = 10
self.cfg["env"]["numActions"] = 2
self.cfg["device_type"] = device_type
self.cfg["device_id"] = device_id
self.cfg["headless"] = headless
super().__init__(cfg=self.cfg)
dof_state_tensor = self.gym.acquire_dof_state_tensor(self.sim)
self.dof_state = gymtorch.wrap_tensor(dof_state_tensor)
self.dof_pos = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 0]
self.dof_vel = self.dof_state.view(self.num_envs, self.num_dof, 2)[..., 1]
# State for each rigid body contains position([0:3]), rotation([3:7]), linear velocity([7:10]), and angular velocity([10:13]).
# self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[a,b,c]
rigid_body_state_tensor = self.gym.acquire_rigid_body_state_tensor(self.sim)
self.rigid_body_state = gymtorch.wrap_tensor(rigid_body_state_tensor)
self.body_pos = self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[:,0,0:3]
self.body_vel = self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[:,0,7:10]
self.body_quar = self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[:,0,3:7]
self.body_angv = self.rigid_body_state.view(self.num_envs, self.num_bodies, 13)[:,0,10:13]
actor_root_state = self.gym.acquire_actor_root_state_tensor(self.sim)
self.root_states = gymtorch.wrap_tensor(actor_root_state)
self.initial_root_states = self.root_states.clone()
self.initial_root_states[:, 7:13] = 0
self.initial_root_states[:, 2] = 0.2
self.initial_root_states[:, 6] = 1
self.targets = to_torch([1000, 0, 0], device=self.device).repeat((self.num_envs, 1))
self.dt = 1./60.
self.potentials = to_torch([-1000./self.dt], device=self.device).repeat(self.num_envs)
self.prev_potentials = self.potentials.clone()
def create_sim(self):
# set the up axis to be z-up given that assets are y-up by default
self.sim_params.up_axis = gymapi.UP_AXIS_Z
self.sim_params.gravity.x = 0
self.sim_params.gravity.y = 0
self.sim_params.gravity.z = -9.81
self.sim = super().create_sim(self.device_id, self.graphics_device_id, self.physics_engine, self.sim_params)
self._create_ground_plane()
self._create_envs(self.num_envs, self.cfg["env"]['envSpacing'], int(np.sqrt(self.num_envs)))
def _create_ground_plane(self):
plane_params = gymapi.PlaneParams()
# set the normal force to be z dimension
plane_params.normal = gymapi.Vec3(0.0, 0.0, 1.0)
plane_params.static_friction = 1
plane_params.dynamic_friction = 0.7
self.gym.add_ground(self.sim, plane_params)
def _create_envs(self, num_envs, spacing, num_per_row):
# define plane on which environments are initialized
lower = gymapi.Vec3(-spacing, -spacing, 0.0)
upper = gymapi.Vec3(spacing, spacing, spacing)
asset_root = "../../assets/"
asset_file = "urdf/STEP_robot/urdf/STEP_robot.urdf"
if "asset" in self.cfg["env"]:
asset_root = self.cfg["env"]["asset"].get("assetRoot", asset_root)
asset_file = self.cfg["env"]["asset"].get("assetFileName", asset_file)
asset_path = os.path.join(asset_root, asset_file)
asset_root = os.path.dirname(asset_path)
asset_file = os.path.basename(asset_path)
asset_options = gymapi.AssetOptions()
asset_options.fix_base_link = False
asset_options.default_dof_drive_mode = gymapi.DOF_MODE_NONE
asset_options.angular_damping = 0.0
testtask_asset = self.gym.load_asset(self.sim, asset_root, asset_file, asset_options)
self.num_dof = self.gym.get_asset_dof_count(testtask_asset)
self.num_bodies = self.gym.get_asset_rigid_body_count(testtask_asset)
pose = gymapi.Transform()
pose.p.z = 0.2
# asset is rotated z-up by default, no additional rotations needed
# pose.r = gymapi.Quat(0.0, 0.0, 0.0, 1.0)
self.testtask_handles = []
self.envs = []
for i in range(self.num_envs):
# create env instance
env_ptr = self.gym.create_env(self.sim, lower, upper, num_per_row)
testtask_handle = self.gym.create_actor(env_ptr, testtask_asset, pose, "testtask", i, 1, 0)
self.gym.enable_actor_dof_force_sensors(env_ptr, testtask_handle)
dof_props = self.gym.get_actor_dof_properties(env_ptr, testtask_handle)
dof_props['driveMode'][0] = gymapi.DOF_MODE_EFFORT
dof_props['driveMode'][1] = gymapi.DOF_MODE_EFFORT
self.gym.set_actor_dof_properties(env_ptr, testtask_handle, dof_props)
self.envs.append(env_ptr)
self.testtask_handles.append(testtask_handle)
def compute_reward(self):
# retrieve environment observations from buffer
body_pos_x = self.obs_buf[:,0]
body_pos_y = self.obs_buf[:,1]
body_pos_z = self.obs_buf[:,2]
body_angle = self.obs_buf[:,3]
body_anglev = self.obs_buf[:,4]
wheel_velL = self.obs_buf[:,5]
wheel_velR = self.obs_buf[:,6]
body_vel_x = self.obs_buf[:,7]
body_vel_y = self.obs_buf[:,8]
body_vel_z = self.obs_buf[:,9]
self.rew_buf[:], self.reset_buf[:] = compute_testtask_reward(
body_angle, body_anglev, body_pos_x, body_pos_y, body_pos_z, wheel_velL, wheel_velR, body_vel_x, body_vel_y, body_vel_z,
self.reset_dist, self.reset_buf, self.progress_buf, self.max_episode_length
)
if torch.count_nonzero(self.reset_buf[:])==0:
print('Exception')
# print(body_angle, sep='\n')
# print('\n')
def compute_observations(self, env_ids=None): # body_angle = math.acos(pyq.Quaternion(self.body_quar[i].tolist()).rotation_matrix[2,2]) #*57.29577951308232
if env_ids is None:
env_ids = np.arange(self.num_envs)
self.gym.refresh_dof_state_tensor(self.sim)
self.gym.refresh_rigid_body_state_tensor(self.sim)
body_angles = []
for i in range(self.num_envs):
body_angle = math.acos(1-2*self.body_quar[i].tolist()[1]**2-2*self.body_quar[i].tolist()[2]**2)
body_angles.append(body_angle)
self.obs_buf[env_ids, 0] = self.body_pos[env_ids, 0].squeeze()
self.obs_buf[env_ids, 1] = self.body_pos[env_ids, 1].squeeze()
self.obs_buf[env_ids, 2] = self.body_pos[env_ids, 2].squeeze()
self.obs_buf[env_ids, 3] = torch.tensor(body_angles, device = self.device).view(1,self.num_envs).squeeze()
self.obs_buf[env_ids, 4] = torch.sqrt(torch.sum(self.body_angv**2,1))
self.obs_buf[env_ids, 5] = self.dof_vel[env_ids, 0].squeeze()
self.obs_buf[env_ids, 6] = self.dof_vel[env_ids, 1].squeeze()
self.obs_buf[env_ids, 7] = self.body_vel[env_ids, 0].squeeze()
self.obs_buf[env_ids, 8] = self.body_vel[env_ids, 1].squeeze()
self.obs_buf[env_ids, 9] = self.body_vel[env_ids, 2].squeeze()
return self.obs_buf
def reset(self, env_ids):
positions = torch_rand_float(-np.pi,np.pi, (len(env_ids), self.num_dof), device=self.device)
velocities = torch_rand_float(-0.1, 0.1, (len(env_ids), self.num_dof), device=self.device)
self.dof_pos[env_ids] = positions
self.dof_vel[env_ids] = velocities
env_ids_int32 = env_ids.to(dtype=torch.int32)
self.gym.set_actor_root_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.initial_root_states),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
self.gym.set_dof_state_tensor_indexed(self.sim,
gymtorch.unwrap_tensor(self.dof_state),
gymtorch.unwrap_tensor(env_ids_int32), len(env_ids_int32))
# to_target = self.targets[env_ids] - self.initial_root_states[env_ids, 0:3]
# to_target[:, 2] = 0.0
# self.prev_potentials[env_ids] = -torch.norm(to_target, p=2, dim=-1) / self.dt
# self.potentials[env_ids] = self.prev_potentials[env_ids].clone()
self.progress_buf[env_ids] = 0
self.reset_buf[env_ids] = 0
def pre_physics_step(self, actions):
actions_tensor = torch.zeros(self.num_envs * self.num_dof, device=self.device, dtype=torch.float)
actions_tensor = actions.to(self.device).squeeze() * self.max_push_effort
forces = gymtorch.unwrap_tensor(actions_tensor)
self.gym.set_dof_actuation_force_tensor(self.sim, forces)
def post_physics_step(self):
self.progress_buf += 1
env_ids = self.reset_buf.nonzero(as_tuple=False).squeeze(-1)
if len(env_ids) > 0:
self.reset(env_ids)
self.compute_observations()
self.compute_reward()
#####################################################################
###=========================jit functions=========================###
#####################################################################
#, body_vel_x, body_vel_y, body_vel_z
@torch.jit.script
def compute_testtask_reward(body_angle,body_anglev,body_pos_x, body_pos_y, body_pos_z, wheel_velL, wheel_velR, body_vel_x, body_vel_y, body_vel_z,
reset_dist, reset_buf, progress_buf, max_episode_length):
# type: (Tensor, Tensor, Tensor, Tensor, Tensor, Tensor, Tensor, Tensor, Tensor, Tensor, float, Tensor, Tensor, float) -> Tuple[Tensor, Tensor]
# reward is combo of angle deviated from upright, velocity of cart, and velocity of pole moving
reward = 1 - 2*body_angle**2 - 0.005*body_anglev**2 - 0.1*torch.abs(body_vel_x-10) - 0.00001*(wheel_velL**2 + wheel_velR**2)
# adjust reward for reset agents
# reward = torch.where(torch.abs(cart_pos) > reset_dist, torch.ones_like(reward) * -2.0, reward)
# reward = torch.where(body_angle<np.pi / 2, torch.ones_like(reward) * -2.0, reward)
reset = torch.where(torch.abs(body_pos_z) > 5, torch.ones_like(reset_buf), reset_buf)
reset = torch.where(torch.abs(body_angle) > math.pi/2, torch.ones_like(reset_buf), reset)
reset = torch.where(torch.abs(body_pos_x) >= reset_dist, torch.ones_like(reset_buf), reset)
reset = torch.where(progress_buf >= max_episode_length - 1, torch.ones_like(reset_buf), reset)
return reward, reset