Nveglglessink gst_element_set_state(GST_STATE_NULL) hangs

I have written a stand alone project to practice my pipeline manipulation skills.
I have a pipeline that goes from udpsrc–>fakesink. Using a time callback, I add another pipeline to play the video on the desktop using nveglglessink. (queue->nvvidconv->nveglglessink )
Then after a certain amount of time, I remove this pipeline. I do this several times just to test the stability of my pipeline implementation.

The problem is that the line gst_element_set_state(recSink->sink, GST_STATE_NULL); will sometimes just hang.

I’m posting the code here. I compile it with the command line: gcc stream.c -o test pkg-config --cflags --libs gstreamer-1.0

include <gst/gst.h>
include <string.h>
include <stdio.h>
include <glib.h>

typedef struct _CustomData
{
gboolean is_live;
GstElement *pipeline;
GMainLoop *loop;
guint64 frame_count;
guint64 drop_count;
gint32 cam_index;
GstElement *qmx;
} CustomData;

typedef struct
{
GstPad *teepad; // source pad on tee
//GstPad *teesink; //sink pad on tee
GstPad *qsink;
GstPad *qsource;
GstElement *queue;
GstElement *encode;
GstElement *mux;
GstElement *sink;
gboolean removing;
} Sink;

CustomData data;
Sink *recSink;

//debug function to make sure pipeline has the expected # of elements
void pipelineElementCount()
{
GstIterator *it = gst_bin_iterate_elements(GST_BIN(data.pipeline));
GValue item = G_VALUE_INIT;
int elementCount = 0;
gboolean done = FALSE;

while(!done)
{
    switch(gst_iterator_next(it, &item)) {
        case GST_ITERATOR_OK:
    elementCount++;
    g_value_reset(&item);
    break;
    case GST_ITERATOR_ERROR:
    case GST_ITERATOR_DONE:
            g_print("pipeline element count is %d \n", elementCount);
    done = TRUE;
    break;
    }
}

g_value_unset(&item);
gst_iterator_free(it);

}

static GstPadProbeReturn
unlink_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
if (GST_EVENT_TYPE (GST_PAD_PROBE_INFO_DATA (info)) != GST_EVENT_EOS)
return GST_PAD_PROBE_PASS;
printf(“unlink_cb gstreamer major %d, minor %d, minor %d\n”, GST_VERSION_MAJOR, GST_VERSION_MINOR, GST_VERSION_MICRO);
gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));

printf(“unlink_cb 1\n”); //~~~~~~~~~~~~~~~~~~~~ IT HANGS on the very next LINE!!!
gst_element_set_state (recSink->sink, GST_STATE_NULL);

printf(“unlink_cb 2\n”);
//gst_element_set_state (recSink->mux, GST_STATE_NULL);
gst_element_set_state (recSink->encode, GST_STATE_NULL);
printf(“unlink_cb 3\n”);

gst_bin_remove (GST_BIN (data.pipeline), recSink->encode);
printf(“unlink_cb 4\n”);
//gst_bin_remove (GST_BIN (data.pipeline), recSink->mux);
gst_bin_remove (GST_BIN (data.pipeline), recSink->sink);
printf(“unlink_cb 5\n”);

g_print (“removed\n”);

return GST_PAD_PROBE_REMOVE;
}

static GstPadProbeReturn
unlink_cb1 (GstPad * pad, GstPadProbeInfo * info, gpointer user_data)
{
GstPad *sinkpad, *eossink, *qsink;

GST_DEBUG_OBJECT (pad, “pad is blocked now”);

/* remove the probe first */
gst_pad_remove_probe (pad, GST_PAD_PROBE_INFO_ID (info));

/* install new probe for EOS */
sinkpad = gst_element_get_static_pad (recSink->sink, “sink”);
gst_pad_add_probe (sinkpad, GST_PAD_PROBE_TYPE_BLOCK, unlink_cb, user_data, NULL);

/* push EOS into the element, the probe will be fired when the
EOS leaves the effect and it has thus drained all of its data */

eossink = gst_element_get_static_pad (recSink->encode, “sink”);
gst_pad_send_event (eossink, gst_event_new_eos ());
gst_object_unref (eossink);

// remove queue element
//unlink queue element sink pad from tee src
qsink = gst_element_get_static_pad (recSink->queue, “sink”);
gst_pad_unlink (recSink->teepad, qsink);
gst_object_unref (qsink);
// remove queue element from pipeline
//g_print(“remove queue element from pipeline\n”);
gst_element_set_state (recSink->queue, GST_STATE_NULL);
gst_bin_remove (GST_BIN (data.pipeline), recSink->queue);

return GST_PAD_PROBE_OK;
}

gboolean timeout_callback(gpointer d)
{
static int i = 0;

//return FALSE;
if(recSink == NULL)
recSink = g_new0 (Sink, 1);

i++;
//g_print("timeout_callback called %d times\n", i);
if ( (10 == i)||(200==i)||(400==i))
{
    time_t now = time(NULL);
    char filename[40];
    GstPad *sinkpad, *teepad, *teesink;
    GstElement *tee, *queue, *encode, *mux, *sink;
    GstPadTemplate *templ, *temp2;

//" ! queue ! nvvidconv ! nveglglessink window-width=720 window-height=480 sync=false qos=false"
tee = gst_bin_get_by_name(GST_BIN(data.pipeline), “dec”);
templ = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (tee), “src_%u”);
teepad = gst_element_request_pad (tee, templ, NULL, NULL);
queue = gst_element_factory_make (“queue”, NULL);
encode = gst_element_factory_make (“nvvidconv”, NULL);
sink = gst_element_factory_make (“nveglglessink”, NULL);
g_object_set (sink, “window-width”, 720, “window-height”, 480, NULL);

    gst_bin_add_many (GST_BIN (data.pipeline), queue, encode, sink, NULL);
    gst_element_sync_state_with_parent (sink);
    gst_element_sync_state_with_parent (encode);
    gst_element_sync_state_with_parent (queue);
    gst_element_link_many (queue, encode, sink, NULL);

    sinkpad = gst_element_get_static_pad (queue, "sink");
    gst_pad_link (teepad, sinkpad);
    gst_object_unref (sinkpad);
    recSink->teepad = teepad;
    recSink->sink = sink;
    recSink->encode = encode;
    recSink->queue = queue;
    pipelineElementCount();  // debug function
    return TRUE;
}
if ((100 == i)||(300==i) ||(500==i))
{
    printf("timeout_callback called %d times\n", i);
    gst_pad_add_probe (recSink->teepad, GST_PAD_PROBE_TYPE_IDLE, unlink_cb1, d, NULL);
    pipelineElementCount();   //debug function to make sure pipeline has the expected # of elements
}

if( 650 == i)
{
    printf("timeout_callback called %d times\n", i);
    pipelineElementCount();
    return FALSE;
}

return TRUE;

}

int main (int argc, char *argv)
{
GstElement *pipeline;
GstBus *bus;
GstStateChangeReturn ret;
GMainLoop *main_loop;

GstElement *elm_identity, *elm_udpsrc;
gint32 index;
gchar pipe_string[1024];


/* Initialize GStreamer */
gst_init (&argc, &argv);

/* Initialize our data structure */
memset (&data, 0, sizeof (data));

if (argc < 2) {
    g_print ("*** Missing required camera index [0,1,2,3]\n");
    return -1;
}
index = atoi (argv[1]);
if (index < 0 || index > 3) {
    g_print ("*** Invalid argument");
    return -1;
}
data.cam_index = index;

sprintf (pipe_string, "udpsrc port=%d"
    " ! application/x-rtp, media=video, clock-rate=90000, encoding-name=H265,playload=96"
    " ! rtpjitterbuffer latency=5 do-lost=true"
    " ! rtph265depay"
    " ! h265parse"
    " ! video/x-h265,alignment=au"
    " ! nvv4l2decoder disable-dpb=true enable-max-performance=1"
    " ! queue max-size-bytes=0"
    " ! videorate max-rate=30"
    " ! identity name=elm_id"
    " ! tee name=dec dec."
    " ! fakesink sync=false",
    5004+index);
   // " ! queue ! nvvidconv ! nveglglessink window-width=720 window-height=480 sync=false qos=false", 5004+index);

printf ("%s\n", pipe_string);

pipeline = gst_parse_launch (pipe_string, NULL);
bus = gst_element_get_bus (pipeline);

/* Start playing */
ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
if (ret == GST_STATE_CHANGE_FAILURE) {
    g_printerr ("Unable to set the pipeline to the playing state.\n");
    gst_object_unref (pipeline);
    return -1;
} else if (ret == GST_STATE_CHANGE_NO_PREROLL) {
    data.is_live = TRUE;
}

main_loop = g_main_loop_new (NULL, FALSE);
data.loop = main_loop;
data.pipeline = pipeline;
data.frame_count = 0;
data.drop_count = 0;

// add source to default context
g_timeout_add (100 , timeout_callback , NULL);
g_main_loop_run (main_loop);

/* Free resources */
g_main_loop_unref (main_loop);
//gst_object_unref (bus);
gst_element_set_state (pipeline, GST_STATE_NULL);
gst_object_unref (pipeline);

return 0;

}

Hi,
Don’t see the code for sending end of stream and waiting for EoS. Please add the code and try again:

    // Send EoS
    gst_element_send_event ((GstElement*)gst_pipeline, gst_event_new_eos ());
    // Wait for EOS message
    bus = gst_pipeline_get_bus(GST_PIPELINE(gst_pipeline));
    gst_bus_poll(bus, GST_MESSAGE_EOS, GST_CLOCK_TIME_NONE);
    gst_element_set_state((GstElement*)gst_pipeline, GST_STATE_NULL);

Hi Dane,

In the future I plan to replace fakesink with something like a udpsink, so for this situation I do not want to end the stream for the main pipeline. I only want to stop the stream to the ‘dynamic’ pipeline which is forked from the main pipeline.

main pipeline

udpsrc → blah → blah → tee name = dec dec.–>fakesink

dynamic pipline that forks from the tee dec.

dec. -->queue–>nvvidconv–>nveglglessink

I do send a EOS event on the encode element of the ‘dynamic’ pipeline in the callback unlink_cb1()

eossink = gst_element_get_static_pad (recSink->encode, “sink”);
gst_pad_send_event (eossink, gst_event_new_eos ());
gst_object_unref (eossink);

The EOS event is then handled in the callback unlink_cb() which is linked to a pad_probe on the sink side of nveglglessink.

This code sometimes runs like a charm, and other times it will hang in unlink_cb() right where I try to set_state the nveglglessink (recSink->sink) to GST_STATE_NULL

Thanks for taking a look at my code!

Hi,
We suggest use appsink and appsrc. You can run

udpsrc → blah → blah → tee name = dec dec.–>appsink

And get decoded frames in appsink. Once you need to render the frames, you can launch the pipeline:

appsrc-->queue–>nvvidconv–>nvegltransform->nveglglessink

Hi DaneLLL,

Would the pipeline you suggest be an independent pipeline? Is it a branch from the main pipeline.

appsrc–>queue–>nvvidconv–>nvegltransform->nveglglessink

I will try what you suggest. But I am wondering why my current solution would have issues? I have tried something similar but had it go to a filesink instead of a nveglglessink, and that worked without a problem.