Push OpenCv Mat images into Deepstream pipeline using appsrc

Please provide complete information as applicable to your setup.

• Hardware Platform (GTX 1660)
• DeepStream Version 5.0
• TensorRT Version 7.0.0.11
• NVIDIA GPU Driver Version 440.33.01
• Issue Type: question

Hello,

I’m trying to push frames into a deepstream pipeline using opencv mat but the pipeline keeps freezing. If i push images using FILE and fopen it works without any issues.

GstFlowReturn PipelineWorker::new_sample (GstElement * sink, PipelineWorker* worker)
{
  GstSample *sample;
  GstBuffer *buf = NULL;
  guint num_rects = 0;
  NvDsObjectMeta *obj_meta = NULL;
  guint vehicle_count = 0;
  guint person_count = 0;
  NvDsMetaList *l_frame = NULL;
  NvDsMetaList *l_obj = NULL;
  NvDsMetaList *l_user = NULL;
  unsigned long int pts = 0;

  g_print("Sample Received");
  sample = gst_app_sink_pull_sample (GST_APP_SINK (sink));
  if (gst_app_sink_is_eos (GST_APP_SINK (sink))) {
    g_print ("EOS received in Appsink********\n");
  }

  if (sample) {
    /* Obtain GstBuffer from sample and then extract metadata from it. */
    buf = gst_sample_get_buffer (sample);
    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta (buf);

    for (l_frame = batch_meta->frame_meta_list; l_frame != NULL;
           l_frame = l_frame->next)
      {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta *)(l_frame->data);

        for (l_user = frame_meta->frame_user_meta_list; l_user != NULL;
             l_user = l_user->next)
        {
          NvDsUserMeta *user_meta = (NvDsUserMeta *)l_user->data;
          if (user_meta->base_meta.meta_type == NVDSINFER_TENSOR_OUTPUT_META)
          {
            NvDsInferTensorMeta *tensor_meta =
                (NvDsInferTensorMeta *)user_meta->user_meta_data;
            Vec2D<int> objects;
            Vec3D<float> normalized_peaks;
            tie(objects, normalized_peaks) = parse_objects_from_tensor_meta(tensor_meta);
            worker->netService.serializeSkeleton(create_display_meta(objects, normalized_peaks, frame_meta, frame_meta->source_frame_width, frame_meta->source_frame_height), worker->token);
          }
        }
      }

    return GST_FLOW_OK;
  }
  return GST_FLOW_ERROR;
}

void PipelineWorker::read_data (GstElement *source, guint size, PipelineWorker *worker)
{
  //////////////////////// Opencv Attempt //////////////////////////////
  GstBuffer *buffer;
  GstFlowReturn gstret = GST_FLOW_OK;
  GstMapInfo map;

  cv::Mat img = cv::imread("oie_sOUOv7TVzybk.jpg");
  gsize sizeInBytes = img.cols * img.rows * img.channels();
  buffer = gst_buffer_new_allocate(NULL, sizeInBytes,NULL);
  gst_buffer_map(buffer,&map,GST_MAP_WRITE);
  memcpy(map.data, img.data, sizeInBytes);
  map.size = sizeInBytes;
  gst_buffer_unmap(buffer,&map);

  gstret = gst_app_src_push_buffer ((GstAppSrc *) worker->data.app_source, buffer);
  worker->data.appsrc_frame_num++;

  if(gstret != GST_FLOW_OK)
      g_print("push buffer failed");
 //////////////////////////////////////////////////////////////////////////////////////////////////////////////////
 /////////////////////////////  FILE object opened with fopen ///////////////////////////////////////
   /* GstBuffer *buffer;
    GstFlowReturn gstret = GST_FLOW_OK;

    std::vector<std::string> images = {"oie_sOUOv7TVzybk.jpg"}; //{"image2.jpeg", "image2.jpeg","image2.jpeg", "image3.jpg", "image3.jpg", "image3.jpg", "image.jpeg", "image.jpeg", "image.jpeg"};

    size_t ret = 0;
    GstMapInfo map;
    buffer = gst_buffer_new_allocate (NULL, worker->data.frame_size, NULL);

    gst_buffer_map (buffer, &map, GST_MAP_WRITE);
    worker->data.file = fopen (images[0].c_str(),"r");
    ret = fread (map.data, 1, worker->data.frame_size, worker->data.file);
    fclose(worker->data.file);
    map.size = ret;

    gst_buffer_unmap (buffer, &map);
    g_print("buffer size: %lu",gst_buffer_get_size(buffer));
    if (ret > 0) {
  #if CUSTOM_PTS
      GST_BUFFER_PTS (buffer) =
          gst_util_uint64_scale (data->appsrc_frame_num, GST_SECOND, data->fps);
  #endif
      gstret = gst_app_src_push_buffer ((GstAppSrc *) worker->data.app_source, buffer);
      if (gstret != GST_FLOW_OK) {
        g_print ("gst_app_src_push_buffer returned %d \n", gstret);
        //return FALSE;
      }
    } else if (ret == 0) {
      g_signal_emit_by_name (worker->data.app_source, "push-buffer", buffer, &ret);
      //gstret = gst_app_src_end_of_stream ((GstAppSrc *) data->app_source);
      if (gstret != GST_FLOW_OK) {
        g_print
            ("gst_app_src_end_of_stream returned %d. EoS not queued successfully.\n",
            gstret);
        //return FALSE;
      }
    } else {
      g_print ("\n failed to read from file\n");
      //return FALSE;
    }*/
}

void PipelineWorker::initPipeline()
{
    GstElement *streammux = NULL, *sink = NULL, *pgie = NULL,
            *nvvidconv = NULL, *nvosd = NULL, *decoder = NULL,
                *nvsink = NULL, *nvvidconv1 = NULL, *caps_filter = NULL, *appsink = NULL, *tee = NULL;

    GstBus *bus = NULL;
    guint bus_watch_id;
    GstCaps *caps = NULL;
    GstCapsFeatures *feature = NULL;
    gchar *vidconv_format;
    GstPad *osd_sink_pad = NULL;
    GstPad *tee_source_pad1, *tee_source_pad2;
    GstPad *appsink_sink_pad;

    /* Standard GStreamer initialization */
    gst_init(0,NULL);
    loop = g_main_loop_new(NULL, FALSE);

    memset (&data, 0, sizeof (data));
    data.app_source = NULL;
    data.frame_size = MUXER_OUTPUT_WIDTH * MUXER_OUTPUT_HEIGHT * 4;
    vidconv_format = "RGBA";
    data.fps = 30;
    data.app_source = gst_element_factory_make ("appsrc", "app-source");


    decoder = gst_element_factory_make ("nvv4l2decoder", "nvv4l2-decoder");

    /* Create gstreamer elements */
    /* Create Pipeline element that will form a connection of other elements */
    pipeline = gst_pipeline_new("deepstream-tensorrt-openpose-pipeline");

    /* Use convertor to convert from software buffer to GPU buffer */
    nvvidconv1 =
        gst_element_factory_make ("nvvideoconvert", "nvvideo-converter1");
    if (!nvvidconv1) {
      g_printerr ("nvvideoconvert1 could not be created. Exiting.\n");
      return ;
    }
    caps_filter = gst_element_factory_make ("capsfilter", "capsfilter");
    if (!caps_filter) {
      g_printerr ("Caps_filter could not be created. Exiting.\n");
      return ;
    }

    /* Create nvstreammux instance to form batches from one or more sources. */
    streammux = gst_element_factory_make("nvstreammux", "stream-muxer");

    if (!pipeline || !streammux)
    {
      g_printerr("pipeline or streammux could not be created. Exiting.\n");
      return ;
    }

    /* Use nvinfer to run inferencing on decoder's output,
     * behaviour of inferencing is set through config file */
    pgie = gst_element_factory_make("nvinfer", "primary-nvinference-engine");

    /* Use convertor to convert from NV12 to RGBA as required by nvosd */
    nvvidconv = gst_element_factory_make("nvvideoconvert", "nvvideo-converter");

    /* Create OSD to draw on the converted RGBA buffer */
    nvosd = gst_element_factory_make("nvdsosd", "nv-onscreendisplay");
    tee = gst_element_factory_make ("tee", "tee");

    nvsink = gst_element_factory_make("nveglglessink", "nvvideo-renderer");
    sink = gst_element_factory_make("fpsdisplaysink", "fps-display");

    appsink = gst_element_factory_make ("appsink", "app-sink");
    if (!appsink) {
      g_printerr ("Appsink element could not be created. Exiting.\n");
      return ;
    }

    /* Configure appsrc */
    g_object_set (data.app_source, "caps",
        gst_caps_new_simple ("image/jpeg",
            "format", G_TYPE_STRING, "RGBA",
            "width", G_TYPE_INT, MUXER_OUTPUT_WIDTH,
            "height", G_TYPE_INT, MUXER_OUTPUT_HEIGHT,
            "framerate", GST_TYPE_FRACTION, data.fps, 1, NULL), NULL);

    g_signal_connect (data.app_source, "need-data", G_CALLBACK (read_data),
        this);
    g_object_set (G_OBJECT (nvvidconv1), "nvbuf-memory-type", 3, NULL);

    caps =
        gst_caps_new_simple ("image/jpeg", "format", G_TYPE_STRING,
        vidconv_format, NULL);
    feature = gst_caps_features_new ("memory:NVMM", NULL);
    gst_caps_set_features (caps, 0, feature);
    g_object_set (G_OBJECT (caps_filter), "caps", caps, NULL);

    g_object_set(G_OBJECT(sink), "text-overlay", FALSE, "video-sink", nvsink, "sync", FALSE, NULL);

    if (!pgie)
    {
      g_printerr("nvinfer could not be created. Exiting.\n");
      return ;
    }

    g_object_set(G_OBJECT(streammux), "width", MUXER_OUTPUT_WIDTH, "height",
                 MUXER_OUTPUT_HEIGHT, "batch-size", 1,
                 "batched-push-timeout", 4000000,
                 NULL);


    /* Set all the necessary properties of the nvinfer element,
     * the necessary ones are : */
    g_object_set(G_OBJECT(pgie), "output-tensor-meta", TRUE,
                 "config-file-path", "deepstream_pose_estimation_config.txt",
                 NULL);

    /* we add a message handler */
    bus = gst_pipeline_get_bus(GST_PIPELINE(pipeline));
    bus_watch_id = gst_bus_add_watch(bus, bus_call, loop);
    gst_object_unref(bus);

    /* Set up the pipeline */
    /* we add all elements into the pipeline */

    gst_bin_add_many (GST_BIN (pipeline),
        data.app_source, decoder, streammux, pgie,
        appsink, NULL);

    GstPad *sinkpad, *srcpad;
    gchar pad_name_sink[16] = "sink_0";
    gchar pad_name_src[16] = "src";

    sinkpad = gst_element_get_request_pad (streammux, pad_name_sink);
    if (!sinkpad) {
      g_printerr ("Streammux request sink pad failed. Exiting.\n");
      return ;
    }

    srcpad = gst_element_get_static_pad (decoder, pad_name_src);
    if (!srcpad) {
      g_printerr ("Decoder request src pad failed. Exiting.\n");
      return ;
    }

    if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
      g_printerr ("Failed to link caps filter to stream muxer. Exiting.\n");
      return ;
    }

    gst_object_unref (sinkpad);
    gst_object_unref (srcpad);

    if (!gst_element_link_many (data.app_source, decoder, NULL) ||
        !gst_element_link_many (streammux, pgie, appsink, NULL)) {
      g_printerr ("Elements could not be linked: Exiting.\n");
      return ;
    }

    tee_source_pad1 = gst_element_get_request_pad (tee, "src_0");
    osd_sink_pad = gst_element_get_static_pad (nvosd, "sink");
    tee_source_pad2 = gst_element_get_request_pad (tee, "src_1");
    appsink_sink_pad = gst_element_get_static_pad (appsink, "sink");

    /* Configure appsink to extract data from DeepStream pipeline */
    g_object_set (appsink, "emit-signals", TRUE, "async", FALSE, NULL);

    /* Callback to access buffer and object info. */
    g_signal_connect (appsink, "new-sample", G_CALLBACK (new_sample), this);

    /* Lets add probe to get informed of the meta data generated, we add probe to
     * the sink pad of the osd element, since by that time, the buffer would have
     * had got all the metadata. */
    osd_sink_pad = gst_element_get_static_pad(nvosd, "sink");
    if (!osd_sink_pad)
      g_print("Unable to get sink pad\n");
    else
      gst_pad_add_probe(osd_sink_pad, GST_PAD_PROBE_TYPE_BUFFER,
                        osd_sink_pad_buffer_probe, (gpointer)sink, NULL);

    /* Set the pipeline to "playing" state */
    g_print("Now playing\n");
    gst_element_set_state(pipeline, GST_STATE_PLAYING);

    secondStart = std::chrono::high_resolution_clock::now();

    /* Wait till pipeline encounters an error or EOS */
    g_print("Running...\n");
    g_main_loop_run(loop);

    /* Out of the main loop, clean up nicely */
    g_print("Returned, stopping playback\n");
    gst_element_set_state(pipeline, GST_STATE_NULL);
    g_print("Deleting pipeline\n");
    gst_object_unref(GST_OBJECT(pipeline));
    g_source_remove(bus_watch_id);
    g_main_loop_unref(loop);
}

If i use the first version of read_data with opencv image, new_sample method is never called; also read_data won’t be called a second time. For the opencv version i tried using both gst_app_src_push_buffer and g_signal_emit_by_name but neither worked.

Any help is greatly appreciated.

Please refer to deepstream sample code in /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-appsrc-test