This is our code of trying to grasp difference objects, which contains many dependencies and therefore cannot run directly. Most of the code is focused on adding and placing objects, followed by computing the grasp pose, etc. In the final part, we use set_joint_position_targets
to control the Franka robot.
import os
import random
import sys
import json
import numpy as np
import argparse
parser = argparse.ArgumentParser()
args = parser.parse_args()
current_dir = os.path.abspath(os.path.dirname(__file__))
sys.path.append(current_dir)
from isaacsim import SimulationApp
simulation_app = SimulationApp({"headless": False}) # False
from genmanip.core.loading.loading import (
relate_franka_from_data,
load_world_xform_prim,
get_object_list,
create_camera_list,
)
from genmanip.core.pointcloud.pointcloud import (
get_mesh_info,
get_mesh_info_by_load,
get_current_meshList,
objectList2meshList,
)
from genmanip.core.random_place.random_place import place_object_to_object_by_relation
from genmanip.core.robot.franka import get_franka_PD_controller
from genmanip.core.sensor.camera import set_camera_look_at, get_src
from genmanip.core.usd_utils.prim_utils import (
add_usd_to_world,
resize_object,
set_colliders,
resize_object_by_lwh,
)
from genmanip.demogen.planning.planning import prepare_grasp_motion_planning_payload
from genmanip.thirdparty.anygrasp import get_init_grasp
from genmanip.thirdparty.curobo_planner import CuroboFrankaPlanner
from genmanip.utils.file_utils import load_yaml, load_default_config
from genmanip.utils.pc_utils import compute_mesh_bbox, compute_aabb_lwh
from genmanip.utils.utils import setup_logger
from omni.isaac.core.utils.prims import delete_prim # type: ignore
from omni.isaac.core import World # type: ignore
import numpy as np
import random
from tqdm import tqdm
from filelock import SoftFileLock
from pathlib import Path
import json
simulation_app._carb_settings.set("/physics/cooking/ujitsoCollisionCooking", False)
logger = setup_logger()
default_config = load_default_config(
current_dir=current_dir, config_name="__None__.json", anygrasp_mode="local"
)
ASSETS_DIR = default_config["ASSETS_DIR"]
TEST_USD_NAME = default_config["TEST_USD_NAME"]
TABLE_UID = "aa49db8a801d402dac6cf1579536502c"
camera_data = load_yaml("configs/cameras/fixed_camera.yml")
world = World()
scene_xform, uuid = load_world_xform_prim(
os.path.join(ASSETS_DIR, "scene_usds/base_scenes/base.usda")
)
print(uuid)
camera_list = create_camera_list(camera_data, uuid)
franka_list = [relate_franka_from_data(uuid)]
for franka in franka_list:
world.scene.add(franka)
object_list = get_object_list(uuid, scene_xform, TABLE_UID)
meshDict = objectList2meshList(object_list)
for obj in object_list.values():
set_colliders(obj.prim_path, "convexDecomposition")
world.reset()
while (
get_src(camera_list["obs_camera"], "depth") is None
or get_src(camera_list["realsense"], "depth") is None
or get_src(camera_list["camera1"], "depth") is None
):
world.step()
franka_cfg = load_yaml("assets/robots/configs/franka.yml")
planner = CuroboFrankaPlanner(franka_cfg, world, franka_list[0].prim_path)
default_joint_positions = franka_list[0].get_joint_positions()
from object_utils.object_pool import ObjectPool
usd_list = os.listdir(os.path.join(ASSETS_DIR, "object_usds/objaverse_usd/"))
print(len(usd_list))
usd_list = [usd_path for usd_path in usd_list if usd_path.endswith(".usd")]
usd_list.sort()
print(len(usd_list))
object_pool = ObjectPool(os.path.join(current_dir, "assets/objects/object_v7.pickle"))
result_dir = Path(os.path.join(current_dir, "saved/filtered_objects"))
result_dir.mkdir(parents=True, exist_ok=True)
log_lock = SoftFileLock(str(result_dir / "log.lock"))
log_file = result_dir / "log.json"
with log_lock:
if not log_file.exists():
with open(log_file, "w") as f:
json.dump({}, f)
for usd_path in usd_list:
uid = usd_path.split(".")[0]
try:
process_lock = SoftFileLock(str(result_dir / f"{uid}.lock"), timeout=0)
with process_lock:
with log_lock:
with open(log_file, "r") as f:
log_data = json.load(f)
if uid in log_data:
continue
usd_path = os.path.join(ASSETS_DIR, "object_usds/objaverse_usd/", usd_path)
object_list[uid] = add_usd_to_world(
asset_path=usd_path,
prim_path=f"/World/{uuid}/obj_{uid}",
name=f"obj_{uid}",
translation=None,
orientation=[0.5, 0.5, 0.5, 0.5],
scale=[0.01, 0.01, 0.01], # [1.0, 1.0, 1.0],
add_rigid_body=True,
add_colliders=True,
collision_approximation="convexDecomposition",
)
object_info = object_pool.get_object_info(uid)
print(f"Finish adding object {uid}")
meshDict = objectList2meshList(
object_list,
os.path.join(
default_config["ASSETS_DIR"],
"mesh_data",
"grasp_detection",
),
)
meshlist = get_current_meshList(object_list, meshDict)
resize_object(
object_list[uid],
random.uniform(
object_info["scale"][0],
object_info["scale"][1],
),
meshlist[uid],
)
meshDict[uid] = get_mesh_info_by_load(
object_list[uid],
os.path.join(
default_config["ASSETS_DIR"],
"mesh_data",
"grasp_detection",
f"{uid}.obj",
),
)
franka_list[0].set_joint_positions(default_joint_positions)
meshlist = get_current_meshList(object_list, meshDict)
min_thickness = 0.1
aabb = compute_mesh_bbox(meshlist[uid])
if np.min(compute_aabb_lwh(aabb)) > min_thickness:
l, w, h = compute_aabb_lwh(aabb)
min_thickness_ratio = min_thickness / np.min([l, w])
min_thickness_ratio = max(
min_thickness_ratio, min_thickness / np.min([l, w])
)
l *= min_thickness_ratio
w *= min_thickness_ratio
h *= min_thickness_ratio
resize_object_by_lwh(
object_list[uid],
l=l,
w=w,
h=h,
mesh=meshlist[uid],
)
meshDict[uid] = get_mesh_info_by_load(
object_list[uid],
os.path.join(
default_config["ASSETS_DIR"],
"mesh_data",
"grasp_detection",
f"{uid}.obj",
),
)
IS_OK = place_object_to_object_by_relation(
uid,
"00000000000000000000000000000000",
object_list,
meshDict,
"on",
platform_uid="00000000000000000000000000000000",
)
meshlist = get_current_meshList(object_list, meshDict)
aabb = compute_mesh_bbox(meshlist[uid])
center = aabb.get_max_bound() + aabb.get_max_bound()
center = center / 2
target_center = [0.0, 0.0]
position = object_list[uid].get_world_pose()[0]
tar_position = position
tar_position[:2] = center[:2] - target_center - position[:2]
object_list[uid].set_world_pose(position=tar_position)
set_camera_look_at(
camera_list["camera1"],
object_list[uid],
azimuth=180.0,
)
for _ in tqdm(range(15)):
world.step(render=True)
meshlist = get_current_meshList(object_list, meshDict)
try:
init_grasp = get_init_grasp(
camera_list["camera1"],
meshlist[uid],
address=default_config["ANYGRASP_ADDR"],
port=default_config["ANYGRASP_PORT"],
)
except Exception as e:
print(e)
delete_prim(f"/World/{uuid}/obj_{uid}")
object_list.pop(uid)
meshDict.pop(uid)
continue
action_list = prepare_grasp_motion_planning_payload(init_grasp, steps=30)
world = World()
franka_view = get_franka_PD_controller(franka_list[0], [2.0] * 9)
paths = []
object_position = object_list[uid].get_world_pose()[0]
for idx, target in tqdm(enumerate(action_list)):
position = target["translation"]
franka_p, _ = franka_list[0].get_world_pose()
position = position - franka_p
orientation = target["orientation"]
results = planner.plan(
position,
orientation,
franka_list[0].get_joints_state(),
)
actions = []
if results is not None:
for res in results:
if target["grasp"]:
actions.append(np.concatenate([res, [0.00, 0.00]]).tolist())
else:
actions.append(np.concatenate([res, [0.04, 0.04]]).tolist())
if idx == 1:
for _ in range(10):
actions.append(
np.concatenate([actions[-1][:7], [0.0, 0.0]]).tolist()
)
while actions:
action = actions.pop(0)
franka_view.set_joint_position_targets(action)
world.step(render=True)
after_object_position = object_list[uid].get_world_pose()[0]
is_success = False
if (
after_object_position[2] - object_position[2] > 0.1
and abs(after_object_position[0] - object_position[0]) < 0.2
and abs(after_object_position[1] - object_position[1]) < 0.2
):
is_success = True
print("Success")
else:
print("Failed")
with log_lock:
with open(log_file, "r") as f:
log_data = json.load(f)
log_data[uid] = is_success
with open(log_file, "w") as f:
json.dump(log_data, f)
delete_prim(f"/World/{uuid}/obj_{uid}")
object_list.pop(uid)
meshDict.pop(uid)
except Exception as e:
if uid in object_list and uid in meshDict:
delete_prim(f"/World/{uuid}/obj_{uid}")
object_list.pop(uid)
meshDict.pop(uid)
print(e)
continue
simulation_app.close()