Hello,
I am trying to publish rgb camera frames through a custom omnigraph node I am creating like so:
class ROS2PublishColorCameraFrameInternalState():
def __init__(self):
self.initialized = False
self.node = rclpy.create_node('camera_frame_publisher')
self.camera_info = CameraInfo()
self.camera_info.header.frame_id = "front_camera_frame"
def get_camera_frame_from_isaac_sim(self):
if not self.camera:
return
try:
rgba = self.camera.get_rgba()
except Exception as e:
return
color_camera_frame = rgba[:,:,:3]
return color_camera_frame
def publish_camera_frame(self, db):
try:
image_np = self.get_camera_frame_from_isaac_sim()
except Exception as e:
print(e)
return
if image_np is not None:
image_msg = self.bridge.cv2_to_imgmsg(image_np, encoding="rgb8")
self.camera_frame_publisher.publish(image_msg)
def create_publisher(self, db):
self.bridge = CvBridge()
self.camera = Camera(
prim_path=db.inputs.camera_prim_path
)
self.camera.initialize()
self.camera.add_motion_vectors_to_frame()
qos_profile = QoSProfile(
reliability=QoSReliabilityPolicy.RELIABLE,
durability=QoSDurabilityPolicy.TRANSIENT_LOCAL,
history=QoSHistoryPolicy.KEEP_LAST,
depth=10
)
self.camera_frame_publisher = self.node.create_publisher(
Image,
db.inputs.camera_frame_topic_name,
qos_profile=qos_profile)
print('Camera frame publisher created')
class ROS2PublishColorCameraFrame:
@staticmethod
def internal_state():
return ROS2PublishColorCameraFrameInternalState()
@staticmethod
def compute(db) -> bool:
state = db.internal_state
if not state.initialized:
state.create_publisher(db)
state.publish_camera_frame(db)
rclpy.spin_once(state.node, timeout_sec=0.01)
return True
@staticmethod
def release(node):
try:
state = ROS2PublishColorCameraFrameInternalState.per_node_internal_state(node)
except Exception:
state = None
pass
if state is not None:
state.node.destroy_node()
try:
rclpy.shutdown()
except:
pass
When just doing that, the frame rate drops to around 15-20. I was investigating ways to optimize this, primarily imitating the ROS2CameraHelper built-in node (can’t use it directly since I need to add some extra information and change the qosprofile). I managed to get to:
# Copyright (c) 2020-2023, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#
import traceback
import carb
import omni
import omni.replicator.core as rep
import omni.syntheticdata
import omni.syntheticdata._syntheticdata as sd
from omni.isaac.core_nodes import BaseWriterNode, WriterRequest
from omni.kit.viewport.utility import get_viewport_from_window_name
from pxr import Usd
class ROS2PublishColorCameraFrameInternalState(BaseWriterNode):
def __init__(self):
self.viewport = None
self.viewport_name = ""
self.resetSimulationTimeOnStop = False
super().__init__(initialize=False)
def post_attach(self, writer, render_product):
try:
omni.syntheticdata.SyntheticData.Get().set_node_attributes(
"IsaacReadSimulationTime", {"inputs:resetOnStop": self.resetSimulationTimeOnStop}, render_product
)
except:
pass
class ROS2PublishColorCameraFrame:
@staticmethod
def internal_state():
return ROS2PublishColorCameraFrameInternalState()
@staticmethod
def compute(db) -> bool:
if db.inputs.enabled is False:
if db.internal_state.initialized is False:
return True
else:
db.internal_state.custom_reset()
return True
sensor_type = db.inputs.type
if db.internal_state.initialized is False:
db.internal_state.initialized = True
stage = omni.usd.get_context().get_stage()
with Usd.EditContext(stage, stage.GetSessionLayer()):
if db.inputs.viewport:
db.log_warn(
"viewport input is deprecated, please use renderProductPath and the IsaacGetViewportRenderProduct to get a viewports render product path"
)
viewport_api = get_viewport_from_window_name(db.inputs.viewport)
if viewport_api:
db.internal_state.viewport = viewport_api
db.internal_state.viewport_name = db.inputs.viewport
if db.internal_state.viewport == None:
carb.log_warn("viewport name {db.inputs.viewport} not found")
db.internal_state.initialized = False
return False
viewport = db.internal_state.viewport
render_product_path = viewport.get_render_product_path()
else:
render_product_path = db.inputs.renderProductPath
if not render_product_path:
carb.log_warn("Render product not valid")
db.internal_state.initialized = False
return False
if stage.GetPrimAtPath(render_product_path) is None:
carb.log_warn("Render product no created yet, retrying on next call")
db.internal_state.initialized = False
return False
db.internal_state.resetSimulationTimeOnStop = db.inputs.resetSimulationTimeOnStop
writer = None
try:
if sensor_type == "rgb":
rv = omni.syntheticdata.SyntheticData.convert_sensor_type_to_rendervar(sd.SensorType.Rgb.name)
writer = rep.writers.get(rv + "ROS2PublishImage")
writer.initialize(
frameId=db.inputs.frameId,
nodeNamespace=db.inputs.nodeNamespace,
queueSize=db.inputs.queueSize,
topicName=db.inputs.topicName,
context=db.inputs.context,
)
else:
carb.log_error("type is not supported")
db.internal_state.initialized = False
return False
if writer is not None:
db.internal_state.append_writer(writer)
db.internal_state.attach_writers(render_product_path)
except Exception as e:
print(traceback.format_exc())
pass
else:
return True
@staticmethod
def release(node):
try:
state = ROS2PublishColorCameraFrameInternalState.per_node_internal_state(node)
except Exception:
state = None
pass
if state is not None:
state.reset()
This publishes the images correctly with fps around 50, however I’m having a hard time understanding the writer utility here. What is it? How can i update the header of the message sent?
Is there a better solution to what I’m doing?