Deepstream plugin causes buffer overwriting for parallel pipeline medias for python plugin

This is my setup

*nvcr.io/nvidia/tensorflow:21.08-tf2-py3 with deepstream docker container
• DeepStream Version: deepstream-6.0_6.0.0-1

I have developed a custom gst plugin using python. It is working fine with single pipeline. Whenever i’m using multiple pipelines (*even in different process), the final videos are same by mixing frames from the all pipeline. Seems the plugin instance is same for all pipelines. This is the code developed for the plugin.

import numpy as np
import cv2
import glob

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GObject, GstBase

from gst_hacks import map_gst_buffer, get_buffer_size
import queue
import threading

GST_OVERLAY_OPENCV = 'gstoverlayopencv'

# https://lazka.github.io/pgi-docs/GstBase-1.0/classes/BaseTransform.html
class GstOverlayOpenCv(GstBase.BaseTransform):

    CHANNELS = 3  # RGB 

    __gstmetadata__ = ("An example plugin of GstOverlayOpenCv",
                       "gst-filter/gst_overlay_opencv.py",
                       "gst.Element draw on image",
                       "Taras at LifeStyleTransfer.com")

    __gsttemplates__ = (Gst.PadTemplate.new("src",
                                            Gst.PadDirection.SRC,
                                            Gst.PadPresence.ALWAYS,
                                            Gst.Caps.from_string("video/x-raw,format=RGB")),
                        Gst.PadTemplate.new("sink",
                                            Gst.PadDirection.SINK,
                                            Gst.PadPresence.ALWAYS,
                                            Gst.Caps.from_string("video/x-raw,format=RGB")))
    
    def __init__(self):
        super(GstOverlayOpenCv, self).__init__()  

        # Overlay could be any of your objects as far as it implements __call__
        # and returns numpy.ndarray
        self.overlay = None
        self.queue = queue.Queue()
        self.GST_OVERLAY_DICT = {"key":"name","value":"PIXEL"}
        self.worker = threading.Thread(target=self.worker)
        self.worker.start()

    def worker(self):
        while True:
            item = self.queue.get()
            # Process the item here
            # print(item)
            self.GST_OVERLAY_DICT = item
            self.queue.task_done()

    def do_transform_ip(self, inbuffer):
        """
            Implementation of simple filter.
            All changes affected on Inbuffer

            Read more:
            https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstBaseTransform.html
        """

        success, (width, height) = get_buffer_size(self.srcpad.get_current_caps())
        if not success:
            # https://lazka.github.io/pgi-docs/Gst-1.0/enums.html#Gst.FlowReturn
            return Gst.FlowReturn.ERROR
       
        with map_gst_buffer(inbuffer, Gst.MapFlags.READ) as mapped:
            frame = np.ndarray((height, width, self.CHANNELS), buffer=mapped, dtype=np.uint8)

        # overlay = self.overlay()
        # x = width - overlay.shape[1] 
        # y = height - overlay.shape[0] 
        text = "Object"

        # get the text size
        font = cv2.FONT_HERSHEY_SIMPLEX
        font_scale = 0.5
        thickness = 2
        text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
        # calculate the position of the text to center it in the frame
        x = int((frame.shape[1] - text_size[0]) / 2)
        y = int((frame.shape[0] + text_size[1]) / 2)

        # write the text on the frame
        color = (0, 0, 255)  # red font color in BGR format
        cv2.putText(frame, str(self.GST_OVERLAY_DICT['value']), (x, y), font, font_scale, color, thickness)
        # cv2.rectangle(frame, (100, 100), (400, 400), (0, 0, 255), 2)
        # print(frame.shape)
        # draw_image(frame, overlay, x, y)

        return Gst.FlowReturn.OK


def register(plugin):
    # https://lazka.github.io/pgi-docs/#GObject-2.0/functions.html#GObject.type_register
    type_to_register = GObject.type_register(GstOverlayOpenCv)

    # https://lazka.github.io/pgi-docs/#Gst-1.0/classes/Element.html#Gst.Element.register
    return Gst.Element.register(plugin, GST_OVERLAY_OPENCV, 0, type_to_register)       


def register_by_name(plugin_name):
    
    # Parameters explanation
    # https://lazka.github.io/pgi-docs/Gst-1.0/classes/Plugin.html#Gst.Plugin.register_static
    name = plugin_name
    description = "gst.Element draws on image buffer"
    version = '1.12.4'
    gst_license = 'LGPL'
    source_module = 'gstreamer'
    package = 'gstoverlay'
    origin = 'lifestyletransfer.com'
    if not Gst.Plugin.register_static(Gst.VERSION_MAJOR, Gst.VERSION_MINOR,
                                      name, description,
                                      register, version, gst_license,
                                      source_module, package, origin):
        raise ImportError("Plugin {} not registered".format(plugin_name)) 
    return True

register_by_name(GST_OVERLAY_OPENCV)

This is the plugin memory utility part:

from ctypes import *
from contextlib import contextmanager

import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject

# CONST from gstconfig.h
# https://github.com/Kurento/gstreamer/blob/0d6031b200e189b391d9c0882760109c1d8cf837/win32/common/gstconfig.h#L67
_GST_PADDING = 4  # From gstconfig.h


class _GstMapInfo(Structure):
    _fields_ = [("memory", c_void_p),  # GstMemory *memory
                ("flags", c_int),  # GstMapFlags flags
                ("data", POINTER(c_byte)),  # guint8 *data
                ("size", c_size_t),  # gsize size
                ("maxsize", c_size_t),  # gsize maxsize
                ("user_data", c_void_p * 4),     # gpointer user_data[4]
                ("_gst_reserved", c_void_p * _GST_PADDING)]

_GST_MAP_INFO_POINTER = POINTER(_GstMapInfo)

_libgst = CDLL("libgstreamer-1.0.so.0")

# Specifying valid ctypes for C function's arguments
_libgst.gst_buffer_map.argtypes = [c_void_p, _GST_MAP_INFO_POINTER, c_int]
_libgst.gst_buffer_map.restype = c_int

_libgst.gst_buffer_unmap.argtypes = [c_void_p, _GST_MAP_INFO_POINTER]
_libgst.gst_buffer_unmap.restype = None

_libgst.gst_mini_object_is_writable.argtypes = [c_void_p]
_libgst.gst_mini_object_is_writable.restype = c_int


@contextmanager
def map_gst_buffer(pbuffer, flags):
    """
        Map Gst.Buffer for Read/Write

        :param pbuffer: https://lazka.github.io/pgi-docs/Gst-1.0/classes/Buffer.html
        :type pbuffer: Gst.Buffer

        :param flags: https://lazka.github.io/pgi-docs/Gst-1.0/flags.html#Gst.MapFlags
        :type flags: Gst.MapFlags
    """
    if pbuffer is None:
        raise TypeError("Cannot pass NULL to _map_gst_buffer")

    ptr = hash(pbuffer)  # Obraining pointer to buffer
    if flags & Gst.MapFlags.WRITE and _libgst.gst_mini_object_is_writable(ptr) == 0:
        raise ValueError("Writable array requested but buffer is not writeable")

    mapping = _GstMapInfo()
    success = _libgst.gst_buffer_map(ptr, mapping, flags)
    if not success:
        raise RuntimeError("Couldn't map buffer")
    try:
        yield cast(
            mapping.data, POINTER(c_byte * mapping.size)).contents
    finally:
        _libgst.gst_buffer_unmap(ptr, mapping)
        

def get_buffer_size(caps):
    """
        Returns width, height of buffer from caps

        :param caps: https://lazka.github.io/pgi-docs/Gst-1.0/classes/Caps.html
        :type caps: Gst.Caps

        :rtype: bool, (int, int)
    """

    caps_struct = caps.get_structure(0)
    (success, width) = caps_struct.get_int('width')
    if not success:
        return False, (0, 0)
    (success, height) = caps_struct.get_int('height')
    if not success:
        return False, (0, 0)    
    return True, (width, height)

and this is pipeline construction part


pipeline = Gst.parse_launch('rtspsrc location={} ! rtph264depay ! h264parse ! avdec_h264 ! tee name=t ! queue ! videoscale method=0 ! video/x-raw ! \
                            videoconvert name=m_videoconvert ! videorate name=m_videorate ! appsink name=appsink t. ! videoconvert name=vidconv1 ! gstoverlayopencv name=gstoverlay ! videoconvert name=vidconv2 ! \
                             nvvideoconvert ! capsfilter caps="video/x-raw(memory:NVMM), format=I420" ! nvv4l2h264enc ! rtph264pay ! udpsink name=udpsink'.format(self.location))

appsink = pipeline.get_by_name('appsink')
appsink.set_property('emit-signals', True)
caps = Gst.caps_from_string(
'video/x-raw, format=(string){BGR}')
appsink.set_property('caps', caps)
appsink.connect("new-sample", self.new_buffer, appsink)

self.gstoverlay = pipeline.get_by_name("gstoverlay")

# Make the UDP sink
updsink_port_num = 5400
# sink = Gst.ElementFactory.make("udpsink", "udpsink")
sink = pipeline.get_by_name('udpsink')
if not sink:
    sys.stderr.write(" Unable to create udpsink")

sink.set_property("host", "224.224.255.255")
sink.set_property("port", updsink_port_num)
sink.set_property("async", False)
sink.set_property("sync", 1)

# print("Adding elements to Pipeline \n")
sink.set_property("qos", 0)

# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()

rtsp_port_num = self.portnum

I am seeing blurry video mixed up from different sources whenever I am running pipelines parallel. I just want to add some polygons with python plugins asynchronously using this plugin. Please help to fix the shared buffer issue in the GStreamer custom plugin written using python.

  1. do you mean that starting two separate process with different RTSP source, , the two outputs are the same?
  2. need to check which element will change frame content, you might add probe function, then dump frames to file to check. here are some references:
    DeepStream SDK FAQ - #17 by mchi
    How to encode multiple images using the same objectMeta inside obj_meta_list - #7 by fanzh

Answer to your questions

  1. I am getting two outputs but are the same even with 2 rtsp sources, the video frames are interleaved or mixed up.
  2. I believe that gstoverlayopencv is the element that causes this issue, because in that gst plugins’s definition some kind of memory map is happening, in which it uses the same location for all the parallal pipelines. Which causes the mixing up. the memory map utility part is also written in python, it is given as plugin memory utility part.

But I don’t know what causes this behavior. Kindly help us, we are stuck with this feature for some weeks.

Got the answer, as I was using same udp sink for both pipelines, it was mixing it up, no problems in the plugins.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.