Sending Frames + MetaData (detections + classes + tracking IDs) to Kafka for each stream running

Please provide complete information as applicable to your setup.

• GPU: GTX 1660ti
• Deepstream Version: 7.1
• Nvidia-Driver Version: 566.14 | CUDA Version: 12.7
• Issue Type: Modifying the output message structure of deepstream-test-app-5

Hello,
Currently I have the deepstream-test-app-5 (Path: /opt/nvidia/deepstream/deepstream-7.1/sources/apps/sample_apps/deepstream-test5) setup and running with my deepstream_config file to perform inference and send the results to kafka. My application succesfully connects and publishes messages to kafka.

The current issue I am facing is with the default message structure that I am receiving in kafka.
Right now, I am getting the following sample output:

{
	"messageid": "b201cefc-7f6b-4c93-9739-64579900399e",
	"mdsversion": "1.0",
	"@timestamp": "1970-01-01T00:00:00.000Z",
	"place": {
		"id": "0",
		"name": "HWY_20_AND_LOCUST__EBA",
		"type": "intersection/road",
		"location": {
			"lat": 30.32,
			"lon": -40.549999999999997,
			"alt": 100.0
		},
		"entrance": {
			"name": "C_127_158",
			"lane": "Lane 1",
			"level": "P1",
			"coordinate": {
				"x": 1.0,
				"y": 2.0,
				"z": 3.0
			}
		}
	},
	"sensor": {
		"id": "HWY_20_AND_LOCUST__EBA__4_11_2018_4_59_59_508_AM_UTC-07_00",
		"type": "Camera",
		"description": "Aisle Camera",
		"location": {
			"lat": 45.293701446999997,
			"lon": -75.830391449900006,
			"alt": 48.155747933800001
		},
		"coordinate": {
			"x": 5.2000000000000002,
			"y": 10.1,
			"z": 11.199999999999999
		}
	},
	"analyticsModule": {
		"id": "XYZ_1",
		"description": "Vehicle Detection and License Plate Recognition",
		"source": "OpenALR",
		"version": "1.0"
	},
	"object": {
		"id": "0",
		"speed": 0.0,
		"direction": 0.0,
		"orientation": 0.0,
		"backpack": {},
		"bbox": {
			"topleftx": 1,
			"toplefty": 480,
			"bottomrightx": 99,
			"bottomrighty": 668
		},
		"location": {
			"lat": 0.0,
			"lon": 0.0,
			"alt": 0.0
		},
		"coordinate": {
			"x": 0.0,
			"y": 0.0,
			"z": 0.0
		},
		"pose": {}
	},
	"event": {
		"id": "6c092e9e-5a98-474f-82b3-da28f624505c",
		"type": "entry"
	},
	"videoPath": ""
}

I would like to modify the application to receive the base 64 encoded frame, frame ID, detections (tracking ID, class label, confidence threshold, bbox [top_left_x, top_left_y, bottom_right_x, bottom_right_y])
A sample of this structure that I want to receive would look like:

{
    "frame_ID": "frame_001",
    "detections": [
        {
            "tracking_ID": "track_001",
            "class_label": "Vehicle",
            "confidence_threshold": 0.95,
            "bbox": {
                "topleftx": 1,
                "toplefty": 480,
                "bottomrightx": 99,
                "bottomrighty": 668
            }
        },
        {
            "tracking_ID": "track_002",
            "class_label": "Person",
            "confidence_threshold": 0.89,
            "bbox": {
                "topleftx": 120,
                "toplefty": 300,
                "bottomrightx": 200,
                "bottomrighty": 400
            }
        }
    ],
    "frame": "base64_encoded_frame_data"
}

I have attempted to make changes in the deepstream_test5_app_main.c file and recompile it to achieve the desired effect, but I have run into a lot of errors since I am not an adept C language developer.

Are there built in parameters in the deep stream config file that I can change/add to achieve this?
Is there any example of how to modify the code to achieve this output?

I am attaching my main files for reference:
cfg_conv.txt (2.0 KB)
cfg_kafka.txt (42 Bytes)
deepstream_app_config.txt (2.3 KB)

Please let me know if any additional information is needed.
Any help is highly appreciated.

This requires additional modification, but there is some similar code for reference

1.For base 64 encoded frame,
Use nvds_obj_enc_process function to encode frame to jpeg. then refer the following code to encode the jpeg into base64 and append it to frame_user_meta_list

These codes are in /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test4/deepstream_test4_app.c

NvDsObjEncOutParams *enc_jpeg_image =
                (NvDsObjEncOutParams *) usrMetaData->user_meta_data;
            START_PROFILE;
            encoded_data = g_base64_encode(enc_jpeg_image->outBuffer, enc_jpeg_image->outLen);
            generate_ts_rfc3339 (ts, MAX_TIME_STAMP_LEN);
            width = g_strdup_printf("%f", obj_meta->detector_bbox_info.org_bbox_coords.width);
            height = g_strdup_printf("%f", obj_meta->detector_bbox_info.org_bbox_coords.height);
            /* Image message fields are separated by ";".
             * Specific Format:  "image;image_format;image_widthximage_height;time;encoded data;"
             * For Example: "image;jpg;640x480;2023-07-31T10:20:13;xxxxxxxxxxx"
             */
            message_data = g_strconcat("image;jpg;", width, "x", height, ";", ts, ";", encoded_data, ";", NULL);
            STOP_PROFILE("Base64 Encode Time ");
            msg_custom_meta->size = strlen(message_data);
            msg_custom_meta->message = g_strdup(message_data);
            if (user_event_meta_custom) {
               user_event_meta_custom->user_meta_data = (void *) msg_custom_meta;
               user_event_meta_custom->base_meta.meta_type = NVDS_CUSTOM_MSG_BLOB;

2.Then modify generate_dsmeta_message in /opt/nvidia/deepstream/deepstream/sources/libs/nvmsgconv/deepstream_schema/dsmeta_payload.cpp and modify json to the structure you want

Thank you for responding to my query.

After I modify this file, I need to rebuilt deepstream-test5 and the changes will be reflected ?
Don’t I need to adjust the eventmsg_payload.cpp file as well?

I’ve read in this forum that I also need to rebuild the entire docker image and then include my custom libnvds_msgconv.so for changes to take effect.

Could you please specify in a bit more detail the files responsible for the message structure, which part of the solution I will have to rebuild and then run again?

Every native module you modify needs to be rebuilt. Typically, you will only need to modify nvmsgconv and deepstream-test5, but this depends on the implementation

This depends on the property value of msg2p-newapi, Please comparison of differences between nvds_msg2p_generate and nvds_msg2p_generate_new

It’s not necessary, just make CUDA_VER=12.6; make install

I have made changes in nvmsgconv and deepstream-test5 main file and rebuilt the application by placing the files back at the directory: /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test5 but so far I don’t see my changes reflected. Am I missing a step?

I don’t know what you did. Please add printf/std::cout to the native code to output logs to ensure that the changes are effective.

I’ve managed to recompile and the changes are taking effect. The problem I am dealing with now is extracting frames within deepstream-test5. I’ve gone through several forums and at most places it is recommended to extract the frames using the dsexample plugin
The only problem is the usage of opencv in the C application. I know opencv has implementations for C++ but the last known version that can be used with C is pretty old.
I am using the function get_converted_mat (GstDsExample * dsexample, NvBufSurface *input_buf, gint idx, NvOSD_RectParams * crop_rect_params, gdouble & ratio, gint input_width, gint input_height)

Can you please let me know if there is a better way to get each frame regardless of detections ?

Also regarding the function nvds_obj_enc_process , this function is saving frames, are there any parameters to use to return the encoded value of the frame?

Using the process nvds_obj_enc_process slows down the performance of the application massively and makes it crash. The FPS starts high but keeps on decreasing till it crashes.
Here is my code:
custom_test5.txt (67.7 KB) Uploaded with .txt extension because C is not permitted

nvds_obj_enc_process is hardware accelerated jpeg encoding and is the best choice for performance

Please refer to the usage of nvds_obj_enc_process in test4. frame_number in the probe function is used to count the frames.

/opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test4/deepstream_test4_app.c

Please observe that I have in fact followed that implementation from deepstream-test4 in my code.
The only difference in my case, in deepstream-test5, is that there is no pgie_src_pad_buffer_probe function implementation, rather the frame level implementation is in bbox_generated_probe_after_analytics. That is where I have placed the nvds_obj_enc_process and I am calling it the same way as it is being called in deepstream_test4.

Here is the function I have modified for reference:

bbox_generated_probe_after_analytics (AppCtx * appCtx, GstBuffer * buf,
    NvDsBatchMeta * batch_meta, guint index)
{
  NvDsObjectMeta *obj_meta = NULL;
  GstClockTime buffer_pts = 0;
  guint32 stream_id = 0;

  NvBufSurface *surface = NULL;

  GstMapInfo map_info = {0};
  memset(&map_info, 0, sizeof(map_info));
  gboolean ok = gst_buffer_map (buf, &map_info, GST_MAP_READ);
  if (!ok) {
    g_printerr("Failed to map buffer\n");
    return;
  }

  surface = (NvBufSurface *) map_info.data;

   
  // if (!gst_buffer_map (buf, &inmap, GST_MAP_READ)) {
  //   GST_ERROR ("input buffer mapinfo failed");
  //   return GST_PAD_PROBE_DROP;
  // }
  // NvBufSurface *ip_surf = (NvBufSurface *) inmap.data;
  // gst_buffer_unmap (buf, &inmap);

  GstMapInfo inmap = GST_MAP_INFO_INIT;
  if (!gst_buffer_map (buf, &inmap, GST_MAP_READ)) {
    GST_ERROR ("input buffer mapinfo failed");
    return GST_PAD_PROBE_DROP;
  }
  NvBufSurface *ip_surf = (NvBufSurface *) inmap.data;
  gst_buffer_unmap (buf, &inmap);

  for (NvDsMetaList * l_frame = batch_meta->frame_meta_list; l_frame != NULL;
      l_frame = l_frame->next) {
    NvDsFrameMeta *frame_meta = (NvDsFrameMeta *) l_frame->data;
    stream_id = frame_meta->source_id;
    GstClockTime buf_ntp_time = 0;


    
    // NvOSD_RectParams rect_params;
    // rect_params.left = 0;
    // rect_params.top = 0;
    // rect_params.width = 1920;
    // rect_params.height = 1080;

    // double scale_ratio = 1.0;

    // if (filter_frame(surface, i, &rect_params,
    //       scale_ratio, 1920, 1080,
    //       frame_meta) != GST_FLOW_OK) {
    //   goto error;
    // }


    if (playback_utc == FALSE) {
      /** Calculate the buffer-NTP-time
       * derived from this stream's RTCP Sender Report here:
       */
      StreamSourceInfo *src_stream = &testAppCtx->streams[stream_id];
      buf_ntp_time = frame_meta->ntp_timestamp;

      if (buf_ntp_time < src_stream->last_ntp_time) {
        GST_WARNING ("Source %d: NTP timestamps are backward in time."
          " Current: %lu previous: %lu \n",stream_id, buf_ntp_time, src_stream->last_ntp_time);
      }
      src_stream->last_ntp_time = buf_ntp_time;
    }

    GList *l;
    for (l = frame_meta->obj_meta_list; l != NULL; l = l->next) {
      /* Now using above information we need to form a text that should
       * be displayed on top of the bounding box, so lets form it here. */

      obj_meta = (NvDsObjectMeta *) (l->data);
      
      if(frame_count % 10 == 0) {
        NvDsObjEncUsrArgs frameData = { 0 };
        /* Preset */
        frameData.isFrame = 1;
        /* To be set by user */
        frameData.saveImg = 1;
        frameData.attachUsrMeta = 0;
        /* Set if Image scaling Required */
        frameData.scaleImg = 1;
        frameData.scaledWidth = 640;
        frameData.scaledHeight = 480;
        /* Quality */
        frameData.quality = 20;
        // frameData.fileNameImg = {'t','e','s','t','.','j','p','g'};
        /* Set to calculate time taken to encode JPG image. */
      // if (true) {
        frameData.calcEncodeTime = 0;
      // }
        /* Main Function Call */
        NvDsObjEncCtxHandle obj_ctx_handle = nvds_obj_enc_create_context (0);
        nvds_obj_enc_process (obj_ctx_handle, &frameData, ip_surf, NULL, frame_meta);
        nvds_obj_enc_finish (obj_ctx_handle);
      }
      frame_count++;

      // if (false)
      {
        /**
         * Enable only if this callback is after tiler
         * NOTE: Scaling back code-commented
         * now that bbox_generated_probe_after_analytics() is post analytics
         * (say pgie, tracker or sgie)
         * and before tiler, no plugin shall scale metadata and will be
         * corresponding to the nvstreammux resolution
         */
        float scaleW = 0;
        float scaleH = 0;
        /* Frequency of messages to be send will be based on use case.
         * Here message is being sent for first object every 30 frames.
         */
        buffer_pts = frame_meta->buf_pts;
        if (!appCtx->config.streammux_config.pipeline_width
            || !appCtx->config.streammux_config.pipeline_height) {
          g_print ("invalid pipeline params\n");
          return;
        }
        LOGD ("stream %d==%d [%d X %d]\n", frame_meta->source_id,
            frame_meta->pad_index, frame_meta->source_frame_width,
            frame_meta->source_frame_height);
        scaleW =
            (float) frame_meta->source_frame_width /
            appCtx->config.streammux_config.pipeline_width;
        scaleH =
            (float) frame_meta->source_frame_height /
            appCtx->config.streammux_config.pipeline_height;

        if (playback_utc == FALSE) {
          /** Use the buffer-NTP-time derived from this stream's RTCP Sender
           * Report here:
           */
          buffer_pts = buf_ntp_time;
        }
        /** Generate NvDsEventMsgMeta for every object */
        NvDsEventMsgMeta *msg_meta =
            (NvDsEventMsgMeta *) g_malloc0 (sizeof (NvDsEventMsgMeta));
        generate_event_msg_meta (appCtx, msg_meta, obj_meta->class_id, TRUE,
                  /**< useTs NOTE: Pass FALSE for files without base-timestamp in URI */
            buffer_pts,
            appCtx->config.multi_source_config[stream_id].uri, stream_id,
            appCtx->config.multi_source_config[stream_id].camera_id,
            obj_meta, scaleW, scaleH, frame_meta);
        testAppCtx->streams[stream_id].meta_number++;
        NvDsUserMeta *user_event_meta =
            nvds_acquire_user_meta_from_pool (batch_meta);
        if (user_event_meta) {
          /*
           * Since generated event metadata has custom objects for
           * Vehicle / Person which are allocated dynamically, we are
           * setting copy and free function to handle those fields when
           * metadata copy happens between two components.
           */
          user_event_meta->user_meta_data = (void *) msg_meta;
          user_event_meta->base_meta.batch_meta = batch_meta;
          user_event_meta->base_meta.meta_type = NVDS_EVENT_MSG_META;
          user_event_meta->base_meta.copy_func =
              (NvDsMetaCopyFunc) meta_copy_func;
          user_event_meta->base_meta.release_func =
              (NvDsMetaReleaseFunc) meta_free_func;
          nvds_add_user_meta_to_frame (frame_meta, user_event_meta);
        } else {
          g_print ("Error in attaching event meta to buffer\n");
        }
      }
    }
    testAppCtx->streams[stream_id].frameCount++;
  }
}

Right now the problem I have is that the function nvds_obj_enc_process is very slow and can only write the frames to disk.
I only want to extract the frame and push it to kafka as a base64 encoded frame without having a MASSIVE drop in speed.

Waiting on any useful response for this issue. Kindly let me know how this can be resolved.

  1. Please do not use nvds_obj_enc_create_context to create context every time, which will cause memory leaks and unnecessary performance loss. nvds_obj_enc_destroy_context is used for release context.
  2. nvds_obj_enc_process just enqueue an object crop for JPEG encode. So please call nvds_obj_enc_finish at the end of batch processing.

This is not a issue with the SDK. Please debug it yourself.

I have moved the functions around so that nvds_obj_enc_create_context is only called at the start of the function and nvds_obj_enc_destroy_context is called at the end.
Can you please specify the correct location to initialize the nvds_obj_enc_create_context?

Here is my entire code for reference:
deepstream_custom_test5.txt (55.0 KB)

Also, please note, my objective is to extract the entire frame, not just the object crops.

Your help is much appreciated

I also have the same query @usmanmalik291. @junshengy please revert back, it would be very helpful

Please read the sample code and API description. The following code can work properly.

diff --git a/sources/apps/sample_apps/deepstream-test5/deepstream_test5_app_main.c b/sources/apps/sample_apps/deepstream-test5/deepstream_test5_app_main.c
index 07f308b..5f17c0e 100644
--- a/sources/apps/sample_apps/deepstream-test5/deepstream_test5_app_main.c
+++ b/sources/apps/sample_apps/deepstream-test5/deepstream_test5_app_main.c
@@ -41,6 +41,8 @@
 #include "nvdsmeta_schema.h"
 
 #include "deepstream_test5_app.h"
+#include "nvbufsurface.h"
+#include "nvds_obj_encode.h"
 
 #define MAX_DISPLAY_LEN (64)
 #define MAX_TIME_STAMP_LEN (64)
@@ -52,6 +54,8 @@
 
 #define IS_YAML(file) (g_str_has_suffix(file, ".yml") || g_str_has_suffix(file, ".yaml"))
 
+static NvDsObjEncCtxHandle obj_ctx_handle;
+
 /** @{
  * Macro's below and corresponding code-blocks are used to demonstrate
  * nvmsgconv + Broker Metadata manipulation possibility
@@ -593,6 +597,14 @@ bbox_generated_probe_after_analytics (AppCtx * appCtx, GstBuffer * buf,
   GstClockTime buffer_pts = 0;
   guint32 stream_id = 0;
 
+  GstMapInfo inmap = GST_MAP_INFO_INIT;
+  if (!gst_buffer_map (buf, &inmap, GST_MAP_READ)) {
+    GST_ERROR ("input buffer mapinfo failed");
+    return;
+  }
+  NvBufSurface *ip_surf = (NvBufSurface *) inmap.data;
+  gst_buffer_unmap (buf, &inmap);
+
   for (NvDsMetaList * l_frame = batch_meta->frame_meta_list; l_frame != NULL;
       l_frame = l_frame->next) {
     NvDsFrameMeta *frame_meta = (NvDsFrameMeta *) l_frame->data;
@@ -612,6 +624,20 @@ bbox_generated_probe_after_analytics (AppCtx * appCtx, GstBuffer * buf,
       src_stream->last_ntp_time = buf_ntp_time;
     }
 
+    NvDsObjEncUsrArgs objData = { 0 };
+    /* To be set by user */
+    objData.saveImg = TRUE;
+    objData.attachUsrMeta = FALSE;
+    /* Set if Image scaling Required */
+    objData.scaleImg = FALSE;
+    objData.scaledWidth = 0;
+    objData.scaledHeight = 0;
+    /* Quality */
+    objData.quality = 80;
+    objData.isFrame = TRUE;
+    /*Main Function Call */
+    nvds_obj_enc_process (obj_ctx_handle, &objData, ip_surf, NULL, frame_meta);
+
     GList *l;
     for (l = frame_meta->obj_meta_list; l != NULL; l = l->next) {
       /* Now using above information we need to form a text that should
@@ -689,6 +715,7 @@ bbox_generated_probe_after_analytics (AppCtx * appCtx, GstBuffer * buf,
     }
     testAppCtx->streams[stream_id].frameCount++;
   }
+  nvds_obj_enc_finish (obj_ctx_handle);
 }
 
 /** @{ imported from deepstream-app as is */
@@ -1535,6 +1562,12 @@ main (int argc, char *argv[])
 
   main_loop = g_main_loop_new (NULL, FALSE);
 
+  obj_ctx_handle = nvds_obj_enc_create_context (0);
+  if (!obj_ctx_handle) {
+    g_print ("Unable to create context\n");
+    return -1;
+  }
+
   _intr_setup ();
   g_timeout_add (400, check_for_interrupt, NULL);
 
@@ -1674,6 +1707,7 @@ main (int argc, char *argv[])
 done:
 
   g_print ("Quitting\n");
+  nvds_obj_enc_destroy_context (obj_ctx_handle);
   for (i = 0; i < num_instances; i++) {
     if (appCtx[i] == NULL)
       continue;
diff --git a/sources/apps/sample_apps/deepstream-test5/Makefile b/sources/apps/sample_apps/deepstream-test5/Makefile
index 8aa904c..3d75b51 100644
--- a/sources/apps/sample_apps/deepstream-test5/Makefile
+++ b/sources/apps/sample_apps/deepstream-test5/Makefile
@@ -48,7 +48,7 @@ CFLAGS+= -I../../apps-common/includes \
 
 LIBS:= -L/usr/local/cuda-$(CUDA_VER)/lib64/ -lcudart
 
-LIBS+= -L$(LIB_INSTALL_DIR) -lnvdsgst_meta -lnvds_meta -lnvdsgst_helper -lnvdsgst_customhelper -lnvdsgst_smartrecord -lnvds_utils -lnvds_msgbroker -lm \
+LIBS+= -L$(LIB_INSTALL_DIR) -lnvdsgst_meta -lnvds_meta -lnvdsgst_helper -lnvdsgst_customhelper -lnvdsgst_smartrecord -lnvds_utils -lnvds_msgbroker -lm -lnvds_batch_jpegenc \
        -lyaml-cpp -lcuda -lgstrtspserver-1.0 -ldl -Wl,-rpath,$(LIB_INSTALL_DIR)
 
 CFLAGS+= $(shell pkg-config --cflags $(PKGS))

Please open a new topic to discuss your issue.

Thank you for your response, Junshengy.

I implemented your code modifications exactly as mentioned in the right places on the default deepstream-test-5 application.
Made changes in deepstream_test5_app_main.c
Make changes in the Makefile

Recompiled the application. It compiled without any errors, but unfortunately, it did not work.
When I ran the modified deepstream_test5 application, I get Segmentation Fault (core dumped).

I don’t see any frames or any other output. It just attempts to run but crashes.

I have tested this on two systems; one with WSL and one with Linux. In both cases I got the same error.

Here are my modified files (attached them with .txt extension so they would upload):
Makefile.txt (2.4 KB)
deepstream_test5_app_main.c.txt (57.8 KB)

Please check your dependency versions, this may be the reason why it is not working.

https://docs.nvidia.com/metropolis/deepstream/dev-guide/text/DS_Installation.html#id11

I recommend using 7.1-triton-multiarch, the above code works well here

So I verified that, I already made sure to follow the installation guide. I am using 7.1-triton-multiarch.
Here is my Dockerfile:

FROM nvcr.io/nvidia/deepstream:7.1-triton-multiarch

# Set working directory
WORKDIR /custom_workspace

# Install basic dependencies
RUN apt-get update

# Install Kafka dependencies
RUN git clone https://github.com/confluentinc/librdkafka.git && \
    cd librdkafka && \
    git checkout tags/v2.2.0 && \
    ./configure --enable-ssl && \
    make && \
    make install && \
    cp /usr/local/lib/librdkafka* /opt/nvidia/deepstream/deepstream/lib/ && \
    ldconfig

RUN apt-get install libglib2.0 libglib2.0-dev
RUN apt-get install libjansson4 libjansson-dev

# Set CUDA version
ENV CUDA_VER=12.6

# For audio decoding warnings
RUN /opt/nvidia/deepstream/deepstream/user_additional_install.sh

CMD ["bash"]

Did you start Docker with the following parameters?

docker run --gpus all -it --rm --net=host --privileged -v /tmp/.X11-unix:/tmp/.X11-unix -e DISPLAY=$DISPLAY -w /opt/nvidia/deepstream/deepstream-7.1 xxxxxx