I have succeeded in running runtime_source_add_add_delete from github. However, the thing came when my rtsp sources became unstable. So, I added a piece of code to capture it (added below). What it does is auto reconnecting after timeout ( unable to receive frame from rtsp ).
void add_src(gint source_id, std::string uri) {
GstElement *source_bin;
GstStateChangeReturn state_return;
g_source_enabled[source_id] = TRUE;
source_bin = create_uridecode_bin (source_id, uri.c_str());
if (!source_bin) {
g_printerr ("Failed to create source bin. Exiting.\n");
}
g_source_bin_list[source_id] = source_bin;
gst_bin_add (GST_BIN (pipeline), source_bin);
state_return =
gst_element_set_state (g_source_bin_list[source_id], GST_STATE_PLAYING);
switch (state_return) {
case GST_STATE_CHANGE_SUCCESS:
g_print ("STATE CHANGE SUCCESS\n\n");
break;
case GST_STATE_CHANGE_FAILURE:
g_print ("STATE CHANGE FAILURE\n\n");
break;
case GST_STATE_CHANGE_ASYNC:
g_print ("STATE CHANGE ASYNC\n\n");
state_return =
gst_element_get_state (g_source_bin_list[source_id], NULL, NULL,
GST_CLOCK_TIME_NONE);
// source_id++;
break;
case GST_STATE_CHANGE_NO_PREROLL:
g_print ("STATE CHANGE NO PREROLL\n\n");
break;
default:
break;
}
}
void del_src(gint source_id) {
GstStateChangeReturn state_return;
gchar pad_name[16];
GstPad *sinkpad = NULL;
state_return =
gst_element_set_state (g_source_bin_list[source_id], GST_STATE_NULL);
g_print("state_return: %d\n", state_return);
g_object_unref (g_source_bin_list[source_id]);
g_print("Unref Success...\n");
switch (state_return) {
case GST_STATE_CHANGE_SUCCESS:
g_print ("STATE CHANGE SUCCESS\n\n");
g_snprintf (pad_name, 15, "sink_%u", source_id);
sinkpad = gst_element_get_static_pad (streammux, pad_name);
gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
gst_element_release_request_pad (streammux, sinkpad);
g_print ("STATE CHANGE SUCCESS %p\n\n", sinkpad);
gst_object_unref (sinkpad);
gst_bin_remove (GST_BIN (pipeline), g_source_bin_list[source_id]);
g_print("RM success...\n");
break;
case GST_STATE_CHANGE_FAILURE:
g_print ("STATE CHANGE FAILURE\n\n");
break;
case GST_STATE_CHANGE_ASYNC:
g_print ("STATE CHANGE ASYNC\n\n");
state_return =
gst_element_get_state (g_source_bin_list[source_id], NULL, NULL,
GST_CLOCK_TIME_NONE);
g_snprintf (pad_name, 15, "sink_%u", source_id);
sinkpad = gst_element_get_static_pad (streammux, pad_name);
gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
gst_element_release_request_pad (streammux, sinkpad);
g_print ("STATE CHANGE ASYNC %p\n\n", sinkpad);
gst_object_unref (sinkpad);
gst_bin_remove (GST_BIN (pipeline), g_source_bin_list[source_id]);
break;
case GST_STATE_CHANGE_NO_PREROLL:
g_print ("STATE CHANGE NO PREROLL\n\n");
break;
default:
break;
}
}
static gboolean
sourcectl (gpointer data)
{
data::Remote remote_msg;
const auto ret = get_msg(remote_msg, zmq::recv_flags::dontwait);
if (!ret) {
// g_mutex_lock (&eos_lock);
for (gint source_id=0; source_id<MAX_NUM_SOURCES; source_id++) {
if (!g_source_enabled[source_id]) continue;
if (n_frame_out_list[source_id] > FRAME_TIMEOUT) {
g_mutex_lock (&eos_lock);
g_print ("Restart source_id %d \n", source_id);
n_frame_out_list[source_id] = 0;
g_mutex_unlock (&eos_lock);
g_print ("Del source_id %d \n", source_id);
del_src(source_id);
std::this_thread::sleep_for(std::chrono::milliseconds(1000));
g_print ("Add source_id %d \n", source_id);
add_src(source_id, source_uri[source_id]);
break;
}
}
// g_mutex_unlock (&eos_lock);
return TRUE;
}
// g_mutex_lock (&eos_lock);
gint source_id = remote_msg.index();
if (!g_strcmp0 (remote_msg.cmd().c_str(), "add")) {
std::string uri = remote_msg.rtsp();
g_print ("Calling Start %d \n", source_id);
add_src(source_id, uri);
source_uri[source_id] = uri;
g_num_sources++;
} else if (!g_strcmp0 (remote_msg.cmd().c_str(), "del")) {
g_source_enabled[source_id] = FALSE;
g_print ("Calling Del %d \n", source_id);
g_num_sources--;
if (g_num_sources == 0) gst_element_set_state (pipeline, GST_STATE_NULL);
del_src(source_id);
if (g_num_sources == 0){
g_main_loop_quit (loop);
// g_mutex_unlock (&eos_lock);
return FALSE;
}
}
// g_mutex_unlock (&eos_lock);
return TRUE;
}
Callback function
g_timeout_add_seconds_full(G_PRIORITY_HIGH, 4, sourcectl, (gpointer) g_source_bin_list, NULL);
The problem is when I try run this code it hangs on in del_src function
state_return =
gst_element_set_state (g_source_bin_list[source_id], GST_STATE_NULL);
The question is - why it hangs and How to solve it?