Message Broker Kafka: failed to send the message

Hi,
I’m building an app using deepstream SDK python bindings. In this app, I want to have a connection with message broker kafka.
I was following this example: https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/blob/master/apps/deepstream-test4/deepstream_test_4.py and used the same configuration files for the message broker as shown in the example.

When running the app I have this error.

app-be_1 | Error: gst-library-error-quark: GStreamer encountered a general supporting library error. (1): /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.c(509): legacy_gst_nvmsgbroker_render (): /GstPipeline:pipeline0/GstNvMsgBroker:nvmsg-broker:
app-be_1 | failed to send the message. err(1)

• Hardware Platform Jetson Xavier NX
• DeepStream Version 5.1
• JetPack Version (valid for Jetson only) 4.5.1
• TensorRT Version 7.1.3
• Issue Type( questions, new requirements, bugs) Questions

I tried with wrong kafka address and wrong conn_str to check if it wasn’t a connection problem but I got a different error: unable to connect to broker library.

This app is built based on this image: nvcr.io/nvidia/deepstream-l4t:5.1-21.02-iot

Docker-compose:

version: "3.0"

services:
  zookeeper:
    image: zookeeper
  kafka:
    image: schrbr/kafka:2.13-2.6.0
    depends_on:
      - zookeeper
    expose:
      - "9092"
    environment:
      KAFKA_ADVERTISED_HOST_NAME: 192.168.99.100
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
  sauroneye-be:
    build: app-be
    image: app/app-be:latest
    depends_on:
      - kafka
    environment:
      KAFKA_LOCATION: kafka

Pipeline Implementation:

   def _implement_pipeline(self):
    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    self.gst_pipeline = Gst.Pipeline()
    check_object(self.gst_pipeline, "Unable to create Pipeline \n")

    # Create nvstreammux instance to form batches from one or more sources.
    print("Creating streamux \n ")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    check_object(streammux, "Unable to create NvStreamMux \n")
    self.gst_pipeline.add(streammux)

    # This should be adapted to multi inputs
    is_live = False
    print("Creating source_bin 0\n ")
    if self.input_stream.input_uri.find("io://") == 0:
        is_live = True
    source_bin = self._create_source_bin(0, self.input_stream.input_uri)
    check_object(source_bin, "Unable to create source bin \n")
    self.gst_pipeline.add(source_bin)

    # This should be adapted to multi inputs
    padname = "sink_0"
    sinkpad = streammux.get_request_pad(padname)
    check_object(sinkpad, "Unable to create sink pad bin \n")

    # This should be adapted to multi inputs
    srcpad = source_bin.get_static_pad("src")
    check_object(srcpad, "Unable to create src pad bin \n")
    srcpad.link(sinkpad)

    # Use nvinfer to run inferencing on decoder's output,
    # behaviour of inferencing is set through config file
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    check_object(pgie, "Unable to create pgie \n")

    # Use convertor to convert from NV12 to RGBA as required by nvosd
    print("Creating nvvidconv \n ")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    check_object(nvvidconv, "Unable to create nvvidconv \n")

    # Create OSD to draw on the converted RGBA buffer
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    check_object(nvosd, "Unable to create nvosd \n")

    nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
    check_object(nvvidconv_postosd, "Unable to create nvvidconv_postosd \n")

    # Create a caps filter
    caps = Gst.ElementFactory.make("capsfilter", "filter")
    check_object(caps, "Unable to create caps \n")
    caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))

    # Make the encoder
    encoder = None
    if self.rtsp_server.codec == H264_CODEC_KEY:
        encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
        print("Creating H264 Encoder")
    elif self.rtsp_server.codec == H265_CODEC_KEY:
        encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
        print("Creating H265 Encoder")
    check_object(encoder, f"Unable to create encoder for codec {self.rtsp_server.codec}")
    encoder.set_property('bitrate', self.rtsp_server.bitrate)
    if is_aarch64():
        encoder.set_property('preset-level', 1)
        encoder.set_property('insert-sps-pps', 1)
        # encoder.set_property('bufapi-version', 1)

    # Make the payload-encode video into RTP packets
    rtppay = None
    if self.rtsp_server.codec == H264_CODEC_KEY:
        rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
        print("Creating H264 rtppay")
    elif self.rtsp_server.codec == H265_CODEC_KEY:
        rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
        print("Creating H265 rtppay")
    check_object(rtppay, f"Unable to create rtppay for codec {self.rtsp_server.codec}")

    # Make the UDP sink
    sink = self.rtsp_server.implement_udp_sink()

    print("Playing file %s " % self.input_stream.input_uri)
    streammux.set_property('width', self.input_stream.stream_width)
    streammux.set_property('height', self.input_stream.stream_height)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)
    if is_live:
        print("At least one of the sources is live")
        streammux.set_property('live-source', 1)

    pgie.set_property('config-file-path', self.detector.config_filename)

    print("Adding elements to Pipeline \n")
    self.gst_pipeline.add(pgie)
    streammux.link(pgie)

    self.gst_pipeline.add(nvosd)
    self.gst_pipeline.add(nvvidconv_postosd)

    if self.message_broker:
        print("Creating queues \n ")
        tee = Gst.ElementFactory.make("tee", "nvsink-tee")
        if not tee:
            sys.stderr.write(" Unable to create tee \n")

        queue1 = Gst.ElementFactory.make("queue", "nvtee-que1")
        if not queue1:
            sys.stderr.write(" Unable to create queue1 \n")

        queue2 = Gst.ElementFactory.make("queue", "nvtee-que2")
        if not queue2:
            sys.stderr.write(" Unable to create queue2 \n")

        print("Creating nvmsgconv \n ")
        msgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
        if not msgconv:
            sys.stderr.write(" Unable to create msgconv \n")

        print("Creating nvmsgbroker \n ")
        msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
        if not msgbroker:
            sys.stderr.write(" Unable to create msgbroker \n")

        msgconv.set_property('config', self.message_broker.config_file)
        msgconv.set_property('payload-type', self.message_broker.payload_type)

        msgbroker.set_property('proto-lib', self.message_broker.proto_lib)
        msgbroker.set_property('conn-str', os.environ["KAFKA_LOCATION"]+self.message_broker.conn_str)

        if self.message_broker.cfg_file is not None:
            msgbroker.set_property('config', self.message_broker.cfg_file)
        if self.message_broker.topic is not None:
            msgbroker.set_property('topic', self.message_broker.topic)
        msgbroker.set_property('sync', False)

        self.gst_pipeline.add(tee)
        self.gst_pipeline.add(queue1)
        self.gst_pipeline.add(queue2)
        self.gst_pipeline.add(msgconv)
        self.gst_pipeline.add(msgbroker)

        nvosd.link(tee)
        queue1.link(msgconv)
        msgconv.link(msgbroker)
        queue2.link(nvvidconv_postosd)

        sink_pad = queue1.get_static_pad("sink")
        tee_msg_pad = tee.get_request_pad('src_%u')
        tee_render_pad = tee.get_request_pad("src_%u")
        if not tee_msg_pad or not tee_render_pad:
            sys.stderr.write("Unable to get request pads\n")
        tee_msg_pad.link(sink_pad)
        sink_pad = queue2.get_static_pad("sink")
        tee_render_pad.link(sink_pad)      
   
        self.gst_pipeline.add(nvvidconv)
        pgie.link(nvvidconv)
        nvosd.link(nvvidconv_postosd)

    self.gst_pipeline.add(caps)
    self.gst_pipeline.add(encoder)
    self.gst_pipeline.add(rtppay)
    self.gst_pipeline.add(sink)

    nvvidconv.link(nvosd)

    nvvidconv_postosd.link(caps)
    caps.link(encoder)
    encoder.link(rtppay)
    rtppay.link(sink)

    # create an event loop and feed gstreamer bus mesasges to it
    self.loop = GObject.MainLoop()
    bus = self.gst_pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, self.loop)

    # Start streaming
    self.rtsp_server.launch_rtsp_server(codec=self.rtsp_server.codec)

    self.osdsinkpad = nvosd.get_static_pad("sink")
    check_object(self.osdsinkpad, "Unable to get sink pad of nvosd \n")`

I try to generate the sample messages on osd probe:

    def _probe_on_frame(self, frame_meta, batch_meta):
    self.frame_number = self.frame_number + 1
    # Store objects' ids detected in frame
    l_obj = frame_meta.obj_meta_list
    obj_counter = {
        self._classes_ids[CLASS_MASK_KEY]: 0,
        self._classes_ids[CLASS_NOMASK_KEY]: 0
    }
    while l_obj is not None:
        try:
            # Casting l_obj.data to pyds.NvDsObjectMeta
            obj_meta = pyds.glist_get_nvds_object_meta(l_obj.data)
            if obj_meta.class_id == self._classes_ids[CLASS_MASK_KEY]:
                obj_meta.rect_params.border_color.set(0.0, 1.0, 0.0, 0.0)
            else:
                obj_meta.rect_params.border_color.set(1.0, 0.0, 0.0, 0.0)
            self._msg_broker(frame_meta, self.frame_number, obj_meta, batch_meta)
        except StopIteration:
            break
        obj_counter[obj_meta.class_id] += 1
        try:
            l_obj = l_obj.next
        except StopIteration:
            break

def _msg_broker(self, frame_meta, frame_number, obj_meta, batch_meta):
    if frame_number % 30:
        # Frequency of messages to be send will be based on use case.
        # Here message is being sent for first object every 30 frames.

        # Allocating an NvDsEventMsgMeta instance and getting reference
        # to it. The underlying memory is not manged by Python so that
        # downstream plugins can access it. Otherwise the garbage collector
        # will free it when this probe exits.
        msg_meta = pyds.alloc_nvds_event_msg_meta()
        #msg_meta.bbox.top = obj_meta.rect_params.top
        #msg_meta.bbox.left = obj_meta.rect_params.left
        #msg_meta.bbox.width = obj_meta.rect_params.width
        #msg_meta.bbox.height = obj_meta.rect_params.height
        msg_meta.frameId = frame_number
        msg_meta = self._generate_event_msg_meta(msg_meta, obj_meta.class_id)
        user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)

        if user_event_meta:
            user_event_meta.user_meta_data = msg_meta
            user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
            # Setting callbacks in the event msg meta. The bindings layer
            # will wrap these callables in C functions. Currently only one
            # set of callbacks is supported.
            pyds.set_user_copyfunc(user_event_meta, self._meta_copy_func)
            pyds.set_user_releasefunc(user_event_meta, self._meta_free_func)
            pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
        else:
            print("Error in attaching event meta to buffer\n")

# Callback function for deep-copying an NvDsEventMsgMeta struct
def _meta_copy_func(self,data, user_data):
    # Cast data to pyds.NvDsUserMeta
    user_meta = pyds.NvDsUserMeta.cast(data)
    src_meta_data = user_meta.user_meta_data
    # Cast src_meta_data to pyds.NvDsEventMsgMeta
    srcmeta = pyds.NvDsEventMsgMeta.cast(src_meta_data)
    # Duplicate the memory contents of srcmeta to dstmeta
    # First use pyds.get_ptr() to get the C address of srcmeta, then
    # use pyds.memdup() to allocate dstmeta and copy srcmeta into it.
    # pyds.memdup returns C address of the allocated duplicate.
    dstmeta_ptr = pyds.memdup(pyds.get_ptr(srcmeta), sys.getsizeof(pyds.NvDsEventMsgMeta))
    # Cast the duplicated memory to pyds.NvDsEventMsgMeta
    dstmeta = pyds.NvDsEventMsgMeta.cast(dstmeta_ptr)

    # Duplicate contents of ts field. Note that reading srcmeat.ts
    # returns its C address. This allows to memory operations to be
    # performed on it.
    dstmeta.ts = pyds.memdup(srcmeta.ts, MAX_TIME_STAMP_LEN + 1)

    # Copy the sensorStr. This field is a string property.
    # The getter (read) returns its C address. The setter (write)
    # takes string as input, allocates a string buffer and copies
    # the input string into it.
    # pyds.get_string() takes C address of a string and returns
    # the reference to a string object and the assignment inside the binder copies content.
    dstmeta.sensorStr = pyds.get_string(srcmeta.sensorStr)

    if (srcmeta.objSignature.size > 0):
        dstmeta.objSignature.signature = pyds.memdup(srcmeta.objSignature.signature, srcmeta.objSignature.size)
        dstmeta.objSignature.size = srcmeta.objSignature.size;

    if (srcmeta.extMsgSize > 0):
        if (srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):
            srcobj = pyds.NvDsPersonObject.cast(srcmeta.extMsg);
            obj = pyds.alloc_nvds_person_object()
            obj.age = srcobj.age
            obj.gender = pyds.get_string(srcobj.gender);
            obj.cap = pyds.get_string(srcobj.cap)
            obj.hair = pyds.get_string(srcobj.hair)
            obj.apparel = pyds.get_string(srcobj.apparel);
            dstmeta.extMsg = obj;
            dstmeta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject);

    return dstmeta

# Callback function for freeing an NvDsEventMsgMeta instance
def _meta_free_func(self, data, user_data):
    user_meta = pyds.NvDsUserMeta.cast(data)
    srcmeta = pyds.NvDsEventMsgMeta.cast(user_meta.user_meta_data)

    # pyds.free_buffer takes C address of a buffer and frees the memory
    # It's a NOP if the address is NULL
    pyds.free_buffer(srcmeta.ts)
    pyds.free_buffer(srcmeta.sensorStr)

    if (srcmeta.objSignature.size > 0):
        pyds.free_buffer(srcmeta.objSignature.signature);
        srcmeta.objSignature.size = 0

    if (srcmeta.extMsgSize > 0):
        if (srcmeta.objType == pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON):
            obj = pyds.NvDsPersonObject.cast(srcmeta.extMsg);
            pyds.free_buffer(obj.gender);
            pyds.free_buffer(obj.cap);
            pyds.free_buffer(obj.hair);
            pyds.free_buffer(obj.apparel);
        pyds.free_gbuffer(srcmeta.extMsg);
        srcmeta.extMsgSize = 0;


def _generate_person_meta(self,data):

    obj = pyds.NvDsPersonObject.cast(data)
    obj.age = 45
    obj.cap = "none"
    obj.hair = "black"
    obj.gender = "male"
    obj.apparel = "formal"
    return obj

def _generate_event_msg_meta(self,data, class_id):
    meta = pyds.NvDsEventMsgMeta.cast(data)
    meta.sensorId = 0
    meta.placeId = 0
    meta.moduleId = 0
    meta.sensorStr = "sensor-0"
    meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
    pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)

    # This demonstrates how to attach custom objects.
    # Any custom object as per requirement can be generated and attached
    # like NvDsVehicleObject / NvDsPersonObject. Then that object should
    # be handled in payload generator library (nvmsgconv.cpp) accordingly.
    if (class_id == self._classes_ids[CLASS_NOMASK_KEY]):
        meta.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
        meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON;
        meta.objClassId = self._classes_ids[CLASS_NOMASK_KEY]
        obj = pyds.alloc_nvds_person_object()
        obj = self._generate_person_meta(obj)
        meta.extMsg = obj
        meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)
    return meta

What can be causing this issue?

Thank you.

Could you explain more about what you are trying to do, why you try wrong kafka addrese? it can work well when using correct kafka address?

I wasn’t sure if the problem was happening because I was using a wrong address or because my app couldn’t connect with kafka server. But that doesn’t seem to be the problem as a different error appears when using a wrong address.

Solved: The problem was with topic string. The app wasn’t reading it properly and sent the wrong value for topic.