Using TensorRT in multithreading always generate errors when exiting

I have this python code that will fire up 3 threads, 1 for face detection and 2 for face recognition, each thread will have its own model/engine. So far, the code works fine but whenever I trigger exiting (by pressing q), it always get Segmentation fault (core dumped). Is there anyway to fix this?

vinhtq115@Dell-G7-7588:~/PycharmProjects/FaceTensorRT$ python multithreading.py 
Loading embeddings: 100%|██████████████████| 18/18 [00:00<00:00, 391.32people/s]
[Detection thread] Reading engine from file weights/yolov5n-0.5.trt
[Recognition thread 0] Reading engine from file weights/arcface_mbf_fp16.trt
[TensorRT] WARNING: The logger passed into createInferRuntime differs from one already provided for an existing builder, runtime, or refitter. TensorRT maintains only a single logger pointer at any given time, so the existing value, which can be retrieved with getLogger(), will be used instead. In order to use a new logger, first destroy all existing builder, runner or refitter objects.

[Recognition thread 1] Reading engine from file weights/arcface_mbf_fp16.trt
[TensorRT] WARNING: The logger passed into createInferRuntime differs from one already provided for an existing builder, runtime, or refitter. TensorRT maintains only a single logger pointer at any given time, so the existing value, which can be retrieved with getLogger(), will be used instead. In order to use a new logger, first destroy all existing builder, runner or refitter objects.

[Recognition thread 0] Ready!
[Recognition thread 1] Ready!
All recognition threads ready. Starting detection process...
...
Exiting main thread.
[TensorRT] INTERNAL ERROR: [defaultAllocator.cpp::free::85] Error Code 1: Cuda Runtime (invalid argument)
[TensorRT] INTERNAL ERROR: [resources.h::operator()::445] Error Code 1: Cuda Driver (invalid device context)
[TensorRT] INTERNAL ERROR: [resources.h::operator()::445] Error Code 1: Cuda Driver (invalid device context)
[TensorRT] INTERNAL ERROR: [resources.h::operator()::445] Error Code 1: Cuda Driver (invalid device context)
Segmentation fault (core dumped)

Current code:

import cv2
import numpy as np
import os
import pycuda.driver as cuda
import tensorrt as trt
import threading
import time
from queue import Queue, Empty
from tqdm import tqdm
from Processing.ArcfaceProcessing import ArcfacePreProcessing, normalize
from Processing.YoloProcessing import YoloPreProcess, YoloPostProcess
from tensorrt_common import allocate_buffers, Do_Inference


cuda.init()
device = cuda.Device(0)
ctx = device.make_context()


class DetectionThread(threading.Thread):
    def __init__(self, yolo_trt_engine_path: str, queues: [Queue], ready_events: [threading.Event],
                 exit_event: threading.Event, conf_thres=0.7, iou_thres=0.5):
        super().__init__()
        self.cap = cv2.VideoCapture('test.mp4')
        self.queues = queues
        self.ready_events = ready_events
        self.exit_event = exit_event
        self.num_recognition_threads = len(queues)
        self.queue_count = 0  # Round-robin
        self.yolo_preprocess = YoloPreProcess()
        self.yolo_postprocess = YoloPostProcess(shape=(640, 640), orig_shape=(720, 1280),
                                                conf_thres=conf_thres, iou_thres=iou_thres, device='cpu')

        # Initialize YOLO
        TRT_LOGGER = trt.Logger()
        self.yolo_engine = None
        if os.path.exists(yolo_trt_engine_path):
            print(f'[Detection thread] Reading engine from file {yolo_trt_engine_path}')
            with open(yolo_trt_engine_path, 'rb') as f, trt.Runtime(TRT_LOGGER) as runtime:
                self.yolo_engine = runtime.deserialize_cuda_engine(f.read())

        if self.yolo_engine is None:
            print(f'[Detection thread] Failed to read engine from file!')
            exit(1)

    def run(self):
        ctx.push()
        # Wait for all recognition threads to be ready
        for i in range(self.num_recognition_threads):
            while not self.ready_events[i].is_set():
                time.sleep(0.01)
        print('All recognition threads ready. Starting detection process...')

        with self.yolo_engine.create_execution_context() as yolo_context:
            yolo_inputs, yolo_outputs, yolo_bindings, yolo_stream = allocate_buffers(self.yolo_engine)

            while True:
                start_time = time.time()
                ret, frame = self.cap.read()
                if not ret:
                    self.exit_event.set()
                    break

                out_frame = frame.copy()  # For drawing bounding boxes.

                # Preprocess frame
                p_frame = self.yolo_preprocess(frame)

                # Inference
                yolo_inputs[0].host = p_frame.reshape(-1)
                trt_outputs = Do_Inference(yolo_context, yolo_bindings, yolo_inputs, yolo_outputs, yolo_stream)

                # Postprocess
                pred = self.yolo_postprocess(trt_outputs)

                for i in range(len(pred)):
                    face_coor = pred[i]['face']
                    landmarks = pred[i]['landmark']

                    self.queues[self.queue_count].put((frame, landmarks))
                    self.queue_count = (self.queue_count + 1) % self.num_recognition_threads

                    # Draw rectangle around face
                    cv2.rectangle(out_frame, (face_coor[0], face_coor[1]), (face_coor[2], face_coor[3]), (0, 255, 0), 3)

                # Display the resulting frame
                end_time = time.time()
                cv2.putText(out_frame, '{:.3f}'.format(1.0 / (end_time - start_time)), (0, 25),
                            cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2, cv2.LINE_AA)
                cv2.imshow('Face Detection', out_frame)

                # Exit if 'q' is pressed
                if cv2.waitKey(1) & 0xFF == ord('q'):
                    self.exit_event.set()
                    break
            print(f'[Detection thread] Received exit signal.')
        ctx.pop()


class RecognitionThread(threading.Thread):
    def __init__(self, arcface_trt_engine_path, embeddings, embeddings_map, queue: Queue,
                 exit_event: threading.Event, recognition_ready_event: threading.Event, id: int):
        super().__init__()
        self.embeddings = embeddings
        self.embeddings_map = embeddings_map
        self.id = id
        self.queue = queue
        self.exit_event = exit_event
        self.recognition_ready_event = recognition_ready_event
        self.arcface_preprocess = ArcfacePreProcessing()
        self.sleep_time = 0.001

        # Initialize ArcFace
        TRT_LOGGER = trt.Logger()
        self.arcface_engine = None
        if os.path.exists(arcface_trt_engine_path):
            print(f'[Recognition thread {id}] Reading engine from file {arcface_trt_engine_path}')
            with open(arcface_trt_engine_path, "rb") as f, \
                    trt.Runtime(TRT_LOGGER) as runtime:
                self.arcface_engine = runtime.deserialize_cuda_engine(f.read())

        if self.arcface_engine is None:
            print(f'[Recognition thread {id}] Failed to read engine from file!')
            exit(1)

    def run(self):
        ctx.push()
        self.recognition_ready_event.set()
        print(f'[Recognition thread {self.id}] Ready!')

        with self.arcface_engine.create_execution_context() as arcface_context:
            arcface_inputs, arcface_outputs, arcface_bindings, arcface_stream = allocate_buffers(self.arcface_engine)

            while not self.exit_event.is_set():
                try:
                    frame, landmarks = self.queue.get(block=False)
                except Empty:
                    time.sleep(self.sleep_time)
                    continue

                face = self.arcface_preprocess(frame, landmarks)
                face = cv2.cvtColor(face, cv2.COLOR_BGR2RGB)

                arcface_inputs[0].host = self.arcface_preprocess.prepare(face, fp16=True)
                embedding = Do_Inference(arcface_context, arcface_bindings, arcface_inputs,
                                         arcface_outputs, arcface_stream)[0]
                embedding = normalize(embedding)

                cos_similarity = np.dot(self.embeddings, embedding.T).clip(min=0, max=1)
                idx = np.argmax(cos_similarity, axis=0)

                name = self.embeddings_map[idx]
                score = cos_similarity[idx]

                print(f'[Recognition thread {self.id}] Detected {name} with a score of {score}. '
                      f'Queue remaining items: {self.queue.qsize()}.')
            print(f'[Recognition thread {self.id}] Received exit signal.')
        ctx.pop()


if __name__ == '__main__':
    yolo_trt_engine_path = 'weights/yolov5n-0.5.trt'
    arcface_trt_engine_path = 'weights/arcface_mbf_fp16.trt'
    embeddings_root = 'embeddings'  # Path to embeddings

    # Load embeddings
    embeddings = []
    embeddings_map = []  # name
    for sub_folder in tqdm(os.listdir(embeddings_root), desc='Loading embeddings', unit='people'):
        # sub_folder = class
        for root, dirs, files in os.walk(os.path.join(embeddings_root, sub_folder)):
            for file in files:
                embed = np.load(os.path.join(root, file))
                embeddings.append(embed)
                embeddings_map.append(sub_folder)
    embeddings = np.array(embeddings)

    # Setup threading
    recognition_thread_nums = 2
    queue_size = 10
    queues = [Queue(queue_size) for _ in range(recognition_thread_nums)]
    exit_event = threading.Event()  # For exiting
    recognition_ready_events = [threading.Event() for _ in range(recognition_thread_nums)]

    # Init threads
    detection_thread = DetectionThread(yolo_trt_engine_path, queues, recognition_ready_events, exit_event)
    recognition_threads = [RecognitionThread(arcface_trt_engine_path, embeddings, embeddings_map, queues[i],
                                             exit_event, recognition_ready_events[i], i)
                           for i in range(recognition_thread_nums)]

    # Start threads
    [thread.start() for thread in recognition_threads]
    detection_thread.start()

    detection_thread.join()
    [thread.join() for thread in recognition_threads]
    print('Exiting main thread.')

tensorrt_common.py:

import tensorrt as trt
import pycuda.autoinit
import pycuda.driver as cuda


TRT_LOGGER = trt.Logger()


def Init_TensorRT(trt_path):
    with open(trt_path, 'rb') as f, \
            trt.Runtime(TRT_LOGGER) as runtime:
        engine = runtime.deserialize_cuda_engine(f.read())
        context = engine.create_execution_context()
        inputs, outputs, bindings, stream = allocate_buffers(engine)
        return [context, inputs, outputs, bindings, stream]


def allocate_buffers(engine):
    class HostDeviceMem(object):
        def __init__(self, host_mem, device_mem):
            """
            host_mem: cpu memory
            device_mem: gpu memory
            """
            self.host = host_mem
            self.device = device_mem

        def __str__(self):
            return "Host:\n" + str(self.host) + "\nDevice:\n" + str(self.device)

        def __repr__(self):
            return self.__str__()

    inputs, outputs, bindings = [], [], []
    stream = cuda.Stream()
    for binding in engine:
        size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size
        dtype = trt.nptype(engine.get_binding_dtype(binding))
        host_mem = cuda.pagelocked_empty(size, dtype)
        device_mem = cuda.mem_alloc(host_mem.nbytes)
        bindings.append(int(device_mem))
        if engine.binding_is_input(binding):
            inputs.append(HostDeviceMem(host_mem, device_mem))
        else:
            outputs.append(HostDeviceMem(host_mem, device_mem))
    return inputs, outputs, bindings, stream


def Do_Inference(context, bindings, inputs, outputs, stream):
    [cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]
    context.execute_async_v2(bindings=bindings, stream_handle=stream.handle)
    [cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]
    stream.synchronize()
    return [out.host for out in outputs]

Hi,
The below link might be useful for you
https://docs.nvidia.com/deeplearning/tensorrt/best-practices/index.html#thread-safety
https://docs.nvidia.com/cuda/cuda-c-programming-guide/index.html#stream-priorities
https://docs.nvidia.com/cuda/cuda-runtime-api/group__CUDART__STREAM.html
For multi threading/streaming, will suggest you to use Deepstream or TRITON
For more details, we recommend you to raise the query to the Deepstream or TRITON forum.

Thanks!

1 Like

According to the thread safety:

The TensorRT runtime can be used by multiple threads simultaneously, so long as each object uses a different execution context.

In my code, I already have ctx.pop() at the end of run of each thread. The problem only arises when an exit signal is evoked from Detection thread. While it doesn’t affect runtime (all threads/models behave correctly in while loop), I’d like to know if there is a way to eliminate the error when exiting.

Hi,

Hope following sample will be helpful to you.

Thank you.

1 Like