Hi,
I’m building an app using deepstream SDK python bindings. In this app, I want to have a connection with message broker kafka.
I was following this example: https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/blob/master/apps/deepstream-test4/deepstream_test_4.py and used the same configuration files for the message broker as shown in the example.
When running the app I have this error.
app-be_1 | Error: gst-library-error-quark: GStreamer encountered a general supporting library error. (1): /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.c(509): legacy_gst_nvmsgbroker_render (): /GstPipeline:pipeline0/GstNvMsgBroker:nvmsg-broker:
app-be_1 | failed to send the message. err(1)
• Hardware Platform Jetson Xavier NX
• DeepStream Version 5.1
• JetPack Version (valid for Jetson only) 4.5.1
• TensorRT Version 7.1.3
• Issue Type( questions, new requirements, bugs) Questions
I tried with wrong kafka address and wrong conn_str to check if it wasn’t a connection problem but I got a different error: unable to connect to broker library.
This app is built based on this image: nvcr.io/nvidia/deepstream-l4t:5.1-21.02-iot
Docker-compose:
version: "3.0"
services:
zookeeper:
image: zookeeper
kafka:
image: schrbr/kafka:2.13-2.6.0
depends_on:
- zookeeper
expose:
- "9092"
environment:
KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
sauroneye-be:
build: app-be
image: app/app-be:latest
depends_on:
- kafka
environment:
KAFKA_LOCATION: kafka
Pipeline Implementation:
def _implement_pipeline(self):
# Create gstreamer elements
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
self.gst_pipeline = Gst.Pipeline()
check_object(self.gst_pipeline, "Unable to create Pipeline \n")
# Create nvstreammux instance to form batches from one or more sources.
print("Creating streamux \n ")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
check_object(streammux, "Unable to create NvStreamMux \n")
self.gst_pipeline.add(streammux)
# This should be adapted to multi inputs
is_live = False
print("Creating source_bin 0\n ")
if self.input_stream.input_uri.find("io://") == 0:
is_live = True
source_bin = self._create_source_bin(0, self.input_stream.input_uri)
check_object(source_bin, "Unable to create source bin \n")
self.gst_pipeline.add(source_bin)
# This should be adapted to multi inputs
padname = "sink_0"
sinkpad = streammux.get_request_pad(padname)
check_object(sinkpad, "Unable to create sink pad bin \n")
# This should be adapted to multi inputs
srcpad = source_bin.get_static_pad("src")
check_object(srcpad, "Unable to create src pad bin \n")
srcpad.link(sinkpad)
# Use nvinfer to run inferencing on decoder's output,
# behaviour of inferencing is set through config file
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
check_object(pgie, "Unable to create pgie \n")
# Use convertor to convert from NV12 to RGBA as required by nvosd
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
check_object(nvvidconv, "Unable to create nvvidconv \n")
# Create OSD to draw on the converted RGBA buffer
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
check_object(nvosd, "Unable to create nvosd \n")
nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
check_object(nvvidconv_postosd, "Unable to create nvvidconv_postosd \n")
# Create a caps filter
caps = Gst.ElementFactory.make("capsfilter", "filter")
check_object(caps, "Unable to create caps \n")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
# Make the encoder
encoder = None
if self.rtsp_server.codec == H264_CODEC_KEY:
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
print("Creating H264 Encoder")
elif self.rtsp_server.codec == H265_CODEC_KEY:
encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
print("Creating H265 Encoder")
check_object(encoder, f"Unable to create encoder for codec {self.rtsp_server.codec}")
encoder.set_property('bitrate', self.rtsp_server.bitrate)
if is_aarch64():
encoder.set_property('preset-level', 1)
encoder.set_property('insert-sps-pps', 1)
# encoder.set_property('bufapi-version', 1)
# Make the payload-encode video into RTP packets
rtppay = None
if self.rtsp_server.codec == H264_CODEC_KEY:
rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
print("Creating H264 rtppay")
elif self.rtsp_server.codec == H265_CODEC_KEY:
rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
print("Creating H265 rtppay")
check_object(rtppay, f"Unable to create rtppay for codec {self.rtsp_server.codec}")
# Make the UDP sink
sink = self.rtsp_server.implement_udp_sink()
print("Playing file %s " % self.input_stream.input_uri)
streammux.set_property('width', self.input_stream.stream_width)
streammux.set_property('height', self.input_stream.stream_height)
streammux.set_property('batch-size', 1)
streammux.set_property('batched-push-timeout', 4000000)
if is_live:
print("At least one of the sources is live")
streammux.set_property('live-source', 1)
pgie.set_property('config-file-path', self.detector.config_filename)
print("Adding elements to Pipeline \n")
self.gst_pipeline.add(pgie)
streammux.link(pgie)
self.gst_pipeline.add(nvosd)
self.gst_pipeline.add(nvvidconv_postosd)
if self.message_broker:
print("Creating queues \n ")
tee = Gst.ElementFactory.make("tee", "nvsink-tee")
if not tee:
sys.stderr.write(" Unable to create tee \n")
queue1 = Gst.ElementFactory.make("queue", "nvtee-que1")
if not queue1:
sys.stderr.write(" Unable to create queue1 \n")
queue2 = Gst.ElementFactory.make("queue", "nvtee-que2")
if not queue2:
sys.stderr.write(" Unable to create queue2 \n")
print("Creating nvmsgconv \n ")
msgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
if not msgconv:
sys.stderr.write(" Unable to create msgconv \n")
print("Creating nvmsgbroker \n ")
msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
if not msgbroker:
sys.stderr.write(" Unable to create msgbroker \n")
msgconv.set_property('config', self.message_broker.config_file)
msgconv.set_property('payload-type', self.message_broker.payload_type)
msgbroker.set_property('proto-lib', self.message_broker.proto_lib)
msgbroker.set_property('conn-str', os.environ["KAFKA_LOCATION"]+self.message_broker.conn_str)
if self.message_broker.cfg_file is not None:
msgbroker.set_property('config', self.message_broker.cfg_file)
if self.message_broker.topic is not None:
msgbroker.set_property('topic', self.message_broker.topic)
msgbroker.set_property('sync', False)
self.gst_pipeline.add(tee)
self.gst_pipeline.add(queue1)
self.gst_pipeline.add(queue2)
self.gst_pipeline.add(msgconv)
self.gst_pipeline.add(msgbroker)
nvosd.link(tee)
queue1.link(msgconv)
msgconv.link(msgbroker)
queue2.link(nvvidconv_postosd)
sink_pad = queue1.get_static_pad("sink")
tee_msg_pad = tee.get_request_pad('src_%u')
tee_render_pad = tee.get_request_pad("src_%u")
if not tee_msg_pad or not tee_render_pad:
sys.stderr.write("Unable to get request pads\n")
tee_msg_pad.link(sink_pad)
sink_pad = queue2.get_static_pad("sink")
tee_render_pad.link(sink_pad)
self.gst_pipeline.add(nvvidconv)
pgie.link(nvvidconv)
nvosd.link(nvvidconv_postosd)
self.gst_pipeline.add(caps)
self.gst_pipeline.add(encoder)
self.gst_pipeline.add(rtppay)
self.gst_pipeline.add(sink)
nvvidconv.link(nvosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)
# create an event loop and feed gstreamer bus mesasges to it
self.loop = GObject.MainLoop()
bus = self.gst_pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, self.loop)
# Start streaming
self.rtsp_server.launch_rtsp_server(codec=self.rtsp_server.codec)
self.osdsinkpad = nvosd.get_static_pad("sink")
check_object(self.osdsinkpad, "Unable to get sink pad of nvosd \n")`
I try to generate the sample messages on osd probe:
def _probe_on_frame(self, frame_meta, batch_meta):
self.frame_number = self.frame_number + 1
# Store objects' ids detected in frame
l_obj = frame_meta.obj_meta_list
obj_counter = {
self._classes_ids[CLASS_MASK_KEY]: 0,
self._classes_ids[CLASS_NOMASK_KEY]: 0
}
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.glist_get_nvds_object_meta(l_obj.data)
if obj_meta.class_id == self._classes_ids[CLASS_MASK_KEY]:
obj_meta.rect_params.border_color.set(0.0, 1.0, 0.0, 0.0)
else:
obj_meta.rect_params.border_color.set(1.0, 0.0, 0.0, 0.0)
self._msg_broker(frame_meta, self.frame_number, obj_meta, batch_meta)
except StopIteration:
break
obj_counter[obj_meta.class_id] += 1
try:
l_obj = l_obj.next
except StopIteration:
break
def _msg_broker(self, frame_meta, frame_number, obj_meta, batch_meta):
if frame_number % 30:
# Frequency of messages to be send will be based on use case.
# Here message is being sent for first object every 30 frames.
# Allocating an NvDsEventMsgMeta instance and getting reference
# to it. The underlying memory is not manged by Python so that
# downstream plugins can access it. Otherwise the garbage collector
# will free it when this probe exits.
msg_meta = pyds.alloc_nvds_event_msg_meta()
#msg_meta.bbox.top = obj_meta.rect_params.top
#msg_meta.bbox.left = obj_meta.rect_params.left
#msg_meta.bbox.width = obj_meta.rect_params.width
#msg_meta.bbox.height = obj_meta.rect_params.height
msg_meta.frameId = frame_number
msg_meta = self._generate_event_msg_meta(msg_meta, obj_meta.class_id)
user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
if user_event_meta:
user_event_meta.user_meta_data = msg_meta
user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
# Setting callbacks in the event msg meta. The bindings layer
# will wrap these callables in C functions. Currently only one
# set of callbacks is supported.
pyds.set_user_copyfunc(user_event_meta, self._meta_copy_func)
pyds.set_user_releasefunc(user_event_meta, self._meta_free_func)
pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
else:
print("Error in attaching event meta to buffer\n")
# Callback function for deep-copying an NvDsEventMsgMeta struct
def _meta_copy_func(self,data, user_data):
# Cast data to pyds.NvDsUserMeta
user_meta = pyds.NvDsUserMeta.cast(data)
src_meta_data = user_meta.user_meta_data
# Cast src_meta_data to pyds.NvDsEventMsgMeta
srcmeta = pyds.NvDsEventMsgMeta.cast(src_meta_data)
# Duplicate the memory contents of srcmeta to dstmeta
# First use pyds.get_ptr() to get the C address of srcmeta, then
# use pyds.memdup() to allocate dstmeta and copy srcmeta into it.
# pyds.memdup returns C address of the allocated duplicate.
dstmeta_ptr = pyds.memdup(pyds.get_ptr(srcmeta), sys.getsizeof(pyds.NvDsEventMsgMeta))
# Cast the duplicated memory to pyds.NvDsEventMsgMeta
dstmeta = pyds.NvDsEventMsgMeta.cast(dstmeta_ptr)
# Duplicate contents of ts field. Note that reading srcmeat.ts
# returns its C address. This allows to memory operations to be
# performed on it.
dstmeta.ts = pyds.memdup(srcmeta.ts, MAX_TIME_STAMP_LEN + 1)
# Copy the sensorStr. This field is a string property.
# The getter (read) returns its C address. The setter (write)
# takes string as input, allocates a string buffer and copies
# the input string into it.
# pyds.get_string() takes C address of a string and returns
# the reference to a string object and the assignment inside the binder copies content.
dstmeta.sensorStr = pyds.get_string(srcmeta.sensorStr)
if (srcmeta.objSignature.size > 0):
dstmeta.objSignature.signature = pyds.memdup(srcmeta.objSignature.signature, srcmeta.objSignature.size)
dstmeta.objSignature.size = srcmeta.objSignature.size;
if (srcmeta.extMsgSize > 0):
if (srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):
srcobj = pyds.NvDsPersonObject.cast(srcmeta.extMsg);
obj = pyds.alloc_nvds_person_object()
obj.age = srcobj.age
obj.gender = pyds.get_string(srcobj.gender);
obj.cap = pyds.get_string(srcobj.cap)
obj.hair = pyds.get_string(srcobj.hair)
obj.apparel = pyds.get_string(srcobj.apparel);
dstmeta.extMsg = obj;
dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject);
return dstmeta
# Callback function for freeing an NvDsEventMsgMeta instance
def _meta_free_func(self, data, user_data):
user_meta = pyds.NvDsUserMeta.cast(data)
srcmeta = pyds.NvDsEventMsgMeta.cast(user_meta.user_meta_data)
# pyds.free_buffer takes C address of a buffer and frees the memory
# It's a NOP if the address is NULL
pyds.free_buffer(srcmeta.ts)
pyds.free_buffer(srcmeta.sensorStr)
if (srcmeta.objSignature.size > 0):
pyds.free_buffer(srcmeta.objSignature.signature);
srcmeta.objSignature.size = 0
if (srcmeta.extMsgSize > 0):
if (srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):
obj = pyds.NvDsPersonObject.cast(srcmeta.extMsg);
pyds.free_buffer(obj.gender);
pyds.free_buffer(obj.cap);
pyds.free_buffer(obj.hair);
pyds.free_buffer(obj.apparel);
pyds.free_gbuffer(srcmeta.extMsg);
srcmeta.extMsgSize = 0;
def _generate_person_meta(self,data):
obj = pyds.NvDsPersonObject.cast(data)
obj.age = 45
obj.cap = "none"
obj.hair = "black"
obj.gender = "male"
obj.apparel = "formal"
return obj
def _generate_event_msg_meta(self,data, class_id):
meta = pyds.NvDsEventMsgMeta.cast(data)
meta.sensorId = 0
meta.placeId = 0
meta.moduleId = 0
meta.sensorStr = "sensor-0"
meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)
# This demonstrates how to attach custom objects.
# Any custom object as per requirement can be generated and attached
# like NvDsVehicleObject / NvDsPersonObject. Then that object should
# be handled in payload generator library (nvmsgconv.cpp) accordingly.
if (class_id == self._classes_ids[CLASS_NOMASK_KEY]):
meta.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON;
meta.objClassId = self._classes_ids[CLASS_NOMASK_KEY]
obj = pyds.alloc_nvds_person_object()
obj = self._generate_person_meta(obj)
meta.extMsg = obj
meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
return meta
What can be causing this issue?
Thank you.