Gstreamer pipeline and OpenCV

Hi,
I want to render via OpenCV and get detect data.

import sys
sys.path.append('../')
import platform
import configparser

import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from gi.repository import GLib
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call

from common.FPS import GETFPS
import numpy as np
import pyds
import cv2
import os
import os.path
from os import path
from time import sleep
from threading import Thread
import keyboard

fps_streams={}

MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["Vehicle", "TwoWheeler", "Person","RoadSign"]

def nvanalytics_src_pad_buffer_probe(pad,info,u_data):
    #"""
    frame_number=0
    num_rects=0
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        l_obj=frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        obj_counter = {
        PGIE_CLASS_ID_VEHICLE:0,
        PGIE_CLASS_ID_PERSON:0,
        PGIE_CLASS_ID_BICYCLE:0,
        PGIE_CLASS_ID_ROADSIGN:0
        }
        print("#"*50)
        while l_obj:
            try: 
                # Note that l_obj.data needs a cast to pyds.NvDsObjectMeta
                # The casting is done by pyds.NvDsObjectMeta.cast()
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            l_user_meta = obj_meta.obj_user_meta_list
            # Extract object level meta data from NvDsAnalyticsObjInfo
            while l_user_meta:
                try:
                    user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
                    if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSOBJ.USER_META"):             
                        user_meta_data = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
                        if user_meta_data.dirStatus: print("Object {0} moving in direction: {1}".format(obj_meta.object_id, user_meta_data.dirStatus))                    
                        if user_meta_data.lcStatus: print("Object {0} line crossing status: {1}".format(obj_meta.object_id, user_meta_data.lcStatus))
                        if user_meta_data.ocStatus: print("Object {0} overcrowding status: {1}".format(obj_meta.object_id, user_meta_data.ocStatus))
                        if user_meta_data.roiStatus: print("Object {0} roi status: {1}".format(obj_meta.object_id, user_meta_data.roiStatus))
                except StopIteration:
                    break

                try:
                    l_user_meta = l_user_meta.next
                except StopIteration:
                    break
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break
    
        # Get meta data from NvDsAnalyticsFrameMeta
        l_user = frame_meta.frame_user_meta_list
        while l_user:
            try:
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.USER_META"):
                    user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
                    if user_meta_data.objInROIcnt: print("Objs in ROI: {0}".format(user_meta_data.objInROIcnt))                    
                    if user_meta_data.objLCCumCnt: print("Linecrossing Cumulative: {0}".format(user_meta_data.objLCCumCnt))
                    if user_meta_data.objLCCurrCnt: print("Linecrossing Current Frame: {0}".format(user_meta_data.objLCCurrCnt))
                    if user_meta_data.ocStatus: print("Overcrowding status: {0}".format(user_meta_data.ocStatus))
            except StopIteration:
                break
            try:
                l_user = l_user.next
            except StopIteration:
                break
        
        print("Frame Number=", frame_number, "stream id=", frame_meta.pad_index, "Number of Objects=",num_rects,"Vehicle_count=",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person_count=",obj_counter[PGIE_CLASS_ID_PERSON])
        # Get frame rate through this probe
        #fps_streams["stream{0}".format(frame_meta.pad_index)].get_fps()
        try:
            l_frame=l_frame.next
        except StopIteration:
            break
        print("#"*50)
        
        #sample = sink.emit('pull-sample')

    return Gst.PadProbeReturn.OK


Gst.init()
def gst_to_opencv(sample):
    buf = sample.get_buffer()
    caps = sample.get_caps()
    print(caps.get_structure(0).get_value('format'))
    print(caps.get_structure(0).get_value('height'))
    print(caps.get_structure(0).get_value('width'))

    print(buf.get_size())

    arr = np.ndarray(
        (caps.get_structure(0).get_value('height'),
         caps.get_structure(0).get_value('width'),
         3),
        buffer=buf.extract_dup(0, buf.get_size()),
        dtype=np.uint8)
    return arr
    
str_rtsp = "rtspsrc location=rtsp://admin:XXX@10.114.129.XXX name=src ! decodebin ! m.sink_0 nvstreammux name=m width=1280 height=720  batch-size=1 live-source=1 ! nvinfer config-file-path= dsnvanalytics_pgie_config.txt ! nvtracker ll-lib-file = /opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so ll-config-file=config_tracker_NvDCF_perf.yml tracker-width=640 tracker-height=384 ! nvdsanalytics name=nvanalytics config-file=config01.txt ! nvvideoconvert  ! nvdsosd name=nvdsosd ! nvvideoconvert  ! videoconvert ! appsink name=sink emit-signals=true caps=video/x-raw,format=BGR"

pipeline = Gst.parse_launch(str_rtsp)

sink = pipeline.get_by_name('sink')
nvanalytics = pipeline.get_by_name('nvanalytics')

nvanalytics_src_pad=nvanalytics.get_static_pad("src")
if not nvanalytics_src_pad:
    sys.stderr.write(" Unable to get src pad \n")
else:
    pass
    #nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)


pipeline.set_state(Gst.State.PLAYING)
while(1):
    sample = sink.emit('pull-sample')
    arr = gst_to_opencv(sample)
    cv2.imshow('VIDEO', arr)
    cv2.waitKey(1)
    print(arr)
    if keyboard.is_pressed('q'):  
        print('You Pressed A Key!')
        break  
pipeline.set_state(Gst.State.NULL)
cv2.destroyAllWindows()

Why are nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0) and sink.emit(‘pull-sample’) not working together?

example
to use nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)
edit comment

     nvanalytics = pipeline.get_by_name('nvanalytics')
     nvanalytics_src_pad=nvanalytics.get_static_pad("src")
     if not nvanalytics_src_pad:
         sys.stderr.write(" Unable to get src pad \n")
     else:
        pass
        nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)
     .
     .
     .
     """
     sample = sink.emit('pull-sample')
     arr = gst_to_opencv(sample)
     cv2.imshow('VIDEO', arr)
     cv2.waitKey(1)
     print(arr)
     """

How do nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0) and sink.emit(‘pull-sample’) work together?

Hi,
Please share the information for reference:
• Hardware Platform (Jetson / GPU)
• DeepStream Version
• JetPack Version (valid for Jetson only)
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only)
• Issue Type( questions, new requirements, bugs)
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

And we have a reference sample:
deepstream_python_apps/apps/deepstream-nvdsanalytics at master · NVIDIA-AI-IOT/deepstream_python_apps · GitHub
Please share a patch to the sample so that we can check further.

thx,I have successfully solved the problem.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.