Sequence of pictures to use object detection and tracking by appsrc

Hi, I am using a sequence of pictures to use object detection and tracking by appsrc. When in the pipeline don’t have primary-inference, they work fine, but if I add primary inference, I can’t see the picture shown in my display. Please help me!

Following my code

# Import libraries
#!/usr/bin/env python
import cv2
import pyds
import time
import json
import logging
from threading import Thread
import numpy as np
import sys
sys.path.append('../')
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from common.bus_call import bus_call
from common.is_aarch_64 import is_aarch64
from common.FPS import GETFPS
import ctypes
ctypes.cdll.LoadLibrary('yolov5/libYoloV5Decoder.so')
# END Import libraries


# Deepstream thread recive data
class DeepstreamThread():
    # Initialize
    def __init__(self, videoPath):
        self.videoPath = videoPath
        self.primaryInferencePath = 'yolov5/config_yolov5.txt'
        self.videoWidth = 1280
        self.videoHeight = 720
        self.videoFPS = 5
    # END Initialize

    # Set buffer
    def NPArrayToGSTBuffer(self, array) -> Gst.Buffer:
        """Converts numpy array to Gst.Buffer"""
        return Gst.Buffer.new_wrapped(array.tobytes())
    # END Set buffer

    # Run pipline
    def _RunPipline (self):
        try:
            # GStreamer
            # Standard GStreamer initialization
            GObject.threads_init()
            Gst.init(None)
            global fpsStream
            fpsStream = GETFPS(0)
            print(fpsStream)

            # Create Pipeline element that will form a connection of other elements
            print("Creating Pipeline \n ")
            self.pipeline = Gst.Pipeline()

            if not self.pipeline:
                sys.stderr.write(" Unable to create Pipeline \n")

            # Use nvinfer to run inferencing on decoder's output,
            # behaviour of inferencing is set through config file
            
            # Source element for reading from the file
            print("Creating Source \n ")
            self.appsource = Gst.ElementFactory.make("appsrc", "numpy-source")
            if not self.appsource:
                sys.stderr.write(" Unable to create Source \n")
            # nvvideoconvert to convert incoming raw buffers to NVMM Mem (NvBufSurface API)
            nvVideoConvert = Gst.ElementFactory.make("nvvideoconvert","nv-videoconv")
            if not nvVideoConvert:
                sys.stderr.write(" error nvvid1")

            capsFilter = Gst.ElementFactory.make("capsfilter","capsfilter1")
            if not capsFilter:
                sys.stderr.write(" error capsf1")

            # Create nvstreammux instance to form batches from one or more sources.
            streamMux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
            if not streamMux:
                sys.stderr.write(" Unable to create NvStreamMux \n")

            # Use nvinfer to run inferencing on decoder's output,
            # behaviour of inferencing is set through config file
            primaryGPUInferenceEngine = Gst.ElementFactory.make("nvinfer", "primary-inference")
            if not primaryGPUInferenceEngine:
                sys.stderr.write(" Unable to create pgie \n")

            # Use convertor to convert from NV12 to RGBA as required by nvosd
            nvVideoConvertRGBA = Gst.ElementFactory.make("nvvideoconvert", "convertor")
            if not nvVideoConvertRGBA:
                sys.stderr.write(" Unable to create nvvidconv \n")

            # Create OSD to draw on the converted RGBA buffer
            nvOnScreenDisplay = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
            if not nvOnScreenDisplay:
                sys.stderr.write(" Unable to create nvosd \n")

            transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
            
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")

            # caps_in = Gst.Caps.from_string("video/x-raw,format=RGBA,width=2152,height=1944,framerate=30/1")
            # caps = Gst.Caps.from_string("video/x-raw(memory:NVMM),format=NV12,width=2152,height=1944,framerate=30/1")
            capsIn = Gst.Caps.from_string("video/x-raw,format=RGBA,width=" + str(self.videoWidth) + ",height=" + str(self.videoHeight)+ ",framerate=60/1")
            caps = Gst.Caps.from_string("video/x-raw(memory:NVMM),format=NV12,width=" + str(self.videoWidth) + ",height=" + str(self.videoHeight)+ ",framerate=60/1")
            # caps = Gst.Caps.from_string("video/x-raw(memory:NVMM)")
            self.appsource.set_property('caps', capsIn)
            capsFilter.set_property('caps',caps)

            streamMux.set_property('width', self.videoWidth)
            streamMux.set_property('height', self.videoHeight)
            streamMux.set_property('batch-size', 1)
            streamMux.set_property('batched-push-timeout', 4000000)
            sink.set_property('sync', False)
            primaryGPUInferenceEngine.set_property('config-file-path', "yolov5/config_yolov5.txt")

            print("Adding elements to Pipeline \n")
            self.pipeline.add(self.appsource)
            self.pipeline.add(nvVideoConvert)
            self.pipeline.add(capsFilter)
            self.pipeline.add(streamMux)
            self.pipeline.add(primaryGPUInferenceEngine)
            self.pipeline.add(nvVideoConvertRGBA)
            self.pipeline.add(nvOnScreenDisplay)
            self.pipeline.add(sink)
            

            # Working Link pipeline
            print("Linking elements in the Pipeline \n")
            self.appsource.link(nvVideoConvert)
            nvVideoConvert.link(capsFilter)
            # caps_filter.link(transform)

            sinkpad = streamMux.get_request_pad("sink_0")
            if not sinkpad:
                sys.stderr.write(" Unable to get the sink pad of streammux \n")

            srcpad = capsFilter.get_static_pad("src")
            if not srcpad:
                sys.stderr.write(" Unable to get source pad of caps_vidconvsrc \n")
            
            srcpad.link(sinkpad)
            streamMux.link(primaryGPUInferenceEngine)
            primaryGPUInferenceEngine.link(nvVideoConvertRGBA)
            nvVideoConvertRGBA.link(nvOnScreenDisplay)
            if is_aarch64():
                self.pipeline.add(transform)
                nvOnScreenDisplay.link(transform)
                transform.link(sink)
            else:
                nvOnScreenDisplay.link(sink)

            # create an event loop and feed gstreamer bus mesages to it
            self.loop = GObject.MainLoop()
            bus = self.pipeline.get_bus()
            bus.add_signal_watch()
            bus.connect("message", bus_call, self.loop)

            # bus.enable_sync_message_emission()
            # bus.connect('sync-message::element', self.SyncMessage)

            # Lets add probe to get informed of the meta data generated, we add probe to
            # the sink pad of the osd element, since by that time, the buffer would have
            # had got all the metadata.
            osdsinkpad = nvOnScreenDisplay.get_static_pad("sink")
            if not osdsinkpad:
                sys.stderr.write(" Unable to get sink pad of nvosd \n")

            osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, self.OsdSinkPadBufferProbe, 0)

            # start play back and listen to events
            print("Starting pipeline \n")
            self.pipeline.set_state(Gst.State.PLAYING)
        except Exception as ex:
            print("Exception runPipeline: ", ex)        
    # END Run pipline

    # Emit data to pipeline
    def _EmitDataToPipline(self):
        # self._EmitDataToPipline()
        captureVideo = cv2.VideoCapture(self.videoPath)
        while(captureVideo.isOpened()):
            ret, frame = captureVideo.read()
            if ret == True:
                try:
                    # Time get data
                    timeStart = time.time()
                    # Covert format to RGBA
                    npArrayData = cv2.cvtColor(frame, cv2.COLOR_BGR2RGBA)
                    cv2.imwrite("Test.jpg", npArrayData)
                    print(npArrayData)
                    # Emit data to pipeline
                    self.appsource.emit("push-buffer", self.NPArrayToGSTBuffer(npArrayData))
                    # Time after emit data
                    timeEnd = time.time()
                    # Calculate time process
                    deltaTime = timeEnd - timeStart
                    # Calculate sleep time
                    timeSleep = max(0, 1/self.videoFPS - deltaTime)
                    print(timeSleep)
                    time.sleep(timeSleep)
                except:
                    time.sleep(1/self.videoFPS)
                    pass
        self.appsource.emit("end-of-stream")
        try:
            self.loop.run()
        except:
            pass
        # cleanup
        self.pipeline.set_state(Gst.State.NULL)
        # END Emit data to pipeline

    def OsdSinkPadBufferProbe(self, pad, info, u_data):
        global fpsStream
        hasDefect = False
        # frameNumber=0
        # numRects=0
        #Intiallizing object counter with 0.
        gstBuffer = info.get_buffer()
        if not gstBuffer:
            print("Unable to get GstBuffer ")
            return

        # Retrieve batch metadata from the gstBuffer
        # Note that pyds.gstBuffer_get_nvds_batch_meta() expects the
        # C address of gstBuffer as input, which is obtained with hash(gstBuffer)
        batchMeta = pyds.gst_buffer_get_nvds_batch_meta(hash(gstBuffer))
        castFrame = batchMeta.frame_meta_list

        while castFrame is not None:
            try:
                # Note that lFrame.data needs a cast to pyds.NvDsFrameMeta
                # The casting is done by pyds.glist_get_nvds_frame_meta()
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                frameMeta = pyds.NvDsFrameMeta.cast(castFrame.data)
            except StopIteration:
                break

            # frameNumber = frameMeta.frame_num
            # numRects = frameMeta.num_obj_meta
            castObject = frameMeta.obj_meta_list
            while castObject is not None:
                try:
                    # Casting castObject.data to pyds.NvDsObjectMeta
                    #obj_meta=pyds.glist_get_nvds_object_meta(l_obj.data)
                    objMeta = pyds.NvDsObjectMeta.cast(castObject.data)
                except StopIteration:
                    break
                # Set Default
                objMeta.text_params.font_params.font_name = "FreeMono"
                objMeta.text_params.font_params.font_size = 30
                # End set default
                try: 
                    castObject = castObject.next
                except StopIteration:
                    break
            # Display fps
            fpsStream.get_fps()
            try:
                castFrame = castFrame.next
            except StopIteration:
                break
                
        return Gst.PadProbeReturn.OK
# END Deepstream thread recive data

if __name__ == '__main__':
    threadDeepstream = DeepstreamThread("video/VideoTest1.mp4")
    # threadDeepstream._RunPipline()
    thread1 = Thread(target = threadDeepstream._RunPipline()) 
    thread2 = Thread(target = threadDeepstream._EmitDataToPipline())
    
    thread1.start()
    thread2.start()

Please help me
Thanks

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) dGPU
• DeepStream Version 6.1
• JetPack Version (valid for Jetson only)
• TensorRT Version 8
• NVIDIA GPU Driver Version (valid for GPU only) 11.6
• Issue Type( questions, new requirements, bugs) question

what does “When in the pipeline don’t have primary-inference, they work fine,” means? what is your media pipeline? please refer to some samples deepstream-appsrc-test, deepstream_python_apps/apps at master · NVIDIA-AI-IOT/deepstream_python_apps · GitHub

1 Like

I mean that I use the input as numpy source. I see Gst.ElementFactory.make("appsrc", "numpy-source"). So I call my pipeline is appsrc, sorry for my bad English. I refer Appsrc with numpy input in Python - #8 by gautampt6ul . But I add primary inference, the pipeline doesn’t work. It’s show only black screen

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

Sorry for the late reply, first you can use file as input source, please refer to deepstream
then modify source to appsrc.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.