How to send metadata messages to the database without causing a freeze in the stream?

Hi,

• Hardware Platform (Jetson / GPU) :Jetson Nano
• DeepStream Version : 5.0
• JetPack Version (valid for Jetson only) :4.5
• TensorRT Version : 7.1.3
Deepstream_python-apps / deepstream_imagedata-multistream

I want to send the class_id of the detected objects to the car as a CanBus message. When I send the message under the tiler_sink_pad_buffer_probe function, the image freezes. Small delays in sending messages cause freezes in the stream. So I wrote some code to queue messages and send them with threads, but I don’t know how to integrate it into deepstream_imagedata_multistream. What changes I should do in deepstream_imagedata_multistream.py?
Here is the code:

import can
from can.interface import Bus
import threading
from concurrent.futures import ThreadPoolExecutor
import queue 
import time
import random

can.rc['interface']='socketcan'
can.rc['channel']='vcan0'
can.rc['bitrate']=250000


class Pipeline(queue.Queue):

    def __init__(self):

        super().__init__(maxsize=10)

    def get_message(self,name):
        value=self.get()
        return value


    def set_message(self,value,name):
        self.put(value)

**#The producer function acts like a function that detects objects. Assign fake class_id to message.**
def producer(pipeline,event):
    while not event.is_set():
        message=random.randint(0,4)
        pipeline.set_message(message,"Producer")



def consumer(pipeline,event):
    bus=can.interface.Bus()
    bus = can.interface.Bus(bustype='socketcan', channel='vcan0', bitrate=250000)
    while not event.is_set() or not pipeline.empty():
        message=pipeline.get_message("Consumer")

        msg=can.Message(arbitration_id=0xc0ffee,data=[message])
        try:
            bus.send(msg) #sending class_id message to virtual CanBus interface
            print(f'data:{msg}')
        except can.CanError:
            print("Message NOT sent")

if __name__=="__main__":
    
    pipeline=Pipeline()

    event=threading.Event()
    with ThreadPoolExecutor() as executor:
        executor.submit(producer,pipeline,event)
        executor.submit(consumer,pipeline,event)

        time.sleep(0.1)
        #logging.info("Main: about to set event")
        event.set()

The producer function acts like a function that detects objects. Assign fake class_id to message.

Regards,

Hi,
Sending messages under probe function is blocking, we have message converter and broker plugin, for convert and sending messages to server. in test4 sample both c++ version and python version demonstrated use “nvmsgconv” and “nvmsgbroker” plugins in the pipeline while output stream to display by branching the data through tee element, each branch provided queue for thread processing.

2 Likes