This is my setup
*• nvcr.io/nvidia/tensorflow:21.08-tf2-py3 with deepstream docker container
• DeepStream Version: deepstream-6.0_6.0.0-1
I have developed a custom gst plugin using python. It is working fine with single pipeline. Whenever i’m using multiple pipelines (*even in different process), the final videos are same by mixing frames from the all pipeline. Seems the plugin instance is same for all pipelines. This is the code developed for the plugin.
import numpy as np
import cv2
import glob
import gi
gi.require_version('Gst', '1.0')
gi.require_version('GstBase', '1.0')
from gi.repository import Gst, GObject, GstBase
from gst_hacks import map_gst_buffer, get_buffer_size
import queue
import threading
GST_OVERLAY_OPENCV = 'gstoverlayopencv'
# https://lazka.github.io/pgi-docs/GstBase-1.0/classes/BaseTransform.html
class GstOverlayOpenCv(GstBase.BaseTransform):
CHANNELS = 3 # RGB
__gstmetadata__ = ("An example plugin of GstOverlayOpenCv",
"gst-filter/gst_overlay_opencv.py",
"gst.Element draw on image",
"Taras at LifeStyleTransfer.com")
__gsttemplates__ = (Gst.PadTemplate.new("src",
Gst.PadDirection.SRC,
Gst.PadPresence.ALWAYS,
Gst.Caps.from_string("video/x-raw,format=RGB")),
Gst.PadTemplate.new("sink",
Gst.PadDirection.SINK,
Gst.PadPresence.ALWAYS,
Gst.Caps.from_string("video/x-raw,format=RGB")))
def __init__(self):
super(GstOverlayOpenCv, self).__init__()
# Overlay could be any of your objects as far as it implements __call__
# and returns numpy.ndarray
self.overlay = None
self.queue = queue.Queue()
self.GST_OVERLAY_DICT = {"key":"name","value":"PIXEL"}
self.worker = threading.Thread(target=self.worker)
self.worker.start()
def worker(self):
while True:
item = self.queue.get()
# Process the item here
# print(item)
self.GST_OVERLAY_DICT = item
self.queue.task_done()
def do_transform_ip(self, inbuffer):
"""
Implementation of simple filter.
All changes affected on Inbuffer
Read more:
https://gstreamer.freedesktop.org/data/doc/gstreamer/head/gstreamer-libs/html/GstBaseTransform.html
"""
success, (width, height) = get_buffer_size(self.srcpad.get_current_caps())
if not success:
# https://lazka.github.io/pgi-docs/Gst-1.0/enums.html#Gst.FlowReturn
return Gst.FlowReturn.ERROR
with map_gst_buffer(inbuffer, Gst.MapFlags.READ) as mapped:
frame = np.ndarray((height, width, self.CHANNELS), buffer=mapped, dtype=np.uint8)
# overlay = self.overlay()
# x = width - overlay.shape[1]
# y = height - overlay.shape[0]
text = "Object"
# get the text size
font = cv2.FONT_HERSHEY_SIMPLEX
font_scale = 0.5
thickness = 2
text_size, _ = cv2.getTextSize(text, font, font_scale, thickness)
# calculate the position of the text to center it in the frame
x = int((frame.shape[1] - text_size[0]) / 2)
y = int((frame.shape[0] + text_size[1]) / 2)
# write the text on the frame
color = (0, 0, 255) # red font color in BGR format
cv2.putText(frame, str(self.GST_OVERLAY_DICT['value']), (x, y), font, font_scale, color, thickness)
# cv2.rectangle(frame, (100, 100), (400, 400), (0, 0, 255), 2)
# print(frame.shape)
# draw_image(frame, overlay, x, y)
return Gst.FlowReturn.OK
def register(plugin):
# https://lazka.github.io/pgi-docs/#GObject-2.0/functions.html#GObject.type_register
type_to_register = GObject.type_register(GstOverlayOpenCv)
# https://lazka.github.io/pgi-docs/#Gst-1.0/classes/Element.html#Gst.Element.register
return Gst.Element.register(plugin, GST_OVERLAY_OPENCV, 0, type_to_register)
def register_by_name(plugin_name):
# Parameters explanation
# https://lazka.github.io/pgi-docs/Gst-1.0/classes/Plugin.html#Gst.Plugin.register_static
name = plugin_name
description = "gst.Element draws on image buffer"
version = '1.12.4'
gst_license = 'LGPL'
source_module = 'gstreamer'
package = 'gstoverlay'
origin = 'lifestyletransfer.com'
if not Gst.Plugin.register_static(Gst.VERSION_MAJOR, Gst.VERSION_MINOR,
name, description,
register, version, gst_license,
source_module, package, origin):
raise ImportError("Plugin {} not registered".format(plugin_name))
return True
register_by_name(GST_OVERLAY_OPENCV)
This is the plugin memory utility part:
from ctypes import *
from contextlib import contextmanager
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
# CONST from gstconfig.h
# https://github.com/Kurento/gstreamer/blob/0d6031b200e189b391d9c0882760109c1d8cf837/win32/common/gstconfig.h#L67
_GST_PADDING = 4 # From gstconfig.h
class _GstMapInfo(Structure):
_fields_ = [("memory", c_void_p), # GstMemory *memory
("flags", c_int), # GstMapFlags flags
("data", POINTER(c_byte)), # guint8 *data
("size", c_size_t), # gsize size
("maxsize", c_size_t), # gsize maxsize
("user_data", c_void_p * 4), # gpointer user_data[4]
("_gst_reserved", c_void_p * _GST_PADDING)]
_GST_MAP_INFO_POINTER = POINTER(_GstMapInfo)
_libgst = CDLL("libgstreamer-1.0.so.0")
# Specifying valid ctypes for C function's arguments
_libgst.gst_buffer_map.argtypes = [c_void_p, _GST_MAP_INFO_POINTER, c_int]
_libgst.gst_buffer_map.restype = c_int
_libgst.gst_buffer_unmap.argtypes = [c_void_p, _GST_MAP_INFO_POINTER]
_libgst.gst_buffer_unmap.restype = None
_libgst.gst_mini_object_is_writable.argtypes = [c_void_p]
_libgst.gst_mini_object_is_writable.restype = c_int
@contextmanager
def map_gst_buffer(pbuffer, flags):
"""
Map Gst.Buffer for Read/Write
:param pbuffer: https://lazka.github.io/pgi-docs/Gst-1.0/classes/Buffer.html
:type pbuffer: Gst.Buffer
:param flags: https://lazka.github.io/pgi-docs/Gst-1.0/flags.html#Gst.MapFlags
:type flags: Gst.MapFlags
"""
if pbuffer is None:
raise TypeError("Cannot pass NULL to _map_gst_buffer")
ptr = hash(pbuffer) # Obraining pointer to buffer
if flags & Gst.MapFlags.WRITE and _libgst.gst_mini_object_is_writable(ptr) == 0:
raise ValueError("Writable array requested but buffer is not writeable")
mapping = _GstMapInfo()
success = _libgst.gst_buffer_map(ptr, mapping, flags)
if not success:
raise RuntimeError("Couldn't map buffer")
try:
yield cast(
mapping.data, POINTER(c_byte * mapping.size)).contents
finally:
_libgst.gst_buffer_unmap(ptr, mapping)
def get_buffer_size(caps):
"""
Returns width, height of buffer from caps
:param caps: https://lazka.github.io/pgi-docs/Gst-1.0/classes/Caps.html
:type caps: Gst.Caps
:rtype: bool, (int, int)
"""
caps_struct = caps.get_structure(0)
(success, width) = caps_struct.get_int('width')
if not success:
return False, (0, 0)
(success, height) = caps_struct.get_int('height')
if not success:
return False, (0, 0)
return True, (width, height)
and this is pipeline construction part
pipeline = Gst.parse_launch('rtspsrc location={} ! rtph264depay ! h264parse ! avdec_h264 ! tee name=t ! queue ! videoscale method=0 ! video/x-raw ! \
videoconvert name=m_videoconvert ! videorate name=m_videorate ! appsink name=appsink t. ! videoconvert name=vidconv1 ! gstoverlayopencv name=gstoverlay ! videoconvert name=vidconv2 ! \
nvvideoconvert ! capsfilter caps="video/x-raw(memory:NVMM), format=I420" ! nvv4l2h264enc ! rtph264pay ! udpsink name=udpsink'.format(self.location))
appsink = pipeline.get_by_name('appsink')
appsink.set_property('emit-signals', True)
caps = Gst.caps_from_string(
'video/x-raw, format=(string){BGR}')
appsink.set_property('caps', caps)
appsink.connect("new-sample", self.new_buffer, appsink)
self.gstoverlay = pipeline.get_by_name("gstoverlay")
# Make the UDP sink
updsink_port_num = 5400
# sink = Gst.ElementFactory.make("udpsink", "udpsink")
sink = pipeline.get_by_name('udpsink')
if not sink:
sys.stderr.write(" Unable to create udpsink")
sink.set_property("host", "224.224.255.255")
sink.set_property("port", updsink_port_num)
sink.set_property("async", False)
sink.set_property("sync", 1)
# print("Adding elements to Pipeline \n")
sink.set_property("qos", 0)
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
rtsp_port_num = self.portnum
I am seeing blurry video mixed up from different sources whenever I am running pipelines parallel. I just want to add some polygons with python plugins asynchronously using this plugin. Please help to fix the shared buffer issue in the GStreamer custom plugin written using python.