How to pass MetaData from One pipeline node to another

Please provide complete information as applicable to your setup.

• Hardware Platform Jetson
• DeepStream Version 6.3
• JetPack Version 5.1.2
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only)
• Issue Type( questions, new requirements, bugs)
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

I have read some data (rtph264_src_pad_buffer_probe) from SEI during h264parser node, and then I want to save those data into Metadata and then I read those data during pgie_src_pad_buffer_probe.

Below is my deepstream pipeline

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys

sys.path.append('../')
import os
import gi

gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call

import pyds
import datetime

PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_BATCH_TIMEOUT_USEC = 33000

def parse_ntp_timestamp_from_sei(data):
    start_code_1 = -1
    start_code_2 = -1
    start_code_3 = -1
    start_code_4 = -1
    index = 0
    while index < len(data):
        start_code_1 = data[index]
        if start_code_1!=0:
            index+=1
            continue
        start_code_2 = data[index+1]
        start_code_3 = data[index+2]
        start_code_4 = data[index+3]
        if start_code_1 == 0 and start_code_2 == 0 and start_code_3 == 0 and start_code_4 > 0:
            nal_type = data[index+4]
            sei_payload_type = data[index+5]
            if nal_type==6 and sei_payload_type ==5:
                uuid = data[index+7:index+7+16].decode("utf-8")
                timestamp = data[index+23:index+23+13].decode("utf-8")
                return timestamp
        index+=1
    return ""

def rtph264_src_pad_buffer_probe(pad, info, u_data):
    print("rtph264_src_pad_buffer_probe")
    buf = info.get_buffer()
    if not buf:
        print("Unable to get GstBuffer for rtph264_src_pad_buffer_probe")
        return
    (result, mapinfo) = buf.map(Gst.MapFlags.READ)
    content =mapinfo.data
    timestamp = parse_ntp_timestamp_from_sei(content)
    buf.add_meta()
    customMeta : Gst.CustomMeta = buf.add_custom_meta("exposure_time")
    customMeta.
    print("SEI Timestamp:"+timestamp)
    # 获取字符串内容
    # 文件路径
    #file_path = '/tmp/data_1.txt'

    # 使用os.path.exists检查文件是否存在
    #if os.path.exists(file_path):
    #    return Gst.PadProbeReturn.OK

    #file = open(file_path,'wb')
    #file.write(content)
    #file.close()
    return Gst.PadProbeReturn.OK


def osd_sink_pad_buffer_probe(pad, info, u_data):
    frame_number = 0
    num_rects = 0

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        # Intiallizing object counter with 0.
        obj_counter = {
            PGIE_CLASS_ID_VEHICLE: 0,
            PGIE_CLASS_ID_PERSON: 0,
            PGIE_CLASS_ID_BICYCLE: 0,
            PGIE_CLASS_ID_ROADSIGN: 0
        }
        frame_number = frame_meta.frame_num
        num_rects = frame_meta.num_obj_meta
        l_obj = frame_meta.obj_meta_list
        while l_obj is not None:
            try:
                # Casting l_obj.data to pyds.NvDsObjectMeta
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                break
            obj_counter[obj_meta.class_id] += 1
            obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.8)  # 0.8 is alpha (opacity)
            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        # Acquiring a display meta object. The memory ownership remains in
        # the C code so downstream plugins can still access it. Otherwise
        # the garbage collector will claim it when this probe function exits.
        display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
        display_meta.num_labels = 1
        py_nvosd_text_params = display_meta.text_params[0]
        # Setting display text to be shown on screen
        # Note that the pyds module allocates a buffer for the string, and the
        # memory will not be claimed by the garbage collector.
        # Reading the display_text field here will return the C address of the
        # allocated string. Use pyds.get_string() to get the string content.
        py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(
            frame_number, num_rects, obj_counter[PGIE_CLASS_ID_VEHICLE], obj_counter[PGIE_CLASS_ID_PERSON])

        # Now set the offsets where the string should appear
        py_nvosd_text_params.x_offset = 10
        py_nvosd_text_params.y_offset = 12

        # Font , font-color and font-size
        py_nvosd_text_params.font_params.font_name = "Serif"
        py_nvosd_text_params.font_params.font_size = 10
        # set(red, green, blue, alpha); set to White
        py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)

        # Text background color
        py_nvosd_text_params.set_bg_clr = 1
        # set(red, green, blue, alpha); set to Black
        py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
        # Using pyds.get_string() to get display_text as string
        print(pyds.get_string(py_nvosd_text_params.display_text))
        pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK


def on_pad_added(src, pad, des):
    vpad = des.get_static_pad("sink")
    pad.link(vpad)


def main(args):
    # Check input arguments
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()

    queue1 = Gst.ElementFactory.make("queue", "queue1")

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    # Source element for reading from the file
    print("Creating rtspsrc \n ")
    global source
    source = Gst.ElementFactory.make("rtspsrc", "rtspsrc")
    if not source:
        sys.stderr.write(" Unable to create Source \n")
    source.connect("pad-added", on_pad_added, queue1)
    pyds.configure_source_for_ntp_sync(hash(source))

    print("Creating rtph264depay \n ")
    rtph264depay = Gst.ElementFactory.make("rtph264depay", "rtph264depay")
    if not rtph264depay:
        sys.stderr.write(" Unable to create rtph264depay \n")

    # Since the data format in the input file is elementary h264 stream,
    # we need a h264parser
    print("Creating H264Parser \n")
    h264parser = Gst.ElementFactory.make("h264parse", "h264-parser")
    if not h264parser:
        sys.stderr.write(" Unable to create h264 parser \n")

    # Use nvdec_h264 for hardware accelerated decode on GPU
    print("Creating Decoder \n")
    decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
    if not decoder:
        sys.stderr.write(" Unable to create Nvv4l2 Decoder \n")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    # Use nvinfer to run inferencing on decoder's output,
    # behaviour of inferencing is set through config file
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    # Use convertor to convert from NV12 to RGBA as required by nvosd
    nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
    if not nvvidconv:
        sys.stderr.write(" Unable to create nvvidconv \n")

    # Create OSD to draw on the converted RGBA buffer
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")

    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")

    # Finally render the osd output
    if is_aarch64():
        print("Creating nv3dsink \n")
        sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
        if not sink:
            sys.stderr.write(" Unable to create nv3dsink \n")
    else:
        print("Creating EGLSink \n")
        sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
        if not sink:
            sys.stderr.write(" Unable to create egl sink \n")

    print("Playing file %s " % args[1])
    source.set_property('location', args[1])
    if os.environ.get('USE_NEW_NVSTREAMMUX') != 'yes':  # Only set these properties if not using new gst-nvstreammux
        streammux.set_property('width', 1920)
        streammux.set_property('height', 1080)
        streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)

    streammux.set_property('batch-size', 1)
    pgie.set_property('config-file-path', "dstest1_pgie_config.txt")

    print("Adding elements to Pipeline \n")
    pipeline.add(queue1)
    pipeline.add(source)
    pipeline.add(rtph264depay)
    pipeline.add(h264parser)
    pipeline.add(decoder)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(nvvidconv)
    pipeline.add(nvosd)
    pipeline.add(sink)

    # we link the elements together
    # file-source -> h264-parser -> nvh264-decoder ->
    # nvinfer -> nvvidconv -> nvosd -> video-renderer
    print("Linking elements in the Pipeline \n")
    # source.link(rtph264depay)
    queue1.link(rtph264depay)
    rtph264depay.link(h264parser)
    h264parser.link(decoder)

    rtph264pad = rtph264depay.get_static_pad("src")
    rtph264pad.add_probe(Gst.PadProbeType.BUFFER, rtph264_src_pad_buffer_probe, 0)

    sinkpad = streammux.get_request_pad("sink_0")
    if not sinkpad:
        sys.stderr.write(" Unable to get the sink pad of streammux \n")
    srcpad = decoder.get_static_pad("src")
    if not srcpad:
        sys.stderr.write(" Unable to get source pad of decoder \n")
    srcpad.link(sinkpad)
    streammux.link(pgie)
    pgie.link(nvvidconv)
    nvvidconv.link(nvosd)
    nvosd.link(sink)

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)

    # Lets add probe to get informed of the meta data generated, we add probe to
    # the sink pad of the osd element, since by that time, the buffer would have
    # had got all the metadata.
    osdsinkpad = nvosd.get_static_pad("sink")
    if not osdsinkpad:
        sys.stderr.write(" Unable to get sink pad of nvosd \n")

    osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)

    # start play back and listen to events
    print("Starting pipeline \n")
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    pipeline.set_state(Gst.State.NULL)


if __name__ == '__main__':
    sys.exit(main(sys.argv))


You can refer to our C/C++ demo sources\apps\sample_apps\deepstream-gst-metadata-test to implement that in Python.

Hi yuweiw,

Do you have some example how do I replace below lines with Python code?

  /* Attach decoder metadata to gst buffer using gst_buffer_add_nvds_meta() */
  meta = gst_buffer_add_nvds_meta (buf, h264parse_meta, NULL,
      h264parse_meta_copy_func, h264parse_meta_release_func);

  /* Set metadata type */
  meta->meta_type = (GstNvDsMetaType)NVDS_GST_META_BEFORE_DECODER_EXAMPLE;

  /* Set transform function to transform decoder metadata from Gst meta to
   * nvds meta */
  meta->gst_to_nvds_meta_transform_func = h264parse_gst_to_nvds_meta_transform_func;

  /* Set release function to release the transformed nvds metadata */
  meta->gst_to_nvds_meta_release_func = h264parse_gst_nvds_meta_release_func;

From the Python API
I can invoke gst_buffer_add_nvds_meta like below, but I don’t know how do I provide that copu_fun and release_func?

pyds.gst_buffer_add_nvds_meta(hash(buf), timestamp,v2x_meta_data_copy_func,v2x_meta_data_release_func)

Could you refer to our FAQ to customize your own metas?

@yuweiw

Actually I am reading this customized NvDsUserMetaData. But my problem is I don’t how to read/write this kind of NvDsUserMetaData in my probe. As you can find I want to write below SEI timstamp into metadata. But inside this probe, there is no batch_meta to be accessed. And all of sample code I can find for python they begin with
batch_meta->l_frame->pyds.NvDsFrameMeta.cast(l_frame.data)->l_obj->pyds.NvDsObjectMeta.cast(l_obj.data)

In my probs, there is no batch_meta. I am stuck there.

def rtph264_src_pad_buffer_probe(pad, info, u_data):
    print("rtph264_src_pad_buffer_probe")
    buf = info.get_buffer()
    if not buf:
        print("Unable to get GstBuffer for rtph264_src_pad_buffer_probe")
        return
    (result, mapinfo) = buf.map(Gst.MapFlags.READ)
    content =mapinfo.data
    timestamp = parse_ntp_timestamp_from_sei(content)
    print("SEI Timestamp:"+timestamp)

    return Gst.PadProbeReturn.OK

If you have alternative ways to pass parameters between probe for same frame. It also can help me.

You can refer to our demo deepstream_custom_binding_test.py to pass the NVDS_USER_META between probe.

I learned this sample code. But it still only pass USER_META between 2 probes which can access batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer)).

And from my case, one of probe, I can’t access batch_meta. That’s why I want to know how to read/wirte USER_META in that kind of probe.

Of course, I mean everything in Python not c++

This is weird. Maybe the metadata was lost during the transformation from nvvideoconvert or other plugins. Could you add the probe function from upstream to downstream to see which plugin drops the metadata?

I give up for python, in c++ I can get read/write USER_META without batch_meta.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.