Thank you for your reply. What I want is to control when the image is publish, not reducing the publish rate. Indeed , service/request seems not a good approach here. So wrote a standalone script to do it. The main structure is as follow.
class RobotOperation(Node):
def __init__(self):
super().__init__("robot_operation")
self.timeline = omni.timeline.get_timeline_interface()
usd_path = "/home/shui/alfresa_pick_and_place/models/alfresa_cross_belt_default_4.usd"
omni.usd.get_context().open_stage(usd_path)
# Get current stage:
stage = omni.usd.get_context().get_stage()
self.world = World(stage_units_in_meters=1.0)
assets_root_path = get_assets_root_path()
if assets_root_path is None:
carb.log_error("Could not find Isaac Sim assets folder")
simulation_app.close()
sys.exit()
self.rp = rep.create.render_product("/world/static_camera_1", resolution=(2048, 1024))
rgbd_camera = self.world.scene.add(
Camera(
name="static_camera_1",
prim_path="/world/static_camera_1",
render_product_path=self.rp.path,
)
)
self.render_product = rgbd_camera._render_product_path
rgbd_camera.initialize()
# self.evt = rep.trigger.on_custom_event(event_name="CAMERA_SHOT")
# self.rp.hydra_texture.set_updates_enabled(False)
self.publish_camera_info(rgbd_camera)
self.publish_rgb(rgbd_camera)
self.publish_depth(rgbd_camera)
# self.publish_pointcloud_from_depth(rgbd_camera)
self.rp.hydra_texture.set_updates_enabled(False)
# Required for editing the SDGPipeline graph which exists in the Session Layer
with Usd.EditContext(stage, stage.GetSessionLayer()):
# Get name of rendervar for RGB sensor type
rv_rgb = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
sd.SensorType.Rgb.name
)
# Get path to IsaacSimulationGate node in RGB pipeline
self.rgb_camera_gate_path = omni.syntheticdata.SyntheticData._get_node_path(
rv_rgb + "IsaacSimulationGate", self.render_product
)
rv_depth = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
sd.SensorType.DistanceToImagePlane.name
)
# Get path to IsaacSimulationGate node in Depth pipeline
self.depth_camera_gate_path = omni.syntheticdata.SyntheticData._get_node_path(
rv_depth + "IsaacSimulationGate", self.render_product
)
# Get path to IsaacSimulationGate node in CameraInfo pipeline
self.camera_info_gate_path = omni.syntheticdata.SyntheticData._get_node_path(
"PostProcessDispatch" + "IsaacSimulationGate",
self.render_product,
)
# self.capture_triggered = False
self.srv = self.create_service(TriggerSrv, "/static_camera_1/capture", self.capture_cb)
self.world.reset()
def publish_camera_info(self, camera: Camera):
from isaacsim.ros2.bridge import read_camera_info
# The following code will link the camera's render product and publish the data to the specified topic name.
render_product = camera._render_product_path
topic_name = "/color/camera_info"
queue_size = 1
node_namespace = camera.name
frame_id = camera.prim_path.split("/")[-1] # This matches what the TF tree is publishing.
self.caminfo_writer = rep.WriterRegistry.get("ROS2PublishCameraInfo")
camera_info, _ = read_camera_info(render_product_path=render_product)
self.caminfo_writer.initialize(
frameId=frame_id,
nodeNamespace=node_namespace,
queueSize=queue_size,
topicName=topic_name,
width=camera_info.width,
height=camera_info.height,
projectionType=camera_info.distortion_model,
k=camera_info.k.reshape([1, 9]),
r=camera_info.r.reshape([1, 9]),
p=camera_info.p.reshape([1, 12]),
physicalDistortionModel=camera_info.distortion_model,
physicalDistortionCoefficients=camera_info.d,
)
rep.WriterRegistry.attach(self.caminfo_writer, [render_product], trigger=None)
gate_path = omni.syntheticdata.SyntheticData._get_node_path(
"PostProcessDispatch" + "IsaacSimulationGate", render_product
)
# Set step input of the Isaac Simulation Gate nodes upstream of ROS publishers to control their execution rate
og.Controller.attribute(gate_path + ".inputs:step").set(1)
return
def publish_rgb(self, camera: Camera):
# The following code will link the camera's render product and publish the data to the specified topic name.
render_product = camera._render_product_path
topic_name = "/color/image_rect"
queue_size = 1
node_namespace = camera.name
frame_id = camera.prim_path.split("/")[-1] # This matches what the TF tree is publishing.
rv = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
sd.SensorType.Rgb.name
)
self.rgb_writer = rep.WriterRegistry.get(rv + "ROS2PublishImage")
self.rgb_writer.initialize(
frameId=frame_id,
nodeNamespace=node_namespace,
queueSize=queue_size,
topicName=topic_name,
)
rep.WriterRegistry.attach(self.rgb_writer, [render_product], trigger=None)
# Set step input of the Isaac Simulation Gate nodes upstream of ROS publishers to control their execution rate
gate_path = omni.syntheticdata.SyntheticData._get_node_path(
rv + "IsaacSimulationGate", render_product
)
og.Controller.attribute(gate_path + ".inputs:step").set(1)
return
def publish_depth(self, camera: Camera):
# The following code will link the camera's render product and publish the data to the specified topic name.
render_product = camera._render_product_path
topic_name = "/depth/image_rect"
queue_size = 1
node_namespace = camera.name
frame_id = camera.prim_path.split("/")[-1] # This matches what the TF tree is publishing.
rv = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
sd.SensorType.DistanceToImagePlane.name
)
self.depth_writer = rep.WriterRegistry.get(rv + "ROS2PublishImage")
self.depth_writer.initialize(
frameId=frame_id,
nodeNamespace=node_namespace,
queueSize=queue_size,
topicName=topic_name,
)
rep.WriterRegistry.attach(self.depth_writer, [render_product], trigger=None)
# Set step input of the Isaac Simulation Gate nodes upstream of ROS publishers to control their execution rate
gate_path = omni.syntheticdata.SyntheticData._get_node_path(
rv + "IsaacSimulationGate", render_product
)
og.Controller.attribute(gate_path + ".inputs:step").set(1)
return
def capture_cb(self, request, response):
try:
# writers = [self.rgb_writer, self.depth_writer, self.caminfo_writer, self.points_writer]
self.capture_triggered = True
response.success = True
response.message = "Captured one frame"
except Exception as e:
response.success = False
response.message = f"{e}"
return response
def run_simulation(self):
i = 0
reset_needed = False
writers = [self.rgb_writer, self.depth_writer, self.caminfo_writer]
while simulation_app.is_running():
self.world.step(render=True)
rclpy.spin_once(self, timeout_sec=0.0)
if self.world.is_stopped() and not reset_needed:
reset_needed = True
if self.world.is_playing():
if reset_needed:
self.world.reset()
# self.my_controller.reset()
reset_needed = False
if self.capture_triggered:
self.caminfo_writer.schedule_write()
self.rgb_writer.schedule_write()
self.depth_writer.schedule_write()
self.capture_triggered = False
# Cleanup
self.timeline.stop()
self.destroy_node()
simulation_app.close()
if __name__ == "__main__":
rclpy.init()
pick_place = RobotOperation()
pick_place.run_simulation()
Based on this structure, I tried following methods.
(1) a method as above, attach render product to writer with argument trigger=None rep.WriterRegistry.attach(self.rgb_writer, [render_product],trigger=None), then schedule_write when capture service is received.
(2) attach render products to writers when a capture service message is received, detach them if not.
(3) self.rp.hydra_texture.set_updates_enabled(True) when a capture service is received, self.rp.hydra_texture.set_updates_enabled(False) if not.
(4) When a capture service received
gate_path = omni.syntheticdata.SyntheticData._get_node_path(
rv + "IsaacSimulationGate", render_product
)
og.Controller.attribute(gate_path + ".inputs:step").set(1)
if not og.Controller.attribute(gate_path + ".inputs:step").set(0)
All of them does not stop streaming. Please help!