AppSink element in Python DeepStream pipeline

• Hardware Platform (Jetson / GPU) : NVIDIA Jetson AGX Orin
• DeepStream Version : 7.0
• JetPack Version (valid for Jetson only) : 6.0
• TensorRT Version : 8.6.2.3
• Issue Type( questions, new requirements, bugs) : question

Hello,
I would like to plug appsink element right after nvinfer in my pipeline.
The extract of my pipeline looks like:

...
arc_pgie = create_pipeline_element("nvinfer", "primary-inference", "Primary Inference")
arc_pgie.set_property("config-file-path", config_path)

arc_appsink = create_pipeline_element("appsink", "appsink", " AppSink")
arc_appsink.set_property("emit-signals", True)
arc_appsink.set_property("sync", False)
arc_appsink.connect("new-sample", on_new_sample)  # Connect callback to new-sample signal
...

arc_pgie.link(arc_appsink)
...
pgie_src_pad = arc_pgie.get_static_pad("src")
if not pgie_src_pad:
    sys.stderr.write("Unable to get src pad of primary infer\n")

pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)

In nvinfer element i perform segmentation so pgie_src_pad_buffer_probe looks like:

def pgie_src_pad_buffer_probe(pad, info, u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.OK

    # Retrieve batch metadata from the gst_buffer
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    print(f"Batch size after inference: {batch_meta.num_frames_in_batch}")

    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break

            class_meta_list = obj_meta.classifier_meta_list
            while class_meta_list is not None:
                try:
                    classifier_meta = pyds.NvDsClassifierMeta.cast(class_meta_list.data)
                except StopIteration:
                    break

                label_info_list = classifier_meta.label_info_list
                while label_info_list is not None:
                    try:
                        label_info = pyds.NvDsLabelInfo.cast(label_info_list.data)
                    except StopIteration:
                        break

                    arcing_prob = label_info.result_prob
                    is_arcing = arcing_prob > 0.5
                    if not is_arcing:
                        continue

                    ntp_timestamp = int(frame_meta.ntp_timestamp / 1_000_000)
                    img = get_frame(gst_buffer, frame_meta.batch_id)

                    alpha = np.ones(img.shape[:-1]).astype(np.uint8) * 255
                    img = np.dstack((img, alhpa))
                    out_filepath224 = save_img(img, f"{ntp_timestamp}224")

                    redis_data = {
                      "timestamp": ntp_timestamp,
                      "arcing": int(is_arcing),
                      "probability": float(arcing_prob),
                      "files": {
                          "res224": out_filepath224,
                      },
                  }

                  r_client.zadd(
                      "arcing-detection", {json.dumps(redis_data): ntp_timestamp}
                  )

                    try:
                        label_info_list = label_info_list.next
                    except StopIteration:
                        break
                try:
                    class_meta_list = class_meta_list.next
                except StopIteration:
                    break
            try:
                l_obj = l_obj.next
            except StopIteration:
                break
        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

I would like to move this whole operation of saving a file and sending data to Redis:

out_filepath224 = save_img(img, f"{ntp_timestamp}224")

redis_data = {
    "timestamp": ntp_timestamp,
    "arcing": int(is_arcing),
    "probability": float(arcing_prob),
    "files": {
        "res224": out_filepath224,
    },
}

r_client.zadd(
    "segmentation", {json.dumps(redis_data): ntp_timestamp}
)

to appsink. How can I achieve this in Python? I tried using something like this:

def pgie_src_pad_buffer_probe(pad, info, u_data):
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.OK

    # Retrieve batch metadata from the gst_buffer
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    print(f"Batch size after inference: {batch_meta.num_frames_in_batch}")

    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break

            class_meta_list = obj_meta.classifier_meta_list
            while class_meta_list is not None:
                try:
                    classifier_meta = pyds.NvDsClassifierMeta.cast(class_meta_list.data)
                except StopIteration:
                    break

                label_info_list = classifier_meta.label_info_list
                while label_info_list is not None:
                    try:
                        label_info = pyds.NvDsLabelInfo.cast(label_info_list.data)
                    except StopIteration:
                        break

                    arcing_prob = label_info.result_prob
                    is_arcing = arcing_prob > 0.5
                    if not is_arcing:
                        continue

                    ntp_timestamp = int(frame_meta.ntp_timestamp / 1_000_000)
                    img = get_frame(gst_buffer, frame_meta.batch_id)

                    alpha = np.ones(img.shape[:-1]).astype(np.uint8) * 255
                    img = np.dstack((img, alhpa))

                    # ----------Here is the change-----------
                    # Create a new Gst.Buffer to push to appsink
                    img_bytes = img.tobytes()
                    buffer = Gst.Buffer.new_wrapped(img_bytes)
                    buffer.pts = ntp_timestamp
                    
                   # Push the buffer to the next element
                   pad.push(buffer)
                  
                   # ------------------------

                    try:
                        label_info_list = label_info_list.next
                    except StopIteration:
                        break
                try:
                    class_meta_list = class_meta_list.next
                except StopIteration:
                    break
            try:
                l_obj = l_obj.next
            except StopIteration:
                break
        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

However i received Error that looks like when performing push operation the frame_meta_list is no longer valid and it is None:

 l_frame = batch_meta.frame_meta_list
              ^^^^^^^^^^^^^^^^^^^^^^^^^^
AttributeError: 'NoneType' object has no attribute 'frame_meta_list'

What I want to achieve?

I would like to only process and extract image, probability and timestamp in probe function connected to nvinfer element and then perform post-processing of data in appsink. My question is, how can I pass this data from nvinfer element to the next element which is appsink? Is there maybe more efficient way like passing the images that are numpy arrays to filesink element and then adding probe function that saves the data to Redis?

If you upgrade to DS-7.1, there are new bindings that can be used to save frames or objects.
nvds_obj_enc_create_context/nvds_obj_enc_process/nvds_obj_enc_finish/nvds_obj_enc_destroy_context

No need to use appsink to avoid copying data to the CPU.

The following example using appsink works fine, save to /opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/deepstream-test1 as deepstream_test_appsink.py, then run

python3 deepstream_test_appsink.py /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264
#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
import os
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from common.platform_info import PlatformInfo
from common.bus_call import bus_call

import pyds

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_BATCH_TIMEOUT_USEC = 33000

def pgie_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    num_rects=0

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        #Intiallizing object counter with 0.
        obj_counter = {
            PGIE_CLASS_ID_VEHICLE:0,
            PGIE_CLASS_ID_PERSON:0,
            PGIE_CLASS_ID_BICYCLE:0,
            PGIE_CLASS_ID_ROADSIGN:0
        }
        frame_number=frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj=frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.8) #0.8 is alpha (opacity)
            try: 
                l_obj=l_obj.next
            except StopIteration:
                break

        # Acquiring a display meta object. The memory ownership remains in
        # the C code so downstream plugins can still access it. Otherwise
        # the garbage collector will claim it when this probe function exits.
        display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]
        # Setting display text to be shown on screen
        # Note that the pyds module allocates a buffer for the string, and the
        # memory will not be claimed by the garbage collector.
        # Reading the display_text field here will return the C address of the
        # allocated string. Use pyds.get_string() to get the string content.
        py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(frame_number, num_rects, obj_counter[PGIE_CLASS_ID_VEHICLE], obj_counter[PGIE_CLASS_ID_PERSON])

        # Now set the offsets where the string should appear
        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12

        # Font , font-color and font-size
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        # set(red, green, blue, alpha); set to White
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

        # Text background color
        py_nvosd_text_params.set_bg_clr = 1
        # set(red, green, blue, alpha); set to Black
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        # Using pyds.get_string() to get display_text as string
        print(pyds.get_string(py_nvosd_text_params.display_text))
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK	

def on_new_sample(sink):
    sample = sink.emit("pull-sample")
    if sample:
        buffer = sample.get_buffer()
        caps = sample.get_caps()

        width = caps.get_structure(0).get_value("width")
        height = caps.get_structure(0).get_value("height")

        features = caps.get_features(0)

        if features:
            print(f"Number of features: {features.get_size()}")
            for i in range(features.get_size()):
                feature = features.get_nth(i)
                print(f"Feature {i}: {feature}")
        else:
            print("No features found")
        print(f"Received frame: {width} x {height} == {hash(buffer)}")

    return Gst.FlowReturn.OK

def main(args):
    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    platform_info = PlatformInfo()
    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    # Source element for reading from the file
    print("Creating Source \n ")
    source = Gst.ElementFactory.make("filesrc", "file-source")
    if not source:
        sys.stderr.write(" Unable to create Source \n")

    # Since the data format in the input file is elementary h264 stream,
    # we need a h264parser
    print("Creating H264Parser \n")
    h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
    if not h264parser:
        sys.stderr.write(" Unable to create h264 parser \n")

    # Use nvdec_h264 for hardware accelerated decode on GPU
    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    # Use nvinfer to run inferencing on decoder's output,
    # behaviour of inferencing is set through config file
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    appsink = Gst.ElementFactory.make("appsink", "appsink")
    if not appsink:
        sys.stderr.write(" Unable to create appsink \n")

    appsink.set_property("emit-signals", True)
    appsink.connect("new-sample", on_new_sample)

    print("Playing file %s " %args[1])
    source.set_property('location', args[1])
    if os.environ.get('USE_NEW_NVSTREAMMUX') != 'yes': # Only set these properties if not using new gst-nvstreammux
        streammux.set_property('width', 1920)
        streammux.set_property('height', 1080)
        streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    
    streammux.set_property('batch-size', 1)
    pgie.set_property('config-file-path', "dstest1_pgie_config.txt")

    print("Adding elements to Pipeline \n")
    pipeline.add(source)
    pipeline.add(h264parser)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(appsink)

    # we link the elements together
    # file-source -> h264-parser -> nvh264-decoder ->
    # nvinfer -> nvvidconv -> nvosd -> video-renderer
    print("Linking elements in the Pipeline \n")
    source.link(h264parser)
    h264parser.link(decoder)

    sinkpad = streammux.request_pad_simple("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    pgie.link(appsink)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    # Lets add probe to get informed of the meta data generated, we add probe to
    # the sink pad of the osd element, since by that time, the buffer would have
    # had got all the metadata.
    pgiesrcpad = pgie.get_static_pad("src")
    if not pgiesrcpad:
        sys.stderr.write(" Unable to get sink pad of pgie \n")

    pgiesrcpad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)

    # start play back and listen to events
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))


@junshengy Thank You for this information and appsink example

I have 2 additional questions:

  1. Is it correct to assume that using appsink involves transferring data from the GPU to the CPU, which is inefficient? If so, is using new bindings to save frames or objects is more efficient approach?
  2. When using new bindings to save frames/objects, how should they be integrated into a GStreamer pipeline? Specifically, should this be attached as probe function to the nvinfer element? If so, what element should be used as the sink element after the nvinfer element?

Yes, It’s more efficient.

Any sink is ok,Please refer to the sample code pointed to by the link above for probe function.

1 Like

@junshengy Thank You for this clarification. So to gain the best performance and pipeline speed when using new bindings to save frames/objects, fakesink is the best option as a sink element after nvinfer or is there any other element worth considering?