Deepstream pipeline hangs on gst_element_set_state uridecodebin

I have succeeded in running runtime_source_add_add_delete from github. However, the thing came when my rtsp sources became unstable. So, I added a piece of code to capture it (added below). What it does is auto reconnecting after timeout ( unable to receive frame from rtsp ).

void add_src(gint source_id, std::string uri) {
  GstElement *source_bin;
  GstStateChangeReturn state_return;
  g_source_enabled[source_id] = TRUE;
  source_bin = create_uridecode_bin (source_id, uri.c_str());
  if (!source_bin) {
    g_printerr ("Failed to create source bin. Exiting.\n");
  }
  g_source_bin_list[source_id] = source_bin;
  gst_bin_add (GST_BIN (pipeline), source_bin);
  state_return =
    gst_element_set_state (g_source_bin_list[source_id], GST_STATE_PLAYING);
  switch (state_return) {
    case GST_STATE_CHANGE_SUCCESS:
      g_print ("STATE CHANGE SUCCESS\n\n");
      break;
    case GST_STATE_CHANGE_FAILURE:
      g_print ("STATE CHANGE FAILURE\n\n");
      break;
    case GST_STATE_CHANGE_ASYNC:
      g_print ("STATE CHANGE ASYNC\n\n");
      state_return =
          gst_element_get_state (g_source_bin_list[source_id], NULL, NULL,
          GST_CLOCK_TIME_NONE);
      // source_id++;
      break;
    case GST_STATE_CHANGE_NO_PREROLL:
      g_print ("STATE CHANGE NO PREROLL\n\n");
      break;
    default:
      break;
  }
}

void del_src(gint source_id) {
  GstStateChangeReturn state_return;
  gchar pad_name[16];
  GstPad *sinkpad = NULL;
  state_return =
    gst_element_set_state (g_source_bin_list[source_id], GST_STATE_NULL);
  g_print("state_return: %d\n", state_return);
  g_object_unref (g_source_bin_list[source_id]);
  g_print("Unref Success...\n");
  switch (state_return) {
    case GST_STATE_CHANGE_SUCCESS:
      g_print ("STATE CHANGE SUCCESS\n\n");
      g_snprintf (pad_name, 15, "sink_%u", source_id);
      sinkpad = gst_element_get_static_pad (streammux, pad_name);
      gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
      gst_element_release_request_pad (streammux, sinkpad);
      g_print ("STATE CHANGE SUCCESS %p\n\n", sinkpad);
      gst_object_unref (sinkpad);
      gst_bin_remove (GST_BIN (pipeline), g_source_bin_list[source_id]);
      g_print("RM success...\n");
      break;
    case GST_STATE_CHANGE_FAILURE:
      g_print ("STATE CHANGE FAILURE\n\n");
      break;
    case GST_STATE_CHANGE_ASYNC:
      g_print ("STATE CHANGE ASYNC\n\n");
      state_return =
          gst_element_get_state (g_source_bin_list[source_id], NULL, NULL,
          GST_CLOCK_TIME_NONE);
      g_snprintf (pad_name, 15, "sink_%u", source_id);
      sinkpad = gst_element_get_static_pad (streammux, pad_name);
      gst_pad_send_event (sinkpad, gst_event_new_flush_stop (FALSE));
      gst_element_release_request_pad (streammux, sinkpad);
      g_print ("STATE CHANGE ASYNC %p\n\n", sinkpad);
      gst_object_unref (sinkpad);
      gst_bin_remove (GST_BIN (pipeline), g_source_bin_list[source_id]);
      break;
    case GST_STATE_CHANGE_NO_PREROLL:
      g_print ("STATE CHANGE NO PREROLL\n\n");
      break;
    default:
      break;
  }
}

static gboolean
sourcectl (gpointer data)
{
  data::Remote remote_msg;
  const auto ret = get_msg(remote_msg, zmq::recv_flags::dontwait);
  if (!ret) {
    // g_mutex_lock (&eos_lock);
    for (gint source_id=0; source_id<MAX_NUM_SOURCES; source_id++) {
      if (!g_source_enabled[source_id]) continue;

      if (n_frame_out_list[source_id] > FRAME_TIMEOUT) {
        g_mutex_lock (&eos_lock);
        g_print ("Restart source_id %d \n", source_id);
        n_frame_out_list[source_id] = 0;
        g_mutex_unlock (&eos_lock);
        g_print ("Del source_id %d \n", source_id);
        del_src(source_id);
        std::this_thread::sleep_for(std::chrono::milliseconds(1000));
        g_print ("Add source_id %d \n", source_id);
        add_src(source_id, source_uri[source_id]);
        break;
      }
    }
    // g_mutex_unlock (&eos_lock);
    return TRUE;
  }

  // g_mutex_lock (&eos_lock);
  gint source_id = remote_msg.index();
  if (!g_strcmp0 (remote_msg.cmd().c_str(), "add")) {
    std::string uri = remote_msg.rtsp();
    g_print ("Calling Start %d \n", source_id);
    add_src(source_id, uri);
    source_uri[source_id] = uri;
    g_num_sources++;
  } else if (!g_strcmp0 (remote_msg.cmd().c_str(), "del")) {
    g_source_enabled[source_id] = FALSE;
    g_print ("Calling Del %d \n", source_id);
    g_num_sources--;
    if (g_num_sources == 0) gst_element_set_state (pipeline, GST_STATE_NULL);
    del_src(source_id);
    if (g_num_sources == 0){
      g_main_loop_quit (loop);
      // g_mutex_unlock (&eos_lock);
      return FALSE;
    }
  }
  // g_mutex_unlock (&eos_lock);

  return TRUE;
}

Callback function

g_timeout_add_seconds_full(G_PRIORITY_HIGH, 4, sourcectl, (gpointer) g_source_bin_list, NULL);

The problem is when I try run this code it hangs on in del_src function

state_return =
    gst_element_set_state (g_source_bin_list[source_id], GST_STATE_NULL);

The question is - why it hangs and How to solve it?

Do you know which component causes the problem?

i was able to restart source several times before it got stuck at state_return = gst_element_set_state (g_source_bin_list[source_id], GST_STATE_NULL); in del_src function

i also tried g_main_loop_quit (loop);in probe function to quit the main loop but it does not seem to work.