Please provide complete information as applicable to your setup.
• GPU Nvidia A10
• DeepStream Version 6.1
• NVIDIA GPU Driver Version 535.154.05
• Issue Type (questions)
When running more than 10 live cameras in deepstream with multiple pipelines, and around 2-3 cameras per pipeline, output frames stop abruptly.
When using TCP for source connection we get EOS messages (All cameras are live rtsp cameras, so we did not expect EOS)
got message which says nvstreammux: Successfully handled EOS for source
when using UDP there is no EOS message but still frames stop after sometime.
What need to be done to have continous flow of frames without any breaking? What is causing the above mentioned issue?
Few More Observations:
- Got these notable logs in GST DEBUG
- INFO rtpjitterbuffer gstrtpjitterbuffer.c:4004:do_deadline_timeout: got deadline timeout
- WARN rtpjitterbuffer rtpjitterbuffer.c:587:calculate_skew: delta - skew: 0:02:16.055133709 too big, reset skew
- WARN rtpsource rtpsource.c:1206:update_receiver_stats: unacceptable seqnum received (seqnr45802, delta -4617, packet_rate: 969, max_dropout: 58140, max_misorder: 1938)
- FIXME videorate gstvideorate.c:705:gst_video_rate_push_buffer:<videorate_51> No buffer duration known
- INFO rtpjitterbuffer rtpjitterbuffer.c:795:rtp_jitter_buffer_calculate_pts: resync to time 0:08:55.270988842, rtptime 6:45:19.010111111
- WARN v4l2videodec gstv4l2videodec.c:1353:gst_v4l2_video_dec_loop: Decoder is producing too many buffers
- rtpsource rtpsource.c:1206:update_receiver_stats: unacceptable seqnum received (seqnr 41355, delta 13923, packet_rate: 107, max_dropout: 6420, max_misorder: 214)
Added some code similar to ours
main pipeline creation code
Standard GStreamer initialisation
GObject.threads_init()
Gst.init(None)
# Create gstreamer elements
pipeline = create_pipeline()
# Create nvstreammux instance to form batches from one or more sources.
streammux = create_streammux()
pipeline.add(streammux)
streamdemux = create_streamdemux()
for source_num in len(sources):
source = sources[source_num]
source_bin = create_source_bin(source_num, source, drop_frame_rate)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname = "sink_%u" % source_num
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write(f"Unable to create sink pad bin for source {source_num} \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
pgie = create_pgie()
# create tracker
if config.use_nvtracker:
tracker = create_tracker([source_num for source_num in len(sources)])
# Use convertor to convert from NV12 to RGBA as required by nvosd
nvvidconv = create_nvvidconv([source_num for source_num in len(sources)])
# Create OSD to draw on the converted RGBA buffer.
nvosd = create_nvosd([source_num for source_num in len(sources)])
# nvvidconv_postosd = create_nvvidconv()
"""
RTSP Out Code Start
"""
# Create a caps filter
caps = create_capsfilter([source_num for source_num in len(sources)])
# Make the encoder
encoder = create_encoder([source_num for source_num in len(sources)])
# Make the payload-encode video into RTP packets
rtppay = create_rtppay([source_num for source_num in len(sources)])
# Make the UDP sink
udpsink_start_port = 5400
udpsink_port_list = [port for port in range(udpsink_start_port, udpsink_start_port+len([source_num for source_num in len(sources)]))]
sink = create_udpsink([source_num for source_num in len(sources)], udpsink_port_list)
"""
RTSP Out Code End
"""
streammux = add_streammux_props(streammux, frame_width, frame_height) # setting properties of streammux
pgie.set_property('config-file-path', primary_model_config_file) # setting properties of pgie
pipeline.add(pgie)
pipeline.add(streamdemux)
if config.use_nvtracker:
dum_var = [pipeline.add(v) for v in tracker.values()]
dum_var = [pipeline.add(v) for v in nvvidconv.values()]
dum_var = [pipeline.add(v) for v in nvosd.values()]
dum_var = [pipeline.add(v) for v in caps.values()]
dum_var = [pipeline.add(v) for v in encoder.values()]
dum_var = [pipeline.add(v) for v in rtppay.values()]
dum_var = [pipeline.add(v) for v in sink.values()]
# Link the elements together:
# uridecodebin -> streammux -> nvinfer ->
# nvtracker -> nvvidconv -> nvosd -> nvvidconv_postosd ->
# caps -> encoder -> rtppay -> udpsink
streammux.link(pgie)
pgie.link(streamdemux)
for source_num in len(sources):
source = sources[source_num]
srcpad1 = streamdemux.get_request_pad(f"src_{source_num}")
if not srcpad1:
sys.stderr.write(" Unable to get the src pad of streamdemux \n")
continue
if config.use_nvtracker:
sinkpad1 = tracker[f"tracker_{source_num}"].get_static_pad("sink")
if not sinkpad1:
sys.stderr.write(" Unable to get sink pad of tracker \n")
else:
sinkpad1 = nvvidconv[f"nvvidconv_{source_num}"].get_static_pad("sink")
if not sinkpad1:
sys.stderr.write(" Unable to get sink pad of nvvidconv \n")
srcpad1.link(sinkpad1)
#######################
if config.use_nvtracker:
tracker[f"tracker_{source_num}"].link(nvvidconv[f"nvvidconv_{source_num}"])
nvvidconv[f"nvvidconv_{source_num}"].link(nvosd[f"nvosd_{source_num}"])
nvosd[f"nvosd_{source_num}"].link(caps[f"caps_{source_num}"])
caps[f"caps_{source_num}"].link(encoder[f"encoder_{source_num}"])
encoder[f"encoder_{source_num}"].link(rtppay[f"rtppay_{source_num}"])
rtppay[f"rtppay_{source_num}"].link(sink[f"sink_{source_num}"])
# create an event loop and feed gstreamer bus mesages to it
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# Start streaming
rtsp_port_num = 8554
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)
factory_dict = dict()
for i in range(len(sources)):
udpsink_port = udpsink_port_list[i]
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch(
"( udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (
udpsink_port, codec))
factory.set_shared(True)
factory_dict[f"factory_{i}"] = factory
for source_num in len(sources):
server.get_mount_points().add_factory("/ds-test", factory_dict[f"factory_{source_num}"])
osdsinkpad = dict()
for source_num in len(sources)
osdsinkpad[f"osdsinkpad_{source_num}"] = nvosd[f"nvosd_{source_num}"].get_static_pad("sink")
if not osdsinkpad[f"osdsinkpad_{source_num}"]:
sys.stderr.write(" Unable to get sink pad of nvosd \n")
for osp, source in zip(osdsinkpad.values(), sources):
osp.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0, source)
# start play back and listen to events
print("Starting pipeline \n", pipeline)
pipeline.set_state(Gst.State.PLAYING)
loop.run()`
Creating Source
def cb_newpad(decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
print("gstname=", gstname)
if (gstname.find("video") != -1):
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
print("features=", features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy, Object, name, user_data):
print("Decodebin child added:", name)
if(name.find("decodebin") != -1):
Object.connect("child-added", decodebin_child_added, user_data)
if(name.find("nvv4l2decoder") != -1):
print("Seting bufapi_version\n")
if use_tcp and "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('protocols') != None:
Object.set_property("protocols", GstRtsp.RTSPLowerTrans.TCP)
def create_source_bin(index, uri):
bin_name = "source-bin-%02d" % index
print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
Added Videorate filter also
def create_videorate_filter(cam_id, intended_fps):
filter = Gst.ElementFactory.make('videorate', f'videorate_{cam_id}')
# caps = Gst.caps_from_string(f"video/x-raw(memory:NVMM),framerate=5/1")
# filter.set_property("caps", caps)
filter.set_property("drop-only", True)
filter.set_property("max-rate", int(intended_fps))
if not filter:
sys.stderr.write(" Unable to create capsfilter \n")
return filter