Auto restart gstreamer pipeline after network failure

Currently I’m writing a small python script allow streaming from usb camera that connected to my Jetson Nano with some modifications on frame (such as cv2.putText …). My problem is if rtmp server stop or an issue happens to my network, Gstreamer pipeline couldn’t self recover/restart. My sample code:

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
GObject.threads_init()
Gst.init(None)
import time
import cv2

class RtmpPipeline(object):
    def __init__(self, w, h, fps):
        self.number_frames = 0
        self.w = w
        self.h = h
        self.fps = fps
        self.duration = 1 / self.fps * Gst.SECOND  # duration of a frame in nanoseconds
        self.online = False

def construct_pipeline(self):
    launch_string = 'appsrc name=source is-live=true format=GST_FORMAT_TIME ' \
                    ' caps=video/x-raw,format=BGR,width=%s,height=%s,framerate=%s/1 ' \
                    '! videoconvert ! nvvidconv ! nvv4l2h264enc maxperf-enable=1 profile=0 preset-level=1 ' \
                    '! video/x-h264 ! h264parse ! video/x-h264 ! queue ! flvmux name=mux ! rtmpsink location=<my rtmp server address>' % (self.w, self.h, self.fps)

    self.pipeline = Gst.parse_launch(launch_string)
    self.appsrc = self.pipeline.get_child_by_name('source')
    ret = self.pipeline.set_state(Gst.State.PLAYING)
    if ret == Gst.StateChangeReturn.FAILURE:
        self.online = False
    else:
        self.online = True

def start(self):
    self.construct_pipeline()
    self.listener = Thread(target=self.message_handle, args=()).start()

def push_to_pipeline(self, frame):
    try:
        data = frame.tostring()
        buf = Gst.Buffer.new_allocate(None, len(data), None)
        buf.fill(0, data)
        buf.duration = self.duration
        timestamp = self.number_frames * self.duration
        buf.pts = buf.dts = int(timestamp)
        buf.offset = timestamp
        self.number_frames += 1
        retval = self.appsrc.emit('push-buffer', buf)
        if retval != Gst.FlowReturn.OK:
            print(retval)
    except Exception:
        self.pipeline.set_state(Gst.State.NULL)
        self.online = False

def message_handle(self):
    # Wait until error or EOS
    bus = self.pipeline.get_bus()

    while True:

        message = bus.timed_pop_filtered(Gst.SECOND * 5, Gst.MessageType.ANY)

        if message:
            if message.type == Gst.MessageType.ERROR:
                err, debug = message.parse_error()
                print("Error received from element %s: %s" % (
                    message.src.get_name(), err))
                print("Debugging information: %s" % debug)
                break
            elif message.type == Gst.MessageType.EOS:
                print("End-Of-Stream reached.")
                break
            elif message.type == Gst.MessageType.STATE_CHANGED:
                if isinstance(message.src, Gst.Pipeline):
                    old_state, new_state, pending_state = message.parse_state_changed()
                    print("Pipeline state changed from %s to %s." %
                          (old_state.value_nick, new_state.value_nick))
            # else:
            #     print("Unexpected message received:", message)
            #     self.unexpected_cnt = self.unexpected_cnt + 1
            #     if self.unexpected_cnt == self.num_unexpected_tot:
            #         break

    print('Terminating gstreamer pipeline...')
    self.pipeline.set_state(Gst.State.NULL)
    self.online = False


if __name__ == '__main__':
    cap = cv2.VideoCapture(0)
    w = int(cap.get(3))
    h = int(cap.get(4))
    fps = int(cap.get(cv2.CAP_PROP_FPS))
    out = RtmpPipeline()
    out.start()
    if cap.isOpened():
        while True:
            ret, frame = cap.read()
            if not ret:
                break
            # Add text to frame
            cv2.putText(...)
            if out.online:
                out.push_to_pipeline(frame)


    cap.release()

Is there a way I can restart this pipeline if problem come with rtmpsink element? And in that case, how can I release/unref Gst.Buffer that I allocated and filled with frame data? Thanks in advance

Hi,
We don’t have experience of handling this case. Suggest you go to OpenCV forum to get help. You may make a post with x264enc . Once you get a working application, you can replace with nvvidconv ! nvv4l2h264enc to enable hardware encoding.