Dear friends:
I have written a plugin named “myplugin” based on modifying the dsexample. It can be inserted into deepstream_app.c:: create_pipeline with some places.
I use this plugin to preprocess the video before inference. So please help me where should I insert the my code. The follow code is shown that I insert the plugin behind the "demuxer ", that is the most forward position I can put in this function.
gboolean
create_pipeline (AppCtx * appCtx,
bbox_generated_callback bbox_generated_post_analytics_cb,
bbox_generated_callback all_bbox_generated_cb, perf_callback perf_cb,
overlay_graphics_callback overlay_graphics_cb)
{
gboolean ret = FALSE;
NvDsPipeline *pipeline = &appCtx->pipeline;
NvDsConfig *config = &appCtx->config;
GstBus *bus;
GstElement *last_elem;
GstElement *tmp_elem1;
GstElement *tmp_elem2;
guint i;
GstPad *fps_pad;
gulong latency_probe_id;
_dsmeta_quark = g_quark_from_static_string (NVDS_META_STRING);
appCtx->all_bbox_generated_cb = all_bbox_generated_cb;
appCtx->bbox_generated_post_analytics_cb = bbox_generated_post_analytics_cb;
appCtx->overlay_graphics_cb = overlay_graphics_cb;
if (config->osd_config.num_out_buffers < 8)
{
config->osd_config.num_out_buffers = 8;
}
pipeline->pipeline = gst_pipeline_new (“pipeline”); // new pipline
if (!pipeline->pipeline)
{
NVGSTDS_ERR_MSG_V (“Failed to create pipeline”);
goto done;
}
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline->pipeline));
pipeline->bus_id = gst_bus_add_watch (bus, bus_callback, appCtx);
gst_object_unref (bus);
if (config->file_loop) // test loop setted
{
guint i;
for (i = 0; i < config->num_source_sub_bins; i++)
config->multi_source_config[i].loop = TRUE;
}
for (guint i = 0; i < config->num_sink_sub_bins; i++)
{
NvDsSinkSubBinConfig *sink_config = &config->sink_bin_sub_bin_config[i];
switch (sink_config->type)
{
case NV_DS_SINK_FAKE:
case NV_DS_SINK_RENDER_EGL:
case NV_DS_SINK_RENDER_OVERLAY:
if (!sink_config->render_config.qos_value_specified)
{
if (config->streammux_config.live_source || sink_config->render_config.sync)
{
sink_config->render_config.qos = TRUE;
}
else
{
sink_config->render_config.qos = FALSE;
}
}
default:
break;
}
}
/*
- Add muxer and < N > source components to the pipeline based
- on the settings in configuration file.
*/
if (!create_multi_source_bin (config->num_source_sub_bins, config->multi_source_config, &pipeline->multi_src_bin))
goto done;
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->multi_src_bin.bin);
if (config->streammux_config.is_parsed)
set_streammux_properties (&config->streammux_config, pipeline->multi_src_bin.streammux);
if(appCtx->latency_info == NULL)
{
appCtx->latency_info = (NvDsFrameLatencyInfo *)
calloc(1, config->streammux_config.batch_size *
sizeof(NvDsFrameLatencyInfo));
}
/** a tee after the tiler which shall be connected to sink(s) */
pipeline->tiler_tee = gst_element_factory_make (NVDS_ELEM_TEE, “tiler_tee”);
if (!pipeline->tiler_tee)
{
NVGSTDS_ERR_MSG_V (“Failed to create element ‘tiler_tee’”);
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->tiler_tee);
/** Tiler + Demux in Parallel Use-Case */
if (config->tiled_display_config.enable == NV_DS_TILED_DISPLAY_ENABLE_WITH_PARALLEL_DEMUX)
{
pipeline->demuxer = gst_element_factory_make (NVDS_ELEM_STREAM_DEMUX, “demuxer”);
if (!pipeline->demuxer)
{
NVGSTDS_ERR_MSG_V (“Failed to create element ‘demuxer’”);
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demuxer);
/** NOTE:
* demux output is supported for only one source
* If multiple [sink] groups are configured with
* link_to_demux=1, only the first [sink]
* shall be constructed for all occurences of
* [sink] groups with link_to_demux=1
*/
{
gchar pad_name[16];
GstPad *demux_src_pad;
i = 0;
if (!create_demux_pipeline (appCtx, i))
{
goto done;
}
for (i=0; i < config->num_sink_sub_bins; i++)
{
if (config->sink_bin_sub_bin_config[i].link_to_demux == TRUE)
{
g_snprintf (pad_name, 16, "src_%02d", config->sink_bin_sub_bin_config[i].source_id);
break;
}
}
if (i >= config->num_sink_sub_bins)
{
g_print ("\n\nError : sink for demux (use link-to-demux-only property) is not provided in the config file\n\n");
goto done;
}
i = 0;
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demux_instance_bins[i].bin);
demux_src_pad = gst_element_get_request_pad (pipeline->demuxer, pad_name);
NVGSTDS_LINK_ELEMENT_FULL (pipeline->demuxer, pad_name, pipeline->demux_instance_bins[i].bin, "sink");
gst_object_unref (demux_src_pad);
NVGSTDS_ELEM_ADD_PROBE(latency_probe_id,
appCtx->pipeline.demux_instance_bins[i].demux_sink_bin.bin,
"sink",
demux_latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
appCtx);
latency_probe_id = latency_probe_id;
}
last_elem = pipeline->demuxer;
link_element_to_tee_src_pad (pipeline->tiler_tee, last_elem);
last_elem = pipeline->tiler_tee;
}
if (config->tiled_display_config.enable)
{
/* Tiler will generate a single composited buffer for all sources. So need
* to create only one processing instance. */
if (!create_processing_instance (appCtx, 0))
{
goto done;
}
// create and add tiling component to pipeline.
if (config->tiled_display_config.columns *
config->tiled_display_config.rows < config->num_source_sub_bins)
{
if (config->tiled_display_config.columns == 0)
{
config->tiled_display_config.columns = (guint) (sqrt (config->num_source_sub_bins) + 0.5);
}
config->tiled_display_config.rows =
(guint) ceil (1.0 * config->num_source_sub_bins /
config->tiled_display_config.columns);
NVGSTDS_WARN_MSG_V
("Num of Tiles less than number of sources, readjusting to "
"%u rows, %u columns", config->tiled_display_config.rows,
config->tiled_display_config.columns);
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->instance_bins[0].bin);
last_elem = pipeline->instance_bins[0].bin;
if (!create_tiled_display_bin (&config->tiled_display_config,
&pipeline->tiled_display_bin))
{
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->tiled_display_bin.bin);
NVGSTDS_LINK_ELEMENT (pipeline->tiled_display_bin.bin, last_elem);
last_elem = pipeline->tiled_display_bin.bin;
link_element_to_tee_src_pad (pipeline->tiler_tee, pipeline->tiled_display_bin.bin);
last_elem = pipeline->tiler_tee;
NVGSTDS_ELEM_ADD_PROBE (latency_probe_id,
pipeline->instance_bins->sink_bin.sub_bins[0].sink, "sink",
latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
appCtx);
latency_probe_id = latency_probe_id;
}
else
{
/*
* Create demuxer only if tiled display is disabled.
*/
pipeline->demuxer = gst_element_factory_make (NVDS_ELEM_STREAM_DEMUX, “demuxer”);
if (!pipeline->demuxer)
{
NVGSTDS_ERR_MSG_V (“Failed to create element ‘demuxer’”);
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->demuxer);
for (i = 0; i < config->num_source_sub_bins; i++)
{
gchar pad_name[16];
GstPad *demux_src_pad;
if (!is_sink_available_for_source_id(config, i))
continue;
if (!create_processing_instance(appCtx, i))
{
goto done;
}
gst_bin_add(GST_BIN(pipeline->pipeline), pipeline->instance_bins[i].bin);
g_snprintf(pad_name, 16, "src_%02d", i);
demux_src_pad = gst_element_get_request_pad(pipeline->demuxer, pad_name);
NVGSTDS_LINK_ELEMENT_FULL(pipeline->demuxer, pad_name,
pipeline->instance_bins[i].bin, "sink");
gst_object_unref(demux_src_pad);
for (int k = 0; k < MAX_SINK_BINS;k++)
{
if(pipeline->instance_bins[i].sink_bin.sub_bins[k].sink)
{
NVGSTDS_ELEM_ADD_PROBE(latency_probe_id,
pipeline->instance_bins[i].sink_bin.sub_bins[k].sink, "sink",
latency_measurement_buf_prob, GST_PAD_PROBE_TYPE_BUFFER,
appCtx);
break;
}
}
latency_probe_id = latency_probe_id;
}
last_elem = pipeline->demuxer;
}
/-------------add myplugin -------------------------------------------------/
if (config-> myplugin_config.enable)
{
if (!create_myplugin_bin (&config->myplugin_config, &pipeline->myplugin_bin))
{
goto done;
}
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->myplugin_bin.bin);
NVGSTDS_LINK_ELEMENT (pipeline->myplugin_bin.bin, last_elem);
last_elem = pipeline->myplugin_bin.bin;
}
/--------------------------------------------------------------------------/
if (config->tiled_display_config.enable == NV_DS_TILED_DISPLAY_DISABLE)
{
fps_pad = gst_element_get_static_pad (pipeline->demuxer, “sink”);
}
else
{
fps_pad = gst_element_get_static_pad (pipeline->tiled_display_bin.bin, “sink”);
}
pipeline->common_elements.appCtx = appCtx;
// Decide where in the pipeline the element should be added and add only if
// enabled
if (config->dsexample_config.enable)
{
// Create dsexample element bin and set properties
if (!create_dsexample_bin (&config->dsexample_config, &pipeline->dsexample_bin))
{
goto done;
}
// Add dsexample bin to instance bin
gst_bin_add (GST_BIN (pipeline->pipeline), pipeline->dsexample_bin.bin);
// Link this bin to the last element in the bin
NVGSTDS_LINK_ELEMENT (pipeline->dsexample_bin.bin, last_elem);
// Set this bin as the last element
last_elem = pipeline->dsexample_bin.bin;
}
// create and add common components to pipeline.
if (!create_common_elements (config, pipeline, &tmp_elem1, &tmp_elem2,
bbox_generated_post_analytics_cb))
{
goto done;
}
if(!add_and_link_broker_sink(appCtx))
{
goto done;
}
if (tmp_elem2)
{
NVGSTDS_LINK_ELEMENT (tmp_elem2, last_elem);
last_elem = tmp_elem1;
}
NVGSTDS_LINK_ELEMENT (pipeline->multi_src_bin.bin, last_elem);
// enable performance measurement and add call back function to receive
// performance data.
if (config->enable_perf_measurement)
{
appCtx->perf_struct.context = appCtx;
enable_perf_measurement (&appCtx->perf_struct, fps_pad,
pipeline->multi_src_bin.num_bins,
config->perf_measurement_interval_sec,
config->multi_source_config[0].dewarper_config.num_surfaces_per_frame,
perf_cb);
}
latency_probe_id = latency_probe_id;
if (config->num_message_consumers)
{
for (i = 0; i < config->num_message_consumers; i++)
{
appCtx->c2d_ctx[i] = start_cloud_to_device_messaging (
&config->message_consumer_config[i], NULL,
&appCtx->pipeline.multi_src_bin);
if (appCtx->c2d_ctx[i] == NULL)
{
NVGSTDS_ERR_MSG_V (“Failed to create message consumer”);
goto done;
}
}
}
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (appCtx->pipeline.pipeline),
GST_DEBUG_GRAPH_SHOW_ALL, “ds-app-null”);
g_mutex_init (&appCtx->app_lock);
g_cond_init (&appCtx->app_cond);
g_mutex_init (&appCtx->latency_lock);
ret = TRUE;
done:
if (!ret)
{
NVGSTDS_ERR_MSG_V (“%s failed”, func);
}
return ret;
}
/**
- Function to destroy pipeline and release the resources, probes etc.
*/
void
destroy_pipeline (AppCtx * appCtx)
{
gint64 end_time;
NvDsConfig *config = &appCtx->config;
guint i;
GstBus *bus = NULL;
end_time = g_get_monotonic_time () + G_TIME_SPAN_SECOND;
if (!appCtx)
return;
if (appCtx->pipeline.demuxer) {
gst_pad_send_event (gst_element_get_static_pad (appCtx->pipeline.demuxer,
“sink”), gst_event_new_eos ());
} else if (appCtx->pipeline.instance_bins[0].sink_bin.bin) {
gst_pad_send_event (gst_element_get_static_pad (appCtx->
pipeline.instance_bins[0].sink_bin.bin, “sink”),
gst_event_new_eos ());
}
g_usleep (100000);
g_mutex_lock (&appCtx->app_lock);
if (appCtx->pipeline.pipeline) {
destroy_smart_record_bin (&appCtx->pipeline.multi_src_bin);
bus = gst_pipeline_get_bus (GST_PIPELINE (appCtx->pipeline.pipeline));
while (TRUE) {
GstMessage *message = gst_bus_pop (bus);
if (message == NULL)
break;
else if (GST_MESSAGE_TYPE (message) == GST_MESSAGE_ERROR)
bus_callback (bus, message, appCtx);
else
gst_message_unref (message);
}
gst_element_set_state (appCtx->pipeline.pipeline, GST_STATE_NULL);
}
g_cond_wait_until (&appCtx->app_cond, &appCtx->app_lock, end_time);
g_mutex_unlock (&appCtx->app_lock);
for (i = 0; i < appCtx->config.num_source_sub_bins; i++) {
NvDsInstanceBin *bin = &appCtx->pipeline.instance_bins[i];
if (config->osd_config.enable) {
NVGSTDS_ELEM_REMOVE_PROBE (bin->all_bbox_buffer_probe_id,
bin->osd_bin.nvosd, “sink”);
} else {
NVGSTDS_ELEM_REMOVE_PROBE (bin->all_bbox_buffer_probe_id,
bin->sink_bin.bin, “sink”);
}
if (config->primary_gie_config.enable) {
NVGSTDS_ELEM_REMOVE_PROBE (bin->primary_bbox_buffer_probe_id,
bin->primary_gie_bin.bin, "src");
}
}
if(appCtx->latency_info == NULL)
{
free(appCtx->latency_info);
appCtx->latency_info = NULL;
}
destroy_sink_bin ();
g_mutex_clear(&appCtx->latency_lock);
if (appCtx->pipeline.pipeline) {
bus = gst_pipeline_get_bus (GST_PIPELINE (appCtx->pipeline.pipeline));
gst_bus_remove_watch (bus);
gst_object_unref (bus);
gst_object_unref (appCtx->pipeline.pipeline);
}
if (config->num_message_consumers) {
for (i = 0; i < config->num_message_consumers; i++) {
if (appCtx->c2d_ctx[i])
stop_cloud_to_device_messaging (appCtx->c2d_ctx[i]);
}
}
}
Thank you very much.