When following: ROS 2 Python Custom OmniGraph Node — Omniverse IsaacSim latest documentation
I receive the following error:
2024-06-12 08:10:06 [580,038ms] [Error] [omni.graph.core.plugin] /World/ActionGraph/ros2_custom_python_node: [/World/ActionGraph] Assertion raised in compute - ‘OgnROS2CustomPythonNodeDatabase’ object has no attribute ‘per_instance_state’
“/home/Documents/$USER/Kit/shared/exts/omni.new.extension/omni/new/extension/ogn/nodes/OgnROS2CustomPythonNo
de.py”, line 105, in compute
state = db.per_instance_state
From the documentation and reference this seems to be an attribute belonging to the super class, so it seems odd that this issue is occurring. I don’t really have too much more to go on regarding this issue.
I followed the tutorial in a fresh scene and on IsaacSim 2023.1.1
My code for reference (in case I did something wrong)
Reference Code
# Copyright (c) 2020-2024, NVIDIA CORPORATION. All rights reserved.
#
# NVIDIA CORPORATION and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA CORPORATION is strictly prohibited.
#
"""
This is the implementation of the OGN node defined in OgnROS2CustomPythonNode.ogn
"""
# Array or tuple values are accessed as numpy arrays so you probably need this import
import numpy
import omni
# OgnROS2CustomPythonNodeDatabase module is an autogenerated python module located in the extension and is used later on.
from omni.new.extension.ogn.OgnROS2CustomPythonNodeDatabase import OgnROS2CustomPythonNodeDatabase
import rclpy
from std_msgs.msg import Int32
# BaseResetNode class is used for resetting the node when stopping and playing
from omni.isaac.core_nodes import BaseResetNode
class OgnROS2CustomPythonNodeInternalState(BaseResetNode):
def __init__(self):
self.node = None
self.numA = None
self.numB = None
self.subscriptionA = None
self.subscriptionB = None
super().__init__(initialize=False)
def listener_callbackA(self, msg):
self.numA = msg.data
def listener_callbackB(self, msg):
self.numB = msg.data
def initialize_ros2_node(self, node_name):
try:
rclpy.init()
except:
pass
self.node = rclpy.create_node(node_name)
self.initialized = True
def create_subscriberA(self, topicName):
self.topicNameA = topicName
self.subscriptionA = self.node.create_subscription(
Int32,
self.topicNameA,
self.listener_callbackA,
10)
def create_subscriberB(self, topicName):
self.topicNameB = topicName
self.subscriptionB = self.node.create_subscription(
Int32,
self.topicNameB,
self.listener_callbackB,
10)
# Overriding a function from BaseResetNode.
# This is automatically called when simulation is stopped.
# This is will also be called when the OmniGraph node is released.
def custom_reset(self):
if self.node:
self.node.destroy_subscription(self.subscriptionA)
self.node.destroy_subscription(self.subscriptionB)
self.node.destroy_node()
self.numA = None
self.numB = None
self.subscriptionA = None
self.subscriptionB = None
self.node = None
self.initialized = False
rclpy.try_shutdown()
class OgnROS2CustomPythonNode:
"""
A custom ros2 python node that adds number from 2 separate topics and outputs the
sum as a OmniGraph Int
"""
@staticmethod
def internal_state():
return OgnROS2CustomPythonNodeInternalState()
@staticmethod
def compute(db) -> bool:
"""Compute the outputs from the current input"""
state = db.per_instance_state
try:
if not state.initialized:
state.initialize_ros2_node('custom_python_adder')
if (state.subscriptionA == None):
state.create_subscriberA(db.inputs.topicInputA)
if (state.subscriptionB == None):
state.create_subscriberB(db.inputs.topicInputB)
rclpy.spin_once(state.node, timeout_sec=0.01)
if state.numA != None and state.numB != None:
db.outputs.sum = state.numA + state.numB
db.outputs.execOut = omni.graph.core.ExecutionAttributeState.ENABLED
# Set the numA and numB to None to ensure we only calculate and trigger execOut when new messages are received.
state.numA = None
state.numB = None
except Exception as error:
# If anything causes your compute to fail report the error and return False
db.log_error(str(error))
return False
# Even if inputs were edge cases like empty arrays, correct outputs mean success
return True
@staticmethod
def release(node):
try:
state = OgnROS2CustomPythonNodeDatabase.per_instance_internal_state(node)
except Exception:
state = None
if state is not None:
state.custom_reset()
Generated Database Code
"""Support for simplified access to data on nodes of type OmniNewExtensionExtension.ROS2CustomPythonNode
A custom ros2 node
"""
import sys
import traceback
import omni.graph.core as og
import omni.graph.core._omni_graph_core as _og
import omni.graph.tools.ogn as ogn
class OgnROS2CustomPythonNodeDatabase(og.Database):
"""Helper class providing simplified access to data on nodes of type OmniNewExtensionExtension.ROS2CustomPythonNode
Class Members:
node: Node being evaluated
Attribute Value Properties:
Inputs:
inputs.execIn
inputs.topicInputA
inputs.topicInputB
Outputs:
outputs.execOut
outputs.sum
"""
# Imprint the generator and target ABI versions in the file for JIT generation
GENERATOR_VERSION = (1, 41, 3)
TARGET_VERSION = (2, 139, 12)
# This is an internal object that provides per-class storage of a per-node data dictionary
PER_NODE_DATA = {}
# This is an internal object that describes unchanging attributes in a generic way
# The values in this list are in no particular order, as a per-attribute tuple
# Name, Type, ExtendedTypeIndex, UiName, Description, Metadata,
# Is_Required, DefaultValue, Is_Deprecated, DeprecationMsg
# You should not need to access any of this data directly, use the defined database interfaces
INTERFACE = og.Database._get_interface([
('inputs:execIn', 'execution', 0, 'Exec In', 'Execution Input', {ogn.MetadataKeys.DEFAULT: '0'}, True, 0, False, ''),
('inputs:topicInputA', 'string', 0, 'Topic Input A', 'Topic Input A', {ogn.MetadataKeys.DEFAULT: '"/numberA"'}, True, "/numberA", False, ''),
('inputs:topicInputB', 'string', 0, 'Topic Input B', 'Topic Input B', {ogn.MetadataKeys.DEFAULT: '"/attributeB"'}, True, "/attributeB", False, ''),
('outputs:execOut', 'execution', 0, 'Execution Ouput', 'Execution Output', {}, True, None, False, ''),
('outputs:sum', 'int', 0, 'Sum', 'sum', {}, True, None, False, ''),
])
@classmethod
def _populate_role_data(cls):
"""Populate a role structure with the non-default roles on this node type"""
role_data = super()._populate_role_data()
role_data.inputs.execIn = og.AttributeRole.EXECUTION
role_data.inputs.topicInputA = og.AttributeRole.TEXT
role_data.inputs.topicInputB = og.AttributeRole.TEXT
role_data.outputs.execOut = og.AttributeRole.EXECUTION
return role_data
class ValuesForInputs(og.DynamicAttributeAccess):
LOCAL_PROPERTY_NAMES = {"execIn", "topicInputA", "topicInputB", "_setting_locked", "_batchedReadAttributes", "_batchedReadValues"}
"""Helper class that creates natural hierarchical access to input attributes"""
def __init__(self, node: og.Node, attributes, dynamic_attributes: og.DynamicAttributeInterface):
"""Initialize simplified access for the attribute data"""
context = node.get_graph().get_default_graph_context()
super().__init__(context, node, attributes, dynamic_attributes)
self._batchedReadAttributes = [self._attributes.execIn, self._attributes.topicInputA, self._attributes.topicInputB]
self._batchedReadValues = [0, "/numberA", "/attributeB"]
@property
def execIn(self):
return self._batchedReadValues[0]
@execIn.setter
def execIn(self, value):
self._batchedReadValues[0] = value
@property
def topicInputA(self):
return self._batchedReadValues[1]
@topicInputA.setter
def topicInputA(self, value):
self._batchedReadValues[1] = value
@property
def topicInputB(self):
return self._batchedReadValues[2]
@topicInputB.setter
def topicInputB(self, value):
self._batchedReadValues[2] = value
def __getattr__(self, item: str):
if item in self.LOCAL_PROPERTY_NAMES:
return object.__getattribute__(self, item)
else:
return super().__getattr__(item)
def __setattr__(self, item: str, new_value):
if item in self.LOCAL_PROPERTY_NAMES:
object.__setattr__(self, item, new_value)
else:
super().__setattr__(item, new_value)
def _prefetch(self):
readAttributes = self._batchedReadAttributes
newValues = _og._prefetch_input_attributes_data(readAttributes)
if len(readAttributes) == len(newValues):
self._batchedReadValues = newValues
class ValuesForOutputs(og.DynamicAttributeAccess):
LOCAL_PROPERTY_NAMES = {"execOut", "sum", "_batchedWriteValues"}
"""Helper class that creates natural hierarchical access to output attributes"""
def __init__(self, node: og.Node, attributes, dynamic_attributes: og.DynamicAttributeInterface):
"""Initialize simplified access for the attribute data"""
context = node.get_graph().get_default_graph_context()
super().__init__(context, node, attributes, dynamic_attributes)
self._batchedWriteValues = { }
@property
def execOut(self):
value = self._batchedWriteValues.get(self._attributes.execOut)
if value:
return value
else:
data_view = og.AttributeValueHelper(self._attributes.execOut)
return data_view.get()
@execOut.setter
def execOut(self, value):
self._batchedWriteValues[self._attributes.execOut] = value
@property
def sum(self):
value = self._batchedWriteValues.get(self._attributes.sum)
if value:
return value
else:
data_view = og.AttributeValueHelper(self._attributes.sum)
return data_view.get()
@sum.setter
def sum(self, value):
self._batchedWriteValues[self._attributes.sum] = value
def __getattr__(self, item: str):
if item in self.LOCAL_PROPERTY_NAMES:
return object.__getattribute__(self, item)
else:
return super().__getattr__(item)
def __setattr__(self, item: str, new_value):
if item in self.LOCAL_PROPERTY_NAMES:
object.__setattr__(self, item, new_value)
else:
super().__setattr__(item, new_value)
def _commit(self):
_og._commit_output_attributes_data(self._batchedWriteValues)
self._batchedWriteValues = { }
class ValuesForState(og.DynamicAttributeAccess):
"""Helper class that creates natural hierarchical access to state attributes"""
def __init__(self, node: og.Node, attributes, dynamic_attributes: og.DynamicAttributeInterface):
"""Initialize simplified access for the attribute data"""
context = node.get_graph().get_default_graph_context()
super().__init__(context, node, attributes, dynamic_attributes)
def __init__(self, node):
super().__init__(node)
dynamic_attributes = self.dynamic_attribute_data(node, og.AttributePortType.ATTRIBUTE_PORT_TYPE_INPUT)
self.inputs = OgnROS2CustomPythonNodeDatabase.ValuesForInputs(node, self.attributes.inputs, dynamic_attributes)
dynamic_attributes = self.dynamic_attribute_data(node, og.AttributePortType.ATTRIBUTE_PORT_TYPE_OUTPUT)
self.outputs = OgnROS2CustomPythonNodeDatabase.ValuesForOutputs(node, self.attributes.outputs, dynamic_attributes)
dynamic_attributes = self.dynamic_attribute_data(node, og.AttributePortType.ATTRIBUTE_PORT_TYPE_STATE)
self.state = OgnROS2CustomPythonNodeDatabase.ValuesForState(node, self.attributes.state, dynamic_attributes)
class abi:
"""Class defining the ABI interface for the node type"""
@staticmethod
def get_node_type():
get_node_type_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'get_node_type', None)
if callable(get_node_type_function):
return get_node_type_function()
return 'OmniNewExtensionExtension.ROS2CustomPythonNode'
@staticmethod
def compute(context, node):
def database_valid():
return True
try:
per_node_data = OgnROS2CustomPythonNodeDatabase.PER_NODE_DATA[node.node_id()]
db = per_node_data.get('_db')
if db is None:
db = OgnROS2CustomPythonNodeDatabase(node)
per_node_data['_db'] = db
if not database_valid():
per_node_data['_db'] = None
return False
except:
db = OgnROS2CustomPythonNodeDatabase(node)
try:
compute_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'compute', None)
if callable(compute_function) and compute_function.__code__.co_argcount > 1:
return compute_function(context, node)
db.inputs._prefetch()
db.inputs._setting_locked = True
with og.in_compute():
return OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS.compute(db)
except Exception as error:
stack_trace = "".join(traceback.format_tb(sys.exc_info()[2].tb_next))
db.log_error(f'Assertion raised in compute - {error}\n{stack_trace}', add_context=False)
finally:
db.inputs._setting_locked = False
db.outputs._commit()
return False
@staticmethod
def initialize(context, node):
OgnROS2CustomPythonNodeDatabase._initialize_per_node_data(node)
initialize_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'initialize', None)
if callable(initialize_function):
initialize_function(context, node)
per_node_data = OgnROS2CustomPythonNodeDatabase.PER_NODE_DATA[node.node_id()]
def on_connection_or_disconnection(*args):
per_node_data['_db'] = None
node.register_on_connected_callback(on_connection_or_disconnection)
node.register_on_disconnected_callback(on_connection_or_disconnection)
@staticmethod
def release(node):
release_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'release', None)
if callable(release_function):
release_function(node)
OgnROS2CustomPythonNodeDatabase._release_per_node_data(node)
@staticmethod
def release_instance(node, target):
OgnROS2CustomPythonNodeDatabase._release_per_node_instance_data(node, target)
@staticmethod
def update_node_version(context, node, old_version, new_version):
update_node_version_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'update_node_version', None)
if callable(update_node_version_function):
return update_node_version_function(context, node, old_version, new_version)
return False
@staticmethod
def initialize_type(node_type):
initialize_type_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'initialize_type', None)
needs_initializing = True
if callable(initialize_type_function):
needs_initializing = initialize_type_function(node_type)
if needs_initializing:
node_type.set_metadata(ogn.MetadataKeys.EXTENSION, "omni.new.extension")
node_type.set_metadata(ogn.MetadataKeys.UI_NAME, "ROS2 Custom Python Node")
node_type.set_metadata(ogn.MetadataKeys.DESCRIPTION, "A custom ros2 node")
node_type.set_metadata(ogn.MetadataKeys.LANGUAGE, "Python")
OgnROS2CustomPythonNodeDatabase.INTERFACE.add_to_node_type(node_type)
@staticmethod
def on_connection_type_resolve(node):
on_connection_type_resolve_function = getattr(OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS, 'on_connection_type_resolve', None)
if callable(on_connection_type_resolve_function):
on_connection_type_resolve_function(node)
NODE_TYPE_CLASS = None
@staticmethod
def register(node_type_class):
OgnROS2CustomPythonNodeDatabase.NODE_TYPE_CLASS = node_type_class
og.register_node_type(OgnROS2CustomPythonNodeDatabase.abi, 1)
@staticmethod
def deregister():
og.deregister_node_type("OmniNewExtensionExtension.ROS2CustomPythonNode")