I used Gst python to construct this pipeline. Here is a piece of my code:
def on_rtspsrc_pad_added(self, rtspsrc, pad, rtph264depay):
# Create the rest of your pipeline here and link it
rtspsrc.link(rtph264depay)
def run(self):
self.pipeline = Gst.Pipeline.new("receiver")
# Create rtspsrc element
self.source = Gst.ElementFactory.make("rtspsrc", None)
self.source.set_property("latency", 0)
self.source.set_property("drop-on-latency", "true")
self.source.set_property("location", self.rtspLink)
self.pipeline.add(self.source)
# Create rtph264depay element
self.rtph264depay = Gst.ElementFactory.make("rtph264depay", None)
if not self.rtph264depay:
print("Could not create rtph264depay element")
self.stop.set()
self.pipeline.add(self.rtph264depay)
# Create h264parse element
self.h264parse = Gst.ElementFactory.make("h264parse", None)
if not self.h264parse:
print("Could not create h264parse element")
self.stop.set()
self.pipeline.add(self.h264parse)
# Create omxh264dec element
self.omxh264dec = Gst.ElementFactory.make("omxh264dec", None)
if not self.omxh264dec:
print("Could not create omxh264dec element")
self.stop.set()
self.pipeline.add(self.omxh264dec)
# Create nvvidconv element
self.nvvidconv = Gst.ElementFactory.make("nvvidconv", None)
if not self.nvvidconv:
print("Could not create nvvidconv element")
self.stop.set()
self.pipeline.add(self.nvvidconv)
# Create tee element
self.tee = Gst.ElementFactory.make("tee", None)
if not self.tee:
print("Could not create tee element")
self.stop.set()
self.pipeline.add(self.tee)
# Create queue element
self.queue = Gst.ElementFactory.make("queue", None)
if not self.queue:
print("Could not create queue element")
self.stop.set()
self.pipeline.add(self.queue)
# Create queue 2 element
self.queue_2 = Gst.ElementFactory.make("queue", None)
if not self.queue_2:
print("Could not create queue_2 element")
self.stop.set()
self.pipeline.add(self.queue_2)
# Create cap_filter element
self.cap_filter = Gst.ElementFactory.make("capsfilter", None)
self.pipeline.add(self.cap_filter)
caps = Gst.Caps.from_string("video/x-raw, format=(string)BGRx, framerate=(fraction)20/1")
self.cap_filter.set_property("caps", caps)
if not self.cap_filter:
print("Could not create capsfilter element")
self.stop.set()
# Create cap_filter 2 element
self.cap_filter_2 = Gst.ElementFactory.make("capsfilter", None)
self.pipeline.add(self.cap_filter_2)
caps_2 = Gst.Caps.from_string("video/x-raw, format=(string)BGRx, framerate=(fraction)20/1")
self.cap_filter_2.set_property("caps", caps_2)
if not self.cap_filter_2:
print("Could not create capsfilter 2 element")
self.stop.set()
# Create videoconvert element
self.videoconvert = Gst.ElementFactory.make("videoconvert", None)
if not self.videoconvert:
print("Could not create videoconvert element")
self.stop.set()
self.pipeline.add(self.videoconvert)
# Create videoconvert 2 element
self.videoconvert_2 = Gst.ElementFactory.make("videoconvert", None)
if not self.videoconvert_2:
print("Could not create videoconvert_2 element")
self.stop.set()
self.pipeline.add(self.videoconvert_2)
# Create cap_filter 3 element
self.cap_filter_3 = Gst.ElementFactory.make("capsfilter", None)
self.pipeline.add(self.cap_filter_3)
caps_3 = Gst.Caps.from_string("video/x-raw, format=(string)BGR")
self.cap_filter_3.set_property("caps", caps_3)
if not self.cap_filter_3:
print("Could not create capsfilter 3 element")
self.stop.set()
# Create cap_filter 4 element
self.cap_filter_4 = Gst.ElementFactory.make("capsfilter", None)
self.pipeline.add(self.cap_filter_4)
caps_4 = Gst.Caps.from_string("video/x-raw, width=(int)640, height=(int)480")
self.cap_filter_4.set_property("caps", caps_4)
if not self.cap_filter_4:
print("Could not create capsfilter 4 element")
self.stop.set()
# Create appsink element
# self.appsink = Gst.ElementFactory.make("xvimagesink", None)
self.appsink = Gst.ElementFactory.make("appsink", None)
self.appsink.set_property("sync", 0)
self.appsink.set_property("drop", "true")
self.appsink.set_property("emit-signals", True)
if not self.appsink:
print("Could not create appsink element")
self.stop.set()
self.pipeline.add(self.appsink)
# Create appsink_2 element
# self.appsink_2 = Gst.ElementFactory.make("xvimagesink", None)
self.appsink_2 = Gst.ElementFactory.make("appsink", None)
self.appsink_2.set_property("sync", 0)
self.appsink_2.set_property("drop", "true")
self.appsink_2.set_property("emit-signals", True)
if not self.appsink_2:
print("Could not create appsink_2 element")
self.pipeline.add(self.appsink_2)
# Link all Gstreamer elements together
self.rtph264depay.link(self.h264parse)
self.h264parse.link(self.omxh264dec)
self.omxh264dec.link(self.nvvidconv)
self.nvvidconv.link(self.tee)
# Tee 1
self.tee.link(self.queue)
self.queue.link(self.cap_filter)
# self.tee.link(self.cap_filter)
self.cap_filter.link(self.videoconvert)
self.videoconvert.link(self.cap_filter_3)
self.cap_filter_3.link(self.appsink)
# Tee 2
self.tee.link(self.queue_2)
self.queue_2.link(self.cap_filter_2)
# self.tee.link(self.cap_filter_2)
self.cap_filter_2.link(self.videoconvert_2)
self.videoconvert_2.link(self.cap_filter_4)
self.cap_filter_4.link(self.appsink_2)
self.source.connect("pad-added", self.on_rtspsrc_pad_added, self.rtph264depay)
# Start playing
Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")
ret = self.pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Unable to set the pipeline to the playing state.")
self.stop.set()
# Wait until error or EOS
bus = self.pipeline.get_bus()
while True:
if self.stop.is_set():
print('Stopping CAM Stream by main process')
break
message = bus.timed_pop_filtered(10000, Gst.MessageType.ANY)
if message:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print("Error received from element %s: %s" % (
message.src.get_name(), err))
print("Debugging information: %s" % debug)
break
elif message.type == Gst.MessageType.EOS:
print("End-Of-Stream reached.")
break
elif message.type == Gst.MessageType.STATE_CHANGED:
if isinstance(message.src, Gst.Pipeline):
old_state, new_state, pending_state = message.parse_state_changed()
print("Pipeline state changed from %s to %s." %
(old_state.value_nick, new_state.value_nick))
# else:
# print("Unexpected message received:", message)
# self.unexpected_cnt = self.unexpected_cnt + 1
# if self.unexpected_cnt == self.num_unexpected_tot:
# break
print('Terminating gstreamer pipeline...')
self.stop.set()
self.pipeline.set_state(Gst.State.NULL)