Message broker and Multistreams in Python

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) RTX 2060 Super.
• DeepStream Version 5.0.1
• JetPack Version (valid for Jetson only)
• TensorRT Version 7.0.0.11
• NVIDIA GPU Driver Version (valid for GPU only) 460
• Issue Type( questions, new requirements, bugs) Question

I am trying to run deepstream_imagedata-multistream.py added to it messagebroker, I modified the pipeline into this:

pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(filter1)
pipeline.add(nvvidconv1)
pipeline.add(nvosd)
pipeline.add(tee)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(msgconv)
pipeline.add(msgbroker)
pipeline.add(sink)
if is_aarch64():
    pipeline.add(transform)

And the linking like this:

streammux.link(pgie)
pgie.link(tracker)
tracker.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(tee)
queue1.link(msgconv)
msgconv.link(msgbroker)
if is_aarch64():
    queue2.link(transform)
    transform.link(sink)
else:
    queue2.link(sink)

And added the needed configurations to run using Kafka, I set the consumer to my host, and added the needed functions from deepstream-test4 to be able to send messages.

This is the logic for sending message added in the tiler_sink_pad_buffer_probe function inside the l_obj loop:

if frame_number % 30 == 0:
    # Frequency of messages to be send will be based on use case.
    # Here message is being sent for first object every 30 frames.

    # Allocating an NvDsEventMsgMeta instance and getting reference
    # to it. The underlying memory is not manged by Python so that
    # downstream plugins can access it. Otherwise the garbage collector
    # will free it when this probe exits.
    msg_meta = pyds.alloc_nvds_event_msg_meta()
    msg_meta.bbox.top = obj_meta.rect_params.top
    msg_meta.bbox.left = obj_meta.rect_params.left
    msg_meta.bbox.width = obj_meta.rect_params.width
    msg_meta.bbox.height = obj_meta.rect_params.height
    msg_meta.frameId = frame_number
    msg_meta.trackingId = long_to_int(obj_meta.object_id)
    # msg_meta.confidence = obj_meta.confidence
    msg_meta = generate_event_msg_meta(msg_meta, obj_meta.class_id, stream_id)
    user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
    if user_event_meta:
        user_event_meta.user_meta_data = msg_meta
        user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
        # Setting callbacks in the event msg meta. The bindings layer
        # will wrap these callables in C functions. Currently only one
        # set of callbacks is supported.
        pyds.set_user_copyfunc(user_event_meta, meta_copy_func)
        pyds.set_user_releasefunc(user_event_meta, meta_free_func)
        pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
    else:
        print("Error in attaching event meta to buffer\n")

This works on one stream, the question is; How do i make this run on different sources/streams ? , I checked the C++ version for handling multi-streams and message broker but i didn’t manage to find equivalent functions in the python version.

Any help is appreciated,
Thanks.

Hi @MinaAbdElMassih ,

Please refer to https://github.com/NVIDIA-AI-IOT/deepstream_python_apps/tree/master/apps/deepstream-test3 ,

as mentioned in Python Sample Apps and Bindings Source Details — DeepStream 6.3 Release documentation

Thank you for your reply @mchi ,

What i meant was that i want to run the message broker on different streams, as in have multiple streams running and each stream has its own message broker, which is already done in the pipeline i showed above, my problem is that i only receive messages from the first source only and not from the rest of the sources.

I already can run on multiple streams, the problem is with the message broker only sending messages for the first source to my local host and the rest of the sources and their message brokers are not sending any messages.

Hi,
We support sending message to multi brokers, here some notice about it,
sources/apps/sample_apps/deepstream-test5/README 8. Multiple broker sinks
if this still can not solve your issue, please share more info.