These are the elements I am using
import configparser
import math
import sys
from core import is_aarch64
from gi.repository import Gst
from utils.set_logger import logger
def create_pipeline() -> Gst.Pipeline:
"""Create a Gstreamer Pipeline
Returns:
Gst.Pipeline: Gstreamer Pipeline
"""
logger.info("Creating Gst Pipeline \n")
pipeline = Gst.Pipeline()
if not pipeline:
logger.error(" Unable to create Pipeline \n")
sys.exit(1)
else:
return pipeline
def create_source_bin(uri: str, index: int) -> Gst.Bin:
"""Create a Gstreamer Source Bin
Args:
uri (str): input uri
index (int): index of the source bin
Raises:
Exception: if unable to create source bin
Exception: if unable to create uri decode bin
Returns:
Gst.Bin: Gstreamer Source Bin
"""
logger.info("Creating source bin")
bin_name = f"source-bin-{index:02d}"
logger.info(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
logger.error(" Unable to create source bin \n")
raise Exception("Unable to create source bin")
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
uri_decode_bin.set_property("file-loop", 0)
uri_decode_bin.set_property("cudadec-memtype", 0)
uri_decode_bin.set_property('latency', 0)
uri_decode_bin.set_property("rtsp-reconnect-interval", 30)
#uri_decode_bin.set_property("rtsp-reconnect-attempts", 50)
if not uri_decode_bin:
logger.error(" Unable to create uri decode bin \n")
raise Exception("Unable to create uri decode bin")
uri_decode_bin.set_property("uri", uri)
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(
Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC)
)
if not bin_pad:
logger.error(" Failed to add ghost pad in source bin \n")
return None
return nbin
def on_pad_added(element, element_src_pad, data):
print("In cb_newpad\n")
caps = element_src_pad.get_current_caps()
str = caps.get_structure(0)
name = str.get_name()
depay_elem = data
media = str.get_string("media")
is_video = media == 'video'
if 'x-rtp' in name and is_video is True:
print('Start binding RTSP')
sinkpad = depay_elem.get_static_pad("sink")
state = element_src_pad.link(sinkpad)
if state != Gst.PadLinkReturn.OK:
print('Unable to link depay loader to rtsp src')
else:
print('Binding RTSP successfully')
else:
print('Cannot be bound,get_name=', name, ' , media=', media)
def decodebin_child_added(child_proxy, Object, name, user_data) -> None:
logger.info(f"Decodebin child added: {name} \n")
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if not is_aarch64() and name.find("nvv4l2decoder") != -1:
Object.set_property("cudadec-memtype", 2)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property("drop-on-latency") is not None:
Object.set_property("drop-on-latency", True)
def cb_newpad(decodebin, decoder_src_pad, data) -> None:
logger.info("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
if gstname.find("video") != -1:
if features.contains("memory:NVMM"):
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
logger.error(
"Failed to link decoder src pad to source bin ghost pad\n"
)
else:
logger.error(
" Error: Decodebin did not pick nvidia decoder plugin.\n"
)
def create_streammux(
batch_size: int, input_height: int, input_width: int
) -> Gst.Element:
"""Create a Gstreamer Streammux Element
Args:
batch_size (int): batch size for streammux
input_height (int): input_height for streammux
input_width (int): input_width for streammux
Returns:
Gst.Element: Gstreamer Streammux Element
"""
logger.info("Creating streamux \n ")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
logger.error(" Unable to create NvStreamMux \n")
sys.exit(1)
streammux.set_property("width", input_width)
streammux.set_property("height", input_height)
streammux.set_property("batch-size", batch_size)
streammux.set_property("batched-push-timeout", 4000000)
return streammux
def create_pgie(pgie_file_path: str, number_sources: int) -> Gst.Element:
"""Create a Gstreamer Primary Inference Element
Args:
pgie_file_path (str): config file path for pgie
number_sources (int): number of sources
Raises:
Exception: if unable to create pgie
Returns:
Gst.Element: Gstreamer Primary Inference Element
"""
logger.info("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
logger.error(" Unable to create pgie \n")
raise Exception("Unable to create pgie")
pgie.set_property("config-file-path", pgie_file_path)
pgie_batch_size = pgie.get_property("batch-size")
if pgie_batch_size != number_sources:
logger.warn(
f"PGIE WARNING: Overriding infer-config batch-size: "
f"{pgie_batch_size} with number of sources "
f"{number_sources} \n",
)
pgie.set_property("batch-size", number_sources)
return pgie
def create_tracker(
tracker_config_txt_path: str, tracker_config_yml_path: str
) -> Gst.Element:
"""Create a Gstreamer Tracker Element
Args:
tracker_config_txt_path (str): tracker txt config
tracker_config_yml_path (str): tracker yml config
Raises:
Exception: if unable to create tracker
Returns:
Gst.Element: Gstreamer Tracker Element
"""
logger.info("Creating nvtracker \n ")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
logger.error(" Unable to create tracker \n")
raise Exception("Unable to create tracker")
config = configparser.ConfigParser()
config.read(tracker_config_txt_path)
config.sections()
for key in config["tracker"]:
if key == "tracker-width":
tracker_width = config.getint("tracker", key)
tracker.set_property("tracker-width", tracker_width)
elif key == "tracker-height":
tracker_height = config.getint("tracker", key)
tracker.set_property("tracker-height", tracker_height)
elif key == "gpu-id":
tracker_gpu_id = config.getint("tracker", key)
tracker.set_property("gpu_id", tracker_gpu_id)
elif key == "ll-lib-file":
tracker_ll_lib_file = config.get("tracker", key)
tracker.set_property("ll-lib-file", tracker_ll_lib_file)
# elif key == "ll-config-file":
# tracker_ll_config_file = config.get("tracker", key)
# tracker.set_property("ll-config-file", tracker_ll_config_file)
tracker.set_property("ll-config-file", tracker_config_yml_path)
return tracker
def create_sgie(sgie_file_path: str, number_sources: int) -> Gst.Element:
"""Create a Gstreamer Secondary Inference Element
Args:
sgie_file_path (str): config file path for pgie
number_sources (int): number of sources
Raises:
Exception: if unable to create sgie
Returns:
Gst.Element: Gstreamer Secondary Inference Element
"""
logger.info("Creating Sgie \n ")
sgie = Gst.ElementFactory.make("nvinfer", "secondary-inference")
if not sgie:
logger.error(" Unable to create sgie \n")
raise Exception("Unable to create sgie")
sgie.set_property("config-file-path", sgie_file_path)
sgie_batch_size = sgie.get_property("batch-size")
if sgie_batch_size != number_sources:
logger.warn(
f"SGIE WARNING: Overriding infer-config batch-size: "
f"{sgie_batch_size} with number of sources "
f"{number_sources} \n",
)
sgie.set_property("batch-size", number_sources)
return sgie
def create_videoconverter(index=0) -> Gst.Element:
"""Create a Gstreamer Video Converter Element
Args:
index (int, optional): Defaults to 0.
Raises:
Exception: If unable to create nvvidconv1
Returns:
Gst.Element: Gstreamer Video Converter Element
"""
logger.info("Creating nvvidconv1 \n ")
if index:
element_name = "convertor1"
else:
element_name = "convertor"
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", element_name)
if not nvvidconv:
logger.error(" Unable to create nvvidconv1 \n")
raise Exception("Unable to create nvvidconv1")
return nvvidconv
def create_caps_and_filter() -> Gst.Element:
"""Create a Gstreamer Caps and Filter Element
Raises:
Exception: If unable to get the caps filter1
Returns:
Gst.Element: Gstreamer Caps and Filter Element
"""
logger.info("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1 or not caps1:
logger.error(" Unable to get the caps filter1 \n")
raise Exception("Unable to get the caps filter1")
filter1.set_property("caps", caps1)
return filter1
def create_tiler(
number_sources: int, input_height: int, input_width: int
) -> Gst.Element:
"""Creates a tiler element
Args:
number_sources (int): number of input sources
input_height (int): height for the tiler
input_width (int): width for the tiler
Raises:
Exception: if unable to create tiler
Returns:
Gst.Element: Gstreamer Tiler Element
"""
logger.info("Creating tiler \n ")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
logger.error(" Unable to create tiler \n")
raise Exception("Unable to create tiler")
tiler_rows = int(math.sqrt(number_sources))
tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
tiler.set_property("width", input_width)
tiler.set_property("height", input_height)
return tiler
def create_onscreendisplay() -> Gst.Element:
"""Creates a on screen display element
Raises:
Exception: if unable to create nvosd
Returns:
Gst.Element: Gstreamer On Screen Display Element
"""
logger.info("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
logger.error(" Unable to create nvosd \n")
raise Exception("Unable to create nvosd")
return nvosd
def create_sink(display: bool) -> Gst.Element:
"""Creates a sink element
Args:
display (bool): whether to display the output
Raises:
Exception: if unable to create nv3dsink
Exception: if unable to create egl sink
Exception: if unable to create Fakesink sink
Returns:
Gst.Element: Gstreamer Sink Element
"""
if display:
if is_aarch64():
logger.info("Creating nv3dsink \n")
sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not sink:
logger.error(" Unable to create nv3dsink \n")
raise Exception("Unable to create nv3dsink")
else:
logger.info("Creating EGLSink \n")
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
if not sink:
logger.error(" Unable to create egl sink \n")
raise Exception("Unable to create egl sink")
else:
logger.info("Creating Fakesink \n")
sink = Gst.ElementFactory.make("fakesink", "fakesink")
if not sink:
logger.error(" Unable to create Fakesink sink \n")
raise Exception("Unable to create Fakesink sink")
sink.set_property("sync", 0)
sink.set_property("qos", 0)
return sink
Here is the pipeline
def run_pipeline(self) -> None:
"""Create and run the pipeline."""
GObject.threads_init()
Gst.init(None)
is_live = False
self.pipeline = create_pipeline()
streammux = create_streammux(
batch_size=len(self.cameras_list),
input_height=self.configs.muxer_output_height,
input_width=self.configs.muxer_output_width,
)
self.pipeline.add(streammux)
#source_bins = []
for idx, camera in enumerate(self.cameras_list):
logger.info(f"Creating source_bin {idx} \n")
uri_name = camera.camera_uri
#uri_name = "rtsp://127.0.0.1:8554/stream1"
if "rtsp://" in uri_name:
is_live = True
source = Gst.ElementFactory.make("rtspsrc", f"rtspsrc{idx}")
source.set_property("latency", 0) #
source.set_property("protocols", "tcp")
source.set_property("location", uri_name)
source.set_property('buffer-mode', 'none')
depay = Gst.ElementFactory.make("rtph264depay", f"depay{idx}")
source.connect('pad-added', on_pad_added, depay)
h264parser = Gst.ElementFactory.make("h264parse", f"h264parse{idx}")
decoder = Gst.ElementFactory.make("nvv4l2decoder", f"decoder{idx}")
self.pipeline.add(source)
#self.pipeline.add(queue)
self.pipeline.add(depay)
self.pipeline.add(h264parser)
self.pipeline.add(decoder)
source.link(depay)
depay.link(h264parser)
h264parser.link(decoder)
padname = "sink_%u" % idx
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
logger.error("Unable to create sink pad bin \n")
raise Exception("Unable to create sink pad bin")
srcpad = decoder.get_static_pad("src")
if not srcpad:
logger.error("Unable to create src pad bin \n")
raise Exception("Unable to create src pad bin")
srcpad.link(sinkpad)
# add camera to self.stream_id_to_camera for easy access
self.stream_id_to_camera[camera.stream_id] = camera
# add camera to self.stream_id_to_gate for easy access
pgie = create_pgie(
self.configs.wheel_detector_config_file, len(self.cameras_list)
)
tracker = create_tracker(
self.configs.tracker_txt_path, self.configs.tracker_yaml_path
)
sgie = create_sgie(
self.configs.nuts_detector_config_file, len(self.cameras_list)
)
nvvidconv1 = create_videoconverter(1)
filter1 = create_caps_and_filter()
tiler = create_tiler(
number_sources=len(self.cameras_list),
input_height=self.configs.tiler_height,
input_width=self.configs.tiler_width,
)
nvvidconv = create_videoconverter()
nvosd = create_onscreendisplay()
sink = create_sink(display=False)
if is_live:
logger.info("Atleast one of the sources is live")
streammux.set_property("live-source", 1)
if not is_aarch64():
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
tiler.set_property("nvbuf-memory-type", mem_type)
logger.info("Adding elements to Pipeline \n")
self.pipeline.add(pgie)
self.pipeline.add(tracker)
self.pipeline.add(sgie)
self.pipeline.add(tiler)
self.pipeline.add(nvvidconv)
self.pipeline.add(filter1)
self.pipeline.add(nvvidconv1)
self.pipeline.add(nvosd)
self.pipeline.add(sink)
logger.info("Linking elements in the Pipeline \n")
streammux.link(pgie)
pgie.link(tracker)
tracker.link(sgie)
sgie.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
nvosd.link(sink)
loop = GLib.MainLoop()
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
sink_pad = tiler.get_static_pad("sink")
if not sink_pad:
logger.error(" Unable to get src pad \n")
sys.exit(1)
else:
sink_pad.add_probe(
Gst.PadProbeType.BUFFER, self.tiler_sink_pad_buffer_probe, 0
)
GLib.timeout_add(20000, self.perf_data.perf_print_callback)
# List the sources
logger.info("Now playing...")
for idx, source in enumerate(self.cameras_list):
logger.info(f"Playing {idx} : {source.camera_uri}")
Both Cameras are 1920*1080 with 15 fps, pipeline normally runs at 15 fps but sometimes due to post-processing fps drops to 10 or 11 for may be 30 to 1 mins, after post-processing it again continue with 15 fps.
one camera is 20 to 30 seconds behind and second one is 90 to 120 seconds behind compared to real-time rtsp after running the pipeline for 10 to 12 hours