That’s great thanks!
Libraries
Here are the versions of the relevant libraries:
cuda-python==12.3.0
Pillow==7.0.0
tensorrt==8.5.2.2
numpy==1.17.4
OpenCV doesn’t show up with pip list
so I assume it just came with Jetpack, here’s the version, but it shouldn’t matter too much I think:
>>> import cv2
>>> print(cv2.__version__)
4.5.4
I installed tensorrt as follows:
sudo apt-get install libopenblas-base libopenmpi-dev libomp-dev -y
sudo apt-get install tensorrt nvidia-tensorrt nvidia-tensorrt-dev python3-libnvinfer-dev -y
Code:
Here is my tensorrt_inference.py
class which we will later import to our actual inference script:
import ctypes
import numpy as np
import tensorrt as trt
from cuda import cuda, cudart
from typing import Union, Optional
TRT_LOGGER = trt.Logger(trt.Logger.WARNING)
def check_cuda_err(err):
if isinstance(err, cuda.CUresult):
if err != cuda.CUresult.CUDA_SUCCESS:
raise RuntimeError("Cuda Error: {}".format(err))
if isinstance(err, cudart.cudaError_t):
if err != cudart.cudaError_t.cudaSuccess:
raise RuntimeError("Cuda Runtime Error: {}".format(err))
else:
raise RuntimeError("Unknown error type: {}".format(err))
def cuda_call(call):
err, res = call[0], call[1:]
check_cuda_err(err)
if len(res) == 1:
res = res[0]
return res
class TensorRTInference:
def __init__(self, engine_path: str):
self.engine = self.load_engine(engine_path)
self.context = self.engine.create_execution_context()
print("TensorRT engine loaded successfully")
@staticmethod
def load_engine(engine_path) -> trt.ICudaEngine:
"""Load a TensorRT engine from file."""
with open(engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime:
return runtime.deserialize_cuda_engine(f.read())
def allocate_buffers(self):
inputs = []
outputs = []
bindings = []
stream = cuda_call(cudart.cudaStreamCreate())
tensor_names = [self.engine.get_tensor_name(i) for i in range(self.engine.num_io_tensors)]
for binding in tensor_names:
# get_tensor_profile_shape returns (min_shape, optimal_shape, max_shape)
# Pick out the max shape to allocate enough memory for the binding.
if binding == "boxes":
shape = (5, 4)
elif binding == "labels":
shape = (5,)
elif binding == "scores":
shape = (5,)
else:
shape = self.engine.get_tensor_shape(binding)
shape_valid = np.all([s >= 0 for s in shape])
if not shape_valid:
raise ValueError(f"Binding {binding} has dynamic shape, " + \
"but no profile was specified.")
size = trt.volume(shape)
trt_type = self.engine.get_tensor_dtype(binding)
# Allocate host and device buffers
if trt.nptype(trt_type):
dtype = np.dtype(trt.nptype(trt_type))
bindingMemory = HostDeviceMem(size, dtype)
else: # no numpy support: create a byte array instead (BF16, FP8, INT4)
size = int(size * trt_type.itemsize)
bindingMemory = HostDeviceMem(size)
# Append the device buffer to device bindings.
bindings.append(int(bindingMemory.device))
# Append to the appropriate list.
if self.engine.get_tensor_mode(binding) == trt.TensorIOMode.INPUT:
inputs.append(bindingMemory)
else:
outputs.append(bindingMemory)
return inputs, outputs, bindings, stream
def do_inference(self, inputs, outputs, bindings, stream):
def execute_async_func():
self.context.execute_async_v3(stream_handle=stream)
# Setup context tensor address.
num_io = self.engine.num_io_tensors
for i in range(num_io):
self.context.set_tensor_address(self.engine.get_tensor_name(i), bindings[i])
return self._do_inference_base(inputs, outputs, stream, execute_async_func)
def _do_inference_base(self, inputs, outputs, stream, execute_async_func):
# Transfer input data to the GPU.
kind = cudart.cudaMemcpyKind.cudaMemcpyHostToDevice
[cuda_call(cudart.cudaMemcpyAsync(inp.device, inp.host, inp.nbytes, kind, stream)) for inp in inputs]
# Run inference.
execute_async_func()
# Transfer predictions back from the GPU.
kind = cudart.cudaMemcpyKind.cudaMemcpyDeviceToHost
[cuda_call(cudart.cudaMemcpyAsync(out.host, out.device, out.nbytes, kind, stream)) for out in outputs]
# Synchronize the stream
cuda_call(cudart.cudaStreamSynchronize(stream))
# Return only the host outputs.
return [out.host for out in outputs]
class HostDeviceMem:
"""Pair of host and device memory, where the host memory is wrapped in a numpy array"""
def __init__(self, size: int, dtype: Optional[np.dtype] = None):
dtype = dtype or np.dtype(np.uint8)
nbytes = size * dtype.itemsize
host_mem = cuda_call(cudart.cudaMallocHost(nbytes))
pointer_type = ctypes.POINTER(np.ctypeslib.as_ctypes_type(dtype))
self._host = np.ctypeslib.as_array(ctypes.cast(host_mem, pointer_type), (size,))
self._device = cuda_call(cudart.cudaMalloc(nbytes))
self._nbytes = nbytes
@property
def host(self) -> np.ndarray:
return self._host
@host.setter
def host(self, data: Union[np.ndarray, bytes]):
if isinstance(data, np.ndarray):
if data.size > self.host.size:
raise ValueError(
f"Tried to fit an array of size {data.size} into host memory of size {self.host.size}"
)
np.copyto(self.host[:data.size], data.flat, casting='safe')
else:
assert self.host.dtype == np.uint8
self.host[:self.nbytes] = np.frombuffer(data, dtype=np.uint8)
@property
def device(self) -> int:
return self._device
@property
def nbytes(self) -> int:
return self._nbytes
def __str__(self):
return f"Host:\n{self.host}\nDevice:\n{self.device}\nSize:\n{self.nbytes}\n"
def __repr__(self):
return self.__str__()
def free(self):
cuda_call(cudart.cudaFree(self.device))
cuda_call(cudart.cudaFreeHost(self.host.ctypes.data))
Here is the data conversion file that I use to prepare the image data for our tensorrt engine/ neural network, augmentations.py
:
from PIL import Image
import numpy as np
import cv2
NORMALIZATION_MEAN = np.array([0.485, 0.456, 0.406])
NORMALIZATION_STD = np.array([0.229, 0.224, 0.225])
def image2tensor(image):
# Assume image is a PIL Image
# Convert PIL Image to a numpy array
np_image = np.array(image).astype(np.float32) / 255.0 # Scale to [0, 1]
np_image = np_image[:, :, ::-1] # Convert to BGR if needed
# Normalize
np_image = (np_image - NORMALIZATION_MEAN) / NORMALIZATION_STD
# Move axis to match the (C, H, W) format
np_image = np_image.transpose(2, 0, 1)
return np_image
def numpy2tensor(image):
# Assume image is a numpy array in BGR format
np_image = image.astype(np.float32) / 255.0 # Scale to [0, 1]
# Convert to RGB
np_image = np_image[:, :, ::-1]
# Normalize
np_image = (np_image - NORMALIZATION_MEAN) / NORMALIZATION_STD
# Move axis to match the (C, H, W) format
np_image = np_image.transpose(2, 0, 1)
return np_image
And here is the simpler version of an inference script that results in a crash:
import cv2
import os
import numpy as np
import data.augmentation as augmentations
from tensorrt_inference import TensorRTInference
ENGINE_PATH: str = "models/jetson_orin_dynamic_output.trt"
print("Creating capture and encoder pipelines...")
cap = cv2.VideoCapture(
'nvarguscamerasrc ! '
'video/x-raw(memory:NVMM) , width=1920, height=1080, format=NV12, framerate=30/1 ! '
'nvvidconv flip-method=2 ! '
'video/x-raw, width=1920, height=1080, format=BGRx ! '
'videoconvert ! '
'video/x-raw, format=BGR ! '
'appsink'
)
# Note: this one works, but only runs at about 5FPS which is too slow
# out = cv2.VideoWriter("test.avi", cv2.VideoWriter_fourcc(*'MJPG'), 30, (1920, 1080))
# Note: this can run at 20FPS, but causes a sudden reboot when ran with the tensorrt model
out = cv2.VideoWriter(
'appsrc is-live=true do-timestamp=true format=3 ! '
'video/x-raw, format=(string)BGR ! '
'videoconvert ! '
'video/x-raw, format=(string)NV12 ! '
'x264enc bitrate=4000 speed-preset=ultrafast tune=zerolatency ! '
'video/x-h264, profile=baseline ! '
'h264parse ! '
'qtmux ! '
'filesink location=output.mp4',
0, 30.0, (1920, 1080), True
)
print("Loading TensorRT model...")
tensorrt_model = TensorRTInference(ENGINE_PATH)
inputs, outputs, bindings, stream = tensorrt_model.allocate_buffers()
while cap.isOpened():
fps.start()
ret_val, img = cap.read()
if not ret_val:
print("Failed to get the frame from the camera.")
break
if os.name == 'nt':
img = cv2.resize(img, (1920, 1080))
img_tensor = augmentations.numpy2tensor(img)
out.write(img)
del img
# Run inference
np.copyto(inputs[0].host, img_tensor.ravel())
tensorrt_model.do_inference(inputs, outputs, bindings, stream)
# Get the output
res = [output.host for output in outputs]
print(res)
cap.release()
out.release()
I believe that should be everything needed to recreate. Thanks!