Link nvstreamdemux to two nvmsgconvs

• Hardware Platform (Jetson / GPU) A100
• DeepStream Version DeepStream 6.4

Hello,

I am trying to write e pipeline (GStreamer in C) with two rtsp streams are connected to streammux (through proper decoders and converters), the streammux is connected to nvinfer and nvtracker. Also, the nvtracker is connected to nvstreamdemux.

The nvstreamdemux is connected to two nvmsgconv. Now the problem is that only one MQTT message is created with this structure.

The same structure works perfectly with one source.

I was wondering what is the proper way of getting MQTT messages from two RTSP sources using streammux and streamdemux?

Here is my source code:

#include <gst/gst.h>
//#include <gst/uridecodebin/gsturidecodebin.h>
//#include <gst/nv/nvstreammux.h>
//#include <gst/nvinfer/gstnvinfer.h>
//#include <gst/video/videooverlay.h>
#include <glib.h>
#include <stdio.h>
//#include "gstnvdsmeta.h"
#include <cuda_runtime_api.h>
#include <nvds_yml_parser.h>
//#include <iostream>
#include "common_func.h"

#define FRAME_WIDTH 1920
#define FRAME_HEIGHT 1080
// GstElement* uridecodebin = gst_element_factory_make("uridecodebin", "uridecodebin");
// GstElement* pipeline = gst_pipeline_new("my-pipeline");
gint frame_counter = 0; // Counter for frames
gint chunk_counter = 0;
gboolean is_EOS = 0;

guint pad_index1 = 0;

GstElement *pipeline;
void on_pad_added_tee(GstElement *element, GstPad *pad, gpointer data)
{
    GstPad *sinkpad;
    GstElement *downstream = (GstElement *)data;

    /* We can now link this pad with the downstream element */
    sinkpad = gst_element_get_static_pad(downstream, "sink");

    if (gst_pad_link(pad, sinkpad) != GST_PAD_LINK_OK)
    {
        g_error("************** [Tee] Failed to link elements!\n");
    }
    else
    {
        g_print(" [Tee] linked successfully\n");
    }

    gst_object_unref(sinkpad);
}

static GstPadProbeReturn nvinfer_src_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *)info->data;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);

    for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
    {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
        /* Increment the frame counter and generate a filename */
        //frame_counter++;
        guint frame_number = frame_meta->frame_num;
        char filename[256];
        snprintf(filename, sizeof(filename), "detection_out/%d.txt", frame_number);

        /* Open the file for writing */
        FILE *bbox_params_dump_file = fopen(filename, "w");
        if (bbox_params_dump_file == NULL)
        {
            g_print("Could not open file for writing\n");
            continue; // Skip this frame if the file can't be opened
        }

        for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj != NULL; l_obj = l_obj->next)
        {
            NvDsObjectMeta *obj_meta = (NvDsObjectMeta *)(l_obj->data);
            float left = obj_meta->rect_params.left;
            float top = obj_meta->rect_params.top;
            float right = left + obj_meta->rect_params.width;
            float bottom = top + obj_meta->rect_params.height;
            // Here confidence stores detection confidence, since dump gie output
            // is before tracker plugin
            float confidence = obj_meta->confidence;
            if (strcmp(obj_meta->obj_label, "airplane") == 0)
            {
                // To remove the object metadata entirely
                // frame_meta->obj_meta_list = g_list_remove (frame_meta->obj_meta_list, l_obj);

                // To simply hide it from nvosd
                obj_meta->rect_params.has_bg_color = 0;
                obj_meta->text_params.display_text = NULL;
            }
            fprintf(bbox_params_dump_file,
                    "%s %f %f %f %f %f\n",
                    obj_meta->obj_label, left, top, right, bottom, confidence);

           
        }
        /* Close the file */
        fclose(bbox_params_dump_file);
    }            

    return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn nvtracker_src_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *)info->data;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);

    for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
    {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
        /* Increment the frame counter and generate a filename */
        //frame_counter++;
        guint frame_number = frame_meta->frame_num;
        unsigned long networkTime = frame_meta->ntp_timestamp;
        guint64 timestamp_ns = frame_meta->buf_pts;
        char filename[256];
        snprintf(filename, sizeof(filename), "track_out/%d.txt", frame_number);

        /* Open the file for writing */
        FILE *bbox_params_dump_file = fopen(filename, "w");
        if (bbox_params_dump_file == NULL)
        {
            g_print("Could not open file for writing\n");
            continue; // Skip this frame if the file can't be opened
        }
        fprintf(bbox_params_dump_file, "%lu\n", networkTime);
        fprintf(bbox_params_dump_file, "%lu\n", timestamp_ns);

        for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj != NULL; l_obj = l_obj->next)
        {
            NvDsObjectMeta *obj_meta = (NvDsObjectMeta *)(l_obj->data);
            float left = obj_meta->tracker_bbox_info.org_bbox_coords.left;
            float top = obj_meta->tracker_bbox_info.org_bbox_coords.top;
            float right = left + obj_meta->tracker_bbox_info.org_bbox_coords.width;
            float bottom = top + obj_meta->tracker_bbox_info.org_bbox_coords.height;
            // Here confidence stores detection confidence, since dump gie output
            // is before tracker plugin
            float confidence = obj_meta->confidence;
            if (strcmp(obj_meta->obj_label, "airplane") == 0)
            {
                // To remove the object metadata entirely
                // frame_meta->obj_meta_list = g_list_remove (frame_meta->obj_meta_list, l_obj);

                // To simply hide it from nvosd
                obj_meta->rect_params.has_bg_color = 0;
                obj_meta->text_params.display_text = NULL;
            }
            fprintf(bbox_params_dump_file,
                    "%s %lu %f %f %f %f %f\n",
                    obj_meta->obj_label, obj_meta->object_id, left, top, right, bottom, confidence);

           
        }
        /* Close the file */
        fclose(bbox_params_dump_file);
        // [dump past frames] ---- start -----
        // [past frames] --- end -----
        }

    return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn nvtracker_src_pad_buffer_probe1(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *)info->data;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);

    for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
    {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
        /* Increment the frame counter and generate a filename */
        //frame_counter++;
        guint frame_number = frame_meta->frame_num;
        unsigned long networkTime = frame_meta->ntp_timestamp;
        guint64 timestamp_ns = frame_meta->buf_pts;
        char filename[256];
        snprintf(filename, sizeof(filename), "track_out1/%d.txt", frame_number);

        /* Open the file for writing */
        FILE *bbox_params_dump_file = fopen(filename, "w");
        if (bbox_params_dump_file == NULL)
        {
            g_print("Could not open file for writing\n");
            continue; // Skip this frame if the file can't be opened
        }
        fprintf(bbox_params_dump_file, "%lu\n", networkTime);
        fprintf(bbox_params_dump_file, "%lu\n", timestamp_ns);
        for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj != NULL; l_obj = l_obj->next)
        {
            NvDsObjectMeta *obj_meta = (NvDsObjectMeta *)(l_obj->data);
            float left = obj_meta->tracker_bbox_info.org_bbox_coords.left;
            float top = obj_meta->tracker_bbox_info.org_bbox_coords.top;
            float right = left + obj_meta->tracker_bbox_info.org_bbox_coords.width;
            float bottom = top + obj_meta->tracker_bbox_info.org_bbox_coords.height;
            // Here confidence stores detection confidence, since dump gie output
            // is before tracker plugin
            float confidence = obj_meta->confidence;
            if (strcmp(obj_meta->obj_label, "airplane") == 0)
            {
                // To remove the object metadata entirely
                // frame_meta->obj_meta_list = g_list_remove (frame_meta->obj_meta_list, l_obj);

                // To simply hide it from nvosd
                obj_meta->rect_params.has_bg_color = 0;
                obj_meta->text_params.display_text = NULL;
            }
            fprintf(bbox_params_dump_file,
                    "%s %lu %f %f %f %f %f\n",
                    obj_meta->obj_label, obj_meta->object_id, left, top, right, bottom, confidence);

           
        }
        /* Close the file */
        fclose(bbox_params_dump_file);
        // [dump past frames] ---- start -----
        // [past frames] --- end -----
        }

    return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn nvtracker_past_src_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *)info->data;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);

    // dump past frame tracked objects appending current frame objects
    gchar bbox_file[1024] = {0};
    FILE *bbox_params_dump_file = NULL;

    NvDsTargetMiscDataBatch *pPastFrameObjBatch = NULL;
    NvDsUserMetaList *bmeta_list = NULL;
    NvDsUserMeta *user_meta = NULL;
    for (bmeta_list = batch_meta->batch_user_meta_list; bmeta_list != NULL;
         bmeta_list = bmeta_list->next)
    {
        user_meta = (NvDsUserMeta *)bmeta_list->data;
        if (user_meta && user_meta->base_meta.meta_type == NVDS_TRACKER_PAST_FRAME_META)
        {
            pPastFrameObjBatch =
                (NvDsTargetMiscDataBatch *)(user_meta->user_meta_data);
            for (uint si = 0; si < pPastFrameObjBatch->numFilled; si++)
            {
                NvDsTargetMiscDataStream *objStream = (pPastFrameObjBatch->list) + si;
                guint stream_id = (guint)(objStream->streamID);
                for (uint li = 0; li < objStream->numFilled; li++)
                {
                    NvDsTargetMiscDataObject *objList = (objStream->list) + li;
                    for (uint oi = 0; oi < objList->numObj; oi++)
                    {
                        NvDsTargetMiscDataFrame *obj = (objList->list) + oi;
                        g_snprintf(bbox_file, sizeof(bbox_file), "track_out/%d.txt", (guint)obj->frameNum);
                        float left = obj->tBbox.left;
                        float right = left + obj->tBbox.width;
                        float top = obj->tBbox.top;
                        float bottom = top + obj->tBbox.height;
                        // Past frame object confidence given by tracker
                        float confidence = obj->confidence;
                        bbox_params_dump_file = fopen(bbox_file, "a");
                        if (!bbox_params_dump_file)
                        {
                            continue;
                        }
                        if (strcmp(objList->objLabel, "airplane") == 0)
                        {
                            // To remove the object metadata entirely
                            // frame_meta->obj_meta_list = g_list_remove (frame_meta->obj_meta_list, l_obj);

                            // To simply hide it from nvosd
                            //objList->rect_params.has_bg_color = 0;
                            //objList->text_params.display_text = NULL;
                            g_print("saw airplane in the past frame\n");
                        }
                        fprintf(bbox_params_dump_file,
                                "%s %lu %f %f %f %f %f\n",
                                objList->objLabel, objList->uniqueId, left, top, right, bottom,
                                confidence);
                        fclose(bbox_params_dump_file);
                    }
                }
            }
        }
    }
    return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn nvtracker_past_src_pad_buffer_probe1(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *)info->data;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);

    // dump past frame tracked objects appending current frame objects
    gchar bbox_file[1024] = {0};
    FILE *bbox_params_dump_file = NULL;

    NvDsTargetMiscDataBatch *pPastFrameObjBatch = NULL;
    NvDsUserMetaList *bmeta_list = NULL;
    NvDsUserMeta *user_meta = NULL;
    for (bmeta_list = batch_meta->batch_user_meta_list; bmeta_list != NULL;
         bmeta_list = bmeta_list->next)
    {
        user_meta = (NvDsUserMeta *)bmeta_list->data;
        if (user_meta && user_meta->base_meta.meta_type == NVDS_TRACKER_PAST_FRAME_META)
        {
            pPastFrameObjBatch =
                (NvDsTargetMiscDataBatch *)(user_meta->user_meta_data);
            for (uint si = 0; si < pPastFrameObjBatch->numFilled; si++)
            {
                NvDsTargetMiscDataStream *objStream = (pPastFrameObjBatch->list) + si;
                guint stream_id = (guint)(objStream->streamID);
                for (uint li = 0; li < objStream->numFilled; li++)
                {
                    NvDsTargetMiscDataObject *objList = (objStream->list) + li;
                    for (uint oi = 0; oi < objList->numObj; oi++)
                    {
                        NvDsTargetMiscDataFrame *obj = (objList->list) + oi;
                        g_snprintf(bbox_file, sizeof(bbox_file), "track_out1/%d.txt", (guint)obj->frameNum);
                        float left = obj->tBbox.left;
                        float right = left + obj->tBbox.width;
                        float top = obj->tBbox.top;
                        float bottom = top + obj->tBbox.height;
                        // Past frame object confidence given by tracker
                        float confidence = obj->confidence;
                        bbox_params_dump_file = fopen(bbox_file, "a");
                        if (!bbox_params_dump_file)
                        {
                            continue;
                        }
                        if (strcmp(objList->objLabel, "airplane") == 0)
                        {
                            // To remove the object metadata entirely
                            // frame_meta->obj_meta_list = g_list_remove (frame_meta->obj_meta_list, l_obj);

                            // To simply hide it from nvosd
                            //objList->rect_params.has_bg_color = 0;
                            //objList->text_params.display_text = NULL;
                            g_print("saw airplane in the past frames meta data\n");
                        }
                        fprintf(bbox_params_dump_file,
                                "%s %lu %f %f %f %f %f\n",
                                objList->objLabel, objList->uniqueId, left, top, right, bottom,
                                confidence);
                        fclose(bbox_params_dump_file);
                    }
                }
            }
        }
    }
    return GST_PAD_PROBE_OK;
}


/*
static GstPadProbeReturn 
nvinfer_src_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *)info->data;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);

    for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
    {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);
        for (NvDsMetaList *l_obj = frame_meta->obj_meta_list; l_obj != NULL; l_obj = l_obj->next)
        {
            NvDsObjectMeta *obj_meta = (NvDsObjectMeta *)(l_obj->data);

            // Get the bounding box coordinates
            float left = obj_meta->rect_params.left;
            float top = obj_meta->rect_params.top;
            float right = left + obj_meta->rect_params.width;
            float bottom = top + obj_meta->rect_params.height;
            // Process the bounding box information, e.g., print it
            g_print("Bounding Box: left=%.2f, top=%.2f, width=%.2f, height=%.2f\n", left, top, right, bottom);
        }
    }

    return GST_PAD_PROBE_OK;
}
*/

static gboolean
bus_call(GstBus *bus, GstMessage *msg, gpointer data)
{
    GMainLoop *loop = (GMainLoop *)data;

    switch (GST_MESSAGE_TYPE(msg))
    {
    case GST_MESSAGE_EOS:
        g_print("End of stream\n");
        g_main_loop_quit(loop);
        break;

    case GST_MESSAGE_ERROR:
    {
        gchar *debug;
        GError *error;

        gst_message_parse_error(msg, &error, &debug);
        g_free(debug);

        g_printerr("Error: %s\n", error->message);
        g_error_free(error);

        g_main_loop_quit(loop);
        break;
    }

    default:
        break;
    }

    return TRUE;
}

static void source_pad_added(GstElement *element, GstPad *pad, gpointer data)
{
    g_print("###################### Second floor camera pad added ....\n");
    const gchar *str = print_pad_type(pad);
    //static guint pad_index = 0; // Choose the pad index based on your use case
    if (strstr(str, "audio") != NULL )
    {
         g_print("pad type is audio returing ... \n");
         goto exit;
    } 

    RTSPLinkElementsData *elements_data = (RTSPLinkElementsData *)data;
    GstElement *depay = elements_data->depay;
    GstElement *convert = elements_data->convert;
    GstElement *streammux = elements_data->streammux;

    GstPad *sinkpad = gst_element_get_static_pad(depay, "sink");
    //    guint pad_index = 0; // Choose the pad index based on your use case
    //    gchar pad_name[16];
    //    g_snprintf(pad_name, sizeof(pad_name), "sink_%u", pad_index);
    //    g_print("pad name is %s\n", pad_name);
    //    GstPad *sinkpad = gst_element_request_pad_simple(data, pad_name);

    //const gchar *pad_type
    
    GstPadLinkReturn link_ret = gst_pad_link(pad, sinkpad);
    if (link_ret != GST_PAD_LINK_OK)
    {
        g_printerr("GstPadLinkReturn: %s\n", gst_pad_link_return_to_string(link_ret));
    }
    else
    {
        g_print("Link succeed.\n");
    }
    gst_object_unref(sinkpad);

    // Now link nvvideoconvert to nvstreammux
    GstPad *srcpad = gst_element_get_static_pad(convert, "src");
   
    gchar pad_name[16];
    g_snprintf(pad_name, sizeof(pad_name), "sink_%u", pad_index1);
    g_print("********************* pad name is %s \n", pad_name);
    GstPad *streammux_sinkpad = gst_element_request_pad_simple(streammux, pad_name);

    if (gst_pad_link(srcpad, streammux_sinkpad) != GST_PAD_LINK_OK)
    {
        g_printerr("Failed to link nvvideoconvert and nvstreammux.\n");
    }
    else
    {
        g_print("nvvideoconvert and nvstreammux linked.\n");
    }

    gst_object_unref(srcpad);
    gst_object_unref(streammux_sinkpad);
    pad_index1++;

    exit:
        g_print("Exiting adding pad ... \n");
}

static void source_pad_added1(GstElement *element, GstPad *pad, gpointer data)
{
    g_print("###################### First floor camera pad added ....\n");
    const gchar *str = print_pad_type(pad);
    //static guint pad_index1 = 0; // Choose the pad index based on your use case
    if (strstr(str, "audio") != NULL )
    {
         g_print("pad type is audio returing ... \n");
         goto exit;
    } 

    RTSPLinkElementsData *elements_data = (RTSPLinkElementsData *)data;
    GstElement *depay = elements_data->depay;
    GstElement *convert = elements_data->convert;
    GstElement *streammux = elements_data->streammux;

    GstPad *sinkpad = gst_element_get_static_pad(depay, "sink");
    //    guint pad_index = 0; // Choose the pad index based on your use case
    //    gchar pad_name[16];
    //    g_snprintf(pad_name, sizeof(pad_name), "sink_%u", pad_index);
    //    g_print("pad name is %s\n", pad_name);
    //    GstPad *sinkpad = gst_element_request_pad_simple(data, pad_name);

    //const gchar *pad_type
    
    GstPadLinkReturn link_ret = gst_pad_link(pad, sinkpad);
    if (link_ret != GST_PAD_LINK_OK)
    {
        g_printerr("GstPadLinkReturn: %s\n", gst_pad_link_return_to_string(link_ret));
    }
    else
    {
        g_print("Link succeed.\n");
    }
    gst_object_unref(sinkpad);

    // Now link nvvideoconvert to nvstreammux
    GstPad *srcpad = gst_element_get_static_pad(convert, "src");
   
    gchar pad_name[16];
    g_snprintf(pad_name, sizeof(pad_name), "sink_%u", pad_index1);
    g_print("********************* pad name is %s \n", pad_name);
    GstPad *streammux_sinkpad = gst_element_request_pad_simple(streammux, pad_name);

    if (gst_pad_link(srcpad, streammux_sinkpad) != GST_PAD_LINK_OK)
    {
        g_printerr("Failed to link nvvideoconvert and nvstreammux.\n");
    }
    else
    {
        g_print("nvvideoconvert and nvstreammux linked.\n");
    }

    gst_object_unref(srcpad);
    gst_object_unref(streammux_sinkpad);
    pad_index1++;

    exit:
        g_print("Exiting adding pad ... \n");
}

void int_handler(int dummy) {
    // Check if pipeline exists
    if (pipeline) {
        // Send EOS (End of Stream) to the pipeline
        gst_element_send_event(pipeline, gst_event_new_eos());
    }
}

// Callback function for pad-added signal
static void demux_pad_added_handler(GstElement *src, GstPad *new_pad, gpointer data) {
    GstPad *sink_pad;
    GstElement *pipeline = (GstElement *)data;
    gchar *name = gst_pad_get_name(new_pad);

    // Assuming downstream elements are named 'branch1', 'branch2', etc.
    gchar *branch_name = g_strdup_printf("nvmsgconv_%s", name + 4);  // Assuming pad names like 'src_0', 'src_1', etc.
    g_print("\n ###################### this is the branch name: %s \n", branch_name);
    GstElement *branch = gst_bin_get_by_name(GST_BIN(pipeline), branch_name);

    if (!branch) {
        g_free(name);
        g_free(branch_name);
        return;
    }

    sink_pad = gst_element_get_static_pad(branch, "sink");
    if (gst_pad_link(new_pad, sink_pad) != GST_PAD_LINK_OK) {
        g_print("Failed to link demux pad with downstream branch.\n");
    }

    gst_object_unref(sink_pad);
    g_free(name);
    g_free(branch_name);
}

int main(int argc, char *argv[])
{
    // Initialize GStreamer
    gst_init(&argc, &argv);

    ///// Pipeline 1 elements
    // Create uridecodebin element
    GstElement *source = gst_element_factory_make ("rtspsrc", "source");
    GstElement *source1 = gst_element_factory_make ("rtspsrc", "source1");

    GstElement *depay = gst_element_factory_make ("rtph264depay", "depay");
    GstElement *depay1 = gst_element_factory_make ("rtph264depay", "depay1");

    GstElement *parse = gst_element_factory_make ("h264parse", "parse");
    GstElement *parse1 = gst_element_factory_make ("h264parse", "parse1");

    GstElement *decoder = gst_element_factory_make ("nvv4l2decoder", "decoder");
    GstElement *decoder1 = gst_element_factory_make ("nvv4l2decoder", "decoder1");

    // GstElement *queue = gst_element_factory_make ("queue", "queue");
    // GstElement *queue1 = gst_element_factory_make ("queue", "queue1");

     

    GstElement *convert = gst_element_factory_make ("nvvideoconvert", "convert");
    GstElement *convert1 = gst_element_factory_make ("nvvideoconvert", "convert1");

    g_object_set (G_OBJECT(convert), "compute-hw", "GPU", NULL);
    GstElement *capsfilter = gst_element_factory_make("capsfilter", "filter");
    GstCaps *caps = gst_caps_from_string("video/x-raw, framerate=10/1");
    g_object_set(G_OBJECT(capsfilter), "caps", caps, NULL);
    gst_caps_unref(caps);

     g_object_set (G_OBJECT(convert1), "compute-hw", "GPU", NULL);
    GstElement *capsfilter1 = gst_element_factory_make("capsfilter", "filter1");
    GstCaps *caps1 = gst_caps_from_string("video/x-raw, framerate=10/1");
    g_object_set(G_OBJECT(capsfilter1), "caps", caps1, NULL);
    gst_caps_unref(caps1);

    // Create nvstreammux element
    GstElement *streammux = gst_element_factory_make("nvstreammux", "stream-mux");

    // Create nvinfer element
    GstElement *nvinfer = gst_element_factory_make("nvinfer", "nvinfer");

    // Create tracker element
    GstElement *tracker = gst_element_factory_make("nvtracker", "tracker");

    GstElement *streamdemux = gst_element_factory_make("nvstreamdemux", "streamdemux");

      //MQTT metadata plugins
    GstElement *nvmsgconv, *nvmsgbroker;
    nvmsgconv = gst_element_factory_make("nvmsgconv", "nvmsgconv_0");
    nvmsgbroker = gst_element_factory_make("nvmsgbroker", "nvmsgbroker");
    g_object_set(G_OBJECT(nvmsgbroker), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so",  "sync", FALSE, NULL);
    g_object_set(G_OBJECT(nvmsgbroker), "conn-str", "localhost;1883", "topic", "<topic1>>", NULL);
    g_object_set(G_OBJECT(nvmsgconv), "config", "msgconv_config.txt", "msg2p-newapi", 1, "frame-interval", 1, "payload-type",  NVDS_PAYLOAD_DEEPSTREAM_MINIMAL, "debug-payload-dir", "msg_debug/", "comp-id", "cam1", NULL);

    GstElement *nvmsgconv1, *nvmsgbroker1;
    nvmsgconv1 = gst_element_factory_make("nvmsgconv", "nvmsgconv_1");
    nvmsgbroker1 = gst_element_factory_make("nvmsgbroker", "nvmsgbroker1");
    g_object_set(G_OBJECT(nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so",  "sync", FALSE, NULL);
    g_object_set(G_OBJECT(nvmsgbroker1), "conn-str", "localhost;1883", "topic", "<topic2>", NULL);
    g_object_set(G_OBJECT(nvmsgconv1), "config", "msgconv_config1.txt", "msg2p-newapi", 1, "frame-interval", 1, "payload-type",  NVDS_PAYLOAD_DEEPSTREAM_MINIMAL, "debug-payload-dir", "msg_debug1/", "comp-id", "cam2", NULL);

    // Set the properties of the elements
    if (g_str_has_suffix(argv[1], ".yml") || g_str_has_suffix(argv[1], ".yaml"))
    {
        nvds_parse_streammux(streammux, argv[1], "streammux");
        nvds_parse_gie(nvinfer, argv[1], "nvinfer");
    }
    /* Modify the source's properties */
    ////// Camera 1
    g_object_set (source, "location", "<ip-addr1>", NULL); //"protocols", );//, 0x00000004, "latency", 1000, "drop-on-latency", TRUE, "timeout", 5000000, NULL);
    g_object_set (decoder, "cudadec-memtype", 0, "num-extra-surfaces", 1, NULL);
    //g_object_set (queue, "leaky", 2, "max-size-buffers", 1, NULL);

    ///// Camera 2
    g_object_set(source1, "location", "<ip-addr2>", NULL); // "protocols", 0x00000004, "latency", 1000, "drop-on-latency", TRUE, "timeout", 5000000, NULL);
    g_object_set(decoder1, "cudadec-memtype", 0, "num-extra-surfaces", 1, NULL);
    //g_object_set(queue, "leaky", 2, "max-size-buffers", 1, NULL);

    //g_object_set(G_OBJECT(encoder), "bitrate", 15000000, NULL);

    // Set up the pipeline
    pipeline = gst_pipeline_new("my-pipeline");
    gst_bin_add_many(GST_BIN(pipeline), source, depay, parse, decoder, capsfilter, convert, streammux, nvinfer, tracker, streamdemux, nvmsgconv, nvmsgbroker, source1, depay1, parse1, decoder1, capsfilter1, convert1, nvmsgconv1, nvmsgbroker1, NULL);

    // Link the elements. Note: you need to handle the 'pad-added' signal when linking dynamic pads.
    // Configure the tracker
    g_object_set(G_OBJECT(tracker), "tracker-width", 832, "tracker-height", 832, "ll-lib-file", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_nvmultiobjecttracker.so", "ll-config-file", "/opt/nvidia/deepstream/deepstream-6.4/samples/configs/deepstream-app/config_tracker_NvDCF_perf.yml",
                 "gpu-id", 0, "display-tracking-id", TRUE, "compute-hw", 1, "user-meta-pool-size", 4096, NULL);


    // Create link element
    RTSPLinkElementsData elements_data;
    elements_data.depay = depay;
    elements_data.convert = convert;
    elements_data.streammux = streammux;

    RTSPLinkElementsData elements_data1;
    elements_data1.depay = depay1;
    elements_data1.convert = convert1;
    elements_data1.streammux = streammux;




    // Link elements together
    g_signal_connect(source, "pad-added", G_CALLBACK(source_pad_added), &elements_data);
    g_signal_connect(source1, "pad-added", G_CALLBACK(source_pad_added1), &elements_data1);
    // Dynamic source file
   // g_signal_connect(uridecodebin, "drained", G_CALLBACK(cb_drained), pipeline);

   if (!gst_element_link_many (depay, parse, decoder, convert, NULL) ) {
        g_printerr ("depay elements could not be linked.\n");
        gst_object_unref (pipeline);
        return -1;
    }
    
    if (!gst_element_link_many(streammux, nvinfer, NULL))
    {
        g_printerr("Failed to link queue1 elements and elements between them. Exiting.\n");
        return -1;
    }

    
    
    if (!gst_element_link_many(depay1, parse1, decoder1, convert1, NULL))
    {
        g_printerr("depay1 elements could not be linked.\n");
        gst_object_unref(pipeline);
        return -1;
    }


    if (!gst_element_link_many(nvinfer, tracker, streamdemux, NULL))
    {
        g_printerr("Failed to link nvinfer to tracker and streamdemux. Exiting.\n");
        return -1;
    }

    if (!gst_element_link_many(nvmsgconv, nvmsgbroker, NULL))
    {
        g_printerr("Failed to link msgconv branch one. Exiting.\n");
        return -1;
    }

    if (!gst_element_link_many(nvmsgconv1, nvmsgbroker1, NULL))
    {
        g_printerr("Failed to link msgconv branch two. Exiting.\n");
        return -1;
    }


    GstPad *nvtracker_src_pad = gst_element_get_static_pad(tracker, "src");
    gst_pad_add_probe(nvtracker_src_pad, GST_PAD_PROBE_TYPE_BUFFER, nvtracker_src_pad_buffer_probe, NULL, NULL);
    gst_pad_add_probe(nvtracker_src_pad, GST_PAD_PROBE_TYPE_BUFFER, nvtracker_past_src_pad_buffer_probe, NULL, NULL);
    gst_object_unref(nvtracker_src_pad);

    //############## Method 2
    GMainLoop *loop = g_main_loop_new(NULL, FALSE);
    GstBus *bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
    gst_bus_add_watch(bus, bus_call, loop);
    gst_object_unref(bus);

     /* Register the signal handler for CTRL+C */
    signal(SIGINT, int_handler);

  
    //Manually link the demux to converters
    GstPad *pad1, *pad2;
    pad1 = gst_element_get_static_pad(nvmsgconv, "sink");
    pad2 = gst_element_get_static_pad(nvmsgconv1, "sink");
    GstPadLinkReturn link_ret = gst_pad_link(gst_element_request_pad_simple(streamdemux, "src_0"), pad1);
    if(link_ret != GST_PAD_LINK_OK)
    {
        g_printerr("################# Pad 1 link failed: %s\n", gst_pad_link_return_to_string(link_ret));
    }
    else {
        g_print("Pads linked successfully!\n");
    }
    link_ret = gst_pad_link(gst_element_request_pad_simple(streamdemux, "src_1"), pad2);

    if(link_ret != GST_PAD_LINK_OK)
    {
        g_printerr("############### Pad 2 link failed: %s\n", gst_pad_link_return_to_string(link_ret));
    }
    else {
        g_print("Pads linked successfully!\n");
    }
    gst_object_unref(pad1);
    gst_object_unref(pad2);



    //Start playing the pipeline
    g_printerr("Starting the pipeline ... \n");
    GstStateChangeReturn ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE)
    {
        g_printerr("Failed to start pipeline. Exiting.\n");
        return -1;
    }
    else
    {
        g_printerr("succeed to start pipeline. Exiting.\n");
    }

    // Run the main loop
    g_main_loop_run(loop);

    // Clean Up
    gst_element_set_state(pipeline, GST_STATE_NULL);
    gst_object_unref(GST_OBJECT(pipeline));
    g_main_loop_unref(loop);
    return 0;
}

please refer to this topic for sending multiple topics.
if still can’t work, could you share the whole designed media pipeline?

Thank you for the pointer! I will update shortly.

Sorry for the late reply, Is this still an DeepStream issue to support? Thanks!

Hi @fanzh
Sorry for the delay in updating. I am debugging right now. I will send an update today.

Thanks!

Hi @fanzh

I tried the solution regarding ‘comp-id’ mentioned in the link that you shared, but the problem still persists.

For your reference, I am sharing the whole application.

In the main.c the IP addresses of the cameras are left as placeholders that should be replaced with camera’s URLs. (lines 702-710).

To compile: $sudo make
To run: ./run-pipeline pipeline-config.yml

In the mean time, I am going to try with a dynamic batch-sized TensorRT engine file to see if it resolves the problem. Currently, I am using static size (batch-size = 2).

Any feedback would be appreciated.

Thanks!

application_mqtt.zip (48.0 MB)

@fanzh Just a quick update: Changing to dynamic batch size engine file did not resolve the problem.

I am still getting only one mqtt message and even though the pipeline is still running no more mqtt messages are generated.

Hi @fanzh

I was wondering if there have been any updates on this? Please let me know if there is any thing I can try to resolve the issue.

why do you need two nvmsgconv? do you need to send messages to two brokers? if yes, as mentioned in my first comment, the two brokers need to have different comp_id, when you call NvDsEventMsgMeta, you need to set meta->componentId to the corresponding broker comp_id.
if still can’t work, could you share the whole media pipeline? for example, here is the pipeline of deepstream-test1.
file-source → h264-parser → nvh264-decoder → pgie → nvvidconv → nvosd → video-renderer

Hi @fanzh .

I don’t need to send messages to two brokers. I just need to have two MQTT topics. Each topic should correspond to one RTSP source. How should I do this after streamdemux plugin?
Essentially how should I differentiate messages corresponding to different sources?

you can use two brokers with the same msg-broker-conn-str an different topic, comp_id. you can still refer to the topic mentioned in my first comment.

Hi @fanzh

I have used the same conn-str for the two brokers, with different topics and different comp-id as shown here:

    GstElement *nvmsgconv, *nvmsgbroker;
    nvmsgconv = gst_element_factory_make("nvmsgconv", "nvmsgconv_0");
    nvmsgbroker = gst_element_factory_make("nvmsgbroker", "nvmsgbroker");
    g_object_set(G_OBJECT(nvmsgbroker), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so",  "sync", FALSE, NULL);
    g_object_set(G_OBJECT(nvmsgbroker), "conn-str", "localhost;1883", "topic", "traffic-data-ny:cam1", NULL);
    g_object_set(G_OBJECT(nvmsgconv), "config", "msgconv_config.txt", "msg2p-newapi", 1, "frame-interval", 1, "payload-type",  NVDS_PAYLOAD_DEEPSTREAM_MINIMAL, "debug-payload-dir", "msg_debug/", "comp-id", 1, NULL);

    GstElement *nvmsgconv1, *nvmsgbroker1;
    nvmsgconv1 = gst_element_factory_make("nvmsgconv", "nvmsgconv_1");
    nvmsgbroker1 = gst_element_factory_make("nvmsgbroker", "nvmsgbroker1");
    g_object_set(G_OBJECT(nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so",  "sync", FALSE, NULL);
    g_object_set(G_OBJECT(nvmsgbroker1), "conn-str", "localhost;1883", "topic", "traffic-data-ny:cam2", NULL);
    g_object_set(G_OBJECT(nvmsgconv1), "config", "msgconv_config1.txt", "msg2p-newapi", 1, "frame-interval", 1, "payload-type",  NVDS_PAYLOAD_DEEPSTREAM_MINIMAL, "debug-payload-dir", "msg_debug1/", "comp-id", 2, NULL);

However, the problem still persists. Here is the pipeline that I am using:
rtspsrc0 → rtph264depay0 → h264parse0 → nvv4l2decoder0 → nvvideoconvert0 → |
rtspsrc1 → rtph264depay1 → h264parse1 → nvv4l2decoder1 → nvvideoconvert1 → |
→ nvstreammux → queue → nvinfer → nvtracker → nvstreamdemux
| → nvmsgconv0 → nvmsgbroker0
| → nvmsgconv1 → nvmsgbroker1

Sorry for the late reply! from the code in your last comment. you did not set nvmsgbroker’s comp-id property. nvmsgconv’s comp-id should be the same with nvmsgbroker’s comp-id. nvmsgbroker plugin is opensource. you can find the checking logics in legacy_gst_nvmsgbroker_render of opt\nvidia\deepstream\deepstream-6.4\sources\gst-plugins\gst-nvmsgbroker\gstnvmsgbroker.cpp.

Hi @fanzh,

Thanks for your response. I see, I did not realize tat nvmsgconv also needs to have a comp-id. I will try this and update here shortly.

Mahshid

Hi @fanzh

I have tried with adding comp-id for nvmsgbroker as well, as follows:

GstElement *nvmsgconv, *nvmsgbroker;
    nvmsgconv = gst_element_factory_make("nvmsgconv", "nvmsgconv_0");
    nvmsgbroker = gst_element_factory_make("nvmsgbroker", "nvmsgbroker");
    g_object_set(G_OBJECT(nvmsgbroker), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so",  "sync", FALSE, NULL);
    g_object_set(G_OBJECT(nvmsgbroker), "conn-str", "localhost;1883", "topic", "traffic-data-ny:cam1", "comp-id", 1, NULL);
    g_object_set(G_OBJECT(nvmsgconv), "config", "msgconv_config.txt", "msg2p-newapi", 1, "frame-interval", 1, "payload-type",  NVDS_PAYLOAD_DEEPSTREAM_MINIMAL, "debug-payload-dir", "msg_debug/", "comp-id", 1, NULL);

    GstElement *nvmsgconv1, *nvmsgbroker1;
    nvmsgconv1 = gst_element_factory_make("nvmsgconv", "nvmsgconv_1");
    nvmsgbroker1 = gst_element_factory_make("nvmsgbroker", "nvmsgbroker1");
    g_object_set(G_OBJECT(nvmsgbroker1), "proto-lib", "/opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_mqtt_proto.so",  "sync", FALSE, NULL);
    g_object_set(G_OBJECT(nvmsgbroker1), "conn-str", "localhost;1883", "topic", "traffic-data-ny:cam2", "comp-id", 2, NULL);
    g_object_set(G_OBJECT(nvmsgconv1), "config", "msgconv_config1.txt", "msg2p-newapi", 1, "frame-interval", 1, "payload-type",  NVDS_PAYLOAD_DEEPSTREAM_MINIMAL, "debug-payload-dir", "msg_debug1/", "comp-id", 2, NULL);

However, the same problem persists. I only get one message and then the pipeline stops sending mqtt messages even though nvinfer and nvtracker keeps generating payload.

I suspect that the problem is with nvstreamdemux. Here is the code where I link streamdemux to the two nvmsgconvs:

  GstPad *pad1, *pad2;
    pad1 = gst_element_get_static_pad(nvmsgconv, "sink");
    pad2 = gst_element_get_static_pad(nvmsgconv1, "sink");
    GstPadLinkReturn link_ret = gst_pad_link(gst_element_request_pad_simple(streamdemux, "src_0"), pad1);
    if(link_ret != GST_PAD_LINK_OK)
    {
        g_printerr("################# Pad 1 link failed: %s\n", gst_pad_link_return_to_string(link_ret));
    }
    else {
        g_print("Pads linked successfully!\n");
    }
    link_ret = gst_pad_link(gst_element_request_pad_simple(streamdemux, "src_1"), pad2);

    if(link_ret != GST_PAD_LINK_OK)
    {
        g_printerr("############### Pad 2 link failed: %s\n", gst_pad_link_return_to_string(link_ret));
    }
    else {
        g_print("Pads linked successfully!\n");
    }
    gst_object_unref(pad1);
    gst_object_unref(pad2);

Sorry if I am missing something. The example you shared is different from my application, because it is using two msgbrokers for sending different messages to different topics. But I specifically want to demux nvinfer or nvtracker payload for two rtsp sources and generate mqtt messages for each of them on a separate topic.

  1. could you share the whole log? wondering if there are some abnormal cases.
  2. if you add nvmsgbroker or nvmsgbroker1 with the different comp-id individually, please make sure both can send successfully.
  3. nvmsgbroker plugin is opensource. you can print payload->componentId and self->compId to check the data with the different componentId are generated. especially you need to rebuild the plugin after modifying the code and replace the plugin(backup the origin plugin). please refer to the readme in /opt/nvidia/deepstream/deepstream/sources/gst-plugins/gst-nvmsgbroker/README.

legacy_gst_nvmsgbroker_render{

//add your log.
if (self->compId && payload->componentId != self->compId)
continue;
}

Hi @fanzh,

Thanks a lot for your reply! I have added the following

if (self->compId && payload->componentId != self->compId)
g_print("self_compid: %d, pauload_compid:%d\n", self->compId, payload->componentId);
}

to legacy_gst_nvmsgbroker_render and new_gst_nvmsgbroker_render, in /opt/nvidia/deepstream/deepstream-6.4/sources/gst-plugins/gst-nvmsgbroker/gstnvmsgbroker.cpp, I then ran make followed by make install. I even copied the output .so file manually as well to /opt/nvidia/deepstream/deepstream-6.4/lib/gst-plugins/. However, I do not see anything being printed. I also removed the if condition, but still, nothing is printed.

I have attached the pipeline log, with GST_DEBUG set to 3.

pipeline.log (31.0 KB)

please use the following code to check the data with the different componentId are generated. especially please backup the origin plugin.
g_print(“self_compid: %d, pauload_compid:%d\n”, self->compId, payload->componentId);
if (self->compId && payload->componentId != self->compId)
continue;
}

Hi @fanzh ,

Understood, I wrote the line of code g_print(“self_compid: %d, pauload_compid:%d\n”, self->compId, payload->componentId); before if (self->compId && payload->componentId != self->compId) continue; } inside legacy_gst_nvmsgbroker_render function. I also backed up the original .so file in /opt/nvidia/deepstream/deepstream-6.4/lib/gst-plugins. However, the print function is not called at all. I even tried printing something in the beginning of the function but nothing is printed. So either this function is not called or the whole .so file is not used and something else is used instead.

I can’t reproduce this issue. here is my test.

  1. add “g_print(“in gst_nvmsgbroker_render\n”);” at the start of gst_nvmsgbroker_render.
  2. make, then execute
mv /opt/nvidia/deepstream/deepstream/lib/gst-plugins/libnvdsgst_msgbroker.so /opt/nvidia/deepstream/deepstream/lib/gst-plugins/libnvdsgst_msgbroker.so_ori && \
mv libnvdsgst_msgbroker.so /opt/nvidia/deepstream/deepstream/lib/gst-plugins/libnvdsgst_msgbroker.so
  1. modify dstest4_config.yml, then execute “./deepstream-test4-app dstest4_config.yml”. the log will be printed. here is the log log.txt (1.8 KB)
    .