Publish one frame at a time to ros2 topic using Omingraph

Isaac Sim Version

5.0.0

Operating System

Ubuntu 24.04

Topic Description

This is a continuation of previous topic Capture camera data on trigger
I want exactly one frame of image is published when a trigger service is received. So I built following action graph.

Although it publishes images when the service is received, it is not restricted to one frame. Once the publish is triggered, it will continuously publish frames.
In order to solve this problem, I added “On Impulse Event” node and “Script node” to solve this problem. When a service message is received, a snippet of scripts, which set the Impulse to true, will be executed. It will trigger render product.

I also tried to add “Isaac Simulation Gate” node and “Trigger Gate”, but all of them does not stop publishing continuous images.

Is there anyway to publish only one frame at a time?

I found this in the Writer class. But it still does not stop camera from streaming.

    def schedule_write(self):
        """Manually schedule a write call to the writer

        Sends a ``writerTrigger`` event to schedule the writer for the current simulation frame. Used in conjunction with
        writer trigger set to ``None`` (ie. ``writer.attach(<render_product>, trigger=None``).

        .. note::
            The writer will not write data until the scheduled frame is rendered through subsequent ``update`` or ``step`` calls.

        Example:
            >>> import omni.replicator.core as rep
            >>> rp = rep.create.render_product(rep.create.camera(), (512, 512))
            >>> writer = rep.writers.get(
            ...     name="BasicWriter",
            ...     init_params={"output_dir": "_out", "rgb": True},
            ...     render_products=rp,
            ...     trigger=None,
            ... )
            >>> # ... initialize/step orchestrator
            >>> writer.schedule_write()
            >>> # ... step/update to render and write scheduled frame
        """
        send_og_event(f"writerTrigger-{self._writer_id}")

Also I tried the method in used in standalone_examples/api/isaacsim.ros2.bridge/camera_manual.py. But it still does not work.

Hi @suihe160 If you want to publish one frame at a time (when triggered once), I am not sure service/request would be a good approach. First you would need to connect “Request Data” from ROS2 Service Server Request to “Enable” of the Isaac Create Render Product. When you send request data True, it would enable the render of the camera images. If you send request data False, it would disable the render of the camera images. But it is hard to control that you only publish one frame.

Let’s take a step back: do you want to control when the image is published or do you just need to reduce the publish rate of the camera images? If reducing the publish rate satisfies your needs, you can take a look at this tutorial ROS2 Setting Publish Rates — Isaac Sim Documentation

Thank you for your reply. What I want is to control when the image is publish, not reducing the publish rate. Indeed , service/request seems not a good approach here. So wrote a standalone script to do it. The main structure is as follow.



class RobotOperation(Node):
    def __init__(self):
        super().__init__("robot_operation")

        self.timeline = omni.timeline.get_timeline_interface()
        usd_path = "/home/shui/alfresa_pick_and_place/models/alfresa_cross_belt_default_4.usd"
        omni.usd.get_context().open_stage(usd_path)

        # Get current stage:
        stage = omni.usd.get_context().get_stage()
        self.world = World(stage_units_in_meters=1.0)

        assets_root_path = get_assets_root_path()
        if assets_root_path is None:
            carb.log_error("Could not find Isaac Sim assets folder")
            simulation_app.close()
            sys.exit()

        self.rp = rep.create.render_product("/world/static_camera_1", resolution=(2048, 1024))
        rgbd_camera = self.world.scene.add(
            Camera(
                name="static_camera_1",
                prim_path="/world/static_camera_1",
                render_product_path=self.rp.path,
            )
        )
        self.render_product = rgbd_camera._render_product_path
        rgbd_camera.initialize()
        # self.evt = rep.trigger.on_custom_event(event_name="CAMERA_SHOT")
        # self.rp.hydra_texture.set_updates_enabled(False)

        self.publish_camera_info(rgbd_camera)
        self.publish_rgb(rgbd_camera)
        self.publish_depth(rgbd_camera)
        # self.publish_pointcloud_from_depth(rgbd_camera)
        self.rp.hydra_texture.set_updates_enabled(False)

        # Required for editing the SDGPipeline graph which exists in the Session Layer
        with Usd.EditContext(stage, stage.GetSessionLayer()):

            # Get name of rendervar for RGB sensor type
            rv_rgb = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
                sd.SensorType.Rgb.name
            )

            # Get path to IsaacSimulationGate node in RGB pipeline
            self.rgb_camera_gate_path = omni.syntheticdata.SyntheticData._get_node_path(
                rv_rgb + "IsaacSimulationGate", self.render_product
            )
            rv_depth = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
                sd.SensorType.DistanceToImagePlane.name
            )
            # Get path to IsaacSimulationGate node in Depth pipeline
            self.depth_camera_gate_path = omni.syntheticdata.SyntheticData._get_node_path(
                rv_depth + "IsaacSimulationGate", self.render_product
            )

            # Get path to IsaacSimulationGate node in CameraInfo pipeline
            self.camera_info_gate_path = omni.syntheticdata.SyntheticData._get_node_path(
                "PostProcessDispatch" + "IsaacSimulationGate",
                self.render_product,
            )

        # self.capture_triggered = False
        self.srv = self.create_service(TriggerSrv, "/static_camera_1/capture", self.capture_cb)
        self.world.reset()

    def publish_camera_info(self, camera: Camera):
        from isaacsim.ros2.bridge import read_camera_info

        # The following code will link the camera's render product and publish the data to the specified topic name.
        render_product = camera._render_product_path
        topic_name = "/color/camera_info"
        queue_size = 1
        node_namespace = camera.name
        frame_id = camera.prim_path.split("/")[-1]  # This matches what the TF tree is publishing.

        self.caminfo_writer = rep.WriterRegistry.get("ROS2PublishCameraInfo")
        camera_info, _ = read_camera_info(render_product_path=render_product)
        self.caminfo_writer.initialize(
            frameId=frame_id,
            nodeNamespace=node_namespace,
            queueSize=queue_size,
            topicName=topic_name,
            width=camera_info.width,
            height=camera_info.height,
            projectionType=camera_info.distortion_model,
            k=camera_info.k.reshape([1, 9]),
            r=camera_info.r.reshape([1, 9]),
            p=camera_info.p.reshape([1, 12]),
            physicalDistortionModel=camera_info.distortion_model,
            physicalDistortionCoefficients=camera_info.d,
        )
        rep.WriterRegistry.attach(self.caminfo_writer, [render_product], trigger=None)

        gate_path = omni.syntheticdata.SyntheticData._get_node_path(
            "PostProcessDispatch" + "IsaacSimulationGate", render_product
        )

        # Set step input of the Isaac Simulation Gate nodes upstream of ROS publishers to control their execution rate
        og.Controller.attribute(gate_path + ".inputs:step").set(1)
        return


    def publish_rgb(self, camera: Camera):
        # The following code will link the camera's render product and publish the data to the specified topic name.
        render_product = camera._render_product_path
        topic_name = "/color/image_rect"
        queue_size = 1
        node_namespace = camera.name
        frame_id = camera.prim_path.split("/")[-1]  # This matches what the TF tree is publishing.

        rv = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
            sd.SensorType.Rgb.name
        )
        self.rgb_writer = rep.WriterRegistry.get(rv + "ROS2PublishImage")
        self.rgb_writer.initialize(
            frameId=frame_id,
            nodeNamespace=node_namespace,
            queueSize=queue_size,
            topicName=topic_name,
        )
        rep.WriterRegistry.attach(self.rgb_writer, [render_product], trigger=None)

        # Set step input of the Isaac Simulation Gate nodes upstream of ROS publishers to control their execution rate
        gate_path = omni.syntheticdata.SyntheticData._get_node_path(
            rv + "IsaacSimulationGate", render_product
        )
        og.Controller.attribute(gate_path + ".inputs:step").set(1)

        return

    def publish_depth(self, camera: Camera):
        # The following code will link the camera's render product and publish the data to the specified topic name.
        render_product = camera._render_product_path
        topic_name = "/depth/image_rect"
        queue_size = 1
        node_namespace = camera.name
        frame_id = camera.prim_path.split("/")[-1]  # This matches what the TF tree is publishing.

        rv = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(
            sd.SensorType.DistanceToImagePlane.name
        )
        self.depth_writer = rep.WriterRegistry.get(rv + "ROS2PublishImage")
        self.depth_writer.initialize(
            frameId=frame_id,
            nodeNamespace=node_namespace,
            queueSize=queue_size,
            topicName=topic_name,
        )
        rep.WriterRegistry.attach(self.depth_writer, [render_product], trigger=None)

        # Set step input of the Isaac Simulation Gate nodes upstream of ROS publishers to control their execution rate
        gate_path = omni.syntheticdata.SyntheticData._get_node_path(
            rv + "IsaacSimulationGate", render_product
        )
        og.Controller.attribute(gate_path + ".inputs:step").set(1)

        return

    def capture_cb(self, request, response):
        try:
            # writers = [self.rgb_writer, self.depth_writer, self.caminfo_writer, self.points_writer]

            self.capture_triggered = True
            response.success = True
            response.message = "Captured one frame"
        except Exception as e:
            response.success = False
            response.message = f"{e}"
        return response

    def run_simulation(self):
        i = 0
        reset_needed = False
        writers = [self.rgb_writer, self.depth_writer, self.caminfo_writer]
        while simulation_app.is_running():
            self.world.step(render=True)
            rclpy.spin_once(self, timeout_sec=0.0)

            if self.world.is_stopped() and not reset_needed:
                reset_needed = True
            if self.world.is_playing():
                if reset_needed:
                    self.world.reset()
                    # self.my_controller.reset()
                    reset_needed = False

                if self.capture_triggered:
                    self.caminfo_writer.schedule_write()
                    self.rgb_writer.schedule_write()
                    self.depth_writer.schedule_write()
                    self.capture_triggered = False

        # Cleanup
        self.timeline.stop()
        self.destroy_node()
        simulation_app.close()


if __name__ == "__main__":
    rclpy.init()
    pick_place = RobotOperation()
    pick_place.run_simulation()

Based on this structure, I tried following methods.
(1) a method as above, attach render product to writer with argument trigger=None rep.WriterRegistry.attach(self.rgb_writer, [render_product],trigger=None), then schedule_write when capture service is received.

(2) attach render products to writers when a capture service message is received, detach them if not.

(3) self.rp.hydra_texture.set_updates_enabled(True) when a capture service is received, self.rp.hydra_texture.set_updates_enabled(False) if not.

(4) When a capture service received

        gate_path = omni.syntheticdata.SyntheticData._get_node_path(
            rv + "IsaacSimulationGate", render_product
        )
        og.Controller.attribute(gate_path + ".inputs:step").set(1)
if not og.Controller.attribute(gate_path + ".inputs:step").set(0)

All of them does not stop streaming. Please help!

I solved the problem. Method 3 works. (self.rp.hydra_texture.set_updates_enabled(True)).

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.