‘’‘Class for infer yolo trt engine’’’
import time
import numpy as np
import pycuda.autoinit
import pycuda.driver as cuda
import tensorrt as trt
from PIL import Image
import cv2
class HostDeviceMem(object):
def init(self, host_mem, device_mem):
self.host = host_mem
self.device = device_mem
def __str__(self):
return "Host:\n" + str(self.host) + "\nDevice:\n" + str(self.device)
def __repr__(self):
return self.__str__()
class TRTLoader:
def __init__(self, trt_engine_path,model_w, model_h,num_classes,threshold,nms_threshold):
self.model_w = model_w
self.model_h = model_h
self.min_confidence = threshold
self.NUM_CLASSES = num_classes
self.bgr_offsets = ( 103.939, 116.779, 123.68 )
self.trt_engine = self.load_engine(trt_engine_path)
self.context = self.trt_engine.create_execution_context()
inputs, outputs, bindings, stream = self.allocate_buffers(self.trt_engine)
self.inputs = inputs
self.outputs = outputs
self.bindings = bindings
self.stream = stream
self.nms_threshold=nms_threshold
def load_engine(self, engine_path):
'''Load TensorRT Engine'''
TRT_LOGGER = trt.Logger(trt.Logger.VERBOSE)
trt.init_libnvinfer_plugins(TRT_LOGGER, '')
trt_runtime = trt.Runtime(TRT_LOGGER)
with open(engine_path, "rb") as f:
engine_data = f.read()
engine = trt_runtime.deserialize_cuda_engine(engine_data)
return engine
def allocate_buffers(self,engine, batch_size=1):
"""Allocates host and device buffer for TRT engine inference.
This function is similair to the one in common.py, but
converts network outputs (which are np.float32) appropriately
before writing them to Python buffer. This is needed, since
TensorRT plugins doesn't support output type description, and
in our particular case, we use NMS plugin as network output.
Args:
engine (trt.ICudaEngine): TensorRT engine
Returns:
inputs [HostDeviceMem]: engine input memory
outputs [HostDeviceMem]: engine output memory
bindings [int]: buffer to device bindings
stream (cuda.Stream): cuda stream for engine inference synchronization
"""
inputs = []
outputs = []
bindings = []
stream = cuda.Stream()
# Current NMS implementation in TRT only supports DataType.FLOAT but
# it may change in the future, which could brake this sample here
# when using lower precision [e.g. NMS output would not be np.float32
# anymore, even though this is assumed in binding_to_type]
binding_to_type = {
'Input': np.float32,
'BatchedNMS': np.int32,
'BatchedNMS_1': np.float32,
'BatchedNMS_2': np.float32,
'BatchedNMS_3': np.float32
}
for binding in engine:
size = trt.volume(engine.get_binding_shape(binding)) * batch_size
dtype = binding_to_type[str(binding)]
# Allocate host and device buffers
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
# Append the device buffer to device bindings.
bindings.append(int(device_mem))
# Append to the appropriate list.
if engine.binding_is_input(binding):
inputs.append(HostDeviceMem(host_mem, device_mem))
else:
outputs.append(HostDeviceMem(host_mem, device_mem))
return inputs, outputs, bindings, stream
def process_image(self,arr):
'''Preprocessing for TensorRT Yolo Model'''
DTYPE_TRT = trt.float32
offsets = self.bgr_offsets
dtype = trt.nptype(DTYPE_TRT)
#Resize Image and Change datatype of image
image_resized=arr.resize((self.model_w, self.model_h), Image.BICUBIC)
image_resized = np.array(image_resized, dtype=dtype, order='C')
# HWC -> CHW
img_chw = image_resized.transpose((2, 0, 1))
img_chw[0] = img_chw[0] - offsets[0]
img_chw[1] = img_chw[1] - offsets[1]
img_chw[2] = img_chw[2] - offsets[2]
img_np = img_chw.ravel()
return img_np
# This function is generalized for multiple inputs/outputs.
# inputs and outputs are expected to be lists of HostDeviceMem objects.
def do_inference(self,context, bindings, inputs, outputs, stream, batch_size=1):
# Transfer input data to the GPU.
[cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]
# Run inference.
print('Input transfer completed')
context.execute_async(
batch_size=batch_size, bindings=bindings, stream_handle=stream.handle
)
print('Inference completed')
# Transfer predictions back from the GPU.
[cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]
# Synchronize the stream
stream.synchronize()
# Return only the host outputs.
return [out.host for out in outputs]
def _nms_boxes(self, boxes, box_confidences):
"""Apply the Non-Maximum Suppression (NMS) algorithm on the bounding boxes with their
confidence scores and return an array with the indexes of the bounding boxes we want to
keep (and display later).
Keyword arguments:
boxes -- a NumPy array containing N bounding-box coordinates that survived filtering,
with shape (N,4); 4 for x,y,height,width coordinates of the boxes
box_confidences -- a Numpy array containing the corresponding confidences with shape Nz
"""
x_coord = boxes[:, 0]
y_coord = boxes[:, 1]
width = boxes[:, 2]
height = boxes[:, 3]
areas = width * height
ordered = box_confidences.argsort()[::-1]
keep = list()
while ordered.size > 0:
# Index of the current element:
i = ordered[0]
keep.append(i)
xx1 = np.maximum(x_coord[i], x_coord[ordered[1:]])
yy1 = np.maximum(y_coord[i], y_coord[ordered[1:]])
xx2 = np.minimum(x_coord[i] + width[i], x_coord[ordered[1:]] + width[ordered[1:]])
yy2 = np.minimum(y_coord[i] + height[i], y_coord[ordered[1:]] + height[ordered[1:]])
width1 = np.maximum(0.0, xx2 - xx1 + 1)
height1 = np.maximum(0.0, yy2 - yy1 + 1)
intersection = width1 * height1
union = (areas[i] + areas[ordered[1:]] - intersection)
# Compute the Intersection over Union (IoU) score:
iou = intersection / union
# The goal of the NMS algorithm is to reduce the number of adjacent bounding-box
# candidates to a minimum. In this step, we keep only those elements whose overlap
# with the current bounding box is lower than the threshold:
indexes = np.where(iou <= self.nms_threshold)[0]
ordered = ordered[indexes + 1]
keep = np.array(keep)
return keep
def postprocess(self, outputs, wh_format=True):
"""
Postprocesses the inference output
Args:
outputs (list of float): inference output
min_confidence (float): min confidence to accept detection
analysis_classes (list of int): indices of the classes to consider
Returns: list of list tuple: each element is a two list tuple (x, y) representing the corners of a bb
"""
p_keep_count = outputs[0]
p_bboxes = outputs[1]
p_scores = outputs[2]
p_classes = outputs[3]
analysis_classes = list(range(self.NUM_CLASSES))
threshold = self.min_confidence
p_bboxes = np.array_split(p_bboxes,len(p_bboxes)/4)
bbs = []
class_ids = []
scores = []
x_scale = self.img_shape[1] / self.model_w
y_scale = self.img_shape[0] / self.model_h
for i in range(p_keep_count[0]):
assert(p_classes[i] < len(analysis_classes))
if p_scores[i]>threshold:
x1 = int(np.round(p_bboxes[i][0]*x_scale))
y1 = int(np.round(p_bboxes[i][1]*y_scale))
x2 = int(np.round(p_bboxes[i][2]*x_scale))
y2 = int(np.round(p_bboxes[i][3]*y_scale))
bbs.append([x1,y1,x2,y2])
class_ids.append(p_classes[i])
scores.append(p_scores[i])
#print(class_ids)
bbs = np.asarray(bbs)
#print(bbs)
class_ids = np.asarray(class_ids)
scores = np.asarray(scores)
nms_boxes, nms_categories, nscores = list(), list(), list()
for category in set(class_ids):
idxs = np.where(class_ids == category)
box = bbs[idxs]
category = class_ids[idxs]
confidence = scores[idxs]
keep = self._nms_boxes(box, confidence)
print('keep',keep)
nms_boxes.append(box[keep])
nms_categories.append(category[keep])
nscores.append(confidence[keep])
if len(nms_boxes)==0:
return [],[],[]
return nms_boxes, nms_categories, nscores
def predict(self,image):
"""Infers model on batch of same sized images resized to fit the model.
Args:
image_paths (str): paths to images, that will be packed into batch
and fed into model
"""
img = self.process_image(image)
print('Preprocessing completed')
self.img_shape = (image.size[1],image.size[0])
inputs = self.inputs
outputs = self.outputs
bindings = self.bindings
stream = self.stream
np.copyto(inputs[0].host, img)
print('Initalisation completed')
# When infering on single image, we measure inference
# time to output it to the user
# Fetch output from the model
detection_out = self.do_inference(
self.context, bindings=bindings, inputs=inputs, outputs=outputs, stream=stream
)
return detection_out
Here while do_inference
context.execute_async(
batch_size=batch_size, bindings=bindings, stream_handle=stream.handle
)
this inference line makes the prblm