Issues with kafka msgbroker and rtsp source

Please provide complete information as applicable to your setup.

• Hardware Platform (GPU)
• DeepStream Version 7.0
• TensorRT Version 8.6.1
• NVIDIA GPU Driver Version (535)

When using nvmsgbroker with rtsp source with nvurisrc bin, getting more messages than the expected count. When running for 1 stream, it works fine, but when more no. of sources, the message count sometimes triples than what it should have been originally. This is the piece of code that I’m working with:

        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12

        if ENABLE_KAFKA:
            data = json.dumps(data)
            _add_custom_meta_to_buffer(data, gst_buffer,frame_meta)

        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

This is the function:

def _add_custom_meta_to_buffer(meta_str, buf,frame_meta):
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buf))
    user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
    msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)
    msg_meta = pyds.NvDsEventMsgMeta.cast(msg_meta)
    msg_meta.type = pyds.NvDsEventType.NVDS_EVENT_ENTRY
    msg_meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_PERSON
    obj = pyds.alloc_nvds_person_object()
    obj = pyds.NvDsPersonObject.cast(obj)
    obj.cap = meta_str
    msg_meta.extMsg = obj
    msg_meta.extMsgSize = sys.getsizeof(pyds.NvDsPersonObject)

    if(user_event_meta):
        user_event_meta.user_meta_data = msg_meta
        user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
        pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
    else:
        Gst.error("Error in attaching event meta to buffer\n")

This is the pipeline:

    nvmsgconv.set_property("config", "dstest4_msgconv_config.cfg")
    nvmsgconv.set_property("payload-type", 0)
    nvmsgbroker.set_property("proto-lib", "/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so")
    nvmsgbroker.set_property("conn-str", KAFKA_URL.replace(":",";"))
    nvmsgbroker.set_property("topic",KAFKA_TOPIC)
    nvmsgbroker.set_property("sync", 1)

    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(sgie1)
    pipeline.add(sgie2)
    pipeline.add(nvanalytics)
    pipeline.add(tiler)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(tee)
    pipeline.add(queuemsg1)
    pipeline.add(queuemsg2)
    pipeline.add(nvmsgconv)
    pipeline.add(nvmsgbroker)
    
    if SINK_TYPE=="filesink": 
        pipeline.add(nvvidconv2)
        pipeline.add(capsfilter)
        pipeline.add(encoder)
        pipeline.add(codeparser)
        pipeline.add(container)
        pipeline.add(sink)
    elif (SINK_TYPE=="fakesink") or (SINK_TYPE=="eglsink"):
        pipeline.add(sink)
    elif SINK_TYPE=="rtspsink":
        pipeline.add(nvvidconv_postosd)
        pipeline.add(encoder)
        pipeline.add(sink)

    print("Linking elements in the Pipeline \n")

    streammux.link(pgie)
    pgie.link(tracker)
    tracker.link(sgie1)
    sgie1.link(sgie2)
    sgie2.link(nvanalytics)
    nvanalytics.link(tiler)
    tiler.link(nvvidconv)
    nvvidconv.link(nvosd)
    nvosd.link(tee)

    queuemsg1.link(nvmsgconv)    
    nvmsgconv.link(nvmsgbroker)

    if SINK_TYPE=="filesink":
        queuemsg2.link(nvvidconv2)
        nvvidconv2.link(capsfilter)
        capsfilter.link(encoder)
        encoder.link(codeparser)
        codeparser.link(container)
        container.link(sink)
    elif (SINK_TYPE=="fakesink") or (SINK_TYPE=="eglsink"): 
        queuemsg2.link(sink)
    elif SINK_TYPE=="rtspsink": 
        queuemsg2.link(nvvidconv_postosd)
        nvvidconv_postosd.link(encoder)
        encoder.link(sink)

    sink_pad=queuemsg1.get_static_pad("sink")
    tee_msg_pad=tee.get_request_pad('src_%u')
    tee_stream_pad=tee.get_request_pad("src_%u")
    if not tee_msg_pad or not tee_stream_pad:
        sys.stderr.write("Unable to get request pads\n")
    tee_msg_pad.link(sink_pad)
    sink_pad=queuemsg2.get_static_pad("sink")
    tee_stream_pad.link(sink_pad)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    nvanalytics_src_pad=nvanalytics.get_static_pad("src")
    if not nvanalytics_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)
        GLib.timeout_add(PERF_INTERVAL, perf_data.perf_print_callback)

This is the msgconv_config:

[sensor0]
enable=1
type=Camera

[place0]
enable=1
id=1

[analytics0]
enable=1
id=XYZ

What could be reason for this?
Cause when running with 1, 10fps rtsp source, I get a total of 600 messages in a minute. But when running 4, 10fps rtsp sources, I get close to 7000 messages in a minute.
This seems to work file when working with file source.

could you share the dstest4_msgconv_config.cfg? does " 7000 messages" have duplicated or abnormal content? will you add “json.dumps(data)” for each frame?

This is the dstest4_msgconv_config.cfg:

[sensor0]
enable=1
type=Camera

[place0]
enable=1
id=1

[analytics0]
enable=1
id=XYZ

Those 7000 messages are mostly Duplicated.
Yes, I need to add json.dumps(data) for Each Frame.

Also, through some testing, found out that when running without nvmultistreamtiler, and setting the batch-size of streammux to 1, it works fine without any duplicate messages.
This is the sample output from kafka with the duplicates happening:
output.txt (815.0 KB)

If you use this simple piece of code to debug, you’ll find that this has many duplicate messages:

import json

with open("output.txt","r") as f:
    data = f.readlines()

new = []

for i in data:
    if "cap" in i:
        new.append(i)
for i in range(len(new)):
    new[i] = json.loads(json.loads(new[i].split("cap\" :")[1].strip()[:-1]))

dups = []

for i in range(len(new)):
    for j in range(len(new)):
        if i!=j:
            if new[i]==new[j]:
                print(new[i])
                dups.append(new[i])

This is how the processing is happening in the nvanalytics_src_pad_buffer_probe:

def nvanalytics_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))

    l_frame = batch_meta.frame_meta_list

    while l_frame:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        l_obj=frame_meta.obj_meta_list
        data = {  
                    "store_id" : stream_data[frame_meta.pad_index][0],
                    "camera_id" : stream_data[frame_meta.pad_index][1],
                    "frame_number" : frame_number,
                    "timestamp" : datetime.now(utc_time).strftime('%Y-%m-%d %H:%M:%S'),
                    "Stream ID": frame_meta.pad_index,
                    "line_detections":[],
                    "non_line_detections":[],
                }
        while l_obj:
            try: 
                # Note that l_obj.data needs a cast to pyds.NvDsObjectMeta
                # The casting is done by pyds.NvDsObjectMeta.cast()
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break

            top = obj_meta.rect_params.top
            left = obj_meta.rect_params.left
            width = obj_meta.rect_params.width
            height = obj_meta.rect_params.height
            trackingID = long_to_int(obj_meta.object_id)
            confidence = obj_meta.confidence

            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
        
        display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]

        if frame_meta.pad_index in temp_line:
            py_nvosd_text_params.display_text = f"Line Data: {temp_line[frame_meta.pad_index]}"

        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12

        if ENABLE_KAFKA:
            data = json.dumps(data)
            _add_custom_meta_to_buffer(data, gst_buffer,frame_meta)

        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

        py_nvosd_text_params.set_bg_clr = 1
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)
        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

In output.txt, could you take an example? which two messages are the same? it seems the cap will vary in each message.

This is message was repeated like 20 times:

{'store_id': 16, 'camera_id': 28, 'frame_number': 20, 'timestamp': '2024-07-10 16:39:11', 'Stream ID': 3, 'line_detections': [], 'non_line_detections': []}


as the screenshot shown, there is no one matched.

\"camera_id\": 28, \"frame_number\": 20, \"timestamp\": \"2024-07-10 16:39:11\", \"Stream ID\": 3

Check these message ids:

9d31bf82-cf21-4b6a-be90-bb8145d7d292
b86b074e-5a91-4aeb-8218-7ac2007e15eb
69aae401-5c29-466a-bfa8-d034d60be9fd
10502279-ed30-4f98-bfec-9f0187e15652
968b36e2-2703-4557-8930-d42fae93c386

yes, I found that four messages. as you know, there are many frames in one second, you can add usec to distinguish the different frames.

I get that there are multiple frames in a 1 second, but then why am I getting frame_number=20 for all the values?

noticing you are testing four rtsp inputs, each stream has frame number 20. you can print data to confirm.

I’m printing the stream_id through frame_meta.pad_index