Hi,
I’m writing a small demo-pipeline which takes 3 input RTSP streams from IP cameras, and then spits out 4 streams (3 single and 1 combined).
Keep in mind, this is just for a start, functionality will be added later.
I am experiencing a lot of latency in all streams, is there something I’m doing wrong / I can improve?
Thanks!
Below, the current code.
Cheers
import sys
import os
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
gi.require_version("GstRtspServer", "1.0")
from gi.repository import Gst, GstRtspServer, GLib
from utils import cb_newpad, decodebin_child_added, create_source_bin, create_and_add_element
from common.bus_call import bus_call
from common.is_aarch_64 import is_aarch64
import math
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "debugging/"
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
camera_uris = [
'rtsp://USER:PW@IP',
'rtsp://USER:PW@IP',
'rtsp://USER:PW@IP'
]
number_sources = 3
Gst.init(None)
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
streammux = create_and_add_element("nvstreammux", "streammux", pipeline)
for i in range(number_sources):
print("Creating source_bin ", i, " \n ")
uri_name = camera_uris[i]
if uri_name.find("rtsp://") == 0:
is_live = True
source_bin = create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
tee = create_and_add_element("tee", f"tee_{i}", pipeline)
queue = create_and_add_element("queue", f"queue_{i}", pipeline)
queue2 = create_and_add_element("queue", f"queue2_{i}", pipeline)
source_bin.link(tee)
tee.link(queue)
tee.link(queue2)
padname = "sink_%u" % i
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = queue.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
# BRANCH 2 SINGLE STREAM
nvconv = create_and_add_element("nvvideoconvert", f"nvconv_{i}", pipeline)
nvdsosd = create_and_add_element("nvdsosd", f"nvdsosd_{i}", pipeline)
nvconv_post = create_and_add_element("nvvideoconvert", f"nvconv_post_{i}", pipeline)
caps = create_and_add_element("capsfilter", f"capsfilter_{i}", pipeline)
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
encoder = create_and_add_element("nvv4l2h264enc", f"encoder_{i}", pipeline)
if is_aarch64():
encoder.set_property("preset-level", 1)
encoder.set_property("insert-sps-pps", 1)
rtppay = create_and_add_element("rtph264pay", f"rtppay_{i}", pipeline)
# Make the UDP sink
updsink_port_num = 5400+(i+1)
print("UDPSINK_PN: ", updsink_port_num)
sink = Gst.ElementFactory.make("udpsink", f"udpsink_{i}")
if not sink:
sys.stderr.write(" Unable to create udpsink")
sink.set_property("host", "224.224.255.255")
sink.set_property("port", updsink_port_num)
sink.set_property("async", False)
sink.set_property("sync", 1)
sink.set_property("qos", 0)
pipeline.add(sink)
queue2.link(nvconv)
nvconv.link(nvdsosd)
nvdsosd.link(nvconv_post)
nvconv_post.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)
tiler = create_and_add_element("nvmultistreamtiler", "nvmultistreamtiler", pipeline)
nvconv = create_and_add_element("nvvideoconvert", "nvconv", pipeline)
nvdsosd = create_and_add_element("nvdsosd", "nvdsosd", pipeline)
nvconv_post = create_and_add_element("nvvideoconvert", "nvconv_post", pipeline)
caps = create_and_add_element("capsfilter", "capsfilter", pipeline)
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
encoder = create_and_add_element("nvv4l2h264enc", "encoder", pipeline)
if is_aarch64():
encoder.set_property("preset-level", 1)
encoder.set_property("insert-sps-pps", 1)
rtppay = create_and_add_element("rtph264pay", "rtppay", pipeline)
# Make the UDP sink
updsink_port_num = 5400
print("UDPSINK_PN: ", updsink_port_num)
sink = Gst.ElementFactory.make("udpsink", "udpsink")
if not sink:
sys.stderr.write(" Unable to create udpsink")
sink.set_property("host", "224.224.255.255")
sink.set_property("port", updsink_port_num)
sink.set_property("async", False)
sink.set_property("sync", 1)
streammux.set_property("width", 640)
streammux.set_property("height", 720)
streammux.set_property("batch-size", number_sources)
streammux.set_property("batched-push-timeout", 33000)
print("Adding elements to Pipeline \n")
tiler_rows = int(math.sqrt(number_sources))
tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos", 0)
pipeline.add(sink)
streammux.link(nvconv)
nvconv.link(tiler)
tiler.link(nvdsosd)
nvdsosd.link(nvconv_post)
nvconv_post.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)
# Main loop
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# RTSP Server
rtsp_port_num = 855
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)
codec = "H264"
combined_factory = GstRtspServer.RTSPMediaFactory.new()
combined_factory.set_launch(
'( udpsrc name=pay0 port=%d buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 " )'
% (updsink_port_num, codec)
)
print("UDPSINK_PN: ", updsink_port_num)
combined_factory.set_shared(True)
server.get_mount_points().add_factory("/main", combined_factory)
for i in range(number_sources):
individual_factory = GstRtspServer.RTSPMediaFactory.new()
individual_factory.set_launch(
'( udpsrc name=pay0 port=%d buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 " )'
% (updsink_port_num+i+1, codec)
)
print("UDPSINK_PN: ", updsink_port_num+i+1)
individual_factory.set_shared(True)
server.get_mount_points().add_factory(f"/cam{i+1}", individual_factory)
print(
f"\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:{rtsp_port_num} ***\n"
f"Access combined stream at rtsp://localhost:{rtsp_port_num}/main\n"
f"Access individual streams at rtsp://localhost:{rtsp_port_num}/cam1, /cam2, /cam3\n\n"
)
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except BaseException:
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
Please provide complete information as applicable to your setup.
• Hardware Platform (Jetson / GPU) Jetson Xavier AGX
• DeepStream Version 6.3
• JetPack Version (valid for Jetson only) 5.1.2
• Issue Type( questions, new requirements, bugs) Question