Using the Native App in Deepstream 7.1, Sink type=6 will not publish messages/detections using message broker Kafka

My system has the following specs:
GPU: GTX 1660ti
RAM: 32 GB
OS: Windows 11
Environment: Ubuntu 24.04.1 LTS (Through WSL)
Deepstream Version: 7.1
Nvidia-Driver Version: 566.03 | CUDA Version: 12.7
Message-Broker: Kafka

I have set up the native application with the stock ultralytics yolov8s model and tested the application by visualizing the results using sink type=2 and printing results using the on-screen display Element (OSD).
The model works and performs well, and I see no issues.

Now, when I add an additional sink with type=6 (which is used for publishing messages using message brokers), the application still runs fine but Publishes NO results to my message broker.

I have verified that this is an issue with deep stream, since the sample test app 4 works just fine and will publish to Kafka: (deepstream_python_apps/apps/deepstream-test4/README at master · NVIDIA-AI-IOT/deepstream_python_apps · GitHub)

Here is my complete native app breakdown:

[application]
enable-perf-measurement=1
perf-measurement-interval-sec=5

[tiled-display]
enable=1
rows=1
columns=1
width=1280
height=720
gpu-id=0
nvbuf-memory-type=0

[source0]
enable=1
type=3
uri=file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4
num-sources=1
gpu-id=0
cudadec-memtype=0

[sink0]
enable=1
type=2
sync=0
gpu-id=0
nvbuf-memory-type=0

[sink1]
enable=1
type=6
sync=0
gpu-id=0
new-api=0
nvbuf-memory-type=0
msg-conv-config=/custom_workspace/native_app/cfg_conv.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so
msg-broker-conn-str=kafka;29092;deepstream_native_app
msg-broker-config=/custom_workspace/native_app/cfg_kafka.txt
msg-broker-topic=deepstream_native_app

[osd]
enable=1
gpu-id=0
border-width=5
text-size=15
text-color=1;1;1;1;
text-bg-color=0.3;0.3;0.3;1
font=Serif
show-clock=0
clock-x-offset=800
clock-y-offset=820
clock-text-size=12
clock-color=1;0;0;0
nvbuf-memory-type=0

[streammux]
gpu-id=0
live-source=0
batch-size=1
batched-push-timeout=40000
width=1920
height=1080
enable-padding=0
nvbuf-memory-type=0

[primary-gie]
enable=1
gpu-id=0
gie-unique-id=1
nvbuf-memory-type=0
config-file=/custom_workspace/native_app/config_infer_primary_yoloV8.txt

[tracker]
enable=1
tracker-width=640
tracker-height=384
ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
ll-config-file=/custom_workspace/native_app/config_tracker_NvDCF_perf.yml
gpu-id=0
enable-batch-process=1
enable-past-frame=1
display-tracking-id=1

[tests]
file-loop=0

I have verified that the paths mentioned in the above config are all accurate.
When I run the above configuration, the application works without producing any errors, but it does not publish results.

How can I get the application to publish results to Kafka?
I have tried using Redis in a similar way but without any luck.
Any help would be greatly appreciated.

which sample are testing? could you share the whole deepstream running log? if new-api is 0, NvDsEventMsgMeta needs to be created and set in probe function. you can add log to check if this step is done.

I run it using the following command:
deepstream-app -c deepstream_app_config.txt

Here are the complete logs:

(deepstream-app:289): GLib-CRITICAL **: 16:34:00.366: g_strchug: assertion 'string != NULL' failed

(deepstream-app:289): GLib-CRITICAL **: 16:34:00.366: g_strchomp: assertion 'string != NULL' failed
gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
[NvTrackerParams::getConfigRoot()] !!![WARNING] File doesn't exist. Will go ahead with default values
[NvMultiObjectTracker] Initialized
0:00:00.497654073   289 0x562cfeea7eb0 INFO                 nvinfer gstnvinfer.cpp:684:gst_nvinfer_logger:<primary_gie> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:2092> [UID = 1]: deserialized trt engine from :/custom_workspace/native_app/model_b1_gpu0_fp16.engine
Implicit layer support has been deprecated
INFO: ../nvdsinfer/nvdsinfer_model_builder.cpp:327 [Implicit Engine Info]: layers num: 0

0:00:00.497798311   289 0x562cfeea7eb0 INFO                 nvinfer gstnvinfer.cpp:684:gst_nvinfer_logger:<primary_gie> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2195> [UID = 1]: Use deserialized engine model: /custom_workspace/native_app/model_b1_gpu0_fp16.engine
0:00:00.503255866   289 0x562cfeea7eb0 INFO                 nvinfer gstnvinfer_impl.cpp:343:notifyLoadModelStatus:<primary_gie> [UID 1]: Load new model:/custom_workspace/native_app/config_infer_primary_yoloV8.txt sucessfully

Runtime commands:
        h: Print this help
        q: Quit

        p: Pause
        r: Resume

NOTE: To expand a source in the 2D tiled display and view object details, left-click on the source.
      To go back to the tiled display, right-click anywhere on the window.

** INFO: <bus_callback:291>: Pipeline ready

WARNING from src_elem: No decoder available for type 'audio/mpeg, mpegversion=(int)4, framed=(boolean)true, stream-format=(string)raw, level=(string)2, base-profile=(string)lc, profile=(string)lc, codec_data=(buffer)119056e500, rate=(int)48000, channels=(int)2'.
Debug info: ../gst/playback/gsturidecodebin.c(960): unknown_type_cb (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstURIDecodeBin:src_elem
Failed to query video capabilities: Inappropriate ioctl for device
** INFO: <bus_callback:277>: Pipeline running

mimetype is video/x-raw

**PERF:  FPS 0 (Avg)
**PERF:  116.26 (116.16)
**PERF:  118.22 (117.28)
nvstreammux: Successfully handled EOS for source_id=0
** INFO: <bus_callback:334>: Received EOS. Exiting ...

Quitting
[NvMultiObjectTracker] De-initialized
App run successful

Please note, the sample application that DID work with the message broker was this:
Deepstream python sample app

I am trying to push results/detections directly from the native app (deepstream-app) to Kafka via msg-broker.

please use \opt\nvidia\deepstream\deepstream\sources\apps\sample_apps\deepstream-test5, which supports sending kafka with new-api=0 mode. please refer to the cfg \opt\nvidia\deepstream\deepstream\sources\apps\sample_apps\deepstream-test5\configs\test5_config_file_src_infer.yml which include type=6.

I have gone through the deepstream-test5 configuration and README file, which highlights the steps required to use a message broker from the native app. My setup is according to the requirements mentioned in the deepstream-test5 app, as you can see below:

[application]
enable-perf-measurement=1
perf-measurement-interval-sec=5

[tiled-display]
enable=1
rows=1
columns=1
width=1280
height=720
gpu-id=0
nvbuf-memory-type=0

[source0]
enable=1
type=3
uri=file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4
num-sources=1
gpu-id=0
cudadec-memtype=0

[sink0]
enable=1
type=2
sync=0
gpu-id=0
nvbuf-memory-type=0

[sink1]
enable=1
type=6
sync=0
gpu-id=0
new-api=0
nvbuf-memory-type=0
msg-conv-config=/custom_workspace/native_app/cfg_conv.txt
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so
msg-broker-conn-str=kafka;29092;deepstream_native_app
msg-broker-config=/custom_workspace/native_app/cfg_kafka.txt
topic=deepstream_native_app
new-api=0

[osd]
enable=1
gpu-id=0
border-width=5
text-size=15
text-color=1;1;1;1;
text-bg-color=0.3;0.3;0.3;1
font=Serif
show-clock=0
clock-x-offset=800
clock-y-offset=820
clock-text-size=12
clock-color=1;0;0;0
nvbuf-memory-type=0

[streammux]
gpu-id=0
live-source=0
batch-size=1
batched-push-timeout=40000
width=1920
height=1080
enable-padding=0
nvbuf-memory-type=0

[primary-gie]
enable=1
gpu-id=0
gie-unique-id=1
nvbuf-memory-type=0
config-file=/custom_workspace/native_app/config_infer_primary_yoloV8.txt

[tracker]
enable=1
tracker-width=640
tracker-height=384
ll-lib-file=/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so
ll-config-file=/custom_workspace/native_app/config_tracker_NvDCF_perf.yml
gpu-id=0
display-tracking-id=1

[tests]
file-loop=0

The problem still remains however that the message broker will not publish/push anything to Kafka. It does not give me any errors when I run it. I have verified that by removing sink0 and only using sink1. I have also added the new-api=0 parameter in my config and it is still not publishing results :(

  1. could you share the whole test5 running log? if using eglsink or filesink, can you see the bboxes on the video?
  2. if new-api=0, the workflow is: test5 set value to NvDsEventMsgMeta, then nvmsgconv convert data in NvDsEventMsgMeta to Json string, then nvmsg sends the string to broker.
    first please add logs after generate_event_msg_meta of deepstream_test5_app_main.c to check if NvDsEventMsgMeta is added.

Thank you @fanzh for supporting us in this ticket, let me ask you this:
Is it possible to use the native app of deepstream (as described in this post by @usmanmalik291 ) to track, detect objects and send results / messages through Kafka without using any custom python code? my guess is that it should work but we are missing something.
@usmanmalik291 waiting for u to send logs.

please refer to this read-made python sample deepstream-test4. you don’t need to modify the code. Here is a command:

python3 deepstream_test_4.py -i /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264 -p /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so   --conn-str="localhost;9092" -t deepstream -s 0 --no-display

I already have a python application setup, where I can send the data using kafka but it’s very slow (1/4th Speed) when performing inference in comparison to the native application (deepstream-app) where I pass my config.
I understand that python can be used to set up the elements using pyds (bindings) similarly to the native app, but I want to send the detections without diving into python or c.

Just using the native deepstream-app with the message-broker configured.

Please note I have modified my app to match the parameters of test5. I am not running test5 itself. The config of my application is already attatched above.

When using the type=2 sink (EGL) I was able to get bounding boxes plotted on the video. Here are the complete logs:

[NvTrackerParams::getConfigRoot()] !!![WARNING] File doesn't exist. Will go ahead with default values
[NvMultiObjectTracker] Initialized
0:00:00.411527819   501 0x5557e0f4f890 INFO                 nvinfer gstnvinfer.cpp:684:gst_nvinfer_logger:<primary_gie> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:2092> [UID = 1]: deserialized trt engine from :/custom_workspace/native_app/model_b1_gpu0_fp16.engine
Implicit layer support has been deprecated
INFO: ../nvdsinfer/nvdsinfer_model_builder.cpp:327 [Implicit Engine Info]: layers num: 0

0:00:00.411685896   501 0x5557e0f4f890 INFO                 nvinfer gstnvinfer.cpp:684:gst_nvinfer_logger:<primary_gie> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2195> [UID = 1]: Use deserialized engine model: /custom_workspace/native_app/model_b1_gpu0_fp16.engine
0:00:00.417994106   501 0x5557e0f4f890 INFO                 nvinfer gstnvinfer_impl.cpp:343:notifyLoadModelStatus:<primary_gie> [UID 1]: Load new model:/custom_workspace/native_app/config_infer_primary_yoloV8.txt sucessfully

Runtime commands:
        h: Print this help
        q: Quit

        p: Pause
        r: Resume

NOTE: To expand a source in the 2D tiled display and view object details, left-click on the source.
      To go back to the tiled display, right-click anywhere on the window.

** INFO: <bus_callback:291>: Pipeline ready

WARNING from src_elem: No decoder available for type 'audio/mpeg, mpegversion=(int)4, framed=(boolean)true, stream-format=(string)raw, level=(string)2, base-profile=(string)lc, profile=(string)lc, codec_data=(buffer)119056e500, rate=(int)48000, channels=(int)2'.
Debug info: ../gst/playback/gsturidecodebin.c(960): unknown_type_cb (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstURIDecodeBin:src_elem
Failed to query video capabilities: Inappropriate ioctl for device
** INFO: <bus_callback:277>: Pipeline running

mimetype is video/x-raw

**PERF:  FPS 0 (Avg)
**PERF:  121.73 (121.16)
**PERF:  121.92 (121.83)
nvstreammux: Successfully handled EOS for source_id=0
** INFO: <bus_callback:334>: Received EOS. Exiting ...

Quitting
[NvMultiObjectTracker] De-initialized
App run successful```

please refer to my comment on Nov 29. did you call generate_event_msg_meta to set value to NvDsEventMsgMeta, and add user_event_meta with type NVDS_EVENT_MSG_META. if yes, you can add logs to confirm these codes are called. low-level code will use NvDsEventMsgMeta to generate Json string.

I think there seems to be a misunderstanding, I am not trying to run the code present at
/opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test5 because it involves diving into c level code.
I am not setting values in the deepstream_test5_app_main.c file because I am not using it to launch the application.

I am, however, referring to the following application to implement the message broker correctly: /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test5/configs/test5_config_file_src_infer.txt

So let me give you a complete breakdown of my application:

I have the following files:
→ cfg_conv.txt
→ cfg_kafka.txt
→ config_infer_primary_yoloV8.txt
→ config_tracker_NvDCF_accuracy.txt
→ config_tracker_NvDCF_accuracy.yml
→ deepstream_app_config.txt
→ labels.txt
→ model_b1_gpu0_fp16.engine

I am running the application through the following command:
deepstream-app -c deepstream_app_config.txt

My objective is to set up this application to publish results to Kafka without involving python or c at this stage. I just want to perform inference and publish the results further using the message broker for later processing.

Please note: The model runs fine without any issues and shows detection and tracking results. Kafka is set up properly and receives messages from other sources just fine, too. I have verified all these components. Just struggling with publishing the results to Kafka with the setup that I just mentioned.

I have attached the relevant files being used to this message as well:
cfg_conv.txt (2.0 KB)
cfg_kafka.txt (42 Bytes)
deepstream_app_config.txt (2.3 KB)

Please let me know if what I am trying to achieve is possible to set up. If so, what exactly am I missing?
I really appreciate the help

As I said on Nov 28, if new-api=0, deepstream-app does not support sending broker. the app layer needs to add user_event_meta with type NVDS_EVENT_MSG_META. deepstream-app does not have this logics. please try deepstream-test5-app -c deepstream_app_config.txt

I have set up everything accordingly and tested it out using deepstream-test5-app. Configured my application to use ‘new-api’ and the messages are now Successfully being published to Kafka!
Thank-you so much!

I have a couple of follow-up questions if regarding the structure of the message that is being sent:

  1. The structure of the message is defined in the file msg-conv-config, which in my case is cfg_conv.txt. Is there any guide as to which elements are and are not supported in this file ?
  2. What would you recommend is the best method to send the frame along with the detections? Should I create an additional sync and dump the frames somewhere to read out of later, or can they be included as part of the message I publish to kafka?
  3. Final question, for real time modification of the streams running (adding and creating additional sources) in the application without restarting the application, what is the best example to refer to for this type of setup? /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test5/test5_config_file_nvmultiurisrcbin_src_list_attr_all.txt ?
  1. this file will be parsed in opensource low-level nvds_msg2p_parse_key_value of \opt\nvidia\deepstream\deepstream\sources\libs\nvmsgconv\deepstream_schema\deepstream_schema.cpp, you can modify code to customize.
  2. if you want to send the frame, please refer to " 3.Send the image by the broker based on Kafka" in the README of deepstream-test4.
  3. yes, multiurisrcbin can support adding or removing source dynamically.

Can you please specify the best application to send the frames + metadata (bounding box + class + tracking ID)?

Currently with the deepstream-test5, I have tried modifying the cfg_conv.txt file to adjust the metadata that is being extracted but it seems to have no effect on the messages that are being posted. By default, it seems to send detections only when an event is triggered. The desired behaviour for me is to extract the (frames + metadata) for all frames and streams and publish them to Kafka.

Could you please outline the steps to achieve this.

if you want to add custom data, please refer to topic1 and topic2.