Cannot get sample when multiple images a pushed to the pipeline but works when done one at a time

Please provide complete information as applicable to your setup.

• Jetson nano
• DeepStream 5
• JetPack 4.4
• TensorRT Version

I have a problem where pushing multiple buffers to the pipeline only returns the results once. But If I wait before I push the next buffer it works as expected. Does someone have any idea?

I have the following pipeline.

appsrc->jpegparse->nvv4l2decoder->nvstreammux->nvinfer->appsink

I listen for sample using the new-sample signal. and push data to the pipeline using gst_app_src_push_buffer function.

The problem I have is that if I attempt to push more than one buffer at once, I only get the sample once. If I wait for the sample before I push a new buffer everything works fine.

Below is the code.

#include <gst/gst.h>
#include <glib.h>

#include <stdio.h>
#include <stdlib.h>
#include <string.h>

#include "gstnvdsmeta.h"

#define MUXER_OUTPUT_WIDTH 704
#define MUXER_OUTPUT_HEIGHT 576
#define MUXER_BATCH_TIMEOUT_USEC 33000

#define NVDS_GST_META_FRAME (nvds_get_user_meta_type("NVIDIA.DECODER.GST_META_FRAME_META"))

typedef struct FrameMeta
{
    int frameIdx;
} FrameMeta;

FrameMeta *probeFrameMeta(GstBuffer *buf) {
    NvDsMetaList *l_frame = NULL;
    NvDsUserMeta *user_meta = NULL;
    FrameMeta *frameMeta = NULL;
    NvDsMetaList *l_user_meta = NULL;
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);
    for (l_frame = batch_meta->frame_meta_list; l_frame != NULL;
         l_frame = l_frame->next) {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta * )(l_frame->data);

        for (l_user_meta = frame_meta->frame_user_meta_list; l_user_meta != NULL;
             l_user_meta = l_user_meta->next) {
            user_meta = (NvDsUserMeta * )(l_user_meta->data);

            if (user_meta->base_meta.meta_type == NVDS_GST_META_FRAME) {
                frameMeta = (FrameMeta *) user_meta->user_meta_data;
                return frameMeta;
            }
        }
    }
    return frameMeta;
}

/* gst meta copy function set by user */
static gpointer frame_meta_copy_func(gpointer data, gpointer user_data) {
    FrameMeta *src_frame_meta = (FrameMeta *) data;

    FrameMeta *dst_frame_meta = (FrameMeta *) g_malloc0(
            sizeof(FrameMeta));
    memcpy(dst_frame_meta, src_frame_meta, sizeof(FrameMeta));
    return (gpointer) dst_frame_meta;
}

/* gst meta release function set by user */
static void frame_meta_release_func(gpointer data, gpointer user_data) {
    FrameMeta *frame_meta = (FrameMeta *) data;
    if (frame_meta) {
        g_free(frame_meta);
        frame_meta = NULL;
    }
}

static void frame_gst_nvds_meta_release_func(gpointer data, gpointer user_data) {
    NvDsUserMeta *user_meta = (NvDsUserMeta *) data;
    FrameMeta *frame_meta = (FrameMeta *) user_meta->user_meta_data;
    frame_meta_release_func(frame_meta, NULL);
}

/* gst to nvds transform function set by user. "data" holds a pointer to NvDsUserMeta */
static gpointer src_gst_to_nvds_meta_transform_func(gpointer data, gpointer user_data) {
    NvDsUserMeta *user_meta = (NvDsUserMeta *) data;
    FrameMeta *src_frame_meta =
            (FrameMeta *) user_meta->user_meta_data;
    FrameMeta *dst_frame_meta =
            (FrameMeta *) frame_meta_copy_func(src_frame_meta, NULL);
    return (gpointer) dst_frame_meta;
}

bool pushData() {
    printf("Trying to push Buffer! \n");
    GstFlowReturn ret;
    int frameIdx;
    char *buffer;
    size_t bufferLength;

    GstMapInfo map;
    GstBuffer *gst_buffer = gst_buffer_new_allocate(NULL, bufferLength, NULL);
    gst_buffer_fill(gst_buffer, 0, buffer, bufferLength);

    NvDsMeta *meta = NULL;
    FrameMeta *frameMeta = (FrameMeta *) g_malloc0(sizeof(FrameMeta));
    if (frameMeta != NULL) {
        frameMeta->frameIdx = frameIdx;
    }
    meta = gst_buffer_add_nvds_meta(gst_buffer, frameMeta, NULL,
                                    frame_meta_copy_func, frame_meta_release_func);
    meta->meta_type = (GstNvDsMetaType)NVDS_GST_META_FRAME;
    meta->gst_to_nvds_meta_transform_func = src_gst_to_nvds_meta_transform_func;

    meta->gst_to_nvds_meta_release_func = frame_gst_nvds_meta_release_func;

    gst_buffer_map (gst_buffer, &map, GST_MAP_WRITE);

    gst_buffer_unmap (gst_buffer, &map);

    ret = gst_app_src_push_buffer ((GstAppSrc *) app_source, gst_buffer);

    if (ret != GST_FLOW_OK) {
        printf("Failed to push Buffer! \n");
        return FALSE;
    }
    return TRUE;
}


GstFlowReturn new_sample(GstElement *sink) {
    printf("New_sample signal fired! \n");
    GstSample *sample;
    GstBuffer *buf = NULL;
    NvDsObjectMeta *obj_meta = NULL;
    NvDsMetaList *l_frame = NULL;
    NvDsMetaList *l_obj = NULL;

    g_signal_emit_by_name (sink, "pull-sample", &sample);
    if (sample) {
        printf("Successfully pulled new Sample! \n");
        gst_sample_unref(sample);
        return GST_FLOW_OK;
    }
    return GST_FLOW_ERROR;
}

void error_cb(GstBus *bus, GstMessage *msg, GMainLoop *main_loop) {
    GError *err;
    gchar *debug_info;
    /* Print error details on the screen */
    gst_message_parse_error(msg, &err, &debug_info);
    g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
    g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
    g_clear_error(&err);
    g_free(debug_info);

    g_main_loop_quit(main_loop);
}

static void
start_feed (GstElement * source, guint size, AppSrcData * data)
{
    if (data->sourceid == 0) {
        data->sourceid = g_idle_add ((GSourceFunc) read_data, NULL);
    }
}

int main(int argc, char *argv[]) {
    GstElement *pipeline = NULL, *app_source = NULL, *app_queue = NULL, *jpegparse = NULL, *nvv4l2decoder = NULL, *nvstreammux = NULL, *nvinfer = NULL, *app_sink = NULL;
    GMainLoop *main_loop;  /* GLib's Main Loop */
    guint sourceid;
    GstBus *bus;

    sourceid = 0;
    gst_init (&argc, &argv);
//    loop = g_main_loop_new (NULL, FALSE);

    app_source = gst_element_factory_make ("appsrc", "appsrc");
    app_queue = gst_element_factory_make ("queue", "queue");
    jpegparse = gst_element_factory_make ("jpegparse", "jpegparse");
    nvv4l2decoder = gst_element_factory_make ("nvv4l2decoder", "nvv4l2decoder");
    nvstreammux = gst_element_factory_make ("nvstreammux", "nvstreammux");
    nvinfer = gst_element_factory_make ("nvinfer", "nvinfer");
    app_sink = gst_element_factory_make ("appsink", "app_sink");
    pipeline = gst_pipeline_new ("inference-pipeline");

    if (!pipeline || !app_source || !jpegparse || !app_queue || !nvv4l2decoder || !nvstreammux || !nvinfer || !app_sink) {
        g_printerr ("Not all elements could be created.\n");
        return -1;
    }

    g_object_set (app_sink, "emit-signals", TRUE, NULL);
    g_signal_connect (app_sink, "new-sample", G_CALLBACK (new_sample), NULL);

    g_object_set (G_OBJECT (app_source), "is-live", 1, NULL);
    g_object_set (G_OBJECT (nvstreammux), "width", MUXER_OUTPUT_WIDTH, "height",
                  MUXER_OUTPUT_HEIGHT, "batch-size", 1,
                  "batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, NULL);

    g_object_set(G_OBJECT(nvinfer),
                 "config-file-path",
                 "/home/pi/configs/config_infer_primary_yoloV3.txt",
                 NULL);

    gst_bin_add_many (GST_BIN (pipeline), app_source, jpegparse, app_queue, nvv4l2decoder, nvstreammux, nvinfer, app_sink, NULL);

    if (!gst_element_link_many (app_source, jpegparse, app_queue, nvv4l2decoder, NULL)) {
        g_printerr ("Elements could not be linked: caontext->customData.app_source, context->customData.app_queue, context->customData.nvv4l2decoder. Exiting.\n");
        return -1;
    }

    if (!gst_element_link_many (nvstreammux, nvinfer, app_sink, NULL)) {
        g_printerr ("Elements could not be linked: 1. Exiting. context->customData.nvstreammux, context->customData.nvinfer, context->customData.app_sink \n");
        return -1;
    }

    GstPad *sinkpad, *srcpad, *infer_src_pad;
    gchar pad_name_sink[16] = "sink_0";
    gchar pad_name_src[16] = "src";

    sinkpad = gst_element_get_request_pad (nvstreammux, pad_name_sink);
    if (!sinkpad) {
        g_printerr ("Streammux request sink pad failed. Exiting.\n");
        return -1;
    }

    srcpad = gst_element_get_static_pad (nvv4l2decoder, pad_name_src);
    if (!srcpad) {
        g_printerr ("Decoder request src pad failed. Exiting.\n");
        return -1;
    }

    if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
        g_printerr ("Failed to link decoder to stream muxer. Exiting.\n");
         return -1;
    }
    gst_object_unref (sinkpad);
    gst_object_unref (srcpad);

    bus = gst_element_get_bus (pipeline);
    gst_bus_add_signal_watch (bus);
    g_signal_connect (G_OBJECT (bus), "message::error", (GCallback)error_cb, &main_loop);
    gst_object_unref (bus);
    gst_element_set_state (pipeline, GST_STATE_PLAYING);

    main_loop = g_main_loop_new (NULL, FALSE);
    g_main_loop_run (main_loop);
    gst_element_set_state (pipeline, GST_STATE_NULL);
    gst_object_unref (pipeline);

    return 0;
}

This is not a DeepStream related question. It is a basic gstreamer GstAppSrc usage question.
Please refer to GstAppSrc
appsrc
GstSample

It was a deepstream question because when I replace nvv4l2decoder with nvvideoconvert it worked.

appsrc->nvjpegdec->nvvideoconvert->nvstreammux->nvinfer->appsink.

Note that nvv4l2decoder is a nvidia module.

Have you set “mjpeg” property to 1 for nvv4l2decoder plugin?

No, that might have been the problem. But it works with appsrc->nvjpegdec->nvvideoconvert->nvstreammux->nvinfer->appsink. The problem I am facing now is that when I start pushing 4 frames a second the performance greatly reduces but when but in the deepstream app I can run 8 streams in real time.

Any suggestions. or should I post a new question?

Just to have an idea. I am trying to implement the deepstream app put as different pipelines.

One pipeline for transforming the streams into jpegs. One for inference, etc. The reason is that I integrate it with nodejs (addon), so I need it to be flexible.

Please create new topic for new problem.