How to decide the order of the plugin in deepstream_app.c

Dear friends:
I have written a plugin named “myplugin” based on modifying the dsexample. It can be inserted into deepstream_app.c:: create_pipeline with some places.
I use this plugin to preprocess the video before inference. So please help me where should I insert the my code. The follow code is shown that I insert the plugin behind the "demuxer ", that is the most forward position I can put in this function.

gboolean
create_pipeline (AppCtx * appCtx,
bbox_generated_callback bbox_generated_post_analytics_cb,
bbox_generated_callback all_bbox_generated_cb, perf_callback perf_cb,
overlay_graphics_callback overlay_graphics_cb)
{
gboolean ret = FALSE;
NvDsPipeline *pipeline = &appCtx->pipeline;
NvDsConfig *config = &appCtx->config;
GstBus *bus;
GstElement *last_elem;
GstElement *tmp_elem1;
GstElement *tmp_elem2;
guint i;
GstPad *fps_pad;
gulong latency_probe_id;

_dsmeta_quark = g_quark_from_static_string (NVDS_META_STRING);

appCtx->all_bbox_generated_cb = all_bbox_generated_cb;
appCtx->bbox_generated_post_analytics_cb = bbox_generated_post_analytics_cb;
appCtx->overlay_graphics_cb = overlay_graphics_cb;

if (config->osd_config.num_out_buffers < 8)
{
config->osd_config.num_out_buffers = 8;
}

pipeline->pipeline = gst_pipeline_new (“pipeline”); // new pipline
if (!pipeline->pipeline)
{
NVGSTDS_ERR_MSG_V (“Failed to create pipeline”);
goto done;
}

bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline->pipeline));
pipeline->bus_id = gst_bus_add_watch (bus, bus_callback, appCtx);
gst_object_unref (bus);

if (config->file_loop) // test loop setted
{
guint i;
for (i = 0; i < config->num_source_sub_bins; i++)
config->multi_source_config[i].loop = TRUE;
}

for (guint i = 0; i < config->num_sink_sub_bins; i++)
{
NvDsSinkSubBinConfig *sink_config = &config->sink_bin_sub_bin_config[i];
switch (sink_config->type)
{
case NV_DS_SINK_FAKE:
case NV_DS_SINK_RENDER_EGL:
case NV_DS_SINK_RENDER_OVERLAY:
if (!sink_config->render_config.qos_value_specified)
{
if (config->streammux_config.live_source || sink_config->render_config.sync)
{
sink_config->render_config.qos = TRUE;
}
else
{
sink_config->render_config.qos = FALSE;
}
}
default:
break;
}
}

/*

  • Add muxer and < N > source components to the pipeline based
  • on the settings in configuration file.
    */

if (!create_multi_source_bin (config->num_source_sub_bins, config->multi_source_config, &pipeline->multi_src_bin))
goto done;
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->multi_src_bin.bin);

if (config->streammux_config.is_parsed)
set_streammux_properties (&config->streammux_config, pipeline->multi_src_bin.streammux);

if(appCtx->latency_info == NULL)
{
appCtx->latency_info = (NvDsFrameLatencyInfo *)
calloc(1, config->streammux_config.batch_size *
sizeof(NvDsFrameLatencyInfo));
}

/** a tee after the tiler which shall be connected to sink(s) */
pipeline->tiler_tee = gst_element_factory_make (NVDS_ELEM_TEE, “tiler_tee”);
if (!pipeline->tiler_tee)
{
NVGSTDS_ERR_MSG_V (“Failed to create element ‘tiler_tee’”);
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->tiler_tee);

/** Tiler + Demux in Parallel Use-Case */
if (config->tiled_display_config.enable == NV_DS_TILED_DISPLAY_ENABLE_WITH_PARALLEL_DEMUX)
{
pipeline->demuxer = gst_element_factory_make (NVDS_ELEM_STREAM_DEMUX, “demuxer”);
if (!pipeline->demuxer)
{
NVGSTDS_ERR_MSG_V (“Failed to create element ‘demuxer’”);
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demuxer);

/** NOTE:
 * demux output is supported for only one source
 * If multiple [sink] groups are configured with
 * link_to_demux=1, only the first [sink]
 * shall be constructed for all occurences of
 * [sink] groups with link_to_demux=1
 */
{
  gchar pad_name[16];
  GstPad *demux_src_pad;
  i = 0;
  if (!create_demux_pipeline (appCtx, i))
  {
    goto done;
  }

  for (i=0; i < config->num_sink_sub_bins; i++)
  {
    if (config->sink_bin_sub_bin_config[i].link_to_demux == TRUE)
    {
      g_snprintf (pad_name, 16, "src_%02d", config->sink_bin_sub_bin_config[i].source_id);
      break;
    }
  }

  if (i >= config->num_sink_sub_bins)
  {
    g_print ("\n\nError : sink for demux (use link-to-demux-only property) is not provided in the config file\n\n");
    goto done;
  }

  i = 0;
  gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demux_instance_bins[i].bin);
  demux_src_pad = gst_element_get_request_pad (pipeline->demuxer, pad_name);
  NVGSTDS_LINK_ELEMENT_FULL (pipeline->demuxer, pad_name, pipeline->demux_instance_bins[i].bin, "sink");
  gst_object_unref (demux_src_pad);

  NVGSTDS_ELEM_ADD_PROBE(latency_probe_id,
      appCtx->pipeline.demux_instance_bins[i].demux_sink_bin.bin,
      "sink",
      demux_latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
      appCtx);
  latency_probe_id = latency_probe_id;
}

last_elem = pipeline->demuxer;
link_element_to_tee_src_pad (pipeline->tiler_tee, last_elem);
last_elem = pipeline->tiler_tee;

}

if (config->tiled_display_config.enable)
{

/* Tiler will generate a single composited buffer for all sources. So need
 * to create only one processing instance. */
if (!create_processing_instance (appCtx, 0))
{
  goto done;
}
// create and add tiling component to pipeline.
if (config->tiled_display_config.columns *
    config->tiled_display_config.rows < config->num_source_sub_bins)
{
  if (config->tiled_display_config.columns == 0)
  {
    config->tiled_display_config.columns = (guint) (sqrt (config->num_source_sub_bins) + 0.5);
  }
  config->tiled_display_config.rows =
      (guint) ceil (1.0 * config->num_source_sub_bins /
      config->tiled_display_config.columns);
  NVGSTDS_WARN_MSG_V
      ("Num of Tiles less than number of sources, readjusting to "
      "%u rows, %u columns", config->tiled_display_config.rows,
      config->tiled_display_config.columns);
}

gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->instance_bins[0].bin);
last_elem = pipeline->instance_bins[0].bin;

if (!create_tiled_display_bin (&config->tiled_display_config,
        &pipeline->tiled_display_bin))
{
  goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->tiled_display_bin.bin);
NVGSTDS_LINK_ELEMENT (pipeline->tiled_display_bin.bin, last_elem);
last_elem = pipeline->tiled_display_bin.bin;

link_element_to_tee_src_pad (pipeline->tiler_tee, pipeline->tiled_display_bin.bin);
last_elem = pipeline->tiler_tee;

NVGSTDS_ELEM_ADD_PROBE (latency_probe_id,
  pipeline->instance_bins->sink_bin.sub_bins[0].sink, "sink",
  latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
  appCtx);
latency_probe_id = latency_probe_id;

}
else
{
/*
* Create demuxer only if tiled display is disabled.
*/
pipeline->demuxer = gst_element_factory_make (NVDS_ELEM_STREAM_DEMUX, “demuxer”);
if (!pipeline->demuxer)
{
NVGSTDS_ERR_MSG_V (“Failed to create element ‘demuxer’”);
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demuxer);
for (i = 0; i < config->num_source_sub_bins; i++)
{
gchar pad_name[16];
GstPad *demux_src_pad;

  if (!is_sink_available_for_source_id(config, i))
    continue;

  if (!create_processing_instance(appCtx, i))
  {
    goto done;
  }
  gst_bin_add(GST_BIN(pipeline->pipeline), pipeline->instance_bins[i].bin);
  g_snprintf(pad_name, 16, "src_%02d", i);
  demux_src_pad = gst_element_get_request_pad(pipeline->demuxer, pad_name);
  NVGSTDS_LINK_ELEMENT_FULL(pipeline->demuxer, pad_name,
                            pipeline->instance_bins[i].bin, "sink");
  gst_object_unref(demux_src_pad);

  for (int k = 0; k < MAX_SINK_BINS;k++)
  {
    if(pipeline->instance_bins[i].sink_bin.sub_bins[k].sink)
    {
      NVGSTDS_ELEM_ADD_PROBE(latency_probe_id,
          pipeline->instance_bins[i].sink_bin.sub_bins[k].sink, "sink",
          latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
          appCtx);
      break;
    }
  }

  latency_probe_id = latency_probe_id;
}
last_elem = pipeline->demuxer;

}

/-------------add myplugin -------------------------------------------------/
if (config-> myplugin_config.enable)
{
if (!create_myplugin_bin (&config->myplugin_config, &pipeline->myplugin_bin))
{
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->myplugin_bin.bin);
NVGSTDS_LINK_ELEMENT (pipeline->myplugin_bin.bin, last_elem);
last_elem = pipeline->myplugin_bin.bin;
}
/--------------------------------------------------------------------------/

if (config->tiled_display_config.enable == NV_DS_TILED_DISPLAY_DISABLE)
{
fps_pad = gst_element_get_static_pad (pipeline->demuxer, “sink”);
}
else
{
fps_pad = gst_element_get_static_pad (pipeline->tiled_display_bin.bin, “sink”);
}

pipeline->common_elements.appCtx = appCtx;

// Decide where in the pipeline the element should be added and add only if
// enabled
if (config->dsexample_config.enable)
{

// Create dsexample element bin and set properties
if (!create_dsexample_bin (&config->dsexample_config, &pipeline->dsexample_bin))
{
  goto done;
}
// Add dsexample bin to instance bin
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->dsexample_bin.bin);
// Link this bin to the last element in the bin
NVGSTDS_LINK_ELEMENT (pipeline->dsexample_bin.bin, last_elem);
// Set this bin as the last element
last_elem = pipeline->dsexample_bin.bin;

}

// create and add common components to pipeline.
if (!create_common_elements (config, pipeline, &tmp_elem1, &tmp_elem2,
bbox_generated_post_analytics_cb))
{
goto done;
}

if(!add_and_link_broker_sink(appCtx))
{
goto done;
}

if (tmp_elem2)
{
NVGSTDS_LINK_ELEMENT (tmp_elem2, last_elem);
last_elem = tmp_elem1;
}

NVGSTDS_LINK_ELEMENT (pipeline->multi_src_bin.bin, last_elem);
// enable performance measurement and add call back function to receive
// performance data.
if (config->enable_perf_measurement)
{
appCtx->perf_struct.context = appCtx;
enable_perf_measurement (&appCtx->perf_struct, fps_pad,
pipeline->multi_src_bin.num_bins,
config->perf_measurement_interval_sec,
config->multi_source_config[0].dewarper_config.num_surfaces_per_frame,
perf_cb);
}

latency_probe_id = latency_probe_id;

if (config->num_message_consumers)
{
for (i = 0; i < config->num_message_consumers; i++)
{
appCtx->c2d_ctx[i] = start_cloud_to_device_messaging (
&config->message_consumer_config[i], NULL,
&appCtx->pipeline.multi_src_bin);
if (appCtx->c2d_ctx[i] == NULL)
{
NVGSTDS_ERR_MSG_V (“Failed to create message consumer”);
goto done;
}
}
}

GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (appCtx->pipeline.pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, “ds-app-null”);

g_mutex_init (&appCtx->app_lock);
g_cond_init (&appCtx->app_cond);
g_mutex_init (&appCtx->latency_lock);

ret = TRUE;
done:
if (!ret)
{
NVGSTDS_ERR_MSG_V (“%s failed”, func);
}
return ret;
}

/**

  • Function to destroy pipeline and release the resources, probes etc.
    */
    void
    destroy_pipeline (AppCtx * appCtx)
    {
    gint64 end_time;
    NvDsConfig *config = &appCtx->config;
    guint i;
    GstBus *bus = NULL;

end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;

if (!appCtx)
return;

if (appCtx->pipeline.demuxer) {
gst_pad_send_event (gst_element_get_static_pad (appCtx->pipeline.demuxer,
“sink”), gst_event_new_eos ());
} else if (appCtx->pipeline.instance_bins[0].sink_bin.bin) {
gst_pad_send_event (gst_element_get_static_pad (appCtx->
pipeline.instance_bins[0].sink_bin.bin, “sink”),
gst_event_new_eos ());
}

g_usleep (100000);

g_mutex_lock (&appCtx->app_lock);
if (appCtx->pipeline.pipeline) {
destroy_smart_record_bin (&appCtx->pipeline.multi_src_bin);
bus = gst_pipeline_get_bus (GST_PIPELINE (appCtx->pipeline.pipeline));

while (TRUE) {
  GstMessage *message = gst_bus_pop (bus);
  if (message == NULL)
    break;
  else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR)
    bus_callback (bus, message, appCtx);
  else
    gst_message_unref (message);
}
gst_element_set_state (appCtx->pipeline.pipeline, GST_STATE_NULL);

}
g_cond_wait_until (&appCtx->app_cond, &appCtx->app_lock, end_time);
g_mutex_unlock (&appCtx->app_lock);

for (i = 0; i < appCtx->config.num_source_sub_bins; i++) {
NvDsInstanceBin *bin = &appCtx->pipeline.instance_bins[i];
if (config->osd_config.enable) {
NVGSTDS_ELEM_REMOVE_PROBE (bin->all_bbox_buffer_probe_id,
bin->osd_bin.nvosd, “sink”);
} else {
NVGSTDS_ELEM_REMOVE_PROBE (bin->all_bbox_buffer_probe_id,
bin->sink_bin.bin, “sink”);
}

if (config->primary_gie_config.enable) {
  NVGSTDS_ELEM_REMOVE_PROBE (bin->primary_bbox_buffer_probe_id,
      bin->primary_gie_bin.bin, "src");
}

}
if(appCtx->latency_info == NULL)
{
free(appCtx->latency_info);
appCtx->latency_info = NULL;
}

destroy_sink_bin ();
g_mutex_clear(&appCtx->latency_lock);

if (appCtx->pipeline.pipeline) {
bus = gst_pipeline_get_bus (GST_PIPELINE (appCtx->pipeline.pipeline));
gst_bus_remove_watch (bus);
gst_object_unref (bus);
gst_object_unref (appCtx->pipeline.pipeline);
}

if (config->num_message_consumers) {
for (i = 0; i < config->num_message_consumers; i++) {
if (appCtx->c2d_ctx[i])
stop_cloud_to_device_messaging (appCtx->c2d_ctx[i]);
}
}
}

Thank you very much.

I hope my pipline as follow

Could you share your setup firstly?

For your question, is it possible to setup a simple pipeline maybe based on deepstream-test1 and verify your custom plugin can work well, then you can integrate it in deepstream-app.

1 Like

Thank you for your reply.
I have solved this problem by add algorithm in the “gstnvinfer.cpp” . At present, it could be useful for my aim.
Thank you again.

1 Like