Currently I’m writing a small python script allow streaming from usb camera that connected to my Jetson Nano with some modifications on frame (such as cv2.putText …). My problem is if rtmp server stop or an issue happens to my network, Gstreamer pipeline couldn’t self recover/restart. My sample code:
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject, GLib
GObject.threads_init()
Gst.init(None)
import time
import cv2
class RtmpPipeline(object):
def __init__(self, w, h, fps):
self.number_frames = 0
self.w = w
self.h = h
self.fps = fps
self.duration = 1 / self.fps * Gst.SECOND # duration of a frame in nanoseconds
self.online = False
def construct_pipeline(self):
launch_string = 'appsrc name=source is-live=true format=GST_FORMAT_TIME ' \
' caps=video/x-raw,format=BGR,width=%s,height=%s,framerate=%s/1 ' \
'! videoconvert ! nvvidconv ! nvv4l2h264enc maxperf-enable=1 profile=0 preset-level=1 ' \
'! video/x-h264 ! h264parse ! video/x-h264 ! queue ! flvmux name=mux ! rtmpsink location=<my rtmp server address>' % (self.w, self.h, self.fps)
self.pipeline = Gst.parse_launch(launch_string)
self.appsrc = self.pipeline.get_child_by_name('source')
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
self.online = False
else:
self.online = True
def start(self):
self.construct_pipeline()
self.listener = Thread(target=self.message_handle, args=()).start()
def push_to_pipeline(self, frame):
try:
data = frame.tostring()
buf = Gst.Buffer.new_allocate(None, len(data), None)
buf.fill(0, data)
buf.duration = self.duration
timestamp = self.number_frames * self.duration
buf.pts = buf.dts = int(timestamp)
buf.offset = timestamp
self.number_frames += 1
retval = self.appsrc.emit('push-buffer', buf)
if retval != Gst.FlowReturn.OK:
print(retval)
except Exception:
self.pipeline.set_state(Gst.State.NULL)
self.online = False
def message_handle(self):
# Wait until error or EOS
bus = self.pipeline.get_bus()
while True:
message = bus.timed_pop_filtered(Gst.SECOND * 5, Gst.MessageType.ANY)
if message:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error received from element %s: %s" % (
message.src.get_name(), err))
print("Debugging information: %s" % debug)
break
elif message.type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
break
elif message.type == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
print("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick))
# else:
# print("Unexpected message received:", message)
# self.unexpected_cnt = self.unexpected_cnt + 1
# if self.unexpected_cnt == self.num_unexpected_tot:
# break
print('Terminating gstreamer pipeline...')
self.pipeline.set_state(Gst.State.NULL)
self.online = False
if __name__ == '__main__':
cap = cv2.VideoCapture(0)
w = int(cap.get(3))
h = int(cap.get(4))
fps = int(cap.get(cv2.CAP_PROP_FPS))
out = RtmpPipeline()
out.start()
if cap.isOpened():
while True:
ret, frame = cap.read()
if not ret:
break
# Add text to frame
cv2.putText(...)
if out.online:
out.push_to_pipeline(frame)
cap.release()
Is there a way I can restart this pipeline if problem come with rtmpsink element? And in that case, how can I release/unref Gst.Buffer that I allocated and filled with frame data? Thanks in advance