Restarting/Reconnecting RTSP source on EOS

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version 5.0.1
• NVIDIA GPU Driver Version (valid for GPU only) 460
• Issue Type( questions, new requirements, bugs) questions

Hello, My question is regarding restarting/reconnecting RTSP sources that generate the EOS (End of Stream) message on the bus messages.

I am using deepstream python application, It has Multi-RTSP sources as input and produces an RTSP out stream as well.

So, this is what i did:

First I connected the bus messaging to my callback function which handles EOS messages differently, instead of quitting the main loop and ending the Deepstream python application, I set the pipeline state to PAUSED.

Then I quit the GObject mainloop function, I sleep for 10 seconds, And on state change SUCCESS/NO_PREROLL for the pipeline to PAUSED State I send a new_flush_start event followed by new_flush_stop event to flush any data in the pads and allow it to be ready to receive new data.

After that I remove the source bin which generated the EOS message, I do that by releasing the Streammux sink pad, unreference it and then remove the sourcebin from the pipeline.

And then check whether the remove status from the pipeline was SUCCESS, And on successful removal I create a new source bin, add it to the pipeline, request sinkpad from the streammux, then link the new sourcebin’s source pad to the streammux’s sink pad, I then try to resume the pipeline to PLAYING state, by setting the pipeline to PLAYING state and then i make sure that the elements inside the pipeline are in PLAYING state as well by manually setting them the elements i set to playing state inside pipeline are: new source bin added, uridecodebin and finally streammux.

And then i do loop.run() which is for the GObject Mainloop.

Now, the problem is that after all of that is done after the generation of the EOS message and i do what i mentioned above the RTSP source doesn’t resume playing and i fail to know the reason.

All it does is process some remaining unprocessed frames and then just stop.

This is the python code for handling EOS message:

  msg_type = msg.type
if msg_type == Gst.MessageType.EOS:
    ret = self.pipeline.set_state(Gst.State.PAUSED)
    self.loop.quit()
    Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "EOS")
    print("Setting Pipeline to Paused State")
    time.sleep(10)
    print("Trying to set back to playing state")
    if ret == Gst.StateChangeReturn.SUCCESS or ret == Gst.StateChangeReturn.NO_PREROLL:
        flush_start = self.pipeline.send_event(Gst.Event.new_flush_start())
        print("Managed to Flush Start: ", flush_start)
        flush_stop = self.pipeline.send_event(Gst.Event.new_flush_stop(True))
        print("Managed to Flush Stop: ", flush_stop)
        i = 0
        uri = configFile['source%u' % int(i)]['uri']
        padname = "sink_%u" % int(i)
        removed_state = self.remove_source_bin()
        if all(element == 1 for element in removed_state):
            self.nbin = self.create_source_bin(i, uri)
            added_state = self.pipeline.add(self.nbin)
            print("Added state: ", added_state)
            self.streammux_sinkpad = self.streammux.get_request_pad(padname)
            if not self.streammux_sinkpad:
                sys.stderr.write("Unable to create sink pad bin \n")
                print("Pad name: ", padname)
            self.srcpad = self.nbin.get_static_pad("src")
            self.srcpad.link(self.streammux_sinkpad)
            Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "Resetting_Source")

            self.bus = self.pipeline.get_bus()
            self.bus.add_signal_watch()
            self.bus.connect("message", self.bus_call, self.loop)

            self.pipeline.set_state(Gst.State.PLAYING)

            self.nbin.set_state(Gst.State.PLAYING)
            nbin_check = self.nbin.get_state(Gst.CLOCK_TIME_NONE)[0]
            if nbin_check == Gst.StateChangeReturn.SUCCESS or nbin_check == Gst.StateChangeReturn.NO_PREROLL:  
                self.uri_decode_bin.set_state(Gst.State.PLAYING)
                uridecodebin_check = self.uri_decode_bin.get_state(Gst.CLOCK_TIME_NONE)[0]
                if uridecodebin_check == Gst.StateChangeReturn.SUCCESS or uridecodebin_check == Gst.StateChangeReturn.NO_PREROLL: 
                    self.streammux.set_state(Gst.State.PLAYING)
                    streammux_check = self.streammux.get_state(Gst.CLOCK_TIME_NONE)[0]
                    if streammux_check == Gst.StateChangeReturn.SUCCESS or streammux_check == Gst.StateChangeReturn.NO_PREROLL:  
                        self.pipeline.set_state(Gst.State.PLAYING)
                        pipeline_check = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)[0]
                        if pipeline_check == Gst.StateChangeReturn.SUCCESS or pipeline_check == Gst.StateChangeReturn.NO_PREROLL:  
                            print("We did it boys!")
                            Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "Trying_Playing")
                        else:
                            print("pipeline failed us")
                    else:
                        print("streammux failed us")
                else:
                    print("uridecodebin failed us")
            else:
                print("nbin failed us")

            self.loop.run()

And this is the function i use for removing the sourcebin:

def remove_source_bin(self):
    # self.nbin.set_state(Gst.State.NULL)
    # self.uri_decode_bin.unparent()
    # self.uri_decode_bin.unref()
    # Gst.Bin.remove(self.pipeline, self.uri_decode_bin)
    removed_states = []
    # for i in range(batch_size):
    padname = "sink_0" #% int(i)
    pad = self.streammux.get_static_pad(padname)
    pad.send_event(Gst.Event.new_flush_stop(False))
    self.streammux.release_request_pad(pad)
    pad.unref()
    removed_states.append(self.pipeline.remove(self.nbin))
    return removed_states

And for creating the new sourcebin i use this function:

def create_source_bin(self, index, uri):
    print("Creating source bin")
    bin_name = "source-bin-%02d" % index
    print(bin_name)
    self.nbin = Gst.Bin.new(bin_name)
    if not self.nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    self.uri_decode_bin = Gst.ElementFactory.make("uridecodebin", 'uridecodebin')
    if not self.uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    self.uri_decode_bin.set_property("uri", uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    self.uri_decode_bin.connect("pad-added", self.cb_newpad, self.nbin)
    self.uri_decode_bin.connect("child-added", self.decodebin_child_added, self.nbin)
    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(self.nbin, self.uri_decode_bin)
    bin_pad = self.nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    # self.nbin.Gst.Bin.recalculate_latency()
    return self.nbin

Thanks in advance for any help,
Mina

1 Like

I have no way to solve it. I will always pay attention to it. I’ve been trying to solve this problem recently.
These are my findings that may help you

  1. I found that the component in the set of pile to pause is not necessarily pause
  2. RTSP cannot be suspended
  3. The uridecode bin can’t be used after being suspended. It can only be re created. It seems to be related to the ready state

I implemented it with threads, but the picture seems abnormal when reconnecting from rtsp, sometimes there will be a green area

Can you elaborate on the steps you followed or what did you do with threads ?

Thanks

Thread 1
Check whether the camera network is connected (TCP)
Thread 2
Set a state variable
If the camera is not available, remove uridecodebin(rtsp), add uridecodebin (local video , number is 0) set pipeline status paused, and set the variable to suspended
If the camera is working and the state variable is suspended, remove uridecodebin(local video), add uridecodebin (RTSP) (number is 0), and set the pipeline state playing

Hi gharbv, I am facing similar issue to handle the EOS when rtsp is not available for some duration in deepstream python pipeline. Can u share the code for approach you discussed. It would be helpful to me.

Hi MinaAbdElMassih,

I also looking solution for same problem of reconnecting the rtsp once its available and resume in python deepstream app. Could you do it now.

@gharbv If the RTSP server sends EOS to RTSP client, the session finishes. So it is of no use to pause the pipeline and play it again. The session should be re-initialized from the beginning, there is no way to resume.

This is part of my code, expected reference…

code (7.2 KB)

I didn’t use python to implement it, but it should be possible if python can set the pipeline state

ok, so I tried setting the pipeline state to READY instead of PAUSED and then remove the source bin and add it again and set the pipeline to PLAYING state, but that some runs work and some other runs crashes producing SEG Fault error and core dumped.

Any idea why that occurs ?

Thanks

Can you share how do you check whether that RTSP is still connected or not, the “is_connect” function ?

Thanks

1 Like

“is_connect” function Before, I was using ping. Now I’m using socket change name"is_connect_s"

return value:
-2 local network faild
-1 rtsp faild
0 connect ok!

code :

code (1.1 KB)

"other runs crashes"emmm,I think you might need Debug
Can you send the pipleline ?

1 Like


This is the graph of the pipeline.

yep.rtsp will reconnect 3 time, I new a new thread detect rtsp network is noraml ,and detect frequency faster than “uridecodebin reconnect 3 time” and change the state of the piple before sending the EOS.then uridecodebin cant send eos

Our pipleline is almost the same
I guess you need g_object_set(G_OBJECT(sink), “sync”, 0).
and create_source_bin use this file fuction
deepstream_reference_apps/deepstream_test_rt_src_add_del.c at master · NVIDIA-AI-IOT/deepstream_reference_apps · GitHub

I have sync set to 1 I will try setting it to 0.
I create the source bin using the code i shared i think it is almost the same, do you have any other suggestions ?

Setting sync to 0 doesn’t solve it.

create the source bin deepstream-test3 has ghost pad than that link

The code I posted is still in testing,
Our stable version uses fork to identify and restart pipeline
Stable version uses fork :code (2.0 KB)

1 Like