How to Use GStreamer for Direct GPU Inference with TensorRT on NVIDIA Orin Nano?

Hello,

I’m working on a project with the NVIDIA Orin Nano, where I aim to capture frames from a CSI camera and perform inference using a YOLOv8 TensorRT engine. My goal is to optimize the pipeline so that the frames are transferred directly from the CSI camera to the GPU without unnecessary CPU memory transfers, ensuring the most efficient processing.

Here’s my current setup:

  1. Code:
from ultralytics import YOLO
import time
import cv2
import os
import csv
from tqdm import tqdm
import numpy as np
import torch  as tf
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GLib

# Define a callback function to process the frames
def on_new_sample(sink):
    sample = sink.emit("pull-sample")
    if sample:
        # Extract the buffer (raw frame data)
        buffer = sample.get_buffer()
        caps = sample.get_caps()
        
        # Get frame size and convert to numpy array
        frame_size = buffer.get_size()
        frame_data = buffer.extract_dup(0, frame_size)

        load_ts = time.time_ns()

        # Convert to a numpy array (assuming BGRx format)
        frame = np.ndarray(
            (640, 640, 4),  # Resolution and channels (BGRx)
            dtype=np.uint8,
            buffer=frame_data
        )
        capture_ts_offset = buffer.pts
        capture_ts = buffer.pts + pipeline_start_timestamp
        preprocessed_ts = time.time_ns()
        result = run_inference(frame[:, :, :3],capture_ts_offset,capture_ts,load_ts, preprocessed_ts)
        results.append(result)
        
        # Return Gst.FlowReturn.OK to continue receiving frames
        return Gst.FlowReturn.OK
    return Gst.FlowReturn.ERROR

def init_gstream():
    # Initialize GStreamer
    Gst.init(None)

    # Create pipeline
    pipeline_description = (
    'nvarguscamerasrc sensor-id=0 ! '
    'video/x-raw(memory:NVMM),width=1920,height=1080,framerate=60/1! '
    'nvvidconv ! '
    'video/x-raw,format=BGRx,width=640,height=640! '
    'appsink name=sink max-buffers=1 drop=true'
    )
    pipeline = Gst.parse_launch(pipeline_description)

    # Get the appsink element
    appsink = pipeline.get_by_name("sink")


    # Set the appsink to emit signals for new samples
    appsink.set_property("emit-signals", True)
    appsink.set_property("sync", False)
    # appsink.set_property("max-latency", 0)

    # Connect the callback to the appsink signal
    appsink.connect("new-sample", on_new_sample)

    # Start the pipeline
    pipeline.set_state(Gst.State.PLAYING)
    pipeline_start_timestamp = time.time_ns()

    return appsink, pipeline_start_timestamp

def run_inference(frame, capture_ts_offset, capture_ts, load_ts,preprocessed_ts):
    # Run inference on the image in memory
    trt_model.predict(frame, batch = 1, verbose = False)  
    
    # Record the timestamp after inference
    after_inference_ts = time.time_ns()
    
    # Calculate inference time
    latency_time = after_inference_ts - capture_ts
    
    # Append results for this image to the results list
    result = [capture_ts_offset, capture_ts, load_ts,preprocessed_ts, after_inference_ts, latency_time]
    
    return result

# Load the exported TensorRT model
trt_model = YOLO("yolov8n_int8_batch1.engine")

#Initialize the camera gstream pipeline
appsink, pipeline_start_timestamp = init_gstream()

results = []
 
# Run the GMainLoop to keep the pipeline running
loop = GLib.MainLoop()
start_ts = time.time_ns()
try:
    # while(time.time_ns() <= (start_ts + 20000000000)):#20seconds
    loop.run()
except KeyboardInterrupt:
    print("Pipeline interrupted")

results_np = np.array(results)
print(f"Latency: {results_np[:,-1].mean()}")

# Save all results to CSV file at once
output_csv = "inference_times_seg_1080p.csv"
with open(output_csv, mode='w', newline='') as file:
    writer = csv.writer(file)
    writer.writerow(["Image Capture Timestamp Offset","Image Capture Timestamp", "Image Load Timestamp","Preprocessed Image Timestamp", "After Inference Timestamp", "Latency Time (seconds)"])
    writer.writerows(results)

print(f"Inference times saved to {output_csv}")

# Clean up when done
pipeline.set_state(Gst.State.NULL)
  1. I use the appsink element to capture frames and process the data as numpy.
  2. The numpy data frame is directly passed to the YOLOv8 TensorRT engine for inference.

My questions are:

  1. How can I ensure that the memory remains in GPU buffers (NVMM) when processing frames for inference?
  2. Is there a way to pass NVMM buffers directly into TensorRT without remapping or converting to a NumPy array?
  3. Are there specific TensorRT model input formats (e.g., RGBA) that allow such direct processing?
  4. Any recommendations or best practices for further optimizing this pipeline?

I’d appreciate any guidance or suggestions from the community on ensuring an efficient zero-copy data path.

Thank you in advance!


Hi,
The optimal solution is to use DeepStream SDK. It is optimized without additional memory copy. Please follow document to install it and try:

Installation — DeepStream documentation
NVIDIA Metropolis - NVIDIA Docs

For Yolo models, please check

GitHub - marcoslucianops/DeepStream-Yolo: NVIDIA DeepStream SDK 7.1 / 7.0 / 6.4 / 6.3 / 6.2 / 6.1.1 / 6.1 / 6.0.1 / 6.0 / 5.1 implementation for YOLO models

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.