Refer to this sample, I tested it in deepstream:7.1-triton-multiarch
import numpy as np
import sys
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
import pyds
import ctypes
import cupy as cp
import imageio
frame_num = 0
def on_new_sample(sink, user_data):
sample = sink.emit("pull-sample")
global frame_num
frame_num += 1
if sample:
gst_buffer = sample.get_buffer()
if gst_buffer:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
# Create dummy owner object to keep memory for the image array alive
owner = None
# Getting Image data using nvbufsurface
# the input should be address of buffer and batch_id
# Retrieve dtype, shape of the array, strides, pointer to the GPU buffer, and size of the allocated memory
data_type, shape, strides, dataptr, size = (
pyds.get_nvds_buf_surface_gpu(hash(gst_buffer), frame_meta.batch_id)
)
# dataptr is of type PyCapsule -> Use ctypes to retrieve the pointer as an int to pass into cupy
ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [
ctypes.py_object,
ctypes.c_char_p,
]
# Get pointer to buffer and create UnownedMemory object from the gpu buffer
c_data_ptr = ctypes.pythonapi.PyCapsule_GetPointer(dataptr, None)
unownedmem = cp.cuda.UnownedMemory(c_data_ptr, size, owner)
# Create MemoryPointer object from unownedmem, at index 0
memptr = cp.cuda.MemoryPointer(unownedmem, 0)
# Create cupy array to access the image data. This array is in GPU buffer
n_frame_gpu = cp.ndarray(
shape=shape,
dtype=data_type,
memptr=memptr,
strides=strides,
order="C",
)
# Initialize cuda.stream object for stream synchronization
stream = cp.cuda.stream.Stream(
null=True
)
stream.synchronize()
if frame_num % 500 == 0:
# Save the CuPy array to an image file
imageio.imsave(f"output-{frame_num}.png", cp.asnumpy(n_frame_gpu))
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.FlowReturn.OK
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream")
loop.quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}, Debug info: {debug}")
loop.quit()
return True
def cb_newpad(decodebin, decoder_src_pad, sink_pad):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
features = caps.get_features(0)
print("gstname=", gstname)
if gstname.find("video") != -1:
print("features=", features)
if features.contains("memory:NVMM"):
ret = decoder_src_pad.link(sink_pad)
if ret != Gst.PadLinkReturn.OK:
sys.stderr.write("Failed to link decoder\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def main(args):
# Check input arguments
if len(args) != 2:
print(f"usage: {args[0]} uri")
sys.exit(1)
Gst.init(None)
pipeline = Gst.Pipeline.new("test-pipeline")
uridecodebin = Gst.ElementFactory.make("uridecodebin", "uridecodebin")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
filter = Gst.ElementFactory.make("capsfilter", "filter1")
tee = Gst.ElementFactory.make("tee", "tee")
queue1 = Gst.ElementFactory.make("queue", "queue1")
queue2 = Gst.ElementFactory.make("queue", "queue2")
fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
appsink = Gst.ElementFactory.make("appsink", "appsink")
if (
not pipeline
or not uridecodebin
or not streammux
or not nvvidconv
or not filter
or not tee
or not queue1
or not queue2
or not fakesink
or not appsink
):
print("One element could not be created. Exiting.")
return -1
uridecodebin.set_property("uri", args[1])
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter.set_property("caps", caps)
pipeline.add(uridecodebin)
pipeline.add(streammux)
pipeline.add(nvvidconv)
pipeline.add(filter)
pipeline.add(tee)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(fakesink)
pipeline.add(appsink)
uridecodebin.connect("pad-added", cb_newpad, streammux.request_pad_simple("sink_0"))
streammux.link(nvvidconv)
nvvidconv.link(filter)
filter.link(tee)
tee_to_appsink_pad = tee.request_pad_simple("src_0")
q1_sink_pad = queue1.get_static_pad("sink")
tee_to_appsink_pad.link(q1_sink_pad)
queue1.link(fakesink)
tee_to_3d_pad = tee.request_pad_simple("src_1")
q2_sink_pad = queue2.get_static_pad("sink")
tee_to_3d_pad.link(q2_sink_pad)
queue2.link(appsink)
streammux.set_property("width", 1920)
streammux.set_property("height", 1080)
streammux.set_property("batch-size", 1)
appsink.set_property("emit-signals", True)
appsink.set_property("sync", False)
appsink.set_property("drop", True)
appsink.connect("new-sample", on_new_sample, pipeline)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
print("Starting pipeline")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
sys.exit(main(sys.argv))
By the way, your TRT version does not match DS-7.1, which may cause some problems. It is recommended that you use Docker