Adding appsink to python deepstream application causes existing pipeline to stop working

• Hardware Platform (Jetson / GPU): GPU
• DeepStream Version 7.1
• TensorRT Version 12.4
• NVIDIA GPU Driver Version (valid for GPU only) 550.67
• Issue Type( questions, new requirements, bugs) Question/Bug
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing) In detail below.

I’m trying to introduce some new probe-based functionality using an appsink in my Python Deepstream application, but when I add the appsink the rest of my pipeline stops working.

I have an existing tee and split the pipeline into two streams. At this point the pipeline is still working. I then link the tee to a new “history_queue” and the primary pipeline still successfully reaches my UI. I then create an appsink using this line of code:

history_sink = Gst.ElementFactory.make("appsink", f"video_history_sink_{stream_index}")

at this point everything is still working fine, however if I add the appsink to my pipline using self.pipeline.add(history_sink) (even without linking it) I no longer see any video feed in my original sink’s UI.

Essentially, the element can be created without errors, but just adding this element - without linking -causes the original pipeline to return no feed.

I have set the GST_DEBUG to 4, but do not see any errors I can action upon.

I receive several INFO statements about state changes such as: notifying about state-changed PAUSED to READY (VOID_PENDING pending). These do not appear when the appsink is not added. I’ll add an excerpt of INFO messages below.

  • GST_STATES gstelement.c:2806:gst_element_continue_state:<rtpstorage9> completed state change to NULL
  • GST_STATES notifying about state-changed READY to NULL (VOID_PENDING pending)
  • GST_STATES gstbin.c:2928:gst_bin_change_state_func:<rtpbin8> child 'rtpstorage9' changed state to 1(NULL) successfully
  • gst_bin_element_set_state:<rtpssrcdemux9> current READY pending VOID_PENDING, desired next NULL

Can you advise how I can debug this further? Or if there are any steps I’m missing in adding an appsink? I have also made sure that “drop” is True, “sync” is False and there are max buffers.

Thank you!

Is your pipeline similar to the following? You can refer to the following sample, which shows how to use appsink and nv3dsink with tee at the same time. This is a sample of native code, but Python should be similar.

/opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-appsrc-test/deepstream_appsrc_test_app.c

                            |  ... ..--> nv3dsink
.... nvvideoconvert --> tee |
                            |  --> appsink

Thanks, this helped a lot. It is very similar to the example.

However after adding logs the appsink appears to only be saving 64 bytes of data per “pull-sample”. Does this seem correct? I was hoping to access the full image/frame. Is there a setting I could be missing to achieve this?

This is my appsink configuration. (I’ve tried max-buffers at various numbers with little change)

appsink.set_property("emit-signals", True)
appsink.set_property("max-buffers", 2)
appsink.set_property("drop", True)
appsink.set_property("sync", False)

And here’s a log output. (Each frame logs exactly this)

Received buffer of size: 64 bytes
Buffer content: b'\x00\x00\x00\x00\x01\x00\x00\x00\x01\x00\x00\x00\x00\x00\x00\x00\x02\x00\x00\x00\x00\x00\x00\x00\x90\xa7\x04T\x8e\x7f\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00'

I assume it is an issue with the caps or conversions, but any attempts I make to change this kill the pipeline. I’ve tried a few variations of caps settings and configurations involving an nvvideoconvert.

Refer to this sample, I tested it in deepstream:7.1-triton-multiarch

import numpy as np
import sys

import gi

gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib

import pyds
import ctypes
import cupy as cp
import imageio

frame_num = 0

def on_new_sample(sink, user_data):
    sample = sink.emit("pull-sample")
    global frame_num
    frame_num += 1
    if sample:
        gst_buffer = sample.get_buffer()
        if gst_buffer:
            batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
            l_frame = batch_meta.frame_meta_list
            while l_frame is not None:
                try:
                    # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
                    # The casting is done by pyds.NvDsFrameMeta.cast()
                    # The casting also keeps ownership of the underlying memory
                    # in the C code, so the Python garbage collector will leave
                    # it alone.
                    frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                except StopIteration:
                    break
                # Create dummy owner object to keep memory for the image array alive
                owner = None
                # Getting Image data using nvbufsurface
                # the input should be address of buffer and batch_id
                # Retrieve dtype, shape of the array, strides, pointer to the GPU buffer, and size of the allocated memory
                data_type, shape, strides, dataptr, size = (
                    pyds.get_nvds_buf_surface_gpu(hash(gst_buffer), frame_meta.batch_id)
                )

                # dataptr is of type PyCapsule -> Use ctypes to retrieve the pointer as an int to pass into cupy
                ctypes.pythonapi.PyCapsule_GetPointer.restype = ctypes.c_void_p
                ctypes.pythonapi.PyCapsule_GetPointer.argtypes = [
                    ctypes.py_object,
                    ctypes.c_char_p,
                ]
                # Get pointer to buffer and create UnownedMemory object from the gpu buffer
                c_data_ptr = ctypes.pythonapi.PyCapsule_GetPointer(dataptr, None)
                unownedmem = cp.cuda.UnownedMemory(c_data_ptr, size, owner)
                # Create MemoryPointer object from unownedmem, at index 0
                memptr = cp.cuda.MemoryPointer(unownedmem, 0)
                # Create cupy array to access the image data. This array is in GPU buffer
                n_frame_gpu = cp.ndarray(
                    shape=shape,
                    dtype=data_type,
                    memptr=memptr,
                    strides=strides,
                    order="C",
                )
                # Initialize cuda.stream object for stream synchronization
                stream = cp.cuda.stream.Stream(
                    null=True
                )
                stream.synchronize()

                if frame_num % 500 == 0:
                    # Save the CuPy array to an image file
                    imageio.imsave(f"output-{frame_num}.png", cp.asnumpy(n_frame_gpu))
                try:
                    l_frame = l_frame.next
                except StopIteration:
                    break
    return Gst.FlowReturn.OK


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        print("End-of-stream")
        loop.quit()
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        print(f"Error: {err}, Debug info: {debug}")
        loop.quit()
    return True


def cb_newpad(decodebin, decoder_src_pad, sink_pad):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    features = caps.get_features(0)

    print("gstname=", gstname)
    if gstname.find("video") != -1:
        print("features=", features)
        if features.contains("memory:NVMM"):
            ret = decoder_src_pad.link(sink_pad)
            if ret != Gst.PadLinkReturn.OK:
                sys.stderr.write("Failed to link decoder\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")


def main(args):
    # Check input arguments
    if len(args) != 2:
        print(f"usage: {args[0]} uri")
        sys.exit(1)

    Gst.init(None)

    pipeline = Gst.Pipeline.new("test-pipeline")

    uridecodebin = Gst.ElementFactory.make("uridecodebin", "uridecodebin")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
    filter = Gst.ElementFactory.make("capsfilter", "filter1")
    tee = Gst.ElementFactory.make("tee", "tee")
    queue1 = Gst.ElementFactory.make("queue", "queue1")
    queue2 = Gst.ElementFactory.make("queue", "queue2")
    fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
    appsink = Gst.ElementFactory.make("appsink", "appsink")

    if (
        not pipeline
        or not uridecodebin
        or not streammux
        or not nvvidconv
        or not filter
        or not tee
        or not queue1
        or not queue2
        or not fakesink
        or not appsink
    ):
        print("One element could not be created. Exiting.")
        return -1

    uridecodebin.set_property("uri", args[1])

    caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
    filter.set_property("caps", caps)

    pipeline.add(uridecodebin)
    pipeline.add(streammux)
    pipeline.add(nvvidconv)
    pipeline.add(filter)
    pipeline.add(tee)
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(fakesink)
    pipeline.add(appsink)

    uridecodebin.connect("pad-added", cb_newpad, streammux.request_pad_simple("sink_0"))

    streammux.link(nvvidconv)
    nvvidconv.link(filter)
    filter.link(tee)

    tee_to_appsink_pad = tee.request_pad_simple("src_0")
    q1_sink_pad = queue1.get_static_pad("sink")
    tee_to_appsink_pad.link(q1_sink_pad)
    queue1.link(fakesink)

    tee_to_3d_pad = tee.request_pad_simple("src_1")
    q2_sink_pad = queue2.get_static_pad("sink")
    tee_to_3d_pad.link(q2_sink_pad)
    queue2.link(appsink)

    streammux.set_property("width", 1920)
    streammux.set_property("height", 1080)
    streammux.set_property("batch-size", 1)

    appsink.set_property("emit-signals", True)
    appsink.set_property("sync", False)
    appsink.set_property("drop", True)
    appsink.connect("new-sample", on_new_sample, pipeline)

    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)

    print("Starting pipeline")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass

    pipeline.set_state(Gst.State.NULL)


if __name__ == "__main__":
    sys.exit(main(sys.argv))

By the way, your TRT version does not match DS-7.1, which may cause some problems. It is recommended that you use Docker