While researching the DeepStream-test4 C source code, I investigated how NvDsCustomMsgInfo is utilized. I observed that managing this metadata requires explicit copy and free functions to handle the lifecycle of the message buffer.
static gpointer
meta_copy_func_custom (gpointer data, gpointer user_data)
{
NvDsUserMeta *user_meta = (NvDsUserMeta *) data;
NvDsCustomMsgInfo *srcMeta = (NvDsCustomMsgInfo *) user_meta->user_meta_data;
NvDsCustomMsgInfo *dstMeta = NULL;
dstMeta = (NvDsCustomMsgInfo *) g_memdup2 (srcMeta, sizeof (NvDsCustomMsgInfo));
if (srcMeta->message)
dstMeta->message = (gpointer) g_strdup ((const char*)srcMeta->message);
dstMeta->size = srcMeta->size;
return dstMeta;
}
static void
meta_free_func_custom (gpointer data, gpointer user_data)
{
NvDsUserMeta *user_meta = (NvDsUserMeta *) data;
NvDsCustomMsgInfo *srcMeta = (NvDsCustomMsgInfo *) user_meta->user_meta_data;
if (srcMeta->message)
g_free (srcMeta->message);
srcMeta->size = 0;
g_free (user_meta->user_meta_data);
}
The application then attaches this custom metadata to the frame by acquiring a meta pointer from the pool and assigning these callbacks:
if (is_first_object && !(frame_number % frame_interval)) {
/* Frequency of images to be send will be based on use case.
* Here images is being sent for first object every frame_interval(default=30).
*/
NvDsUserMetaList *usrMetaList = obj_meta->obj_user_meta_list;
while (usrMetaList != NULL) {
NvDsUserMeta *user_event_meta_custom =
nvds_acquire_user_meta_from_pool (batch_meta);
NvDsCustomMsgInfo *msg_custom_meta =
(NvDsCustomMsgInfo *) g_malloc0 (sizeof (NvDsCustomMsgInfo));
NvDsUserMeta *usrMetaData = (NvDsUserMeta *) usrMetaList->data;
if (usrMetaData->base_meta.meta_type == NVDS_CROP_IMAGE_META) {
NvDsObjEncOutParams *enc_jpeg_image =
(NvDsObjEncOutParams *) usrMetaData->user_meta_data;
START_PROFILE;
encoded_data = g_base64_encode(enc_jpeg_image->outBuffer, enc_jpeg_image->outLen);
generate_ts_rfc3339 (ts, MAX_TIME_STAMP_LEN);
width = g_strdup_printf("%f", obj_meta->detector_bbox_info.org_bbox_coords.width);
height = g_strdup_printf("%f", obj_meta->detector_bbox_info.org_bbox_coords.height);
/* Image message fields are separated by ";".
* Specific Format: "image;image_format;image_widthximage_height;time;encoded data;"
* For Example: "image;jpg;640x480;2023-07-31T10:20:13;xxxxxxxxxxx"
*/
message_data = g_strconcat("image;jpg;", width, "x", height, ";", ts, ";", encoded_data, ";", NULL);
STOP_PROFILE("Base64 Encode Time ");
msg_custom_meta->size = strlen(message_data);
msg_custom_meta->message = g_strdup(message_data);
if (user_event_meta_custom) {
user_event_meta_custom->user_meta_data = (void *) msg_custom_meta;
user_event_meta_custom->base_meta.meta_type = NVDS_CUSTOM_MSG_BLOB;
user_event_meta_custom->base_meta.copy_func =
(NvDsMetaCopyFunc) meta_copy_func_custom;
user_event_meta_custom->base_meta.release_func =
(NvDsMetaReleaseFunc) meta_free_func_custom;
nvds_add_user_meta_to_frame (frame_meta, user_event_meta_custom);
} else {
g_print ("Error in attaching event meta custom to buffer\n");
}
In the C version, the meta_copy_func_custom and meta_free_func_custom functions ensure that the message string is correctly duplicated or released when the metadata is processed or pooled.
So I think copy and free function is a must.
To bring this functionality to Python, I implemented the corresponding logic in bindschema.cpp. I defined the C-level callbacks at the top of the file to manage memory within the GStreamer/DeepStream pipeline:
/* Callback function to copy custom msg info to another destination */
static gpointer copy_custom_msg_info(gpointer data, gpointer user_data) {
NvDsUserMeta *srcMeta = (NvDsUserMeta *)data;
NvDsCustomMsgInfo *srcData = (NvDsCustomMsgInfo *)srcMeta->user_meta_data;
// Allocate new NvDsCustomMsgInfo struct
NvDsCustomMsgInfo *dstData = (NvDsCustomMsgInfo *)g_memdup2(srcData, sizeof(NvDsCustomMsgInfo));
if (srcData->message) {
// Deep copy the string/blob content
dstData->message = (gpointer)g_strdup((const char *)srcData->message);
}
return dstData;
}
/* Callback function to release allocated memory for custom msg info */
static void release_custom_msg_info(gpointer data, gpointer user_data) {
NvDsUserMeta *user_meta = (NvDsUserMeta *)data;
NvDsCustomMsgInfo *srcData = (NvDsCustomMsgInfo *)user_meta->user_meta_data;
if (srcData) {
if (srcData->message) {
g_free(srcData->message);
}
g_free(srcData);
}
}
Then, inside void bindschema(py::module &m), I exposed the NvDsCustomMsgInfo class and created a helper function, alloc_nvds_custommsginfo. This helper simplifies the Python workflow by allocating the memory and automatically registering the C callbacks:
py::class_<NvDsCustomMsgInfo>(m, "NvDsCustomMsgInfo",
pydsdoc::metaschema::CustomMsgInfoDoc::descr)
.def(py::init<>())
.def_property("message", VOID_PTR_FREE_EXISTING(NvDsCustomMsgInfo, message))
.def_readwrite("size", &NvDsCustomMsgInfo::size)
.def("cast",
[](void *data) {
return (NvDsCustomMsgInfo *) data;
},
py::return_value_policy::reference,
pydsdoc::metaschema::CustomMsgInfoDoc::cast)
.def("cast",
[](size_t data) {
return (NvDsCustomMsgInfo *) data;
},
py::return_value_policy::reference,
pydsdoc::metaschema::CustomMsgInfoDoc::cast);
m.def("alloc_nvds_custommsginfo",
[](NvDsUserMeta *meta) {
auto *mem = (NvDsCustomMsgInfo *) g_malloc0(sizeof(NvDsCustomMsgInfo));
// Set the C-level callbacks required by GStreamer/DeepStream
meta->base_meta.copy_func = (NvDsMetaCopyFunc)copy_custom_msg_info;
meta->base_meta.release_func = (NvDsMetaReleaseFunc)release_custom_msg_info;
return mem;
},
py::return_value_policy::reference,
pydsdoc::methodsDoc::alloc_nvds_custommsginfo);
Note that in alloc_nvds_custommsginfo, I pass NvDsUserMeta *meta inside and register copy, free functions.
During compilation, I noticed the documentation strings were missing. I updated docstrings/schema_doc.h and docstrings/functionsdoc.h to include the necessary descriptions:
namespace CustomMsgInfoDoc
{
constexpr const char* descr = R"pyds(
Holds custom message information.
:ivar message: *str/blob*, The custom message content.
:ivar size: *int*, Size of the custom message.)pyds";
constexpr const char* cast = R"pyds(
Cast given object/data to :class:`NvDsCustomMsgInfo`,
call pyds.NvDsCustomMsgInfo.cast(data))pyds";
}
also docstrings/functionsdoc.h:
// Add this line
constexpr const char* alloc_nvds_custommsginfo = R"pyds(
Allocate an :class:`NvDsCustomMsgInfo` struct and attach it to NvDsUserMeta.
This automatically sets the C-level copy and release callbacks.
:arg meta: :class:`NvDsUserMeta` object to which this info will be attached.
:returns: Allocated :class:`NvDsCustomMsgInfo` object.)pyds";
I compile it successfully
Finally, I encountered an issue where pyds.NVDS_CUSTOM_MSG_BLOB was undefined in Python. I resolved this by adding the missing value to the NvDsMetaType enum binding in src/bindnvdsmeta.cpp:
// Inside src/bindnvdsmeta.cpp
py::enum_<NvDsMetaType>(m, "NvDsMetaType", pydsdoc::nvdsmetadoc::NvDsMetaTypeDoc::descr)
.value("NVDS_INVALID_META", NVDS_INVALID_META)
.value("NVDS_BATCH_META", NVDS_BATCH_META)
.value("NVDS_FRAME_META", NVDS_FRAME_META)
// ... existing values ...
.value("NVDS_EVENT_MSG_META", NVDS_EVENT_MSG_META)
// --- ADD THIS LINE ---
.value("NVDS_CUSTOM_MSG_BLOB", NVDS_CUSTOM_MSG_BLOB)
.value("NVDS_START_USER_META", NVDS_START_USER_META)
.export_values();
python app code, how I attach that message blob:
def pgie_src_pad_buffer_probe(pad, info, u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
# Check if we've already attached our custom blob to THIS frame
# We can scan the user_meta_list to see if NVDS_CUSTOM_MSG_BLOB exists
already_attached = False
l_user = frame_meta.frame_user_meta_list
while l_user is not None:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDS_CUSTOM_MSG_BLOB:
already_attached = True
break
l_user = l_user.next
# Only attach if it's not already there
if not already_attached:
user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
if user_event_meta:
msg_custom_meta = pyds.alloc_nvds_custommsginfo(user_event_meta)
# Use pre-populated map
camera_info = source_to_sensor_map.get(frame_meta.source_id, {})
sensor_id = camera_info.get("sensor_id", f"CAM{frame_meta.source_id}")
uri = camera_info.get("uri", "Unknown")
payload_string = f"tinsource|{sensor_id}|{uri}|{frame_meta.source_id}\n"
msg_custom_meta.message = payload_string
msg_custom_meta.size = len(payload_string)
user_event_meta.user_meta_data = msg_custom_meta
user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_CUSTOM_MSG_BLOB
pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
in my modified c++ lib msgconv
/opt/nvidia/deepstream/deepstream-7.1/sources/libs/nvmsgconv/deepstream_schema/dsmeta_payload.cpp
I already configure to route the message to custom path, which have my generate_dsmeta_message_custom implementation.
# Configure Messaging
msgconv.set_property('config', "/configs/ds_msgconv_config.txt")
msgconv.set_property('payload-type', 257)
msgconv.set_property('msg2p-newapi', True)
gchar*
generate_dsmeta_message_custom(void* privData, void* frameMeta)
{
g_print("DEBUG: Inside generate_dsmeta_message_custom (BLOB mode)\n");
if (frameMeta == NULL) return NULL;
NvDsFrameMeta* frame_meta = (NvDsFrameMeta*)frameMeta;
JsonObject* rootObj = json_object_new();
JsonArray* objectsArray = json_array_new();
std::stringstream ss;
// 1. Calculate Scale Factors (Same as before)
float scaleW = (frame_meta->pipeline_width == 0) ? 1.0f : (float)frame_meta->source_frame_width / frame_meta->pipeline_width;
float scaleH = (frame_meta->pipeline_height == 0) ? 1.0f : (float)frame_meta->source_frame_height / frame_meta->pipeline_height;
// 2. Object Loop (Same as before)
for (NvDsObjectMetaList* obj_l = frame_meta->obj_meta_list; obj_l; obj_l = obj_l->next) {
NvDsObjectMeta* obj_meta = (NvDsObjectMeta*)obj_l->data;
if (!obj_meta) continue;
ss.str(""); ss.clear();
float left = obj_meta->rect_params.left * scaleW;
float top = obj_meta->rect_params.top * scaleH;
ss << obj_meta->object_id << "|" << left << "|" << top << "|"
<< (left + (obj_meta->rect_params.width * scaleW)) << "|"
<< (top + (obj_meta->rect_params.height * scaleH)) << "|"
<< (obj_meta->obj_label ? obj_meta->obj_label : "N/A") << "|"
<< obj_meta->confidence;
json_array_add_string_element(objectsArray, ss.str().c_str());
}
// 3. Root Assembly
json_object_set_string_member(rootObj, "version", "4.0");
json_object_set_int_member(rootObj, "frame_num", (gint64)frame_meta->frame_num);
char ts[MAX_TIME_STAMP_LEN + 1];
generate_ts_rfc3339(ts, MAX_TIME_STAMP_LEN);
json_object_set_string_member(rootObj, "@timestamp", ts);
json_object_set_array_member(rootObj, "objects", objectsArray);
// 4. BLOB Unboxing (Reading the string from Python)
for (NvDsUserMetaList* l = frame_meta->frame_user_meta_list; l; l = l->next) {
NvDsUserMeta* user_meta = (NvDsUserMeta*)l->data;
// We look for NVDS_CUSTOM_MSG_BLOB instead of EVENT_MSG_META
if (user_meta && user_meta->base_meta.meta_type == NVDS_CUSTOM_MSG_BLOB) {
NvDsCustomMsgInfo* custom_info = (NvDsCustomMsgInfo*)user_meta->user_meta_data;
if (custom_info && custom_info->message && custom_info->size > 0) {
std::string raw_blob((char*)custom_info->message, custom_info->size);
// Trim trailing newline if present
if (!raw_blob.empty() && raw_blob.back() == '\n') raw_blob.pop_back();
// Expecting: tinsource|sensorId|sourceUri|sourceId
std::vector<std::string> parts = split_string(raw_blob, '|');
if (parts.size() >= 4 && parts[0] == "tinsource") {
JsonObject* sourceDetails = json_object_new();
// parts[1] = sensorId, parts[2] = uri, parts[3] = sourceId
json_object_set_string_member(sourceDetails, "sensorId", parts[1].c_str());
json_object_set_string_member(sourceDetails, "uri", parts[2].c_str());
// Convert string sourceId to integer
try {
json_object_set_int_member(sourceDetails, "sourceId", std::stoll(parts[3]));
} catch (...) {
json_object_set_int_member(sourceDetails, "sourceId", -1);
}
json_object_set_object_member(rootObj, "sourceDetails", sourceDetails);
break; // Found our specific source info blob
}
}
}
}
// 5. Serialization & Cleanup
JsonNode* rootNode = json_node_new(JSON_NODE_OBJECT);
json_node_set_object(rootNode, rootObj);
gchar* message = json_to_string(rootNode, TRUE);
json_node_free(rootNode);
// Note: JsonObject/Array are managed by the parent node in GLib-Json
return message;
}
I hope the moderator can confirm whether my implementation is correct. I feel a bit worried because I had to implement it myself.
First, I’d like to ask whether my implementation is correct.
Second, is this the right place to put this code? Should I create a new file to manage it, or is it risky to modify the original library files?
Thank you so much.