TensorRT multi stream

Description

I am trying to make inference from several threads at same time, in sync mode every thread should wait until other one done with CUDA ( via custom mutex ) otherwise its crash with memory problem
Which slow down the framerate from 60 FPS to 10~15FPS with 4 threads ( with 30~50% GPU usage ), I found out what in trtexec possible to setup stream so will be possible to inference several frame at the same time, so after hours google was made “this” but application crashing

[TRT] [E] 1: [pointWiseV2Helpers.cpp::launchPwgenKernel::280] Error Code 1: Cuda Driver (invalid resource handle)

Full code, I just tried to make more or less working example when several threads are living together and not blocking each other, working fine without cuda.Device(0).make_context() and in one thread, but dont want with and several threads

import math
import threading
import time

import tensorrt as trt  # NOQA
from collections import OrderedDict, namedtuple, deque
import numpy as np
import torch
import cv2
import pycuda.driver as cuda  # NOQA
#import pycuda.autoinit # NOQA
from PIL import Image

Binding = namedtuple('Binding', ('name', 'dtype', 'shape', 'data', 'ptr'))
logger = trt.Logger(trt.Logger.INFO)
trt.init_libnvinfer_plugins(logger, namespace="")

trt_runtime = trt.Runtime(logger)
trt_engine = None
with open('/root/model_v9_8.engine', 'rb') as f:
    trt_engine = trt_runtime.deserialize_cuda_engine(f.read())

streams = []

class HostDeviceMem(object):
    def __init__(self, host_mem, device_mem):
        self.host = host_mem
        self.device = device_mem

    def __str__(self):
        return "Host:\n" + str(self.host) + "\nDevice:\n" + str(self.device)

    def __repr__(self):
        return self.__str__()


def allocate_buffers(engine):
    inputs = []
    outputs = []
    bindings = []
    stream = cuda.Stream()

    # Current NMS implementation in TRT only supports DataType.FLOAT but
    # it may change in the future, which could brake this sample here
    # when using lower precision [e.g. NMS output would not be np.float32
    # anymore, even though this is assumed in binding_to_type]
    binding_to_type = {
        "images": np.float32,
        "Input": np.float32,
        "NMS": np.float32,
        "det_scores": np.float32,
        "det_boxes": np.float32,
        "NMS_1": np.int32,
        "num_dets": np.int32,
        "det_classes": np.int32,
    }

    for binding in engine:
        size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size
        dtype = binding_to_type[str(binding)]
        # Allocate host and device buffers
        host_mem = cuda.pagelocked_empty(size, dtype)
        device_mem = cuda.mem_alloc(host_mem.nbytes)
        # Append the device buffer to device bindings.
        bindings.append(int(device_mem))
        # Append to the appropriate list.
        if engine.binding_is_input(binding):
            inputs.append(HostDeviceMem(host_mem, device_mem))
        else:
            outputs.append(HostDeviceMem(host_mem, device_mem))
    return inputs, outputs, bindings, stream

def do_inference(context, bindings, inputs, outputs, stream, batch_size=1):
    [cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]
    context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
    [cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]
    stream.synchronize()
    return [out.host for out in outputs]

def load_into_numpy(image):
    (im_width, im_height) = image.size
    return np.array(image).reshape(
        (im_height, im_width, 3)
    ).astype(np.uint8)

def load_frame(arr):
    image = Image.fromarray(np.uint8(arr))
    model_input_width = 640
    model_input_height = 640
    # Note: Bilinear interpolation used by Pillow is a little bit
    # different than the one used by Tensorflow, so if network receives
    # an image that is not 300x300, the network output may differ
    # from the one output by Tensorflow
    image_resized = image.resize(
        size=(model_input_width, model_input_height),
        resample=Image.BILINEAR
    )
    img_np = load_into_numpy(image_resized)
    # HWC -> CHW
    img_np = img_np.transpose((2, 0, 1))
    # Normalize to [-1.0, 1.0] interval (expected by model)
    img_np = (2.0 / 255.0) * img_np - 1.0
    img_np = img_np.ravel()
    return img_np

cuda.init()
cfx = cuda.Device(0).make_context()
inputs, outputs, bindings, stream = allocate_buffers(trt_engine)
context = trt_engine.create_execution_context()


def cam(id: int):
    global trt_engine, cfx, inputs, outputs, bindings, stream, context


    cap = cv2.VideoCapture('test_videos/double.avi')
    fps = deque([0], maxlen=100)
    cfx.push()
    while cap.isOpened():
        p1 = time.perf_counter()
        print('===' * 10)
        _, frame = cap.read()

        img = load_frame(frame)

        np.copyto(inputs[0].host, img.ravel())

        detection_out = do_inference(
            context,
            bindings=bindings,
            inputs=inputs,
            outputs=outputs,
            stream=stream
        )
        for det in detection_out:
            print(det[0])

        current_fps = int(1000 / ((time.perf_counter() - p1) * 1000))
        fps.append(current_fps)
        avg_fps = sum(fps) / len(fps)
        print(f"Stream: {id}, NOW: {current_fps} AVG: {avg_fps}")
    cfx.pop()

cam(1)
# for id in range(0, 3):
#     threading.Thread(target=cam, args=[id], daemon=True).start()
#
# while True:
#     time.sleep(1)



Environment

TensorRT Version: TensorRT v8502
GPU Type: Jetson XavierNX
CUDA Version: 11.4
Operating System + Version: Ubuntu 20.04 ( Jetpack 5.1 )
Python Version (if applicable): 3.8.10

Hi,

The below links might be useful for you.
https://docs.nvidia.com/deeplearning/tensorrt/archives/tensorrt-803/best-practices/index.html#thread-safety

https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html

For multi-threading/streaming, will suggest you to use Deepstream or TRITON

For more details, we recommend you raise the query in Deepstream forum.

or

raise the query in Triton Inference Server Github instance issues section.

Thanks!

Hello again, after year, If needed for someone, here the working example of multithreading with rtsp streams

Required installed pycuda, preprocessing + pipeline for cv2 probably need adjust for your case

import time
import logging
import os
from threading import Thread

import cv2
import torchvision.transforms as transforms
import pycuda.driver as cuda
import tensorrt as trt
import numpy as np

cuda.init()
cfx = cuda.Device(0).make_context()

logger = trt.Logger(trt.Logger.VERBOSE)
trt.init_libnvinfer_plugins(logger, '')

logging.basicConfig(level=logging.DEBUG, format="%(asctime)s - %(name)s - %(levelname)s - %(message)s")

with open('../model_medium.engine', 'rb') as f, trt.Runtime(logger) as runtime:
    engine = runtime.deserialize_cuda_engine(f.read())


class Stream(Thread):
    def __init__(self, i: int = 0, rtsp: str = ""):
        super().__init__()
        self.index = i
        self.rtsp = rtsp
        self.log = logging.getLogger(f"Stream-{i}")
        self.running = True

    def run(self):
        global engine, cfx

        cfx.push()
        context = engine.create_execution_context()
        stream = cuda.Stream()
        host_inputs = []
        cuda_inputs = []
        host_outputs = []
        cuda_outputs = []
        bindings = []

        for binding in engine:
            batch_size = 1
            shape = engine.get_tensor_shape(binding)

            size = trt.volume(shape) * batch_size
            dtype = trt.nptype(engine.get_tensor_dtype(binding))
            host_mem = cuda.pagelocked_empty(size, dtype)
            cuda_mem = cuda.mem_alloc(host_mem.nbytes)

            bindings.append(int(cuda_mem))
            if engine.get_tensor_mode(binding) == trt.TensorIOMode.INPUT:
                host_inputs.append(host_mem)
                cuda_inputs.append(cuda_mem)
            else:
                host_outputs.append(host_mem)
                cuda_outputs.append(cuda_mem)

        cfx.pop()

        pipeline = [
            f"rtspsrc location={self.rtsp} user-id={os.getenv('RTSP_USERNAME')} user-pw={os.getenv('RTSP_PASSWORD')}",
            "rtph264depay",
            "h264parse",
            "nvv4l2decoder",
            "nvvidconv",
            "video/x-raw, format=BGRx",
            "videoscale",
            "videorate",
            "video/x-raw, width=640, height=640, framerate=20/1",
            "videoconvert",
            "video/x-raw, format=BGR",
            "appsink drop=true max-lateness=1000000000 max-buffers=5 emit-signals=true"
        ]
        cap = cv2.VideoCapture(" ! ".join(pipeline), cv2.CAP_GSTREAMER)
        assert cap.isOpened(), "Error: Could not open video"
        while self.running:
            ret, frame = cap.read()
            if not ret:
                break
            # Preprocess image using torchvision
            preprocess = transforms.Compose([
                transforms.ToTensor(),
                transforms.Normalize(mean=[0.485, 0.456, 0.406], std=[0.229, 0.224, 0.225]),
            ])
            img = preprocess(frame)
            img = img.unsqueeze(0)  # add batch dimension

            np.copyto(host_inputs[0], img.flatten())
            cfx.push()

            cuda.memcpy_htod_async(cuda_inputs[0], host_inputs[0], stream)
            context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
            for i in range(len(cuda_outputs)):
                cuda.memcpy_dtoh_async(host_outputs[i], cuda_outputs[i], stream)
            stream.synchronize()

            cfx.pop()

            num_dets = host_outputs[0].reshape((1, 1))
            det_boxes = host_outputs[1].reshape((1, 100, 4))
            det_scores = host_outputs[2].reshape((1, 100))
            det_classes = host_outputs[3].reshape((1, 100))

            num_detections = int(num_dets[0][0])
            self.log.info(f"{num_detections:-^60}")
            for i in range(num_detections):
                self.log.info(
                    f"Detection {i + 1}: {det_classes[0][i]}, {det_scores[0][i]:.2f}, {det_boxes[0][i]}")


streams = []
try:
    streams = [
        Stream(1, 'rtsp://192.168.178.94:554/11'),
        Stream(2, 'rtsp://192.168.178.94:554/11'),
        Stream(3, 'rtsp://192.168.178.94:554/11'),
        Stream(4, 'rtsp://192.168.178.94:554/11'),
    ]
    for s in streams:
        s.start()
    while True:
        time.sleep(1)
except KeyboardInterrupt:
    logging.info("Shutting down...")
    for s in streams:
        s.running = False
        s.join()
        logging.info(f"Stream {s.index} stopped")
    exit(0)

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.