Metadata Sync Issue

Please provide complete information as applicable to your setup.

• Hardware Platform - GPU
• DeepStream Version - 7.0
• TensorRT Version - 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type - questions
• Question -
Hi,

I’m currently working on a video analytics project, and my application architecture is structured as follows:
decodebin(n) -> streammux -> nvinfer -> nvtracker -> nvanalytics -> streamdemux -> videoconvert(n) -> nvosd(n) -> videoconvert(n) -> jpegencoder(n) -> multifilesink

The requirement is to track all objects, calculate the object count for line crossing, and save an image every 10 seconds. My approach processes all frames from the RTSP streams through nvanalytics, and then, after nvanalytics, I pass only one frame every 10 seconds for further processing. This approach aims to reduce resource usage by limiting downstream processing.

While the application works fine, I’m facing an issue where, after removing the frame metadata from batchmeta , the frame and user metadata are not syncing correctly. This results in mismatched data, such as frames from one stream being paired with user metadata from another.

I would like to know:

  1. Why is this mismatch happening?
  2. Is my approach correct for this scenario?
  3. What would be the best approach to handle this requirement effectively?

The application functions properly for video files, but this issue occurs only with RTSP streams.

Thank you!

Code :

    import sys
sys.path.append('../')
import time
from temp_utils import read_yaml,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str
# from utils import read_yaml,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,send_message_to_iothub,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str
import gi
import configparser
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
from urllib.parse import urlparse
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
import os
import json
import pyds
import uuid

# logging.basicConfig(level=logging.INFO,filename="fps.log",format=" [ %(asctime)s  %(message)s ]\n ")


perf_data = None
MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_PERSON = 0
PGIE_CLASS_ID_BAG = 1
PGIE_CLASS_ID_FACE = 2
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MAX_RECT_CAPACITY = 16
MUXER_BATCH_TIMEOUT_USEC = 0.5*1e6
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
# OSD_PROCESS_MODE= 0 ## CPU Mode
OSD_PROCESS_MODE = 1 ## GPU Mode
OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["person","bag","face"]
no_of_streams = len(sys.argv)-1
config_file_path = "main_config.yaml"
first_frame_processed = False
curr_epoch = get_current_epoch_in_syd()
stream_start_time_dict = {i:None for i in range(no_of_streams)}
start_time_syd_dict = {i:curr_epoch for i in range(no_of_streams)}
start_time_syd_dict_json = {i:curr_epoch for i in range(no_of_streams)} 
json_dir_name = "jsons"
os.makedirs(json_dir_name,exist_ok=True)



def generate_unique_number():
    unique_number = uuid.uuid4().int
    truncated_number = int(str(unique_number)[:12])
    return truncated_number

global unique_number
unique_number = generate_unique_number()


def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        print("features=",features)
        if features.contains("memory:NVMM"):
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)
    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)

def create_source_bin(index,uri):
    print("Creating source bin")
    bin_name="source-bin-%02d" %index
    print('bin_name-',bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")
    uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    reconnect_attempts = -1
    reconnect_interval = 30 
    uri_decode_bin.set_property("rtsp-reconnect-attempts",reconnect_attempts)
    uri_decode_bin.set_property("rtsp-reconnect-interval",reconnect_interval)
    uri_decode_bin.set_property("message-forward", True)
    uri_decode_bin.set_property("uri",uri)
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin
 


def make_element(element_name, i):
    element = Gst.ElementFactory.make(element_name, element_name)
    if not element:
        sys.stderr.write(" Unable to create {0}".format(element_name))
    element.set_property("name", "{0}-{1}".format(element_name, str(i)))
    return element


def nvanalytics_src_pad_buffer_probe(pad, info, u_data):

    global start_time_syd_dict_json
    global first_frame_processed
    global stream_start_time_dict
    global perf_data

    no_of_sec_send_msg = u_data["no_of_sec_send_msg"]
    cam_ids = u_data["cam_id"]
    

    if not first_frame_processed:
   
        straem_start_time_ = get_current_epoch_in_syd()
        stream_start_time_dict = {i:straem_start_time_ for i in range(no_of_streams)}
        first_frame_processed = True


    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer")
        return Gst.PadProbeReturn.DROP
    
    batch_meta  = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    prev_frame = None

    while l_frame is not None:
        try:
            
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            stream_id = frame_meta.pad_index
            pts = frame_meta.buf_pts
            curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)

            if curr_time - start_time_syd_dict_json[stream_id] < no_of_sec_send_msg:

                if prev_frame is None:
                    batch_meta.frame_meta_list = l_frame.next
                else:
                    prev_frame.next = l_frame.next

                pyds.nvds_remove_frame_meta_from_batch(batch_meta, frame_meta)

                l_frame = l_frame.next

                stream_index = "stream{0}".format(stream_id)
                perf_data.update_fps(stream_index)
                continue

            start_time_syd_dict_json[stream_id] = curr_time
            
            syd_time_pts_ = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id])
            syd_time_pts_file = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id],"datetime")

            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            display_meta.num_labels = 1
            text_params = display_meta.text_params[0]

            # Set custom text and its properties
            text_params.display_text = f"{syd_time_pts_}"
            text_params.x_offset = 100  # X-coordinate
            text_params.y_offset = 50   # Y-coordinate
            text_params.font_params.font_name = "Serif"
            text_params.font_params.font_size = 12
            text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)  # RGBA (white)
            text_params.set_bg_clr = 1
            text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)  # RGBA (black with transparency)
            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

            frame_number = frame_meta.frame_num
            num_rects = frame_meta.num_obj_meta

            l_user = frame_meta.frame_user_meta_list
            while l_user:
                try:
                    user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                    if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.USER_META"):
                        user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
                        data = {
                        "stream_id": stream_id,
                        "camera_name": cam_ids[stream_id],
                        "unique_number_uuid": unique_number,
                        "batch_id": frame_meta.batch_id,
                        "frame_num": frame_number,
                        "buf_pts": pts,
                        "ntp_timestamp": frame_meta.ntp_timestamp,
                        "source_id": frame_meta.source_id,
                        "object_count": user_meta_data.objCnt,
                        "Number_of_objects": num_rects,
                        "object_in_ROI_count": user_meta_data.objInROIcnt,
                        "cumulative_line_cross_count": user_meta_data.objLCCumCnt,
                        "current_line_cross_count": user_meta_data.objLCCurrCnt,
                        "overcrowding_status": user_meta_data.ocStatus,
                        "unique_id": user_meta_data.unique_id,
                        "PTS_TIME": syd_time_pts_
                    }
                        p = os.path.join(json_dir_name, f"{syd_time_pts_file}_{stream_id}.json")
                        with open(p, "w") as f:
                            json.dump(data, f)
                        print(f"JSON saved: {p}")
                        # send_message_to_iothub(json.dumps(data))
                except StopIteration:
                    break
                l_user = l_user.next

            stream_index = "stream{0}".format(stream_id)
            perf_data.update_fps(stream_index)
            prev_frame = l_frame
            l_frame = l_frame.next

        except StopIteration:
            break
    
    if not batch_meta.num_frames_in_batch:
        #print("No elements in batch meta")
        return Gst.PadProbeReturn.DROP
    return Gst.PadProbeReturn.OK



        
def frame_filter_pad_probe(pad, info, user_data):
    global start_time_syd_dict
    
    base_path = user_data["base_path"]
    cam_id_ip_port = user_data["cam_id"]
    no_of_sec_send_msg = user_data["no_of_sec_send_msg"]

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return Gst.PadProbeReturn.OK

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame:

        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        stream_id = frame_meta.pad_index
        try:
            pts = frame_meta.buf_pts
            curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
            if (curr_time - start_time_syd_dict[stream_id]) >= no_of_sec_send_msg:
                syd_time_pts_file = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id],"datetime")
                frame_number = frame_meta.frame_num
                start_time_syd_dict[stream_id] = curr_time
                image_folder_path = create_dynamic_path(base_path, cam_id_ip_port)
                image_name = f"{syd_time_pts_file}_{frame_number}.jpg"
                image_save_path = os.path.join(image_folder_path,image_name)
                print(f"image save sucessfully image path : {image_save_path} stream id : {stream_id}")
                multifilesink = pad.get_parent_element()
                multifilesink.set_property("location", image_save_path)
        except Exception as e:
            print(f"Error occurred while save the image : {e}")

        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

def main(args,base_path,no_of_sec_send_msg,analytics_config_path,infer_config_path,tracker_config_path):

    if len(args) < 2:
        sys.stderr.write("usage: %s <uri1> [uri2] ... [uriN]\n" % args[0])
        sys.exit(1)
 
    global perf_data
    perf_data = PERF_DATA(len(args) - 1)
    number_sources=len(args)-1

    cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(args[1:])}
 
    platform_info = PlatformInfo()
   
    Gst.init(None)
 
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False
 
    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")

    print("Creating streamux \n ")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")
 
    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i+1]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname="sink_%u" %i
        sinkpad= streammux.request_pad_simple(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
    queue1=Gst.ElementFactory.make("queue","queue1")
    queue2=Gst.ElementFactory.make("queue","queue2")
    queue3=Gst.ElementFactory.make("queue","queue3")
    queue4=Gst.ElementFactory.make("queue","queue4")
    pipeline.add(queue1)
    pipeline.add(queue2)
    pipeline.add(queue3)
    pipeline.add(queue4)

    for q in [queue1,queue2,queue3,queue4]:
        q.set_property("max-size-buffers", 100)
        q.set_property("leaky", True)
 
    print("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")
 
    print("Creating nvtracker \n ")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        sys.stderr.write(" Unable to create tracker \n")
 
    print("Creating nvdsanalytics \n ")
    nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
    if not nvanalytics:
        sys.stderr.write(" Unable to create nvanalytics \n")
    nvanalytics.set_property("config-file", analytics_config_path)

    print("Creating nvstreamdemux")
    nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
    if not nvstreamdemux:
        sys.stderr.write(" Unable to create nvstreamdemux \n")

    if is_live:
        print("Atleast one of the sources is live")
        streammux.set_property('live-source', 1)
 
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    pgie.set_property('config-file-path', infer_config_path)
    pgie_batch_size=pgie.get_property("batch-size")

    if(pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
        pgie.set_property("batch-size",number_sources)

    config = configparser.ConfigParser()
    config.read(tracker_config_path)
    config.sections()
 
    for key in config['tracker']:
        if key == 'tracker-width' :
            tracker_width = config.getint('tracker', key)
            tracker.set_property('tracker-width', tracker_width)
        if key == 'tracker-height' :
            tracker_height = config.getint('tracker', key)
            tracker.set_property('tracker-height', tracker_height)
        if key == 'gpu-id' :
            tracker_gpu_id = config.getint('tracker', key)
            tracker.set_property('gpu_id', tracker_gpu_id)
        if key == 'll-lib-file' :
            tracker_ll_lib_file = config.get('tracker', key)
            tracker.set_property('ll-lib-file', tracker_ll_lib_file)
        if key == 'll-config-file' :
            tracker_ll_config_file = config.get('tracker', key)
            tracker.set_property('ll-config-file', tracker_ll_config_file)

    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(nvanalytics)
    pipeline.add(nvstreamdemux)

    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    queue2.link(tracker)
    tracker.link(queue3)
    queue3.link(nvanalytics)
    nvanalytics.link(queue4)
    queue4.link(nvstreamdemux)


    for i in range(number_sources):

        queue5 = make_element("queue", i)
        pipeline.add(queue5)

        queue5.set_property("max-size-buffers", 100)
        queue5.set_property("leaky", True)

        nvvideoconvert = make_element("nvvideoconvert", i)
        # nvvideoconvert = make_element("capsfilter", i)
        pipeline.add(nvvideoconvert)

        nvdsosd = make_element("nvdsosd", i)
        pipeline.add(nvdsosd)
        nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
        nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)
        
        # connect nvstreamdemux -> queue5
        padname = "src_%u" % i
        demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad \n")
 
        queuesinkpad = queue5.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad \n")
        demuxsrcpad.link(queuesinkpad)

        queue6 = make_element("queue", f"queue6_{i}")
        pipeline.add(queue6)


        queue6.set_property("max-size-buffers", 100)
        queue6.set_property("leaky", True)    # upstream

        nvvidconv2 = make_element("nvvideoconvert", f"nvvideoconvert_{i}")
        # nvvidconv2 = make_element("capsfilter", f"capsfilter_{i}")
        pipeline.add(nvvidconv2)

        jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
        jpegenc.set_property("quality", 50)
        if not jpegenc:
            sys.stderr.write("Unable to create nvjpegenc \n")
        pipeline.add(jpegenc)

        multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
        if not multifilesink:
            sys.stderr.write(" Unable to create multifilesink \n")

        multifilesink.set_property("post-messages", True)
        pipeline.add(multifilesink)

        queue5.link(nvvideoconvert)
        nvvideoconvert.link(nvdsosd)
        nvdsosd.link(queue6)

        queue6.link(nvvidconv2)
        nvvidconv2.link(jpegenc)
        jpegenc.link(multifilesink)

        uri_name = args[i + 1]

        probe_data = {
            "cam_id": cam_ids[i],
            "base_path":base_path,
            "no_of_sec_send_msg":no_of_sec_send_msg
            
        }

        sinkpad = multifilesink.get_static_pad("sink")
        if not sinkpad:
            sys.stderr.write("Unable to get sink pad of multifilesink \n")
            sys.exit(1)
        else:
            sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
            
 
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    nvanalytics_src_pad=nvanalytics.get_static_pad("src")

    if not nvanalytics_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, {"no_of_sec_send_msg":no_of_sec_send_msg,"cam_id":cam_ids})
        GLib.timeout_add(5000*2, perf_data.perf_print_callback)
 
    print("Now playing...")
    for i, source in enumerate(args):
        if (i != 0):
            print(i, ": ", source)
 
    print("Starting pipeline \n") 
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)
 
if __name__ == '__main__':
    s = time.time()
    print(f"start time ===================================> {convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),'datetime')}")
    yaml_con = read_yaml(config_file_path)
    image_dir_name = yaml_con["image_dir_name"]
    os.makedirs(image_dir_name,exist_ok=True)
    no_of_sec_send_msg = yaml_con["no_of_sec_send_msg"]
    analytics_config_path = yaml_con["analytics_config_path"]
    infer_config_path = yaml_con["infer_config_path"]
    tracker_config_path = yaml_con["tracker_config_path"]
    main(sys.argv,image_dir_name,no_of_sec_send_msg,analytics_config_path,infer_config_path,tracker_config_path)
    print(f"end time ===================================> {convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),'datetime')}\n No of Seconds =====================> {(time.time() - s):.2f} seconds")

why do you need to remove frame meta from batchmeta? can you simplify deepstream sample like deepstream_test_3.py to reproduce this issue?

Hi, I’m trying to process frames selectively using a condition after the nvanlytics step. For example, I only process frames where frame_num % 10 == 0. In my batch, I have 3 streams: {stream0: 10, stream1: 23, stream2: 30} (frame numbers). In this case, I want to process only stream 0 and stream 2, skipping stream 1.

My approach is to reduce the application’s overhead and save resources by only processing the selected frames, such as encoding or adding overlays (nvosd), etc. However, I have a question: After removing the frame metadata from the batch, why does the user metadata and frame metadata mismatch? For instance, in the above example, when I remove stream 1 because it doesn’t meet my condition, the user metadata for stream 2 seems to get assigned to stream 1’s frame. How is this possible?

I’m confused because I removed the frame metadata, so how does the downstream element still access stream 1’s frame? Could you please explain how to properly remove frame metadata from the batch meta, and provide more details on the pyds.frame_meta_remove_frame_batch function?

Thank you!

do you want to access frame image or frame meta? frame_meta_remove_frame_batch just removes frame meta data from batch, frame images still exist in batch.

I want to completely delete the frame metadata, frame, and user metadata, as my downstream elements do not need access to this data. For example, if I remove stream 0 frame metadata, my downstream components should not be able to access it, ensuring it is fully deleted from memory.

frame_meta_remove_frame_batch can remove frame meta from batch. currently there is no interface to remove frame image from batch. Here is a workaround.
after nvstreamdemux, you can add a probe function. if you don’t want to pass the frame downstream, you can use GST_PAD_PROBE_DROP.

Thank you for your response. I’ve already tried the approach you suggested, and it’s working fine now. I have a question: what is the difference between removing frame metadata from a batch and dropping the batch entirely? For example, I know that using GST_PAD_PROBE_DROP will drop the entire batch. Does this mean the entire batch is removed from memory, or does it just instruct the downstream element not to process the batch? Which method is more memory-efficient?

“removing frame metadata” only removes frame meta from batch and frame image is not removed. batch will pushed downstream. " dropping the batch" will trigger removing batch including all frame meta and images.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.