How to get the current state of a GStreamer pipeline while working with live camera sources

Hi all,

I’m working on a DeepStream project where I use the Python bindings on Jetson devices. I have a working code written in Gst-Python and I’m experimenting with RTSP/HTTP camera sources. Sometimes in both of these camera types, the streams can get dropped and the stream becomes a black-screen.

I’ve tried to listen different types of messages from the GStreamer bus for the pipeline when I disconnect the RTSP cameras but once I unplug the camera’s power, both of the stream and the messages on the terminal stops. Eventually, my aim is re-connecting to the cameras automatically when the stream drops. Here is my simple gstreamer code:

Element Creation:

def main():
    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    # Source element for reading from the file
    print("Creating Source \n ")
    source = Gst.ElementFactory.make("rtspsrc", "rtsp-cam-source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    caps_source = Gst.ElementFactory.make("capsfilter", "caps-source")
    caps_source.set_property("caps", Gst.caps_from_string("application/x-rtp"))
    if not caps_source:
        sys.stderr.write(" Unable to create Caps-Source \n")

...
Creating other pipeline elements...
...

Property Setting:

    source.set_property('location', 'rtsp://<user>:<pass>@<cam-ip>:<port>/<streaming-url>')
    source.set_property('message-forward', 1)
    source.set_property('latency', 0)
    source.set_property('timeout', 0)
    source.set_property('tcp-timeout', 0)
    source.set_property('udp-reconnect', 0)
    source.set_property('retry', 0)
    appsink.set_property('sync', 0)
    appsink.set_property('emit-signals', 1)

Display:

    while True:
        # pipe_state = pipeline.get_state(Gst.CLOCK_TIME_NONE)

        message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)

        if image_arr is not None:   
            cv2.imshow("Doruk Receive Image from Pipeline Buffer", image_arr)
            if cv2.waitKey(1) == ord('q'):
                pipeline.set_state(Gst.State.NULL)
                pipeline.send_event(Gst.Event.new_eos())
                break
        if message:
            if message.type == Gst.MessageType.ERROR:
                err, debug = message.parse_error()
                print(("Error received from element %s: %s" % (
                    message.src.get_name(), err)))
                print(("Debugging information: %s" % debug))
                break
            elif message.type == Gst.MessageType.EOS:
                print("End-Of-Stream reached.")
                break
            elif message.type == Gst.MessageType.STATE_CHANGED:
                # print(message.type)
                if isinstance(message.src, Gst.Pipeline):
                    old_state, new_state, pending_state = message.parse_state_changed()
                    print(("Pipeline state changed from %s to %s." %
                        (old_state.value_nick, new_state.value_nick)))
            else:
                print(message.type)

Hi,

I did some testing with gstd with my PC on my end and it seems that first you would need to extend your code to catch the GST_MESSAGE_WARNING messages from the bus. . The rtspsrc should generate a message of type GST_MESSAGE_WARNING when the sender stops streaming for whatever reason

For your reference below are the commands I used to verify the warning message being send when the sender stream is stopped. For my testing I used the rtspsink from RidgeRun to simulate a live camera input, but it should apply the same way with your actual cameras.

#Terminal 1 (sender)

gst-launch-1.0 videotestsrc is-live=true ! "video/x-raw, width=640, height=480, framerate=30/1" ! \
perf ! x264enc tune=zerolatency ! h264parse ! capsfilter caps="video/x-h264, mapping=/stream1" ! rtspsink service=4500

#Terminal 2 (receiver)

gst-client-1.0 pipeline_create p1 rtspsrc location=rtsp://127.0.0.1:4500/stream1 ! \
rtph264depay ! h264parse ! avdec_h264 ! queue ! autovideosink sync=false

gst-client-1.0 bus_filter p1 error+eos+warning

gst-client-1.0 pipeline_play p1

gst-client-1.0 bus_read p1

gst-client-1.0 pipeline_stop p1
gst-client-1.0 pipeline_delete p1

Jafet Chaves,
Embedded SW Engineer at RidgeRun
Contact us: support@ridgerun.com
Developers wiki: https://developer.ridgerun.com/
Website: www.ridgerun.com

Hi,

Thank you for your inputs. I have extended my bus messages not only limited to Warnings but also the other messages in case if they can give some update about the state. However, none of them is providing me after disconnecting the camera. This is how I extended my bus message coverage:

...
elif message.type == Gst.MessageType.WARNING:
    m = message.parse_warning()
    print("MessageType Warning: ", m)
elif message.type == Gst.MessageType.INFO:
    m = message.parse_info()
    # m = message.parse_info_details()
    print("MessageType Info: ", m)
elif message.type == Gst.MessageType.CLOCK_LOST:
    m = message.parse_clock_lost()
    print("MessageType Clock Lost: ", m)
elif message.type == Gst.MessageType.STREAM_STATUS:
    m = message.parse_stream_status()
    print("MessageType Stream Status: ", m)
elif message.type == Gst.MessageType.QOS:
    m = message.parse_qos()
    print("MessageType QOS: ", m)
elif message.type == Gst.MessageType.PROGRESS:
    m = message.parse_progress()
    print("MessageType Progress: ", m)
else:
    print(message.type)

And this is the output when I disconnect the camera. It outputs the <flags GST_MESSAGE_TAG of type Gst.MessageType> message and stream goes live. Once I disconnect it, it waits for some time then prints out the Error received from element rtsp-cam-source: gst-resource-error-quark: Could not read from resource. (9) message. Please see all the messages from the pipeline:

Hi, doruk.sonmez1

Could you please provide the full pipeline description (receiver pipeline) in your Python script? The warning message should be capture based on the snippet of code that you provided but perhaps with the full pipeline description I could find why the message is not travelling through the pipeline bus.

Sure, please find the full snippet below:

import sys

sys.path.append('./')
import gi

gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst, GLib
from common.bus_call import bus_call
import cv2
import numpy as np
import socket

'''
rtspsrc --> rtph264depay --> h264parse --> avdec_h264 --> videoconvert --> xvimagesink
'''

image_arr = None
stream_str = 'rtsp://<user>:<pw>@<ip>:<port>/h264'

def on_src_pad_added(src, new_pad, depayer):
    sink_pad = depayer.get_static_pad("sink")

    if (sink_pad.is_linked()):
        print("We are already linked. Ignoring.")
        return

    # check the new pad's type
    new_pad_caps = new_pad.get_current_caps()
    new_pad_struct = new_pad_caps.get_structure(0)
    new_pad_type = new_pad_struct.get_name()

    ret = new_pad.link(sink_pad)
    return


def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()

    arr = np.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         3),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=np.uint8)
    return arr


def new_buffer(sink, data):
    global image_arr
    sample = sink.emit("pull-sample")
    # buf = sample.get_buffer()
    # print "Timestamp: ", buf.pts
    arr = gst_to_opencv(sample)
    image_arr = arr
    return Gst.FlowReturn.OK

def main():
    # Standard GStreamer initialization
    GObject.threads_init()
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    # Source element for reading from the file
    print("Creating Source \n ")
    source = Gst.ElementFactory.make("rtspsrc", "rtsp-cam-source")

    caps_source = Gst.ElementFactory.make("capsfilter", "caps-source")
    caps_source.set_property("caps", Gst.caps_from_string("application/x-rtp"))

    jit_buffer = Gst.ElementFactory.make("rtpjitterbuffer", "jitter-buffer")

    depay = Gst.ElementFactory.make("rtph264depay", "rtp-depay")

    parser = Gst.ElementFactory.make("h264parse", "h264-parser")

    decoder = Gst.ElementFactory.make("avdec_h264", "h264-decoder")

    # videoconvert to make sure a superset of raw formats are supported
    vidconvsrc = Gst.ElementFactory.make("videoconvert", "convertor_src1")

    appsink = Gst.ElementFactory.make("appsink", "app-sink")

    caps = Gst.caps_from_string(
        "video/x-raw, format=(string){BGR, GRAY8}; video/x-bayer,format=(string){rggb,bggr,grbg,gbrg}")

    source.set_property('location', stream_str)
    source.set_property('message-forward', 1)
    source.set_property('latency', 0)
    source.set_property('timeout', 0)
    source.set_property('tcp-timeout', 0)
    source.set_property('buffer-mode', 4)
    source.set_property('udp-reconnect', 0)
    source.set_property('retry', 0)
    jit_buffer.set_property('latency', 0)
    jit_buffer.set_property('drop-on-latency', 0)
    jit_buffer.set_property('mode', 4)
    appsink.set_property('sync', 0)
    appsink.set_property('emit-signals', 1)
    appsink.set_property('wait-on-eos', 0)
    appsink.set_property('max-buffers', 1)
    appsink.set_property('drop', 1)
    appsink.set_property('qos', 1)
    appsink.set_property('async', 0)
    appsink.set_property('caps', caps)

    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(caps_source)
    pipeline.add(jit_buffer)
    pipeline.add(depay)
    pipeline.add(parser)
    pipeline.add(decoder)
    pipeline.add(vidconvsrc)
    pipeline.add(appsink)

    print("Linking elements in the Pipeline \n")
    source.connect("pad-added", on_src_pad_added, caps_source)
    caps_source.link(jit_buffer)
    jit_buffer.link(depay)
    depay.link(parser)
    parser.link(decoder)
    decoder.link(vidconvsrc)
    vidconvsrc.link(appsink)
    appsink.connect('new-sample', new_buffer, appsink)

    # start play back and listen to events
    print("Starting pipeline \n")
    ret = pipeline.set_state(Gst.State.PLAYING)
    if ret == Gst.StateChangeReturn.FAILURE:
        print("Unable to set the pipeline to the playing state.")
        exit(-1)

    # create an event loop and feed gstreamer bus mesages to it
    bus = pipeline.get_bus()
    bus.add_signal_watch()

    # Parse message
    while True:
        pipe_state = pipeline.get_state(Gst.CLOCK_TIME_NONE)
        # print(pipe_state.state)
        message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)

        if image_arr is not None:
            # print(image_arr)
            cv2.imshow("Doruk Receive Image from Pipeline Buffer", image_arr)
            if cv2.waitKey(1) == ord('q'):
                pipeline.set_state(Gst.State.NULL)
                pipeline.send_event(Gst.Event.new_eos())
                break
        if pipe_state.state is not Gst.State.PLAYING:
            print("Connection to the source is lost. Reconnecting...")

        if message:
            if message.type == Gst.MessageType.ERROR:
                err, debug = message.parse_error()
                print("Error received from element %s: %s" % (
                    message.src.get_name(), err))
                print("Debugging information: %s" % debug)
                break
            elif message.type == Gst.MessageType.EOS:
                print("End-Of-Stream reached.")
                break
            elif message.type == Gst.MessageType.STATE_CHANGED:
                if isinstance(message.src, Gst.Pipeline):
                    old_state, new_state, pending_state = message.parse_state_changed()
                    print("Pipeline state changed from %s to %s." %
                          (old_state.value_nick, new_state.value_nick))
            elif message.type == Gst.MessageType.WARNING:
                m = message.parse_warning()
                print("MessageType Warning: ", m)
            # elif message.type == Gst.MessageType.INFO:
            #     m = message.parse_info()
            #     # m = message.parse_info_details()
            #     print("MessageType Info: ", m)
            # elif message.type == Gst.MessageType.CLOCK_LOST:
            #     m = message.parse_clock_lost()
            #     print("MessageType Clock Lost: ", m)
            # elif message.type == Gst.MessageType.STREAM_STATUS:
            #     m = message.parse_stream_status()
            #     print("MessageType Stream Status: ", m)
            # elif message.type == Gst.MessageType.QOS:
            #     m = message.parse_qos()
            #     print("MessageType QOS: ", m)
            # elif message.type == Gst.MessageType.PROGRESS:
            #     m = message.parse_progress()
            #     print("MessageType Progress: ", m)
            else:
                print(message.type)

if __name__ == '__main__':
    sys.exit(main())

doruk.sonmez,

Thanks for providing the full snippet. One important thing to note is the timeout you are using during the polling here:

message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)

The timeout is given in nanoseconds and 10 microseconds is not too much time to be able to capture the warning messages I believe. Just for testing you could try with GST_CLOCK_TIME_NONE to wait forever, later you could try a number in the order of seconds. Also due to this reason one typically implements the bus watch in a separate thread (else you would block other parts of the code while waiting for messages in the bus).

Hi,

I just tried your recommendation and moved message gathering to a thread as follows using the GST_CLOCK_TIME_NONE:

def gather_message(bus):
    global message
    message = bus.timed_pop_filtered(Gst.CLOCK_TIME_NONE, Gst.MessageType.ANY)

And utilized this function inside the while loop instead of message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)

# Parse message
while True:
    # message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
    x = threading.Thread(target=gather_message, args=(bus,))
    x.start()

This way, I can gather the bus messages without slowing the stream thanks to the threading. However, I still can not get what is needed.

I have also experimented with GStd from RidgeRun and if there is any easy way to do this in GStd, I can completely port my code to it.

Now that you are able to capture messages from the bus, are you able to get the warning messages when the RTSP camera is disconnected? If so, the next step would be to act upon receiving the warning messages in your message parsing logic. Perhaps first for easiness what you could try to do in your code upon receiving the warning message is to destroy the previous receiver pipeline, then create it again and put it in PLAYING state, that way you could implement the reconnection logic that you require.

Now regarding your questions with gstd. It is possible to do what you require with gstd, since it implements ways to interact with pipelines buses. The project offers a command line application (gst-client) that it is basically a client to interact in many ways with pipelines: create, destroy, play, stop, etc. If you decide to use gst-client you will be basically implementing a shell script but depending of your use case this command line application can be limited (it is mostly intended for prototyping). For more complex use cases there is also a C and Python API that can be used to write your own client application or library. Feel free to check the simple and advanced examples section in the following wiki: GstD - GStreamer Daemon

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.