Hi @gudwnzm
Here you can find a simple and quick integration of the A* algorithm (code from the next blog) with the Isaac Sim Navigation Sample
The modification is done in the simple_robot_controller.py
file (...exts/omni.isaac.demos/omni/isaac/demos/utils/simple_robot_controller.py
)
simple_robot_controller.py (15.2 KB)
# Copyright (c) 2018-2020, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#
import math
import carb
from pxr import UsdGeom, Gf
from omni.isaac.core.utils.rotations import quat_to_euler_angles
from omni.isaac.dynamic_control import _dynamic_control
import numpy as np
from omni.debugdraw import _debugDraw
from .quintic_path_planner import QuinticPolynomial, quintic_polynomials_planner
from .stanley_control import State, pid_control, stanley_control, normalize_angle, Kp, calc_target_index
# 1. +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
import heapq
import matplotlib.pyplot as plt
from scipy.interpolate import interp1d
grid = np.array([
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0, 0, 0, 0, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 1, 1, 1, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0],
[0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0, 0]])
distance_per_square = 0.25
def heuristic(a, b):
return np.sqrt((b[0] - a[0]) ** 2 + (b[1] - a[1]) ** 2)
def astar(array, start, goal):
neighbors = [(0,1),(0,-1),(1,0),(-1,0),(1,1),(1,-1),(-1,1),(-1,-1)]
close_set = set()
came_from = {}
gscore = {start:0}
fscore = {start:heuristic(start, goal)}
oheap = []
heapq.heappush(oheap, (fscore[start], start))
while oheap:
current = heapq.heappop(oheap)[1]
if current == goal:
data = []
while current in came_from:
data.append(current)
current = came_from[current]
return data
close_set.add(current)
for i, j in neighbors:
neighbor = current[0] + i, current[1] + j
tentative_g_score = gscore[current] + heuristic(current, neighbor)
if 0 <= neighbor[0] < array.shape[0]:
if 0 <= neighbor[1] < array.shape[1]:
if array[neighbor[0]][neighbor[1]] == 1:
continue
else:
# array bound y walls
continue
else:
# array bound x walls
continue
if neighbor in close_set and tentative_g_score >= gscore.get(neighbor, 0):
continue
if tentative_g_score < gscore.get(neighbor, 0) or neighbor not in [i[1]for i in oheap]:
came_from[neighbor] = current
gscore[neighbor] = tentative_g_score
fscore[neighbor] = tentative_g_score + heuristic(neighbor, goal)
heapq.heappush(oheap, (fscore[neighbor], neighbor))
return False
# 1. -------------------------------------------------------------------------
def calc_speed_profile(cyaw, max_speed, target_speed, min_speed=1):
speed_profile = np.array(cyaw) / max([abs(c) for c in cyaw]) * max_speed
# direction = 1.0
# # Set stop point
# for i in range(len(cyaw) - 1):
# dyaw = abs(cyaw[i + 1] - cyaw[i])
# switch = math.pi / 4.0 <= dyaw < math.pi / 2.0
# if switch:
# direction *= -1
# if direction != 1.0:
# speed_profile[i] = -target_speed
# else:
# speed_profile[i] = target_speed
# if switch:
# speed_profile[i] = 0.0
# speed down
res = min(int(len(cyaw) / 3), int(max_speed * 60))
# # print("slow", slow, len(cyaw))
for i in range(1, res):
speed_profile[-i] = min(speed_profile[-i], speed_profile[-i] / (float(res - i)) ** 0.5) # / (res))
if speed_profile[-i] <= min_speed:
speed_profile[-i] = min_speed
return speed_profile
class RobotController:
def __init__(
self,
stage,
dc,
articulation_path,
odom_prim_path,
wheel_joint_names,
wheel_speed,
goal_offset_threshold,
wheel_base,
wheel_radius,
):
self._stage = stage
self._stage_unit = UsdGeom.GetStageMetersPerUnit(self._stage)
self._dc = dc
self._articulation_path = articulation_path
self._odom_prim_path = odom_prim_path
self._wheel_joint_names = wheel_joint_names
self._wheel_speed = wheel_speed
self._goal_offset_threshold = goal_offset_threshold
self._reached_goal = [True, True]
self._enable_navigation = False
self._goal = [4.00, 4.00, 0]
self._go_forward = False
self._wheel_base = wheel_base
self._wheel_radius = wheel_radius
self.state = State(wheel_base, x=0, y=0, yaw=0, v=0)
self.target_idx = 0
self._debugDraw = _debugDraw.acquire_debug_draw_interface()
self.cx = []
def _get_odom_data(self):
self.imu = self._dc.get_rigid_body(self._odom_prim_path)
imu_pose = self._dc.get_rigid_body_pose(self.imu)
roll, pitch, yaw = quat_to_euler_angles(np.array([imu_pose.r.w, imu_pose.r.x, imu_pose.r.y, imu_pose.r.z]))
# print(roll, pitch, yaw)
self.current_robot_translation = [imu_pose.p.x, imu_pose.p.y, imu_pose.p.z]
self.current_robot_translation = [i * self._stage_unit for i in self.current_robot_translation]
self.current_robot_orientation = [normalize_angle(roll), normalize_angle(pitch), normalize_angle(yaw)]
self.current_speed = self._dc.get_rigid_body_local_linear_velocity(self.imu).x * self._stage_unit
def reached_goal(self):
return self._reached_goal[0] and self._reached_goal[1]
def get_goal(self):
return self._goal
def draw_path(self, step=None):
if self._enable_navigation and not self._reached_goal[0]:
for i in range(len(self.cx) - 1):
# self._debugDraw.draw_line(
# carb.Float3(self.cx[i], self.cy[i], 14),
# self.y_color,
# carb.Float3(self.cx[i] + 20 * math.cos(self.cyaw[i]), self.cy[i] + 20 * math.sin(self.cyaw[i]), 14),
# self.y_color,
# )
self._debugDraw.draw_line(
carb.Float3(self.cx[i] / self._stage_unit, self.cy[i] / self._stage_unit, 0.14 / self._stage_unit),
self.argb[i],
carb.Float3(
self.cx[i + 1] / self._stage_unit, self.cy[i + 1] / self._stage_unit, 0.14 / self._stage_unit
),
self.argb[i - 1],
)
def update(self, step):
v = 0
w = 0
if self._enable_navigation:
self._get_odom_data()
# self.draw_path()
theta = self.current_robot_orientation[2]
theta_goal = self._goal[2]
theta_diff = math.atan2(math.sin(theta_goal - theta), math.cos(theta_goal - theta))
x_diff = float(self._goal[0]) - self.current_robot_translation[0]
y_diff = float(self._goal[1]) - self.current_robot_translation[1]
rho = np.hypot(x_diff, y_diff)
self.state = State(
self._wheel_base * Kp,
x=self.current_robot_translation[0],
y=self.current_robot_translation[1],
yaw=self.current_robot_orientation[2] % (2 * np.pi),
v=self.current_speed,
)
self._reached_goal = [
rho < self._goal_offset_threshold[0] or self.rotate_only and rho < self._goal_offset_threshold[0] * 5,
abs(theta_diff) <= self._goal_offset_threshold[1],
]
if self._reached_goal[0] and self._reached_goal[1]:
self.control_command(0, 0)
self._enable_navigation = False
return
if not self.rotate_only:
ai = pid_control(self.sp[self.target_idx], self.state.v) / step
di, self.target_idx = stanley_control(self.state, self.cx, self.cy, self.cyaw, self.target_idx)
self.state.update(ai, di, step)
v = self.state.v
w = self.state.w
if self._reached_goal[0] or self.rotate_only:
if self._reached_goal[0]:
self.rotate_only = True
v = 0
if theta_diff > 0:
w = min(((theta_diff) * Kp / step), 1)
else:
w = max(((theta_diff) * Kp / step), -1)
# print(rho, abs(theta_diff), v, w, self.sp[self.target_idx], self.current_speed, self.target_idx)
kw = 0.5
# Allow additional steering to use differential drive (backwards spin on one wheel to tighten the cornering radius)
if not self._reached_goal[0] and v > 0:
kw = 0.5 + abs((self._wheel_base * w) / v) * (0.5 * Kp / step)
# print(kw)
command_left = (v - kw * self._wheel_base * w) / self._wheel_radius
command_right = (v + kw * self._wheel_base * w) / self._wheel_radius
# print(command_left, command_right)
self.control_command(command_left, command_right)
def control_setup(self):
self.ar = self._dc.get_articulation(self._articulation_path)
self.wheel_left = self._dc.find_articulation_dof(self.ar, self._wheel_joint_names[0])
self.wheel_right = self._dc.find_articulation_dof(self.ar, self._wheel_joint_names[1])
self.vel_props = _dynamic_control.DofProperties()
# self.vel_props.drive_mode = _dynamic_control.DRIVE_VEL
self.vel_props.damping = 1e7
self.vel_props.stiffness = 0
self._dc.set_dof_properties(self.wheel_left, self.vel_props)
self._dc.set_dof_properties(self.wheel_right, self.vel_props)
def control_command(self, left_wheel_speed, right_wheel_speed):
# Wake up articulation every move command to ensure commands are applied
self._dc.wake_up_articulation(self.ar)
# Normalizes both wheels speed if any speed will be clipped
if abs(left_wheel_speed) > self._wheel_speed[0]:
factor = abs(self._wheel_speed[0] / left_wheel_speed)
right_wheel_speed = right_wheel_speed * factor
left_wheel_speed = left_wheel_speed * factor
if abs(right_wheel_speed) > self._wheel_speed[1]:
factor = abs(self._wheel_speed[1] / right_wheel_speed)
left_wheel_speed = left_wheel_speed * factor
right_wheel_speed = right_wheel_speed * factor
self._dc.set_dof_velocity_target(self.wheel_left, left_wheel_speed)
self._dc.set_dof_velocity_target(self.wheel_right, right_wheel_speed)
def set_goal(self, x, y, theta, sv=0.5, sa=0.05, gv=0.5, ga=0.05, max_speed=2.0):
theta = normalize_angle(theta)
self._goal = [x, y, theta]
max_accel = 1.5 # max accel [m/ss]
max_jerk = 0.3 # max jerk [m/sss]
self._get_odom_data()
self.target_idx = 0
x_diff = self.current_robot_translation[0] - x
y_diff = self.current_robot_translation[1] - y
rho = np.hypot(x_diff, y_diff)
if rho / 5.0 > self._goal_offset_threshold[0]:
# 2. +++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++++
start = (int(self.current_robot_translation[0] / distance_per_square),
int(self.current_robot_translation[1] / distance_per_square))
goal = (int(x / distance_per_square), int(y / distance_per_square))
route = astar(grid, start, goal)
route = route + [start]
route = route[::-1]
x_coords = []
y_coords = []
for i in (range(0,len(route))):
x_coords.append(route[i][0])
y_coords.append(route[i][1])
points = np.stack((x_coords, y_coords), axis=1)
distance = np.cumsum( np.sqrt(np.sum( np.diff(points, axis=0)**2, axis=1 )) )
distance = np.insert(distance, 0, 0)/distance[-1]
interpolations_methods = ['slinear', 'quadratic', 'cubic']
alpha = np.linspace(0, 1, len(route) * 50)
print(alpha.shape)
interpolator = interp1d(distance, points, kind="quadratic", axis=0)
x, y = interpolator(alpha).T
# plot map and path
fig, ax = plt.subplots(figsize=(5,5))
ax.imshow(grid, cmap=plt.cm.Dark2)
ax.scatter(start[1],start[0], marker = "*", color = "yellow", s = 200)
ax.scatter(goal[1],goal[0], marker = "*", color = "red", s = 200)
ax.plot(y_coords,x_coords, color = "black")
ax.scatter(y, x, color = "red")
plt.show()
self.cx = x
self.cy = y
self.ck = np.ones_like(self.cx)
self.cyaw = np.zeros_like(self.cx)
# self.tt, self.cx, self.cy, self.cyaw, self.ck, s, j = quintic_polynomials_planner(
# self.current_robot_translation[0],
# self.current_robot_translation[1],
# self.current_robot_orientation[2],
# sv,
# sa,
# x,
# y,
# theta,
# gv,
# ga,
# max_accel,
# max_jerk,
# 1 / 60.0,
# )
# 2. -------------------------------------------------------------------------
self.cx = np.array(self.cx)
self.cy = np.array(self.cy)
self.sp = calc_speed_profile(np.array(self.ck), max_speed, 0.5, 0.05)
color = [(0, t / np.max(self.sp), 0) for t in self.sp]
self.y_color = int.from_bytes(b"\xff\xff\x00\x00", byteorder="big")
rgb_bytes = [(np.clip(c, 0, 1.0) * 255).astype("uint8").tobytes() for c in color]
argb_bytes = [b"\xff" + b for b in rgb_bytes]
self.argb = [int.from_bytes(b, byteorder="big") for b in argb_bytes]
self.rotate_only = False
else:
self.rotate_only = True
def enable_navigation(self, flag):
self._enable_navigation = flag
There are basically 2 changes indicated between +++++
and ------
-
the A* start algorithm and the grid. The grid is 20x20 and for the integration, I use a mapping of 25cm per grid square side. The obstacles in the grid are fixed and they are there only to showcase
-
Calling the planer and configuring some variables for navigation. Note that A* is only a path search algorithm. It does not take care of setting the robot velocity setpoints to reach the goal. Current integration set some values required for the last task to 1 or 0 (because the idea is to show the use to the A*)… You have here room for improvement on which to work :)
OFF-TOPIC:
If you want to display the matplotib chart inside Omniverse, as shown in the video you can use the external extension GitHub - Toni-SM/semu.data.visualizer: Display Matplotlib graphics and OpenCV images inside NVIDIA Omniverse apps
And talking about extensions… the following event will take place today
More details here in the Omniverse discord server: