Deepstream RTSP in and RTSP out with nvanalytics and tracker

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) :Jetson nano 4gb
• DeepStream Version:5.1
• JetPack Version (valid for Jetson only):4.5.1
• TensorRT Version :7.1
• NVIDIA GPU Driver Version (valid for GPU only) :10.2
• Issue Type( questions, new requirements, bugs) :bugs
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing):yes
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description):deepstream RTSP_IN_RTSP_OUT

At Server/Producer Side:
I have merged the nvanalytics and Tracker with RTSPinRTSPout pipeline. And writing the stream on port 5400 using udp sink and then hosting it on rtsp server at port 8554 : rtsp://localhost:8554/ds-test

At Client/Consumer Side:
Fetched the video from rtsp://localhost:8554/ds-test and saved to local disk using gstreamer pipeline.

Problems I am facing:

  1. Few Video file (.mp4) got saved with size of bytes. (Out of 90 videos 30 videos in bytes(size) are not playable)
  2. We are not getting constant video duration (ranging from 2 sec to 13 sec)

Please see below the code for reference:
Server/Producer:

Standard GStreamer initialization

GObject.threads_init()
Gst.init(None)

# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False

if not pipeline:
    sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")

# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
    sys.stderr.write(" Unable to create NvStreamMux \n")

pipeline.add(streammux)
for i in range(number_sources):
    print("Creating source_bin ", i, " \n ")
    uri_name = args[i]
    if uri_name.find("rtsp://") == 0:
        is_live = True
    source_bin = create_source_bin(i, uri_name)
    if not source_bin:
        sys.stderr.write("Unable to create source bin \n")
    pipeline.add(source_bin)
    padname = "sink_%u" % i
    sinkpad = streammux.get_request_pad(padname)
    if not sinkpad:
        sys.stderr.write("Unable to create sink pad bin \n")
    srcpad = source_bin.get_static_pad("src")
    if not srcpad:
        sys.stderr.write("Unable to create src pad bin \n")
    srcpad.link(sinkpad)
    
# queue1=Gst.ElementFactory.make("queue","queue1")
# queue2=Gst.ElementFactory.make("queue","queue2")
# queue3=Gst.ElementFactory.make("queue","queue3")
# queue4=Gst.ElementFactory.make("queue","queue4")
# queue5=Gst.ElementFactory.make("queue","queue5")
# queue6=Gst.ElementFactory.make("queue","queue6")
# queue7=Gst.ElementFactory.make("queue","queue7")
# pipeline.add(queue1)
# pipeline.add(queue2)
# pipeline.add(queue3)
# pipeline.add(queue4)
# pipeline.add(queue5)
# pipeline.add(queue6)
# pipeline.add(queue7)

print("Creating Pgie \n ")
if gie=="nvinfer":
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
else:
    pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
if not pgie:
    sys.stderr.write(" Unable to create pgie \n")
    
print("Creating nvtracker \n ")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
    sys.stderr.write(" Unable to create tracker \n")

print("Creating nvdsanalytics \n ")
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
if not nvanalytics:
    sys.stderr.write(" Unable to create nvanalytics \n")
nvanalytics.set_property("config-file", 'config_ATMS.txt')

print("Creating tiler \n ")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
    sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
    sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
    sys.stderr.write(" Unable to create nvosd \n")
nvvidconv_postosd = Gst.ElementFactory.make(
    "nvvideoconvert", "convertor_postosd")
if not nvvidconv_postosd:
    sys.stderr.write(" Unable to create nvvidconv_postosd \n")

# Create a caps filter
caps = Gst.ElementFactory.make("capsfilter", "filter")
caps.set_property(
    "caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420")
)

# Make the encoder
if codec == "H264":
    encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
    print("Creating H264 Encoder")
elif codec == "H265":
    encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
    print("Creating H265 Encoder")
if not encoder:
    sys.stderr.write(" Unable to create encoder")
encoder.set_property("bitrate", bitrate)
if is_aarch64():
    encoder.set_property("preset-level", 1)
    encoder.set_property("insert-sps-pps", 1)
    encoder.set_property("bufapi-version", 1)

# Make the payload-encode video into RTP packets
if codec == "H264":
    rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
    print("Creating H264 rtppay")
elif codec == "H265":
    rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
    print("Creating H265 rtppay")
if not rtppay:
    sys.stderr.write(" Unable to create rtppay")

# Make the UDP sink
updsink_port_num = 5400
sink = Gst.ElementFactory.make("udpsink", "udpsink")
if not sink:
    sys.stderr.write(" Unable to create udpsink")

sink.set_property("host", "224.224.255.255")
sink.set_property("port", updsink_port_num)
sink.set_property("async", False)
sink.set_property("sync", 1)

if is_live:
    print("Atleast one of the sources is live")
    streammux.set_property('live-source', 1)

streammux.set_property("width", 1280)
streammux.set_property("height", 960)
streammux.set_property("batch-size", 1)
streammux.set_property("batched-push-timeout", 4000000)

if gie=="nvinfer":
    pgie.set_property("config-file-path", "ATMS_pgie_config.txt")
else:
    pgie.set_property("config-file-path", "dstest1_pgie_inferserver_config.txt")


pgie_batch_size = pgie.get_property("batch-size")
if pgie_batch_size != number_sources:
    print(
        "WARNING: Overriding infer-config batch-size",
        pgie_batch_size,
        " with number of sources ",
        number_sources,
        " \n",
    )
    pgie.set_property("batch-size", number_sources)

print("Adding elements to Pipeline \n")
tiler_rows = int(math.sqrt(number_sources))
tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos", 0)

#Set properties of tracker
config = configparser.ConfigParser()
config.read('ATMS_tracker_config.txt')
config.sections()

for key in config['tracker']:
    if key == 'tracker-width' :
        tracker_width = config.getint('tracker', key)
        tracker.set_property('tracker-width', tracker_width)
    if key == 'tracker-height' :
        tracker_height = config.getint('tracker', key)
        tracker.set_property('tracker-height', tracker_height)
    if key == 'gpu-id' :
        tracker_gpu_id = config.getint('tracker', key)
        tracker.set_property('gpu_id', tracker_gpu_id)
    if key == 'll-lib-file' :
        tracker_ll_lib_file = config.get('tracker', key)
        tracker.set_property('ll-lib-file', tracker_ll_lib_file)
    if key == 'll-config-file' :
        tracker_ll_config_file = config.get('tracker', key)
        tracker.set_property('ll-config-file', tracker_ll_config_file)
    if key == 'enable-batch-process' :
        tracker_enable_batch_process = config.getint('tracker', key)
        tracker.set_property('enable_batch_process', tracker_enable_batch_process)
    if key == 'enable-past-frame' :
        tracker_enable_past_frame = config.getint('tracker', key)
        tracker.set_property('enable_past_frame', tracker_enable_past_frame)

pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(nvanalytics)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(nvvidconv_postosd)
pipeline.add(caps)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(sink)

streammux.link(pgie)
pgie.link(tracker)
tracker.link(nvanalytics)
nvanalytics.link(nvvidconv)
nvvidconv.link(tiler)
tiler.link(nvosd)
nvosd.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)

# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)

# nvanalytics_src_pad=nvanalytics.get_static_pad("src")
# if not nvanalytics_src_pad:
#     sys.stderr.write(" Unable to get src pad \n")
# else:
#     nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)

tiler_sink_pad = tiler.get_static_pad("sink")
if not tiler_sink_pad:
    sys.stderr.write(" Unable to get sink pad \n")
else:
    tiler_sink_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_sink_pad_buffer_probe, 0)

# Start streaming
rtsp_port_num = 8554

server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)

factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch(
    '( udpsrc name=pay0 port=%d buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 " )'
    % (updsink_port_num, codec)
)
factory.set_shared(True)
server.get_mount_points().add_factory("/ds-test", factory)

print(
    "\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/ds-test ***\n\n"
    % rtsp_port_num
)

for i, source in enumerate(args):
    if (i != 0):
        print(i, ": ", source)

# start play back and listen to events
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
    loop.run()
except BaseException:
    pass
# cleanup
pipeline.set_state(Gst.State.NULL)

def parse_args():
parser = argparse.ArgumentParser(description='RTSP Output Sample Application Help ')
parser.add_argument("-i", “–input”,
help=“Path to input H264 elementry stream”, nargs="+", default=[“a”], required=True)
parser.add_argument("-g", “–gie”, default=“nvinfer”,
help=“choose GPU inference engine type nvinfer or nvinferserver , default=nvinfer”, choices=[‘nvinfer’,‘nvinferserver’])
parser.add_argument("-c", “–codec”, default=“H264”,
help=“RTSP Streaming Codec H264/H265 , default=H264”, choices=[‘H264’,‘H265’])
parser.add_argument("-b", “–bitrate”, default=4000000,
help="Set the encoding bitrate ", type=int)
# Check input arguments
if len(sys.argv)==1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
global codec
global bitrate
global stream_path
global gie
gie = args.gie
codec = args.codec
bitrate = args.bitrate
stream_path = args.input
return stream_path

if name == ‘main’:
stream_path = parse_args()
sys.exit(main(stream_path))

Client/Listener:
# Initializes Gstreamer, it’s variables, paths
Gst.init(sys.argv)

    command="rtspsrc location=rtsp://localhost:8554/ds-test latency=0 ! rtph264depay ! h264parse ! mp4mux ! filesink location="+tempFileName
   
    pipeline = Gst.parse_launch(command)
    
    bus = pipeline.get_bus()
    
    # allow bus to emit messages to main thread
    bus.add_signal_watch()
    
    # Init GObject loop to handle Gstreamer Bus Events
    loop = GObject.MainLoop()
    
    # Add handler to specific signal
    bus.connect("message", on_message, loop)
    
    # Start pipeline
    pipeline.set_state(Gst.State.PLAYING)
    time.sleep(15)
    pipeline.send_event(Gst.Event.new_eos())
    try:
        loop.run()
    except Exception:
        #traceback.print_exc()
        loop.quit()
    # Stop Pipeline
    pipeline.set_state(Gst.State.NULL)

How about the system load (output from jtop) when the 2 programs are running? Please ensure that nano runs in MAXN power mode.

Already running in MAXN power mode

Will lower bitrate of encoder get more video saved?