Hello,
I’m working on a project with the NVIDIA Orin Nano, where I aim to capture frames from a CSI camera and perform inference using a YOLOv8 TensorRT engine. My goal is to optimize the pipeline so that the frames are transferred directly from the CSI camera to the GPU without unnecessary CPU memory transfers, ensuring the most efficient processing.
Here’s my current setup:
- Code:
from ultralytics import YOLO
import time
import cv2
import os
import csv
from tqdm import tqdm
import numpy as np
import torch as tf
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib
# Define a callback function to process the frames
def on_new_sample(sink):
sample = sink.emit("pull-sample")
if sample:
# Extract the buffer (raw frame data)
buffer = sample.get_buffer()
caps = sample.get_caps()
# Get frame size and convert to numpy array
frame_size = buffer.get_size()
frame_data = buffer.extract_dup(0, frame_size)
load_ts = time.time_ns()
# Convert to a numpy array (assuming BGRx format)
frame = np.ndarray(
(640, 640, 4), # Resolution and channels (BGRx)
dtype=np.uint8,
buffer=frame_data
)
capture_ts_offset = buffer.pts
capture_ts = buffer.pts + pipeline_start_timestamp
preprocessed_ts = time.time_ns()
result = run_inference(frame[:, :, :3],capture_ts_offset,capture_ts,load_ts, preprocessed_ts)
results.append(result)
# Return Gst.FlowReturn.OK to continue receiving frames
return Gst.FlowReturn.OK
return Gst.FlowReturn.ERROR
def init_gstream():
# Initialize GStreamer
Gst.init(None)
# Create pipeline
pipeline_description = (
'nvarguscamerasrc sensor-id=0 ! '
'video/x-raw(memory:NVMM),width=1920,height=1080,framerate=60/1! '
'nvvidconv ! '
'video/x-raw,format=BGRx,width=640,height=640! '
'appsink name=sink max-buffers=1 drop=true'
)
pipeline = Gst.parse_launch(pipeline_description)
# Get the appsink element
appsink = pipeline.get_by_name("sink")
# Set the appsink to emit signals for new samples
appsink.set_property("emit-signals", True)
appsink.set_property("sync", False)
# appsink.set_property("max-latency", 0)
# Connect the callback to the appsink signal
appsink.connect("new-sample", on_new_sample)
# Start the pipeline
pipeline.set_state(Gst.State.PLAYING)
pipeline_start_timestamp = time.time_ns()
return appsink, pipeline_start_timestamp
def run_inference(frame, capture_ts_offset, capture_ts, load_ts,preprocessed_ts):
# Run inference on the image in memory
trt_model.predict(frame, batch = 1, verbose = False)
# Record the timestamp after inference
after_inference_ts = time.time_ns()
# Calculate inference time
latency_time = after_inference_ts - capture_ts
# Append results for this image to the results list
result = [capture_ts_offset, capture_ts, load_ts,preprocessed_ts, after_inference_ts, latency_time]
return result
# Load the exported TensorRT model
trt_model = YOLO("yolov8n_int8_batch1.engine")
#Initialize the camera gstream pipeline
appsink, pipeline_start_timestamp = init_gstream()
results = []
# Run the GMainLoop to keep the pipeline running
loop = GLib.MainLoop()
start_ts = time.time_ns()
try:
# while(time.time_ns() <= (start_ts + 20000000000)):#20seconds
loop.run()
except KeyboardInterrupt:
print("Pipeline interrupted")
results_np = np.array(results)
print(f"Latency: {results_np[:,-1].mean()}")
# Save all results to CSV file at once
output_csv = "inference_times_seg_1080p.csv"
with open(output_csv, mode='w', newline='') as file:
writer = csv.writer(file)
writer.writerow(["Image Capture Timestamp Offset","Image Capture Timestamp", "Image Load Timestamp","Preprocessed Image Timestamp", "After Inference Timestamp", "Latency Time (seconds)"])
writer.writerows(results)
print(f"Inference times saved to {output_csv}")
# Clean up when done
pipeline.set_state(Gst.State.NULL)
- I use the
appsink
element to capture frames and process the data as numpy. - The numpy data frame is directly passed to the YOLOv8 TensorRT engine for inference.
My questions are:
- How can I ensure that the memory remains in GPU buffers (
NVMM
) when processing frames for inference? - Is there a way to pass
NVMM
buffers directly into TensorRT without remapping or converting to a NumPy array? - Are there specific TensorRT model input formats (e.g.,
RGBA
) that allow such direct processing? - Any recommendations or best practices for further optimizing this pipeline?
I’d appreciate any guidance or suggestions from the community on ensuring an efficient zero-copy data path.
Thank you in advance!