Deepstream_parappel_app issue with kafka messaging

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU): GPU
• DeepStream Version: 7.0
• JetPack Version (valid for Jetson only)
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only): 12.0
• Issue Type( questions, new requirements, bugs): bugs
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

I am using deepstream_parallel_app from the repo:

NVIDIA-AI-IOT/deepstream_parallel_inference_app: A project demonstrating how to use nvmetamux to run multiple models in parallel. (github.com)

I have created a custom pipline which has two sources and there are two models as primary-gie0 and primary-gie1, primary-gie0 will do inference on source0 and primary-gie1 will do inference on source1,

after that I have created one sink for tiled video saving,

and then defined two separate sink to send messages via kafka using different topic,

what is happpening on the kafka consumer side is that I am getting the messages for source id0 only not source id 1,

I am running two separate python based consumers for each topic,

Below are the relevant config files:

main_config_file:

# SPDX-FileCopyrightText: Copyright (c) <2022> NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: MIT
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT. IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
# The values in the config file are overridden by values set through GObject
# properties.

#branch1 yolo
#brach2  bodypose
#
#
#
application:
  enable-perf-measurement: 1
  perf-measurement-interval-sec: 5
  ##gie-kitti-output-dir=streamscl

tiled-display:
  enable: 1
  rows: 1
  columns: 2
  width: 3840
  height: 1080
  gpu-id: 0
  #(0): nvbuf-mem-default - Default memory allocated, specific to particular platform
  #(1): nvbuf-mem-cuda-pinned - Allocate Pinned/Host cuda memory, applicable for Tesla
  #(2): nvbuf-mem-cuda-device - Allocate Device cuda memory, applicable for Tesla
  #(3): nvbuf-mem-cuda-unified - Allocate Unified cuda memory, applicable for Tesla
  #(4): nvbuf-mem-surface-array - Allocate Surface Array memory, applicable for Jetson
  nvbuf-memory-type: 0
  #which source should be showed, -1 means showing all.
  # show-source: 2

source:
  csv-file-path: custom_source.csv
  #csv-file-path: sources_4_different_source.csv
  #csv-file-path: sources_4_rtsp.csv

sink0:
  enable: 1
  #Type - 1=FakeSink 2=EglSink 3=File 7=nv3dsink (Jetson only)
  type: 1
  sync: 1
  source-id: 0
  gpu-id: 0
  nvbuf-memory-type: 0

sink1:
  enable: 1
  type: 3
  #1=mp4 2=mkv
  container: 1
  #1=h264 2=h265 3=mpeg4
  ## only SW mpeg4 is supported right now.
  codec: 2
  sync: 1
  bitrate: 2000000
  # output-file:  /out_videos/nvds_analytics_roi_final_output12.mp4
  output-file:  /out_videos/custom_parallel_app11.mp4
  source-id: 0

sink2:
  enable: 0
  #Type - 1=FakeSink 2=EglSink 3=File 4=RTSPStreaming
  type: 4
  #1=h264 2=h265
  codec: 1
  #encoder type 0=Hardware 1=Software
  enc-type: 1
  sync: 0
  bitrate: 4000000
  #H264 Profile - 0=Baseline 2=Main 4=High
  #H265 Profile - 0=Main 1=Main10
  profile: 0
  # set below properties in case of RTSPStreaming
  rtsp-port: 8554
  udp-port: 5400

sink3:
  enable: 1
  type: 6
  msg-conv-config: dstest5_msgconv_sample_config.yml
  msg-conv-msg2p-new-api: 1
  msg-conv-payload-type: 1
  msg-conv-frame-interval: 1
  msg-broker-proto-lib: /opt/nvidia/deepstream/deepstream-7.0/lib/libnvds_kafka_proto.so
  msg-broker-conn-str: localhost;9092
  topic: quickstart-events
  source-id: 0
  # disable-msgconv :  1
 
sink4:
  enable: 1
  type: 6
  msg-conv-config: dstest5_msgconv_sample_config.yml
  msg-conv-msg2p-new-api: 1
  msg-conv-payload-type: 1
  msg-conv-frame-interval: 1
  msg-broker-proto-lib: /opt/nvidia/deepstream/deepstream-7.0/lib/libnvds_kafka_proto.so
  msg-broker-conn-str: localhost;9092
  topic: quickstart-events_test1
  source-id: 1
  # disable-msgconv :  1


osd:
  enable: 1
  gpu-id: 0
  border-width: 1
  text-size: 15
  #value changed
  text-color: 1;1;1;1
  text-bg-color: 0.3;0.3;0.3;1
  font: Serif
  show-clock: 0
  clock-x-offset: 800
  clock-y-offset: 820
  clock-text-size: 12
  clock-color: 1;0;0;0
  nvbuf-memory-type: 0

streammux:
  gpu-id: 0
  ##Boolean property to inform muxer that sources are live
  live-source: 0
  buffer-pool-size: 2
  batch-size: 2
  ##time out in usec, to wait after the first buffer is available
  ##to push the batch even if the complete batch is not formed
  batched-push-timeout: 400000
  ## Set muxer output width and height
  width: 1920
  height: 1080
  ##Enable to maintain aspect ratio wrt source, and allow black borders, works
  ##along with width, height properties
  enable-padding: 0
  nvbuf-memory-type: 0

primary-gie0:
  enable: 1
  gpu-id: 0
  batch-size: 1
  # input-tensor-meta: 1      # keep 0 for full frame inference
  ## 0=FP32, 1=INT8, 2=FP16 mode
  bbox-border-color0: 1;0;0;1
  bbox-border-color1: 0;1;1;1
  bbox-border-color2: 0;1;1;1
  bbox-border-color3: 0;1;0;1
  gie-unique-id: 1
  nvbuf-memory-type: 0
  interval: 0
  config-file: pgie_peoplenet_tao_config.txt
  #infer-raw-output-dir: ../../../../../samples/primary_detector_raw_output/

branch0:
  ## pgie's id
  pgie-id: 1
  ## select sources by sourceid
  src-ids: 0

tracker0:
  enable: 0
  cfg-file-path: tracker0.yml
  

primary-gie1:
  enable: 1
  #(0): nvinfer; (1): nvinferserver
  plugin-type: 0
  gpu-id: 0
  #input-tensor-meta: 1
  batch-size: 1
  #Required by the app for OSD, not a plugin property
  bbox-border-color0: 1;0;0;1
  bbox-border-color1: 0;1;1;1
  bbox-border-color2: 0;0;1;1
  bbox-border-color3: 0;1;0;1
  interval: 0
  gie-unique-id: 2
  nvbuf-memory-type: 0
  #config-file: ../../bodypose2d/config_body2_inferserver.txt
  config-file: ../../bodypose2d/config_body2_infer.txt

branch1:
  ## pgie's id
  pgie-id: 2
  ## select sources by sourceid
  src-ids: 1

tracker1:
  enable: 0      
  cfg-file-path: tracker1.yml

meta-mux:
  enable: 1
  config-file: ../../metamux/config_metamux0.txt


tests:
  file-loop: 0

metamux:

################################################################################
# Copyright (c) 2022, NVIDIA CORPORATION. All rights reserved.
#
# Permission is hereby granted, free of charge, to any person obtaining a
# copy of this software and associated documentation files (the "Software"),
# to deal in the Software without restriction, including without limitation
# the rights to use, copy, modify, merge, publish, distribute, sublicense,
# and/or sell copies of the Software, and to permit persons to whom the
# Software is furnished to do so, subject to the following conditions:
#
# The above copyright notice and this permission notice shall be included in
# all copies or substantial portions of the Software.
#
# THE SOFTWARE IS PROVIDED "AS IS", WITHOUT WARRANTY OF ANY KIND, EXPRESS OR
# IMPLIED, INCLUDING BUT NOT LIMITED TO THE WARRANTIES OF MERCHANTABILITY,
# FITNESS FOR A PARTICULAR PURPOSE AND NONINFRINGEMENT.  IN NO EVENT SHALL
# THE AUTHORS OR COPYRIGHT HOLDERS BE LIABLE FOR ANY CLAIM, DAMAGES OR OTHER
# LIABILITY, WHETHER IN AN ACTION OF CONTRACT, TORT OR OTHERWISE, ARISING
# FROM, OUT OF OR IN CONNECTION WITH THE SOFTWARE OR THE USE OR OTHER
# DEALINGS IN THE SOFTWARE.
################################################################################

# The values in the config file are overridden by values set through GObject
# properties.

[property]
enable=1
# sink pad name which data will be pass to src pad.
active-pad=sink_0
# default pts-tolerance is 60 ms.
pts-tolerance=60000

[user-configs]

[group-0]
# src-ids-model-<model unique ID>=<source ids>
# mux all source if don't set it.
src-ids-model-1=0;1
src-ids-model-2=1;2
src-ids-model-3=1;2

Please suggest.

please refer to this topic for sending multiple topics.

Okay, Thanks for the reply, I will try to follow the solution you suggested,

meanwhile I have another query

The messages I am getting is something like:

{‘version’: ‘4.0’, ‘id’: ‘0’, ‘@timestamp’: ‘2024-07-25T13:49:23.385Z’, ‘sensorId’: ‘HWY_20_AND_LOCUST__EBA__4_11_2018_4_59_59_508_AM_UTC-07_00’, ‘objects’: [‘21|0|0|0|0|person’, ‘2|0|0|0|0|person’, ‘22|0|0|0|0|person’, ‘14|0|0|0|0|person’, ‘5|0|0|0|0|person’, ‘1|0|0|0|0|person’]}
{‘version’: ‘4.0’, ‘id’: ‘0’, ‘@timestamp’: ‘2024-07-25T13:49:23.424Z’, ‘sensorId’: ‘HWY_20_AND_LOCUST__EBA__4_11_2018_4_59_59_508_AM_UTC-07_00’, ‘objects’: [‘21|0|0|0|0|person’, ‘2|0|0|0|0|person’, ‘22|0|0|0|0|person’, ‘14|0|0|0|0|person’, ‘5|0|0|0|0|person’, ‘1|0|0|0|0|person’]}
{‘version’: ‘4.0’, ‘id’: ‘0’, ‘@timestamp’: ‘2024-07-25T13:49:23.464Z’, ‘sensorId’: ‘HWY_20_AND_LOCUST__EBA__4_11_2018_4_59_59_508_AM_UTC-07_00’, ‘objects’: [‘21|0|0|0|0|person’, ‘2|0|0|0|0|person’, ‘22|0|0|0|0|person’, ‘14|0|0|0|0|person’, ‘5|0|0|0|0|person’, ‘1|0|0|0|0|person’]}

So basically I am not getting the frame Id( It is always zero) for each frame, and I am getting the track Id but not the box coordinates., all box coordinates are zero,

Not able to figure out why

About “all box coordinates are zero”, please refer to the comment on Feb 17 '23 in this topic.

This solution is based if we have to segregate message per class, but I want to segregate message per branch (in parallel Pipeline), Can you please suggest something,

Cant we just create two different sinks( Type 6) for two differnet branch and specify source ID, I tried this but its not working, I am receiving messages for first source(refer sink3 in the config) only not the second source(refer sink4 in the config),

I am wondering why sink3 is working properly but not sink4 in the above mentioned config,

please suggest.

there are two issues. one is sending multiple topics. the other one is “all box coordinates are zero”. first let’s focus on the first one.
please set different msg-broker-comp-id for [sink3] and [sink4]. if still can’t work, because nvmsgconv and nvmsgbroker plugin are opensource. you can add log to debug. for example, you can add payload->payload in legacy_gst_nvmsgbroker_render of \opt\nvidia\deepstream\deepstream-7.0\sources\gst-plugins\gst-nvmsgbroker\gstnvmsgbroker.cpp to check if the messages are generated.

I tried adding different msg-broker-comp-id for each sink still not working, but when I removed the source id for the sink, I am getting same message from both the sink, i.e both sinks are working emmiting the same message.

By carefully analyzing the message, I think I am getting the accumulated message from both the sources,

Below is the Sample message,

{‘version’: ‘4.0’, ‘id’: ‘0’, ‘@timestamp’: ‘2024-07-26T18:41:40.781Z’, ‘sensorId’: ‘HWY_20_AND_LOCUST__EBA__4_11_2018_4_59_59_508_AM_UTC-07_00’, ‘objects’: [‘21|0|0|0|0|person’, ‘2|0|0|0|0|person’, ‘22|0|0|0|0|person’, ‘14|0|0|0|0|person’, ‘5|0|0|0|0|person’, ‘1|0|0|0|0|person’, ‘21|0|0|0|0|person’, ‘2|0|0|0|0|person’, ‘22|0|0|0|0|person’, ‘14|0|0|0|0|person’, ‘5|0|0|0|0|person’, ‘1|0|0|0|0|person’]}

And I have two branches in the pipeline, both has tracker enabled on the same source, I have pasted above the final message, It has all the track IDs repeated indicating that the message for both Branches are within this same message.

Below is the snap of last frame:

Below is the updated main config file:

main_config.txt

application:
  enable-perf-measurement: 1
  perf-measurement-interval-sec: 5
  ##gie-kitti-output-dir=streamscl
 
tiled-display:
  enable: 1
  rows: 1
  columns: 2
  width: 3840
  height: 1080
  gpu-id: 0
  #(0): nvbuf-mem-default - Default memory allocated, specific to particular platform
  #(1): nvbuf-mem-cuda-pinned - Allocate Pinned/Host cuda memory, applicable for Tesla
  #(2): nvbuf-mem-cuda-device - Allocate Device cuda memory, applicable for Tesla
  #(3): nvbuf-mem-cuda-unified - Allocate Unified cuda memory, applicable for Tesla
  #(4): nvbuf-mem-surface-array - Allocate Surface Array memory, applicable for Jetson
  nvbuf-memory-type: 0
  #which source should be showed, -1 means showing all.
  # show-source: 2
 
source:
  csv-file-path: custom_source.csv
  #csv-file-path: sources_4_different_source.csv
  #csv-file-path: sources_4_rtsp.csv
 
sink0:
  enable: 1
  #Type - 1=FakeSink 2=EglSink 3=File 7=nv3dsink (Jetson only)
  type: 1
  sync: 1
  source-id: 0
  gpu-id: 0
  nvbuf-memory-type: 0
 
sink1:
  enable: 1
  type: 3
  #1=mp4 2=mkv
  container: 1
  #1=h264 2=h265 3=mpeg4
  ## only SW mpeg4 is supported right now.
  codec: 2
  sync: 1
  bitrate: 2000000
  # output-file:  /out_videos/nvds_analytics_roi_final_output12.mp4
  output-file:  /out_videos/custom_parallel_app51.mp4
  source-id: 0
 
sink2:
  enable: 0
  #Type - 1=FakeSink 2=EglSink 3=File 4=RTSPStreaming
  type: 4
  #1=h264 2=h265
  codec: 1
  #encoder type 0=Hardware 1=Software
  enc-type: 1
  sync: 0
  bitrate: 4000000
  #H264 Profile - 0=Baseline 2=Main 4=High
  #H265 Profile - 0=Main 1=Main10
  profile: 0
  # set below properties in case of RTSPStreaming
  rtsp-port: 8554
  udp-port: 5400
 
sink3:
  enable: 1
  type: 6
  msg-conv-config: dstest5_msgconv_sample_config.yml
  msg-conv-msg2p-new-api: 1
  msg-conv-payload-type: 1
  msg-conv-frame-interval: 1
  msg-broker-proto-lib: /opt/nvidia/deepstream/deepstream-7.0/lib/libnvds_kafka_proto.so
  msg-broker-conn-str: localhost;9092
  topic: quickstart-events
  msg-broker-comp-id: 0
  # source-id: 0
  # disable-msgconv :  1
 
sink4:
  enable: 1
  type: 6
  msg-conv-config: dstest5_msgconv_sample_config.yml
  msg-conv-msg2p-new-api: 1
  msg-conv-payload-type: 1
  msg-conv-frame-interval: 1
  msg-broker-proto-lib: /opt/nvidia/deepstream/deepstream-7.0/lib/libnvds_kafka_proto.so
  msg-broker-conn-str: localhost;9092
  topic: quickstart-events1
  msg-broker-comp-id: 1
  # source-id: 1
  # disable-msgconv :  1
  
# message-converter:
#   enable: 1
#   msg-conv-config: dstest5_msgconv_sample_config.yml
#   #(0): PAYLOAD_DEEPSTREAM - Deepstream schema payload
#   #(1): PAYLOAD_DEEPSTREAM_MINIMAL - Deepstream schema payload minimal
#   #(256): PAYLOAD_RESERVED - Reserved type
#   #(257): PAYLOAD_CUSTOM   - Custom schema payload
#   msg-conv-payload-type: 0
#   # Name of library having custom implementation.
#   msg-conv-msg2p-lib: /opt/nvidia/deepstream/deepstream-7.0/lib/libnvds_msgconv.so
#   # Id of component in case only selected message to parse.
#   #msg-conv-comp-id: <val>
 
osd:
  enable: 1
  gpu-id: 0
  border-width: 1
  text-size: 15
  #value changed
  text-color: 1;1;1;1
  text-bg-color: 0.3;0.3;0.3;1
  font: Serif
  show-clock: 0
  clock-x-offset: 800
  clock-y-offset: 820
  clock-text-size: 12
  clock-color: 1;0;0;0
  nvbuf-memory-type: 0
 
streammux:
  gpu-id: 0
  ##Boolean property to inform muxer that sources are live
  live-source: 0
  buffer-pool-size: 2
  batch-size: 2
  ##time out in usec, to wait after the first buffer is available
  ##to push the batch even if the complete batch is not formed
  batched-push-timeout: 400000
  ## Set muxer output width and height
  width: 1920
  height: 1080
  ##Enable to maintain aspect ratio wrt source, and allow black borders, works
  ##along with width, height properties
  enable-padding: 0
  nvbuf-memory-type: 0
 
primary-gie0:
  enable: 1
  gpu-id: 0
  batch-size: 1
  # input-tensor-meta: 1      # keep 0 for full frame inference
  ## 0=FP32, 1=INT8, 2=FP16 mode
  bbox-border-color0: 1;0;0;1
  bbox-border-color1: 0;1;1;1
  bbox-border-color2: 0;1;1;1
  bbox-border-color3: 0;1;0;1
  gie-unique-id: 1
  nvbuf-memory-type: 0
  interval: 0
  config-file: pgie_peoplenet_tao_config.txt
  #infer-raw-output-dir: ../../../../../samples/primary_detector_raw_output/
 
branch0:
  ## pgie's id
  pgie-id: 1
  ## select sources by sourceid
  src-ids: 0
 
tracker0:
  enable: 1
  cfg-file-path: tracker0.yml
  
 
primary-gie1:
  enable: 1
  #(0): nvinfer; (1): nvinferserver
  plugin-type: 0
  gpu-id: 0
  #input-tensor-meta: 1
  batch-size: 1
  #Required by the app for OSD, not a plugin property
  bbox-border-color0: 1;0;0;1
  bbox-border-color1: 0;1;1;1
  bbox-border-color2: 0;0;1;1
  bbox-border-color3: 0;1;0;1
  interval: 0
  gie-unique-id: 2
  nvbuf-memory-type: 0
  #config-file: ../../bodypose2d/config_body2_inferserver.txt
  config-file: pgie_peoplenet_tao_config.txt
 
branch1:
  ## pgie's id
  pgie-id: 2
  ## select sources by sourceid
  src-ids: 1
 
tracker1:
  enable: 1      
  cfg-file-path: tracker1.yml
 
meta-mux:
  enable: 0
  config-file: ../../metamux/config_metamux0.txt
 
 
tests:
  file-loop: 0

I want the message of first branch in sink3 and another branch in sink 4, basically on the consumer end, I want to segregate the message branch wise.

I dont want the messages from both branch in single payload, I want different payloads for differnet branch and use differnt kafka sinks with differnet topics maybe,

Another way maybe if possible, If the messages are in the same payload, it would help if we know which portion of message belongs to which branch or source, so that it can be segregated in the consumer end.

There is no update from you for a period, assuming this is not an issue any more. Hence we are closing this topic. If need further support, please open a new one. Thanks.

  1. from the screenshot, why does the two picture have the sample trackid? can you reproduce this issue by the untouched code?
  2. about “I want the message of first branch in sink3 and another branch in sink 4”. here are some solutions.
    solution1: nvmsgconv plugin is opensource. if msg-conv-msg2p-new-api is 1, you can modify the nvmsgconv plugin code to customize. you can make nvmsgconv with msg-broker-comp-id: 0 only process the frame which is belonged to the first branch. especially you need to rebuild and replace the plugin after modifying the code.
    solution2: you can use msg-conv-msg2p-new-api =1, then you need to do generate_event_msg_meta. you can only add event msg meta of the first branch for sink3. pleas refer to topic in my first commnet.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.