How to use TensorRT by the multi-threading package of python

Hi all,

Purpose: So far I need to put the TensorRT in the second threading.

I have read this document but I still have no idea how to exactly do TensorRT part on python.

I already have a sample which can successfully run on TRT.
Now I just want to run a really simple multi-threading code with TensorRT.
(I have done to generate the TensorRT engine, so I will load an engine and do TensorRT inference by multi-threading.)

Here is my code below. (Without the Tensorrt code)

import threading
import time
from my_tensorrt_code import TRTInference, trt

exitFlag = 0

class myThread(threading.Thread):
   def __init__(self, func, args):
      threading.Thread.__init__(self)
      self.func = func
      self.args = args
   def run(self):
      print ("Starting " + self.args[0])
      self.func(*self.args)
      print ("Exiting " + self.args[0])

if __name__ == '__main__':
    # Create new threads
    '''
    format thread:
        - func: function names, function that we wished to use
        - arguments: arguments that will be used for the func's arguments
    '''

    trt_engine_path = './tensorrt_engine.trt'

    max_batch_size = 1
    trt_inference_wrapper = TRTInference(trt_engine_path, 
        trt_engine_datatype=trt.DataType.FLOAT,
        batch_size=max_batch_size)

    # Get TensorRT SSD model output
    input_img_path = './testimage.png'

    thread1 = myThread(trt_inference_wrapper.infer, [input_img_path])

    # Start new Threads
    thread1.start()
    thread1.join()
    print ("Exiting Main Thread")

The part of TRTInference code is quite similar with TensorRT which provided the uff_ssd sample of Python part.

However, when I run this code, I always got this error messages on many platform (Desktop, TX2, and AGX).

[TensorRT] ERROR: ../rtSafe/cuda/caskConvolutionRunner.cpp (290) - Cask Error in checkCaskExecError<false>: 7 (Cask Convolution execution)
[TensorRT] ERROR: FAILED_EXECUTION: std::exception

I tracked the code and I found that this error message would get error during doing the do_inference function.

def do_inference(context, bindings, inputs, outputs, stream, batch_size=1):
    [cuda.memcpy_htod_async(inp.device, inp.host, stream) for inp in inputs]
    context.execute_async(batch_size=batch_size, bindings=bindings, stream_handle=stream.handle)
    [cuda.memcpy_dtoh_async(out.host, out.device, stream) for out in outputs]
    stream.synchronize()
    return [out.host for out in outputs]

Could you share me some suggestions that how to fix this error?
This error happened not only on desktop but also on Jetson devices…

Thank you so much!

Best regards,
Chieh

Reference:

1 Like

Hi,

We have a multi-threading TensorRT example for C++ here:

I will check your python sample and update more information with you later.
In general, you will need to create different context for the TensorRT to isolate the working environment.

Thanks.

Hi AastaLLL,

Thanks for your information.
Mainly, my purpose is link this graph below.

My other script is using Python, so that is why I am looking for the python solutions.

you will need to create different context for the TensorRT to isolate the working environment.

Thanks for your hint. However, I don’t know how to exactly deal with this part by code. As my script above, I directly called the TensorRT class by the multi-threading package of python. How can I create different context?

I will be grateful for any help you can provide

Thank you.

Chieh

Hi,

This issue can can be reproduced in our environment, and it is related to the context store /pop.
We are checking this and try to write a sample for you.
Will update more information later.

Thanks.

Hi @AastaLLL,

Got it!
Thank you so much!

Hi,

The sample is ready. The workflow looks like this:

  1. Master: create TensorRT engine and buffer, store the created CUDA context.
  2. Thread1: restore master’s CUDA context, run inference, destroy its CUDA context.
  3. Master: destroy the stored CUDA context.

The sample is created with TensorRT default MNIST model.

1. Prepare data

$ /usr/src/tensorrt/bin/trtexec --onnx=/usr/src/tensorrt/data/mnist/mnist.onnx --saveEngine=mnist.trt
$ cd /usr/src/tensorrt/data/mnist/
$ sudo pip3 install pillow
$ python3 download_pgms.py

2. Test

test.py

import threading
import time
from my_tensorrt_code import TRTInference, trt

exitFlag = 0

class myThread(threading.Thread):
   def __init__(self, func, args):
      threading.Thread.__init__(self)
      self.func = func
      self.args = args
   def run(self):
      print ("Starting " + self.args[0])
      self.func(*self.args)
      print ("Exiting " + self.args[0])

if __name__ == '__main__':
    # Create new threads
    '''
    format thread:
        - func: function names, function that we wished to use
        - arguments: arguments that will be used for the func's arguments
    '''

    trt_engine_path = 'mnist.trt'

    max_batch_size = 1
    trt_inference_wrapper = TRTInference(trt_engine_path,
        trt_engine_datatype=trt.DataType.FLOAT,
        batch_size=max_batch_size)

    # Get TensorRT SSD model output
    input_img_path = '/usr/src/tensorrt/data/mnist/3.pgm'

    thread1 = myThread(trt_inference_wrapper.infer, [input_img_path])

    # Start new Threads
    thread1.start()
    thread1.join()
    trt_inference_wrapper.destory();
    print ("Exiting Main Thread")

my_tensorrt_code.py

from PIL import Image
import numpy as np
import tensorrt as trt
import pycuda.autoinit
import pycuda.driver as cuda
import threading
import time
import math


class TRTInference:
    def __init__(self, trt_engine_path, trt_engine_datatype, batch_size):
        self.cfx = cuda.Device(0).make_context()
        stream = cuda.Stream()

        TRT_LOGGER = trt.Logger(trt.Logger.INFO)
        trt.init_libnvinfer_plugins(TRT_LOGGER, '')
        runtime = trt.Runtime(TRT_LOGGER)

        # deserialize engine
        with open(trt_engine_path, 'rb') as f:
            buf = f.read()
            engine = runtime.deserialize_cuda_engine(buf)
        context = engine.create_execution_context()

        # prepare buffer
        host_inputs  = []
        cuda_inputs  = []
        host_outputs = []
        cuda_outputs = []
        bindings = []

        for binding in engine:
            size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size
            host_mem = cuda.pagelocked_empty(size, np.float32)
            cuda_mem = cuda.mem_alloc(host_mem.nbytes)

            bindings.append(int(cuda_mem))
            if engine.binding_is_input(binding):
                host_inputs.append(host_mem)
                cuda_inputs.append(cuda_mem)
            else:
                host_outputs.append(host_mem)
                cuda_outputs.append(cuda_mem)

        # store
        self.stream  = stream
        self.context = context
        self.engine  = engine

        self.host_inputs = host_inputs
        self.cuda_inputs = cuda_inputs
        self.host_outputs = host_outputs
        self.cuda_outputs = cuda_outputs
        self.bindings = bindings


    def infer(self, input_img_path):
        threading.Thread.__init__(self)
        self.cfx.push()

        # restore
        stream  = self.stream
        context = self.context
        engine  = self.engine

        host_inputs = self.host_inputs
        cuda_inputs = self.cuda_inputs
        host_outputs = self.host_outputs
        cuda_outputs = self.cuda_outputs
        bindings = self.bindings

        # read image
        image = 1 - (np.asarray(Image.open(input_img_path), dtype=np.float)/255)
        np.copyto(host_inputs[0], image.ravel())

        # inference
        start_time = time.time()
        cuda.memcpy_htod_async(cuda_inputs[0], host_inputs[0], stream)
        context.execute_async(bindings=bindings, stream_handle=stream.handle)
        cuda.memcpy_dtoh_async(host_outputs[0], cuda_outputs[0], stream)
        stream.synchronize()
        print("execute times "+str(time.time()-start_time))

        # parse output
        output = np.array([math.exp(o) for o in host_outputs[0]])
        output /= sum(output)
        for i in range(len(output)): print("%d: %.2f"%(i,output[i]))

        self.cfx.pop()


    def destory(self):
        self.cfx.pop()
$ python3 test.py

Thanks.

8 Likes

Dear @AastaLLL,

Thanks for your help.
It is really helpful for me. Also, it can work on my case.

I noticed that mainly you put the self.cfx = cuda.Device(0).make_context(), self.cfx.push(), and self.cfx.pop() in your script to deal with the multi-threading case. It is really amazing.

Thank you so much.

Sincerely,
Chieh

Thank you so much both of you!

Hi there,

following up on the code provided by @AastaLLL, if I want to use two threads, should I initialize one TRTInference instance for each of the threads or can it be shared among threads? If not, how can I share the Engine among different streams?

Also, shouldn’t you do self.cfx.pop() at the end of the __init__ function? I mean, when you are in the inferfunction you have two self.cfxpushed on the device. Why do you need this?

HI mfoglio,

Please help to open a new topic if it’s still an issue. Thanks

hello @AastaLLL . after kill thread, recieved bellow error:

PyCUDA ERROR: The context stack was not empty upon module cleanup.
-------------------------------------------------------------------
A context was still active when the context stack was being
cleaned up. At this point in our execution, CUDA may already
have been deinitialized, so there is no way we can finish
cleanly. The program will be aborted now.
Use Context.pop() to avoid this problem.

How could i fix this problem

HI tienduchoangtb,

Please help to open a new topic if it’s still an issue. Thanks