Restarting/Reconnecting RTSP source on EOS

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version 5.0.1
• NVIDIA GPU Driver Version (valid for GPU only) 460
• Issue Type( questions, new requirements, bugs) questions

Hello, My question is regarding restarting/reconnecting RTSP sources that generate the EOS (End of Stream) message on the bus messages.

I am using deepstream python application, It has Multi-RTSP sources as input and produces an RTSP out stream as well.

So, this is what i did:

First I connected the bus messaging to my callback function which handles EOS messages differently, instead of quitting the main loop and ending the Deepstream python application, I set the pipeline state to PAUSED.

Then I quit the GObject mainloop function, I sleep for 10 seconds, And on state change SUCCESS/NO_PREROLL for the pipeline to PAUSED State I send a new_flush_start event followed by new_flush_stop event to flush any data in the pads and allow it to be ready to receive new data.

After that I remove the source bin which generated the EOS message, I do that by releasing the Streammux sink pad, unreference it and then remove the sourcebin from the pipeline.

And then check whether the remove status from the pipeline was SUCCESS, And on successful removal I create a new source bin, add it to the pipeline, request sinkpad from the streammux, then link the new sourcebin’s source pad to the streammux’s sink pad, I then try to resume the pipeline to PLAYING state, by setting the pipeline to PLAYING state and then i make sure that the elements inside the pipeline are in PLAYING state as well by manually setting them the elements i set to playing state inside pipeline are: new source bin added, uridecodebin and finally streammux.

And then i do loop.run() which is for the GObject Mainloop.

Now, the problem is that after all of that is done after the generation of the EOS message and i do what i mentioned above the RTSP source doesn’t resume playing and i fail to know the reason.

All it does is process some remaining unprocessed frames and then just stop.

This is the python code for handling EOS message:

  msg_type = msg.type
if msg_type == Gst.MessageType.EOS:
    ret = self.pipeline.set_state(Gst.State.PAUSED)
    self.loop.quit()
    Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "EOS")
    print("Setting Pipeline to Paused State")
    time.sleep(10)
    print("Trying to set back to playing state")
    if ret == Gst.StateChangeReturn.SUCCESS or ret == Gst.StateChangeReturn.NO_PREROLL:
        flush_start = self.pipeline.send_event(Gst.Event.new_flush_start())
        print("Managed to Flush Start: ", flush_start)
        flush_stop = self.pipeline.send_event(Gst.Event.new_flush_stop(True))
        print("Managed to Flush Stop: ", flush_stop)
        i = 0
        uri = configFile['source%u' % int(i)]['uri']
        padname = "sink_%u" % int(i)
        removed_state = self.remove_source_bin()
        if all(element == 1 for element in removed_state):
            self.nbin = self.create_source_bin(i, uri)
            added_state = self.pipeline.add(self.nbin)
            print("Added state: ", added_state)
            self.streammux_sinkpad = self.streammux.get_request_pad(padname)
            if not self.streammux_sinkpad:
                sys.stderr.write("Unable to create sink pad bin \n")
                print("Pad name: ", padname)
            self.srcpad = self.nbin.get_static_pad("src")
            self.srcpad.link(self.streammux_sinkpad)
            Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "Resetting_Source")

            self.bus = self.pipeline.get_bus()
            self.bus.add_signal_watch()
            self.bus.connect("message", self.bus_call, self.loop)

            self.pipeline.set_state(Gst.State.PLAYING)

            self.nbin.set_state(Gst.State.PLAYING)
            nbin_check = self.nbin.get_state(Gst.CLOCK_TIME_NONE)[0]
            if nbin_check == Gst.StateChangeReturn.SUCCESS or nbin_check == Gst.StateChangeReturn.NO_PREROLL:  
                self.uri_decode_bin.set_state(Gst.State.PLAYING)
                uridecodebin_check = self.uri_decode_bin.get_state(Gst.CLOCK_TIME_NONE)[0]
                if uridecodebin_check == Gst.StateChangeReturn.SUCCESS or uridecodebin_check == Gst.StateChangeReturn.NO_PREROLL: 
                    self.streammux.set_state(Gst.State.PLAYING)
                    streammux_check = self.streammux.get_state(Gst.CLOCK_TIME_NONE)[0]
                    if streammux_check == Gst.StateChangeReturn.SUCCESS or streammux_check == Gst.StateChangeReturn.NO_PREROLL:  
                        self.pipeline.set_state(Gst.State.PLAYING)
                        pipeline_check = self.pipeline.get_state(Gst.CLOCK_TIME_NONE)[0]
                        if pipeline_check == Gst.StateChangeReturn.SUCCESS or pipeline_check == Gst.StateChangeReturn.NO_PREROLL:  
                            print("We did it boys!")
                            Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "Trying_Playing")
                        else:
                            print("pipeline failed us")
                    else:
                        print("streammux failed us")
                else:
                    print("uridecodebin failed us")
            else:
                print("nbin failed us")

            self.loop.run()

And this is the function i use for removing the sourcebin:

def remove_source_bin(self):
    # self.nbin.set_state(Gst.State.NULL)
    # self.uri_decode_bin.unparent()
    # self.uri_decode_bin.unref()
    # Gst.Bin.remove(self.pipeline, self.uri_decode_bin)
    removed_states = []
    # for i in range(batch_size):
    padname = "sink_0" #% int(i)
    pad = self.streammux.get_static_pad(padname)
    pad.send_event(Gst.Event.new_flush_stop(False))
    self.streammux.release_request_pad(pad)
    pad.unref()
    removed_states.append(self.pipeline.remove(self.nbin))
    return removed_states

And for creating the new sourcebin i use this function:

def create_source_bin(self, index, uri):
    print("Creating source bin")
    bin_name = "source-bin-%02d" % index
    print(bin_name)
    self.nbin = Gst.Bin.new(bin_name)
    if not self.nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    self.uri_decode_bin = Gst.ElementFactory.make("uridecodebin", 'uridecodebin')
    if not self.uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    self.uri_decode_bin.set_property("uri", uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    self.uri_decode_bin.connect("pad-added", self.cb_newpad, self.nbin)
    self.uri_decode_bin.connect("child-added", self.decodebin_child_added, self.nbin)
    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(self.nbin, self.uri_decode_bin)
    bin_pad = self.nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    # self.nbin.Gst.Bin.recalculate_latency()
    return self.nbin

Thanks in advance for any help,
Mina

I have no way to solve it. I will always pay attention to it. I’ve been trying to solve this problem recently.
These are my findings that may help you

  1. I found that the component in the set of pile to pause is not necessarily pause
  2. RTSP cannot be suspended
  3. The uridecode bin can’t be used after being suspended. It can only be re created. It seems to be related to the ready state

I implemented it with threads, but the picture seems abnormal when reconnecting from rtsp, sometimes there will be a green area

Can you elaborate on the steps you followed or what did you do with threads ?

Thanks

Thread 1
Check whether the camera network is connected (TCP)
Thread 2
Set a state variable
If the camera is not available, remove uridecodebin(rtsp), add uridecodebin (local video , number is 0) set pipeline status paused, and set the variable to suspended
If the camera is working and the state variable is suspended, remove uridecodebin(local video), add uridecodebin (RTSP) (number is 0), and set the pipeline state playing

Hi gharbv, I am facing similar issue to handle the EOS when rtsp is not available for some duration in deepstream python pipeline. Can u share the code for approach you discussed. It would be helpful to me.