Please provide complete information as applicable to your setup.
• Hardware Platform GPU
• DeepStream Version 7.1
• NVIDIA GPU Driver Version 535.183.01
• Issue Type rtsp input frame is disorted
**• How to reproduce the issue ? I have attached the code
nvurisrcbin → lead to input frame distorted
uridecodebin → no problem in reading input frame
rtspsrc → no problem in reading input frame
- I tried to use nvurisrcbin for rtsp reconnection property, the reconnection property is working fine but when reading frame from rtsp, lead to disorted frame (jitter effect) how to resolve it.
- When I try to replace with uridecodebin it works fine no distorted or jitter effect
- I tried paralley running nvurisrcbin and uridecodebin, the same frame uridecodebin is working but nurisrcbin image is distorted.
import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import json
import pyds
import uuid
import platform
import numpy as np
from urllib.parse import urlparse
from common.platform_info import PlatformInfo
from common.bus_call import bus_call #setup_gstreamer_debugging
from common.FPS import PERF_DATA
from azure.iot.device import IoTHubDeviceClient, Message
import os
from datetime import datetime
import pytz
from functools import partial
save_img_time_interval = 5
perf_data = None
PGIE_CLASS_ID_PERSON = 0
PGIE_CLASS_ID_BAG = 1
PGIE_CLASS_ID_FACE = 2
MUXER_BATCH_TIMEOUT_USEC = 4000
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["person","bag","face"]
def nvanalytics_src_pad_buffer_probe(pad,info,u_data):
frame_number=0
num_rects=0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number=frame_meta.frame_num
l_obj=frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
obj_counter = {
PGIE_CLASS_ID_PERSON : 0,
PGIE_CLASS_ID_BAG : 0,
PGIE_CLASS_ID_FACE : 0
}
while l_obj:
try:
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
l_user_meta = obj_meta.obj_user_meta_list
while l_user_meta:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSOBJ.USER_META"):
user_meta_data = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
except StopIteration:
break
try:
l_user_meta = l_user_meta.next
except StopIteration:
break
try:
l_obj=l_obj.next
except StopIteration:
break
l_user = frame_meta.frame_user_meta_list
while l_user:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.USER_META"):
user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
except StopIteration:
break
try:
l_user = l_user.next
except StopIteration:
break
stream_index = "stream{0}".format(frame_meta.pad_index)
global perf_data
perf_data.update_fps(stream_index)
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def cb_newpad(decodebin, decoder_src_pad,data):
print("In cb_newpad\n")
caps=decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
source_bin=data
features=caps.get_features(0)
print("gstname=",gstname)
if(gstname.find("video")!=-1):
print("features=",features)
if features.contains("memory:NVMM"):
bin_ghost_pad=source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def create_source_bin(index, uri):
print("Creating source bin")
bin_name = "source-bin-%02d" % index
print('bin_name-', bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
return None
nv_bin = Gst.ElementFactory.make("nvurisrcbin", "nvurisrcbin")
if not nv_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
return None
reconnect_attempts = -1
reconnect_interval = 10
nv_bin.set_property("rtsp-reconnect-attempts", reconnect_attempts)
nv_bin.set_property("rtsp-reconnect-interval", reconnect_interval)
nv_bin.set_property("message-forward", True)
nv_bin.set_property("uri", uri)
nv_bin.connect("pad-added", cb_newpad, nbin)
nv_bin.connect("child-added", decodebin_child_added, nbin)
Gst.Bin.add(nbin, nv_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def extract_rtsp_details(rtsp_url):
parsed_uri = urlparse(rtsp_url)
if parsed_uri.path and parsed_uri.path != '/':
return parsed_uri.path.split('/')[-1]
else:
return f"{parsed_uri.hostname}:{parsed_uri.port}"
IST = pytz.timezone('Asia/Kolkata')
def create_dynamic_path(base_path, ip_port):
now = datetime.now()
now = datetime.now(IST)
dynamic_path = os.path.join(
base_path,
ip_port,
str(now.year)+"_year",
f"{now.month:02d}_month",
f"{now.day:02d}_day",
f"{now.hour:02d}_hour",
f"{now.minute:02d}_minute",
)
os.makedirs(dynamic_path, exist_ok=True)
return dynamic_path
last_saved_time = {}
def frame_filter_pad_probe(pad, info, user_data):
global last_saved_time
current_time = time.time()
stream_id = user_data["stream_id"]
uri_name = user_data["uri_name"]
if stream_id not in last_saved_time:
last_saved_time[stream_id] = 0
if current_time - last_saved_time[stream_id] >= save_img_time_interval:
last_saved_time[stream_id] = current_time
cam_id_ip_port = extract_rtsp_details(uri_name)
base_path = "/Deepstream_output/nsnv"
image_folder_path = create_dynamic_path(base_path, cam_id_ip_port)
now = datetime.now(IST)
image_name = f"{now.year}_{now.month:02d}_{now.day:02d}_{now.hour:02d}_{now.minute:02d}_{now.second:02d}.jpg"
image_save_path = os.path.join(image_folder_path, image_name)
multifilesink = pad.get_parent_element()
multifilesink.set_property("location", image_save_path)
return Gst.PadProbeReturn.OK
def main(args):
if len(args) < 2:
sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % args[0])
sys.exit(1)
global perf_data
perf_data = PERF_DATA(len(args) - 1)
number_sources=len(args)-1
is_live = False
platform_info = PlatformInfo()
Gst.init(None)
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
# New streammux
print("Creating streamux \n ")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 40000)
streammux.set_property("sync-inputs", True)
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ",i," \n ")
uri_name=args[i+1]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname="sink_%u" %i
sinkpad= streammux.request_pad_simple(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad=source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue1.set_property("max-size-buffers", 400)
queue1.set_property("leaky", True)
queue2.set_property("max-size-buffers", 400)
queue2.set_property("leaky", True)
queue3.set_property("max-size-buffers", 400)
queue3.set_property("leaky", True)
queue4.set_property("max-size-buffers", 400)
queue4.set_property("leaky", True)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
nvanalytics.set_property("config-file", "config_nvdsanalytics.txt")
pgie.set_property('config-file-path', "dsnvanalytics_pgie_config.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie.set_property("batch-size",number_sources)
config = configparser.ConfigParser()
config.read('dsnvanalytics_tracker_config.txt')
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
pipeline.add(queue1,queue2,queue3,queue4,pgie,tracker,nvanalytics,nvstreamdemux)
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(tracker)
tracker.link(queue3)
queue3.link(nvanalytics)
nvanalytics.link(queue4)
queue4.link(nvstreamdemux)
for i in range(number_sources):
queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
nvvideoconvert = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
nvdsosd = Gst.ElementFactory.make("nvdsosd",f"nvdsosd_{i}")
queue6 = Gst.ElementFactory.make("queue", f"queue6_{i}")
nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert2_{i}")
jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
queue5.set_property("max-size-buffers", 400)
queue5.set_property("leaky", True)
queue6.set_property("max-size-buffers", 400)
queue6.set_property("leaky", True)
pipeline.add(queue5, nvvideoconvert, nvdsosd, queue6, nvvidconv2, jpegenc, multifilesink)
nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue5.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
jpegenc.set_property("quality", 90)
multifilesink.set_property("post-messages", True)
queue5.link(nvvideoconvert)
nvvideoconvert.link(nvdsosd)
nvdsosd.link(queue6)
queue6.link(nvvidconv2)
nvvidconv2.link(jpegenc)
jpegenc.link(multifilesink)
uri_name = args[i + 1]
probe_data = {
"stream_id": i,
"uri_name": uri_name,
}
sinkpad = multifilesink.get_static_pad("sink")
if not sinkpad:
sys.stderr.write("Unable to get sink pad of multifilesink \n")
sys.exit(1)
sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
nvanalytics_src_pad=nvanalytics.get_static_pad("src")
if not nvanalytics_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)
GLib.timeout_add(5000, perf_data.perf_print_callback)
print("Now playing...")
for i, source in enumerate(args):
if (i != 0):
print(i, ": ", source)
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))