Creating a Kafka broker library to write metadata to a log file

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)
• DeepStream Version
• JetPack Version (valid for Jetson only)
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only)
• Issue Type( questions, new requirements, bugs)
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)


Our task is to write metadata of detectors and object classifiers to the Kafka message broker.

First, we would like to write metadata to the log file using the ‘nvmsgconv’ and ‘nvmsgbroker’ plugins, as shown in point 10 of your course.

  1. To do this, we build an adapter file *.so from the broker.cpp file.

broker.cpp file:

  • Copyright (c) 2017-2018, NVIDIA CORPORATION. All rights reserved.
  • NVIDIA Corporation and its licensors retain all intellectual property
  • and proprietary rights in and to this software, related documentation
  • and any modifications thereto. Any use, reproduction, disclosure or
  • distribution of this software and related documentation without an express
  • license agreement from NVIDIA Corporation is strictly prohibited.

#include <glib.h>
#include “nvds_msgapi.h”
#define MAX_FIELD_LEN 255 //maximum topic length supported by kafka is 255

typedef struct {
void *kh;
char topic[MAX_FIELD_LEN];
} NvDsKafkaProtoConn;

void logit(std::string log)
std::ofstream outfile;“logs.txt”, std::ios_base::app);
outfile << std::endl << log << std::endl;
void logit(const uint8_t *payload)
std::ofstream outfile;“logs.txt”, std::ios_base::app);
outfile << std::endl << payload << std::endl;

NvDsMsgApiHandle nvds_msgapi_connect(char *connection_str, nvds_msgapi_connect_cb_t connect_cb, char *config_path)
NvDsKafkaProtoConn *conn_ptr = (NvDsKafkaProtoConn *)malloc(sizeof(NvDsKafkaProtoConn));
return conn_ptr;
NvDsMsgApiErrorType nvds_msgapi_send(NvDsMsgApiHandle h_ptr, char *topic, const uint8_t *payload, size_t nbuf)
NvDsMsgApiErrorType nvds_msgapi_send_async(NvDsMsgApiHandle h_ptr, char *topic, const uint8_t *payload, size_t nbuf, nvds_msgapi_send_cb_t send_callback, void *user_ptr)
void nvds_msgapi_do_work(NvDsMsgApiHandle h_ptr)
//logit(“nvds_msgapi_do_work call”);
NvDsMsgApiErrorType nvds_msgapi_disconnect(NvDsMsgApiHandle h_ptr)
char *nvds_msgapi_getversion()
logit(“nvds_msgapi_getversion call”);
return (char *)NVDS_MSGAPI_VERSION;

  1. Further in the deepstream project, we specified the path to the created *.so library (#define PROTOCOL_ADAPTOR_LIB “/opt/nvidia/deepstream/deepstream5.1/sources/apps/sample_apps/demo/test_app/broker/”)

  2. We checked that the file with the * .so library is located at the specified path and is being opened for viewing.

And set the config for the ‘msgbroker’ plugin:

g_object_set (G_OBJECT(msgbroker), “proto-lib”, PROTOCOL_ADAPTOR_LIB,
“conn-str”, CONNECTION_STRING, “sync”, FALSE, NULL);

  1. Now, when we run our pipline in the console, an error message is displayed

ERROR from element nvmsg-broker: Could not initialize supporting library.
Error details: gstnvmsgbroker.c(362): legacy_gst_nvmsgbroker_start (): /GstPipeline:dstest3-pipeline/GstNvMsgBroker:nvmsg-broker:
unable to open shared library
Returned, stopping playback

Please tell me if the sequence of our actions is correct and what could be our mistake.

Sorry for the late response, we will investigate this issue and update soon.

1 Like

it should be


This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.