I have written a stand alone project to practice my pipeline manipulation skills.
I have a pipeline that goes from udpsrc–>fakesink. Using a time callback, I add another pipeline to play the video on the desktop using nveglglessink. (queue->nvvidconv->nveglglessink )
Then after a certain amount of time, I remove this pipeline. I do this several times just to test the stability of my pipeline implementation.
The problem is that the line gst_element_set_state(recSink->sink, GST_STATE_NULL); will sometimes just hang.
I’m posting the code here. I compile it with the command line: gcc stream.c -o test pkg-config --cflags --libs gstreamer-1.0
include <gst/gst.h>
include <string.h>
include <stdio.h>
include <glib.h>typedef struct _CustomData
{
gboolean is_live;
GstElement *pipeline;
GMainLoop *loop;
guint64 frame_count;
guint64 drop_count;
gint32 cam_index;
GstElement *qmx;
} CustomData;typedef struct
{
GstPad *teepad; // source pad on tee
//GstPad *teesink; //sink pad on tee
GstPad *qsink;
GstPad *qsource;
GstElement *queue;
GstElement *encode;
GstElement *mux;
GstElement *sink;
gboolean removing;
} Sink;CustomData data;
Sink *recSink;//debug function to make sure pipeline has the expected # of elements
void pipelineElementCount()
{
GstIterator *it = gst_bin_iterate_elements(GST_BIN(data.pipeline));
GValue item = G_VALUE_INIT;
int elementCount = 0;
gboolean done = FALSE;while(!done) { switch(gst_iterator_next(it, &item)) { case GST_ITERATOR_OK: elementCount++; g_value_reset(&item); break; case GST_ITERATOR_ERROR: case GST_ITERATOR_DONE: g_print("pipeline element count is %d \n", elementCount); done = TRUE; break; } } g_value_unset(&item); gst_iterator_free(it);
}
static GstPadProbeReturn
unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_DATA (info)) != GST_EVENT_EOS)
return GST_PAD_PROBE_PASS;
printf(“unlink_cb gstreamer major %d, minor %d, minor %d\n”, GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO);
gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));printf(“unlink_cb 1\n”); //~~~~~~~~~~~~~~~~~~~~ IT HANGS on the very next LINE!!!
gst_element_set_state (recSink->sink, GST_STATE_NULL);printf(“unlink_cb 2\n”);
//gst_element_set_state (recSink->mux, GST_STATE_NULL);
gst_element_set_state (recSink->encode, GST_STATE_NULL);
printf(“unlink_cb 3\n”);gst_bin_remove (GST_BIN (data.pipeline), recSink->encode);
printf(“unlink_cb 4\n”);
//gst_bin_remove (GST_BIN (data.pipeline), recSink->mux);
gst_bin_remove (GST_BIN (data.pipeline), recSink->sink);
printf(“unlink_cb 5\n”);g_print (“removed\n”);
return GST_PAD_PROBE_REMOVE;
}static GstPadProbeReturn
unlink_cb1 (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstPad *sinkpad, *eossink, *qsink;GST_DEBUG_OBJECT (pad, “pad is blocked now”);
/* remove the probe first */
gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));/* install new probe for EOS */
sinkpad = gst_element_get_static_pad (recSink->sink, “sink”);
gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, unlink_cb, user_data, NULL);/* push EOS into the element, the probe will be fired when the
EOS leaves the effect and it has thus drained all of its data */eossink = gst_element_get_static_pad (recSink->encode, “sink”);
gst_pad_send_event (eossink, gst_event_new_eos ());
gst_object_unref (eossink);// remove queue element
//unlink queue element sink pad from tee src
qsink = gst_element_get_static_pad (recSink->queue, “sink”);
gst_pad_unlink (recSink->teepad, qsink);
gst_object_unref (qsink);
// remove queue element from pipeline
//g_print(“remove queue element from pipeline\n”);
gst_element_set_state (recSink->queue, GST_STATE_NULL);
gst_bin_remove (GST_BIN (data.pipeline), recSink->queue);return GST_PAD_PROBE_OK;
}gboolean timeout_callback(gpointer d)
{
static int i = 0;//return FALSE;
if(recSink == NULL)
recSink = g_new0 (Sink, 1);i++; //g_print("timeout_callback called %d times\n", i); if ( (10 == i)||(200==i)||(400==i)) { time_t now = time(NULL); char filename[40]; GstPad *sinkpad, *teepad, *teesink; GstElement *tee, *queue, *encode, *mux, *sink; GstPadTemplate *templ, *temp2;
//" ! queue ! nvvidconv ! nveglglessink window-width=720 window-height=480 sync=false qos=false"
tee = gst_bin_get_by_name(GST_BIN(data.pipeline), “dec”);
templ = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (tee), “src_%u”);
teepad = gst_element_request_pad (tee, templ, NULL, NULL);
queue = gst_element_factory_make (“queue”, NULL);
encode = gst_element_factory_make (“nvvidconv”, NULL);
sink = gst_element_factory_make (“nveglglessink”, NULL);
g_object_set (sink, “window-width”, 720, “window-height”, 480, NULL);gst_bin_add_many (GST_BIN (data.pipeline), queue, encode, sink, NULL); gst_element_sync_state_with_parent (sink); gst_element_sync_state_with_parent (encode); gst_element_sync_state_with_parent (queue); gst_element_link_many (queue, encode, sink, NULL); sinkpad = gst_element_get_static_pad (queue, "sink"); gst_pad_link (teepad, sinkpad); gst_object_unref (sinkpad); recSink->teepad = teepad; recSink->sink = sink; recSink->encode = encode; recSink->queue = queue; pipelineElementCount(); // debug function return TRUE; } if ((100 == i)||(300==i) ||(500==i)) { printf("timeout_callback called %d times\n", i); gst_pad_add_probe (recSink->teepad, GST_PAD_PROBE_TYPE_IDLE, unlink_cb1, d, NULL); pipelineElementCount(); //debug function to make sure pipeline has the expected # of elements } if( 650 == i) { printf("timeout_callback called %d times\n", i); pipelineElementCount(); return FALSE; } return TRUE;
}
int main (int argc, char *argv)
{
GstElement *pipeline;
GstBus *bus;
GstStateChangeReturn ret;
GMainLoop *main_loop;GstElement *elm_identity, *elm_udpsrc; gint32 index; gchar pipe_string[1024]; /* Initialize GStreamer */ gst_init (&argc, &argv); /* Initialize our data structure */ memset (&data, 0, sizeof (data)); if (argc < 2) { g_print ("*** Missing required camera index [0,1,2,3]\n"); return -1; } index = atoi (argv[1]); if (index < 0 || index > 3) { g_print ("*** Invalid argument"); return -1; } data.cam_index = index; sprintf (pipe_string, "udpsrc port=%d" " ! application/x-rtp, media=video, clock-rate=90000, encoding-name=H265,playload=96" " ! rtpjitterbuffer latency=5 do-lost=true" " ! rtph265depay" " ! h265parse" " ! video/x-h265,alignment=au" " ! nvv4l2decoder disable-dpb=true enable-max-performance=1" " ! queue max-size-bytes=0" " ! videorate max-rate=30" " ! identity name=elm_id" " ! tee name=dec dec." " ! fakesink sync=false", 5004+index); // " ! queue ! nvvidconv ! nveglglessink window-width=720 window-height=480 sync=false qos=false", 5004+index); printf ("%s\n", pipe_string); pipeline = gst_parse_launch (pipe_string, NULL); bus = gst_element_get_bus (pipeline); /* Start playing */ ret = gst_element_set_state (pipeline, GST_STATE_PLAYING); if (ret == GST_STATE_CHANGE_FAILURE) { g_printerr ("Unable to set the pipeline to the playing state.\n"); gst_object_unref (pipeline); return -1; } else if (ret == GST_STATE_CHANGE_NO_PREROLL) { data.is_live = TRUE; } main_loop = g_main_loop_new (NULL, FALSE); data.loop = main_loop; data.pipeline = pipeline; data.frame_count = 0; data.drop_count = 0; // add source to default context g_timeout_add (100 , timeout_callback , NULL); g_main_loop_run (main_loop); /* Free resources */ g_main_loop_unref (main_loop); //gst_object_unref (bus); gst_element_set_state (pipeline, GST_STATE_NULL); gst_object_unref (pipeline); return 0;
}