• Hardware Platform (Jetson / GPU) Jetson Orin Nano
• DeepStream Version 7.0
• JetPack Version (valid for Jetson only) 6.0
• TensorRT Version 8.6.2
I am using a program similiar to this one deepstream_python_apps/apps/deepstream-rtsp-in-rtsp-out/deepstream_test1_rtsp_in_rtsp_out.py at master · NVIDIA-AI-IOT/deepstream_python_apps · GitHub and I wanted to know how many inputs it can have at the same time on a Orin Nano. When I try to run too many processes at the same time, I get this
I got this when I was running the link-out program with 3 links and another one that also takes these 3 links and writes the inferences info on a sqlite database. With 3 links-out and 2 links into database, it runs.
I plan to otimize the code, maybe merge both programs into one, but I wanted to know if I had a perfect optimized code, what would be the expected amount of inputs I could read at the same time.
For more info:
Here is the main function of link out program
def main(args):
# Initialize GStreamer and RTSP server
Gst.init(None)
server = GstRtspServer.RTSPServer.new()
server.props.service = "8554"
server.attach(None)
pipelines = []
osdsinkpads = [] # Store osdsinkpads for each pipeline
starting_udp_port = 5400
for i, input_file in enumerate(args.input):
udp_port = starting_udp_port + i
rtsp_mount_point = f"/ds-test-{i}"
pipeline, codec, nvosd = create_pipeline(
input_file,
udp_port,
args.codec,
args.bitrate,
args.enc_type
)
if pipeline:
pipelines.append(pipeline)
# Store the osdsinkpad for this nvosd
osdsinkpad = nvosd.get_static_pad("sink")
if not osdsinkpad:
sys.stderr.write(f" Unable to get sink pad of nvosd for input {i} \n")
continue
osdsinkpads.append(osdsinkpad)
# Add probe to the sink pad of nvosd for this pipeline
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch(
f"( udpsrc name=pay0 port={udp_port} buffer-size=524288 "
f"caps=\"application/x-rtp, media=video, clock-rate=90000, "
f"encoding-name=(string){codec}, payload=96 \" )"
)
factory.set_shared(True)
server.get_mount_points().add_factory(rtsp_mount_point, factory)
print(f"\n*** DeepStream: Launched RTSP Streaming at rtsp://localhost:8554{rtsp_mount_point} ***\n")
# Main loop
loop = GLib.MainLoop()
for pipeline in pipelines:
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("Failed to set pipeline to PLAYING")
return 1
try:
loop.run()
except Exception as e:
print(f"Main loop terminated: {e}")
# Cleanup
for pipeline in pipelines:
pipeline.set_state(Gst.State.NULL)
Here is the main function of the database one
def main(input_files, db_file, labels_file, ignore_background, location):
labels = load_labels(labels_file)
background_class = labels[0] if labels else "background"
lock = threading.Lock()
frame_queue = Queue()
def write_worker():
aggregated_data = {}
while True:
frame_data = frame_queue.get()
if frame_data is None:
break
for key, value in frame_data.items():
if key not in aggregated_data:
aggregated_data[key] = value
else:
for label, count in value.items():
aggregated_data[key][label] = aggregated_data[key].get(label, 0) + count
if frame_queue.qsize() == 0:
write_to_sqlite(aggregated_data, db_file, labels, background_class, ignore_background, location, lock)
aggregated_data.clear()
with ThreadPoolExecutor(max_workers=len(input_files) + 1) as executor:
executor.submit(write_worker)
futures = [
executor.submit(process_stream, input_file, stream_id, db_file, labels, background_class, ignore_background, location, lock, frame_queue)
for stream_id, input_file in enumerate(input_files, start=1)
]
for future in futures:
future.result()
frame_queue.put(None)
I will probably replace ThreadPoolExecution with ProcessPoolExecutor. Not sure if it will increase amount of possible inputs though, worth a try at least.