Isaac sim How to use external data to control the movement of the robot arm

Isaac sim How to use external data to control the movement of the robot arm

code:
from omni.isaac.examples.base_sample import BaseSample
from omni.isaac.franka import Franka
from omni.isaac.core.objects import DynamicCuboid
from omni.isaac.franka.controllers import PickPlaceController
from omni.isaac.core.tasks import BaseTask
import numpy as np

class FrankaPlaying(BaseTask):

def __init__(self, name):
    super().__init__(name=name, offset=None)
    self.current_line = 0
    self._goal_position = np.array([-0.3, -0.3, 0.0515 / 2.0])
    self._task_achieved = False
    return

# 场景设置
def set_up_scene(self, scene):
    super().set_up_scene(scene)
    scene.add_default_ground_plane()
    self._cube = scene.add(DynamicCuboid(prim_path="/World/random_cube",
                                        name="fancy_cube",
                                        position=np.array([0.3, 0.3, 0.3]),
                                        scale=np.array([0.0515, 0.0515, 0.0515]),
                                        color=np.array([0, 0, 1.0])))
    self._franka = scene.add(Franka(prim_path="/World/Fancy_Franka",
                                    name="fancy_franka"))    
    return
file_object = None
# 定义全局变量,用于保存文件对象和文件路径
def next_line():
    global file_object
    if FrankaPlaying.file_object is None:
        # 第一次调用时打开文件
        FrankaPlaying.file_object = open('C:\data\output.txt', 'r')
    # 读取文件的下一行
    line = FrankaPlaying.file_object.readline().strip()
    line = np.array([line])
    if not line:
        #如果到达文件末尾,重新打开文件并返回第一行
        FrankaPlaying.file_object.close()
        FrankaPlaying.file_object = open('C:\data\output.txt', 'r')
        line = FrankaPlaying.file_object.readline().strip()
        line = np.array([line])
    return line

# 获取场景信息
def get_observations(self):
    cube_position, _ = self._cube.get_world_pose()
    current_joint_positions = FrankaPlaying.next_line()
    print("关节的位置为: ", current_joint_positions)
    observations = {
        self._franka.name: {
            "joint_positions": current_joint_positions,
        },
        self._cube.name: {
            "position": cube_position,
            "goal_position": self._goal_position
        }
    }
    return observations

# 每一步的操作
def pre_step(self, control_index, simulation_time):
    cube_position, _ = self._cube.get_world_pose()
    if not self._task_achieved and np.mean(np.abs(self._goal_position - cube_position)) < 0.02:
        #改变方块颜色
        self._cube.get_applied_visual_material().set_color(color=np.array([0, 1.0, 0]))
        self._task_achieved = True
    return

#重置
def post_reset(self):
    self._franka.gripper.set_joint_positions(self._franka.gripper.joint_opened_positions)
    self._cube.get_applied_visual_material().set_color(color=np.array([0, 0, 1.0]))
    self._task_achieved = False
    return

class HelloWorld(BaseSample):
def init(self) → None:
super().init()
return

def setup_scene(self):
    world = self.get_world()
    # 添加任务
    world.add_task(FrankaPlaying(name="my_first_task"))
    return
# 加载后
async def setup_post_load(self):
    self._world = self.get_world()
    self._franka = self._world.scene.get_object("fancy_franka")
    # 创建控制器
    self._controller = PickPlaceController(
        name="pick_place_controller",
        gripper=self._franka.gripper,
        robot_articulation=self._franka,
    )
    # 添加物理回调
    self._world.add_physics_callback("sim_step", callback_fn=self.physics_step)
    # 运行
    await self._world.play_async()
    return
# 重置
async def setup_post_reset(self):
    self._controller.reset()
    # 重置7

    await self._world.play_async()
    return

# 物理步
def physics_step(self, step_size):
    # 获取当前观察
    current_observations = self._world.get_observations()
    # 获取动作
    actions = self._controller.forward(
        picking_position=current_observations["fancy_cube"]["position"],
        placing_position=current_observations["fancy_cube"]["goal_position"],
        current_joint_positions=current_observations["fancy_franka"]["joint_positions"],
    )
    #  执行动作
    self._franka.apply_action(actions)
    if self._controller.is_done():
        self._world.pause()
    return

Hi @hiteam_yangyan - Following documents would be useful to you:

Let us know if you have any follow-up questions.