Hello NVIDIA Developer Community,
I’m currently working on a project with the NVIDIA Orin Nano where I aim to perform real-time object detection using a TensorRT engine for YOLOv8 in INT8 precision. My setup includes an IMX477 CSI camera, and I am leveraging GStreamer to capture frames and pass them for inference.
The current inference latency is around 24ms per frame, and I’d like to optimize this further to achieve lower latency.
My Setup:
- Hardware:
- NVIDIA Orin Nano.
- IMX477 CSI camera.
- Software:
- TensorRT engine (
int8
, batch size 1). - GStreamer for real-time frame capture and preprocessing.
- Pipeline Description:
- Capture frames using the following GStreamer pipeline:
nvarguscamerasrc sensor-id=0 !
video/x-raw(memory:NVMM),width=1920,height=1080,framerate=60/1 !
nvvideoconvert !
video/x-raw(memory:NVMM),format=RGBA,width=640,height=640 !
appsink name=sink max-buffers=1 drop=true
- Process frames using the
appsink
and TensorRT.
Full Code:
python
from ultralytics import YOLO
import time
import cv2
import os
import csv
from tqdm import tqdm
import numpy as np
import torch as tf
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
# Define a callback function to process the frames
def on_new_sample(sink):
sample = sink.emit("pull-sample")
if sample:
# Extract the buffer (raw frame data)
buffer = sample.get_buffer()
caps = sample.get_caps()
# Get frame size and convert to numpy array
frame_size = buffer.get_size()
frame_data = buffer.extract_dup(0, frame_size)
load_ts = time.time_ns()
# Convert to a numpy array (assuming BGRx format)
frame = np.ndarray(
(640, 640, 4), # Resolution and channels (BGRx)
dtype=np.uint8,
buffer=frame_data
)
capture_ts_offset = buffer.pts
capture_ts = buffer.pts + pipeline_start_timestamp
preprocessed_ts = time.time_ns()
result = run_inference(frame[:, :, :3],capture_ts_offset,capture_ts,load_ts, preprocessed_ts)
results.append(result)
# Return Gst.FlowReturn.OK to continue receiving frames
return Gst.FlowReturn.OK
return Gst.FlowReturn.ERROR
def init_gstream():
# Initialize GStreamer
Gst.init(None)
# Create pipeline
pipeline_description = (
'nvarguscamerasrc sensor-id=0 ! '
'video/x-raw(memory:NVMM),width=1920,height=1080,framerate=60/1! '
'nvvidconv ! '
'video/x-raw,format=BGRx,width=640,height=640! '
'appsink name=sink max-buffers=1 drop=true'
)
pipeline = Gst.parse_launch(pipeline_description)
# Get the appsink element
appsink = pipeline.get_by_name("sink")
# Set the appsink to emit signals for new samples
appsink.set_property("emit-signals", True)
appsink.set_property("sync", False)
# appsink.set_property("max-latency", 0)
# Connect the callback to the appsink signal
appsink.connect("new-sample", on_new_sample)
# Start the pipeline
pipeline.set_state(Gst.State.PLAYING)
pipeline_start_timestamp = time.time_ns()
return appsink, pipeline_start_timestamp
def run_inference(frame, capture_ts_offset, capture_ts, load_ts,preprocessed_ts):
# Run inference on the image in memory
trt_model.predict(frame, batch = 1, verbose = False)
# Record the timestamp after inference
after_inference_ts = time.time_ns()
# Calculate inference time
latency_time = after_inference_ts - capture_ts
# Append results for this image to the results list
result = [capture_ts_offset, capture_ts, load_ts,preprocessed_ts, after_inference_ts, latency_time]
return result
# Load the exported TensorRT model
trt_model = YOLO("yolov8n_int8_batch1.engine")
#Initialize the camera gstream pipeline
appsink, pipeline_start_timestamp = init_gstream()
results = []
# Run the GMainLoop to keep the pipeline running
loop = GLib.MainLoop()
start_ts = time.time_ns()
try:
# while(time.time_ns() <= (start_ts + 20000000000)):#20seconds
loop.run()
except KeyboardInterrupt:
print("Pipeline interrupted")
results_np = np.array(results)
print(f"Latency: {results_np[:,-1].mean()}")
# Save all results to CSV file at once
output_csv = "inference_times_seg_1080p.csv"
with open(output_csv, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Image Capture Timestamp Offset","Image Capture Timestamp", "Image Load Timestamp","Preprocessed Image Timestamp", "After Inference Timestamp", "Latency Time (seconds)"])
writer.writerows(results)
print(f"Inference times saved to {output_csv}")
# Clean up when done
pipeline.set_state(Gst.State.NULL)
My Observations and Challenges:
- The inference time is consistently ~24ms per frame with INT8 precision.
- I am aiming to reduce this latency further for improved real-time performance.
- I suspect there might be optimizations possible in the GStreamer pipeline, memory handling, or TensorRT inference configuration.
Questions:
- How can I optimize the pipeline to further reduce inference latency?
- Are there specific GStreamer or TensorRT configurations that can help?
- Can I use a more efficient way to pass frames from the IMX477 camera to the TensorRT engine (e.g., direct GPU memory processing or avoiding unnecessary memory transfers)?
- Are there any other tricks to optimize TensorRT engine performance for INT8 inference on the Orin Nano?
Any advice or suggestions would be greatly appreciated. Thank you for your help!