Confirmation needed: Memory management (copy/free) for NvDsCustomMsgInfo bindings

While researching the DeepStream-test4 C source code, I investigated how NvDsCustomMsgInfo is utilized. I observed that managing this metadata requires explicit copy and free functions to handle the lifecycle of the message buffer.

static gpointer
meta_copy_func_custom (gpointer data, gpointer user_data)
{
  NvDsUserMeta *user_meta = (NvDsUserMeta *) data;
  NvDsCustomMsgInfo *srcMeta = (NvDsCustomMsgInfo *) user_meta->user_meta_data;
  NvDsCustomMsgInfo *dstMeta = NULL;

  dstMeta = (NvDsCustomMsgInfo *) g_memdup2 (srcMeta, sizeof (NvDsCustomMsgInfo));

  if (srcMeta->message)
    dstMeta->message = (gpointer) g_strdup ((const char*)srcMeta->message);
  dstMeta->size = srcMeta->size;

  return dstMeta;
}

static void
meta_free_func_custom (gpointer data, gpointer user_data)
{
  NvDsUserMeta *user_meta = (NvDsUserMeta *) data;
  NvDsCustomMsgInfo *srcMeta = (NvDsCustomMsgInfo *) user_meta->user_meta_data;

  if (srcMeta->message)
    g_free (srcMeta->message);
  srcMeta->size = 0;

  g_free (user_meta->user_meta_data);
}

The application then attaches this custom metadata to the frame by acquiring a meta pointer from the pool and assigning these callbacks:


      if (is_first_object && !(frame_number % frame_interval)) {
        /* Frequency of images to be send will be based on use case.
         * Here images is being sent for first object every frame_interval(default=30).
         */
        NvDsUserMetaList *usrMetaList = obj_meta->obj_user_meta_list;
        while (usrMetaList != NULL) {
          NvDsUserMeta *user_event_meta_custom =
                    nvds_acquire_user_meta_from_pool (batch_meta);
          NvDsCustomMsgInfo *msg_custom_meta =
                   (NvDsCustomMsgInfo *) g_malloc0 (sizeof (NvDsCustomMsgInfo));

          NvDsUserMeta *usrMetaData = (NvDsUserMeta *) usrMetaList->data;
          if (usrMetaData->base_meta.meta_type == NVDS_CROP_IMAGE_META) {
            NvDsObjEncOutParams *enc_jpeg_image =
                (NvDsObjEncOutParams *) usrMetaData->user_meta_data;
            START_PROFILE;
            encoded_data = g_base64_encode(enc_jpeg_image->outBuffer, enc_jpeg_image->outLen);
            generate_ts_rfc3339 (ts, MAX_TIME_STAMP_LEN);
            width = g_strdup_printf("%f", obj_meta->detector_bbox_info.org_bbox_coords.width);
            height = g_strdup_printf("%f", obj_meta->detector_bbox_info.org_bbox_coords.height);
            /* Image message fields are separated by ";".
             * Specific Format:  "image;image_format;image_widthximage_height;time;encoded data;"
             * For Example: "image;jpg;640x480;2023-07-31T10:20:13;xxxxxxxxxxx"
             */
            message_data = g_strconcat("image;jpg;", width, "x", height, ";", ts, ";", encoded_data, ";", NULL);
            STOP_PROFILE("Base64 Encode Time ");
            msg_custom_meta->size = strlen(message_data);
            msg_custom_meta->message = g_strdup(message_data);
            if (user_event_meta_custom) {
               user_event_meta_custom->user_meta_data = (void *) msg_custom_meta;
               user_event_meta_custom->base_meta.meta_type = NVDS_CUSTOM_MSG_BLOB;
               user_event_meta_custom->base_meta.copy_func =
                   (NvDsMetaCopyFunc) meta_copy_func_custom;
               user_event_meta_custom->base_meta.release_func =
                   (NvDsMetaReleaseFunc) meta_free_func_custom;
               nvds_add_user_meta_to_frame (frame_meta, user_event_meta_custom);
            } else {
              g_print ("Error in attaching event meta custom to buffer\n");
            }

In the C version, the meta_copy_func_custom and meta_free_func_custom functions ensure that the message string is correctly duplicated or released when the metadata is processed or pooled.

So I think copy and free function is a must.

To bring this functionality to Python, I implemented the corresponding logic in bindschema.cpp. I defined the C-level callbacks at the top of the file to manage memory within the GStreamer/DeepStream pipeline:


/* Callback function to copy custom msg info to another destination */
static gpointer copy_custom_msg_info(gpointer data, gpointer user_data) {
    NvDsUserMeta *srcMeta = (NvDsUserMeta *)data;
    NvDsCustomMsgInfo *srcData = (NvDsCustomMsgInfo *)srcMeta->user_meta_data;

    // Allocate new NvDsCustomMsgInfo struct
    NvDsCustomMsgInfo *dstData = (NvDsCustomMsgInfo *)g_memdup2(srcData, sizeof(NvDsCustomMsgInfo));

    if (srcData->message) {
        // Deep copy the string/blob content
        dstData->message = (gpointer)g_strdup((const char *)srcData->message);
    }
    return dstData;
}

/* Callback function to release allocated memory for custom msg info */
static void release_custom_msg_info(gpointer data, gpointer user_data) {
    NvDsUserMeta *user_meta = (NvDsUserMeta *)data;
    NvDsCustomMsgInfo *srcData = (NvDsCustomMsgInfo *)user_meta->user_meta_data;

    if (srcData) {
        if (srcData->message) {
            g_free(srcData->message);
        }
        g_free(srcData);
    }
}


Then, inside void bindschema(py::module &m), I exposed the NvDsCustomMsgInfo class and created a helper function, alloc_nvds_custommsginfo. This helper simplifies the Python workflow by allocating the memory and automatically registering the C callbacks:

 py::class_<NvDsCustomMsgInfo>(m, "NvDsCustomMsgInfo",
                                        pydsdoc::metaschema::CustomMsgInfoDoc::descr)
                                .def(py::init<>())
                .def_property("message", VOID_PTR_FREE_EXISTING(NvDsCustomMsgInfo, message))
                .def_readwrite("size", &NvDsCustomMsgInfo::size)
                .def("cast",
                     [](void *data) {
                         return (NvDsCustomMsgInfo *) data;
                     },
                     py::return_value_policy::reference,
                     pydsdoc::metaschema::CustomMsgInfoDoc::cast)

                .def("cast",
                     [](size_t data) {
                         return (NvDsCustomMsgInfo *) data;
                     },
                     py::return_value_policy::reference,
                     pydsdoc::metaschema::CustomMsgInfoDoc::cast);

        m.def("alloc_nvds_custommsginfo",
          [](NvDsUserMeta *meta) {
              auto *mem = (NvDsCustomMsgInfo *) g_malloc0(sizeof(NvDsCustomMsgInfo));

              // Set the C-level callbacks required by GStreamer/DeepStream
              meta->base_meta.copy_func = (NvDsMetaCopyFunc)copy_custom_msg_info;
              meta->base_meta.release_func = (NvDsMetaReleaseFunc)release_custom_msg_info;

              return mem;
          },
          py::return_value_policy::reference,
          pydsdoc::methodsDoc::alloc_nvds_custommsginfo);

Note that in alloc_nvds_custommsginfo, I pass NvDsUserMeta *meta inside and register copy, free functions.

During compilation, I noticed the documentation strings were missing. I updated docstrings/schema_doc.h and docstrings/functionsdoc.h to include the necessary descriptions:


        namespace CustomMsgInfoDoc
        {
            constexpr const char* descr = R"pyds(
                Holds custom message information.
                :ivar message: *str/blob*, The custom message content.
                :ivar size: *int*, Size of the custom message.)pyds";

            constexpr const char* cast = R"pyds(
                Cast given object/data to :class:`NvDsCustomMsgInfo`,
                call pyds.NvDsCustomMsgInfo.cast(data))pyds";
        }

also docstrings/functionsdoc.h:

// Add this line
        constexpr const char* alloc_nvds_custommsginfo = R"pyds(
            Allocate an :class:`NvDsCustomMsgInfo` struct and attach it to NvDsUserMeta.
            This automatically sets the C-level copy and release callbacks.
            :arg meta: :class:`NvDsUserMeta` object to which this info will be attached.
            :returns: Allocated :class:`NvDsCustomMsgInfo` object.)pyds";

I compile it successfully

Finally, I encountered an issue where pyds.NVDS_CUSTOM_MSG_BLOB was undefined in Python. I resolved this by adding the missing value to the NvDsMetaType enum binding in src/bindnvdsmeta.cpp:

// Inside src/bindnvdsmeta.cpp
py::enum_<NvDsMetaType>(m, "NvDsMetaType", pydsdoc::nvdsmetadoc::NvDsMetaTypeDoc::descr)
    .value("NVDS_INVALID_META", NVDS_INVALID_META)
    .value("NVDS_BATCH_META", NVDS_BATCH_META)
    .value("NVDS_FRAME_META", NVDS_FRAME_META)
    // ... existing values ...
    .value("NVDS_EVENT_MSG_META", NVDS_EVENT_MSG_META)
    
    // --- ADD THIS LINE ---
    .value("NVDS_CUSTOM_MSG_BLOB", NVDS_CUSTOM_MSG_BLOB)
    
    .value("NVDS_START_USER_META", NVDS_START_USER_META)
    .export_values();

python app code, how I attach that message blob:

def pgie_src_pad_buffer_probe(pad, info, u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer: 
        return Gst.PadProbeReturn.OK
    
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        # Check if we've already attached our custom blob to THIS frame
        # We can scan the user_meta_list to see if NVDS_CUSTOM_MSG_BLOB exists
        already_attached = False
        l_user = frame_meta.frame_user_meta_list
        while l_user is not None:
            user_meta = pyds.NvDsUserMeta.cast(l_user.data)
            if user_meta.base_meta.meta_type == pyds.NvDsMetaType.NVDS_CUSTOM_MSG_BLOB:
                already_attached = True
                break
            l_user = l_user.next

        # Only attach if it's not already there
        if not already_attached:
            user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            if user_event_meta:
                msg_custom_meta = pyds.alloc_nvds_custommsginfo(user_event_meta)
                
                # Use pre-populated map
                camera_info = source_to_sensor_map.get(frame_meta.source_id, {})
                sensor_id = camera_info.get("sensor_id", f"CAM{frame_meta.source_id}")
                uri = camera_info.get("uri", "Unknown")
                
                payload_string = f"tinsource|{sensor_id}|{uri}|{frame_meta.source_id}\n"
                msg_custom_meta.message = payload_string
                msg_custom_meta.size = len(payload_string)
                
                user_event_meta.user_meta_data = msg_custom_meta
                user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_CUSTOM_MSG_BLOB
                
                pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)

        try:
            l_frame = l_frame.next
        except StopIteration:
            break
            
    return Gst.PadProbeReturn.OK

in my modified c++ lib msgconv

/opt/nvidia/deepstream/deepstream-7.1/sources/libs/nvmsgconv/deepstream_schema/dsmeta_payload.cpp

I already configure to route the message to custom path, which have my generate_dsmeta_message_custom implementation.

    # Configure Messaging
    msgconv.set_property('config', "/configs/ds_msgconv_config.txt")
    msgconv.set_property('payload-type', 257)
    msgconv.set_property('msg2p-newapi', True)
gchar*
generate_dsmeta_message_custom(void* privData, void* frameMeta)
{
    g_print("DEBUG: Inside generate_dsmeta_message_custom (BLOB mode)\n");
    if (frameMeta == NULL) return NULL;

    NvDsFrameMeta* frame_meta = (NvDsFrameMeta*)frameMeta;
    JsonObject* rootObj = json_object_new();
    JsonArray* objectsArray = json_array_new();
    std::stringstream ss;

    // 1. Calculate Scale Factors (Same as before)
    float scaleW = (frame_meta->pipeline_width == 0) ? 1.0f : (float)frame_meta->source_frame_width / frame_meta->pipeline_width;
    float scaleH = (frame_meta->pipeline_height == 0) ? 1.0f : (float)frame_meta->source_frame_height / frame_meta->pipeline_height;

    // 2. Object Loop (Same as before)
    for (NvDsObjectMetaList* obj_l = frame_meta->obj_meta_list; obj_l; obj_l = obj_l->next) {
        NvDsObjectMeta* obj_meta = (NvDsObjectMeta*)obj_l->data;
        if (!obj_meta) continue;

        ss.str(""); ss.clear();
        float left = obj_meta->rect_params.left * scaleW;
        float top  = obj_meta->rect_params.top * scaleH;

        ss << obj_meta->object_id << "|" << left << "|" << top << "|"
           << (left + (obj_meta->rect_params.width * scaleW)) << "|"
           << (top + (obj_meta->rect_params.height * scaleH)) << "|"
           << (obj_meta->obj_label ? obj_meta->obj_label : "N/A") << "|"
           << obj_meta->confidence;

        json_array_add_string_element(objectsArray, ss.str().c_str());
    }

    // 3. Root Assembly
    json_object_set_string_member(rootObj, "version", "4.0");
    json_object_set_int_member(rootObj, "frame_num", (gint64)frame_meta->frame_num);

    char ts[MAX_TIME_STAMP_LEN + 1];
    generate_ts_rfc3339(ts, MAX_TIME_STAMP_LEN);
    json_object_set_string_member(rootObj, "@timestamp", ts);
    json_object_set_array_member(rootObj, "objects", objectsArray);

    // 4. BLOB Unboxing (Reading the string from Python)
    for (NvDsUserMetaList* l = frame_meta->frame_user_meta_list; l; l = l->next) {
        NvDsUserMeta* user_meta = (NvDsUserMeta*)l->data;

        // We look for NVDS_CUSTOM_MSG_BLOB instead of EVENT_MSG_META
        if (user_meta && user_meta->base_meta.meta_type == NVDS_CUSTOM_MSG_BLOB) {
            NvDsCustomMsgInfo* custom_info = (NvDsCustomMsgInfo*)user_meta->user_meta_data;

            if (custom_info && custom_info->message && custom_info->size > 0) {
                std::string raw_blob((char*)custom_info->message, custom_info->size);

                // Trim trailing newline if present
                if (!raw_blob.empty() && raw_blob.back() == '\n') raw_blob.pop_back();

                // Expecting: tinsource|sensorId|sourceUri|sourceId
                std::vector<std::string> parts = split_string(raw_blob, '|');

                if (parts.size() >= 4 && parts[0] == "tinsource") {
                    JsonObject* sourceDetails = json_object_new();

                    // parts[1] = sensorId, parts[2] = uri, parts[3] = sourceId
                    json_object_set_string_member(sourceDetails, "sensorId", parts[1].c_str());
                    json_object_set_string_member(sourceDetails, "uri", parts[2].c_str());

                    // Convert string sourceId to integer
                    try {
                        json_object_set_int_member(sourceDetails, "sourceId", std::stoll(parts[3]));
                    } catch (...) {
                        json_object_set_int_member(sourceDetails, "sourceId", -1);
                    }

                    json_object_set_object_member(rootObj, "sourceDetails", sourceDetails);
                    break; // Found our specific source info blob
                }
            }
        }
    }

    // 5. Serialization & Cleanup
    JsonNode* rootNode = json_node_new(JSON_NODE_OBJECT);
    json_node_set_object(rootNode, rootObj);
    gchar* message = json_to_string(rootNode, TRUE);

    json_node_free(rootNode);
    // Note: JsonObject/Array are managed by the parent node in GLib-Json

    return message;
}

I hope the moderator can confirm whether my implementation is correct. I feel a bit worried because I had to implement it myself.

First, I’d like to ask whether my implementation is correct.
Second, is this the right place to put this code? Should I create a new file to manage it, or is it risky to modify the original library files?

Thank you so much.