Runtime nvanalytics update leads to cumulative_line_cross_count to 0

Please provide complete information as applicable to your setup.

• Hardware Platform (GPU)
• DeepStream Version 7.1

Hi,
I’m working with DeepStream, and I’m implementing runtime ROI and line updates by calling the following code whenever necessary:

nvanalytics.set_property("config-file", config_file)

However, when I update the line during runtime, it reloads all the lines, resetting the “cumulative_line_cross_count” and user_meta_data.objLCCumCnt to 0.

How can I prevent the counters from resetting to 0 during line updates? I want to retain the existing counts.

and

Gst-nvdsanalytics

can you give the path for gst-nvdsanalytics and nvds_analytics low level api

did you execute “nvdsanalytics->stream_analytics_ctx->clear”? could you share the whole modification in nvdsanalytics if possbile?
for a workaround, please use a global variable to save the count before reloading file. after reloading, you can add global variable to objLCCumCnt.

Current code I uses for runtime source addition, deleteion and nvds (roi and line) update.

I used FastAPI to listen for inputs from user

if add_sources → create source_bin, link with streammux and add it to pipeline, and add demux after stream specific elements as well

if delete_source → unlink from streammux and demux and remove elements from pipline

if update_analytics - > update config_file.txt of nvanalytics property which already set while initialising and set property config_file.txt for nvanalytics again → this result in cumulative line crosiing count to 0 when every I update config_file.txt and set it as property

what is the best way to add stream, add stream specific element after demux and how to update roi and line for new and existing streams?

import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GstRtspServer
from gi.repository import Gst, GLib
from gi.repository import GLib
from ctypes import *
import time
import sys
import pyds
import platform
from common.platform_info import PlatformInfo
import os
import pytz
from threading import Thread, Lock
from datetime import datetime
from zoneinfo import ZoneInfo
from urllib.parse import urlparse
import uvicorn
from fastapi import FastAPI, HTTPException
import logging
from datetime import datetime, timedelta


app = FastAPI()
perf_data = None
MUXER_BATCH_TIMEOUT_USEC = 33000
GPU_ID = 0
g_num_sources = 0
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
PGIE_CLASS_ID_PERSON = 0
PGIE_CLASS_ID_BAG = 1
PGIE_CLASS_ID_FACE = 2
dict_rtsp_source_bin = {}
analytics_config = "c.txt"


def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name)
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)  

    if name.find("source") != -1:
        print("RTSP source detected, configuring NTP sync.",name)
        pyds.configure_source_for_ntp_sync(hash(Object))

    if(name.find("nvv4l2decoder") != -1):
        if (platform_info.is_integrated_gpu()):
            Object.set_property("enable-max-performance", True)
            Object.set_property("drop-frame-interval", 0)
            Object.set_property("num-extra-surfaces", 0)
        else:
            Object.set_property("gpu_id", GPU_ID)

def cb_newpad(decodebin,pad,data):
    global streammux
    print("In cb_newpad")
    caps=pad.get_current_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        source_id = data
        pad_name = "sink_%u" % source_id
        print('pad_name',pad_name)
        sinkpad = streammux.request_pad_simple(pad_name)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin ")
        if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
            print("Decodebin linked to pipeline")
        else:
            sys.stderr.write("Failed to link decodebin to pipeline")

def create_uridecode_bin(index,rtsp_url):
    print("Creating uridecodebin for [%s]" % rtsp_url)
    bin_name="source-bin-%02d" % index
    print(bin_name)

    bin=Gst.ElementFactory.make("uridecodebin", bin_name)
    if not bin:
        sys.stderr.write(" Unable to create uri decode bin ")

    bin.set_property("uri",rtsp_url)
    bin.connect("pad-added",cb_newpad,index)
    bin.connect("child-added",decodebin_child_added,index)
    return bin


def update_analytics_config(rtsp_url, zone_name=None, roi_value=None, line_value=None):
    try:
        global nvanalytics
        if rtsp_url not in dict_rtsp_source_bin:
            print(f"Source {rtsp_url} not found in active sources.")
            return

        stream_id = dict_rtsp_source_bin[rtsp_url][0]
        config_file = analytics_config
        print(f"Updating analytics config for stream {stream_id} {rtsp_url}")
        with open(config_file, "r") as file:
            lines = file.readlines()
        # Copy of the lines for modification
        updated_lines = lines[:]  

        # Handle ROI configuration
        if roi_value is not None:
            zone_name = "roi-"+zone_name
            roi_section = f"[roi-filtering-stream-{stream_id}]"
            enable_line = "enable=1\n"
            roi_line = f"{zone_name}={roi_value}\n"
            zone_found = False
            section_found = False
            # Search for the section corresponding to the stream
            for i, line in enumerate(updated_lines):
                if line.strip() == roi_section:
                    section_found = True
                    j = i + 1
                    while j < len(updated_lines) and updated_lines[j].strip():
                        if updated_lines[j].startswith("roi-"):
                            # Check if the zone already exists and update its value
                            zone_key = updated_lines[j].split("=")[0].strip()
                            if zone_key == zone_name:
                                updated_lines[j] = roi_line
                                zone_found = True
                                break
                        j += 1
                    # If zone not found, append a new zone line
                    if not zone_found:
                        updated_lines.insert(j, roi_line)
                    break
             # for new stream 
            if not section_found:
                updated_lines.append(f"\n{roi_section}\n")
                updated_lines.append(enable_line)
                updated_lines.append(roi_line)

        # Handle Line Crossing Configuration
        if line_value is not None:
            line_section = f"[line-crossing-stream-{stream_id}]"
            enable_line = "enable=1\n"
            line_line = f"line-crossing-Exit={line_value}\n"
            class_id_line = "class-id=0\n"
            extended_line = "extended=0\n"
            mode_line = "mode=loose\n"
            section_found = False
            for i, line in enumerate(updated_lines):
                if line.strip() == line_section:
                    section_found = True
                    # Add or update the configuration lines
                    if i + 1 < len(updated_lines) and not updated_lines[i + 1].strip().startswith("enable="):
                        updated_lines.insert(i + 1, enable_line)
                    elif i + 1 >= len(updated_lines) or not updated_lines[i + 1].strip() == enable_line.strip():
                        updated_lines[i + 1] = enable_line
                    if i + 2 < len(updated_lines) and updated_lines[i + 2].startswith("line-crossing-Exit="):
                        updated_lines[i + 2] = line_line
                    else:
                        updated_lines.insert(i + 2, line_line)
                    # Ensure other properties exist
                    other_props = [class_id_line, extended_line, mode_line]
                    for prop in other_props:
                        if prop not in updated_lines[i:]:
                            updated_lines.insert(i + 3, prop)
                    break
            if not section_found:
                updated_lines.append(f"\n{line_section}\n")
                updated_lines.append(enable_line)
                updated_lines.append(line_line)
                updated_lines.append(class_id_line)
                updated_lines.append(extended_line)
                updated_lines.append(mode_line)

        # Write the updated configuration back to the file
        with open(config_file, "w") as file:
            file.writelines(updated_lines)
        # Reload nvanalytics configuration
        if nvanalytics:
            nvanalytics.set_property("config-file", config_file)
            print(f"Reloaded nvanalytics config file: {config_file}")
        print(f"Successfully updated analytics for stream {stream_id} {rtsp_url}")
    except Exception as e:
        print("Error updating analytics config:", e)



def add_stream_sepecific_element(source_id, rtsp_url, pipeline, nvstreamdemux):

    print(f"add_stream_sepecific_element for source_id {source_id} {rtsp_url}")

    # common
    queue5 = Gst.ElementFactory.make("queue", "queue_nvosd"+str(source_id))
    queue5.set_property("max-size-buffers", 200) 
    queue5.set_property("leaky", True)  
    pipeline.add(queue5)

    nvstreamdemux.set_state(Gst.State.NULL)
    padname = f"src_{source_id}"
    demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
    if not demuxsrcpad:
        sys.stderr.write("Unable to create demux src pad \n")
    queuesinkpad = queue5.get_static_pad("sink")
    if not queuesinkpad:
        sys.stderr.write("Unable to create queue sink pad \n")
    demuxsrcpad.link(queuesinkpad)
    nvstreamdemux.set_state(Gst.State.PLAYING)

    nvvideoconvert1 =Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_nvosd"+str(source_id))
    nvdsosd = Gst.ElementFactory.make("nvdsosd", "nvdsosd"+str(source_id))
    tee = Gst.ElementFactory.make("tee", "nvsink-tee"+str(source_id))
    nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
    nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT) 
    pipeline.add(nvvideoconvert1, nvdsosd, tee)
    queue5.link(nvvideoconvert1)
    nvvideoconvert1.link(nvdsosd)
    nvdsosd.link(tee)

    # rtsp out
    queue_udp = Gst.ElementFactory.make("queue", "queue_udp"+str(source_id))
    nvvideoconvert_udp = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_udp"+str(source_id))
    caps_udp = Gst.ElementFactory.make("capsfilter", "caps_udp"+str(source_id))
    encoder_udp = Gst.ElementFactory.make("nvv4l2h264enc", "encoder"+str(source_id))
    rtppay_udp = Gst.ElementFactory.make("rtph264pay", "rtppay"+str(source_id))
    udpsink_udp = Gst.ElementFactory.make("udpsink", "udpsink"+str(source_id))
    queue_udp.set_property("max-size-buffers", 200) 
    queue_udp.set_property("leaky", True)
    caps_udp.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM),format=I420"))
    udp_port = 5400 + source_id
    udpsink_udp.set_property("host", "224.224.255.255")
    udpsink_udp.set_property("port", udp_port)
    udpsink_udp.set_property("async", False)
    udpsink_udp.set_property("sync", 0)
    pipeline.add(queue_udp,nvvideoconvert_udp,caps_udp,encoder_udp,rtppay_udp,udpsink_udp)
    tee.get_request_pad("src_%u").link(queue_udp.get_static_pad("sink"))
    queue_udp.link(nvvideoconvert_udp)
    nvvideoconvert_udp.link(caps_udp)
    caps_udp.link(encoder_udp)
    encoder_udp.link(rtppay_udp)
    rtppay_udp.link(udpsink_udp)

    # img save
    queue_img = Gst.ElementFactory.make("queue", "queue_multifile"+str(source_id))
    nvvideoconvert_img = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_multifile"+str(source_id))
    jpegenc_img = Gst.ElementFactory.make("jpegenc", "jpegenc"+str(source_id))
    multifilesink_img = Gst.ElementFactory.make("multifilesink", "multifilesink"+str(source_id))
    queue_img.set_property("max-size-buffers", 200) 
    queue_img.set_property("leaky", True)
    multifilesink_img.set_property("post-messages", True)
    pipeline.add(queue_img, nvvideoconvert_img, jpegenc_img, multifilesink_img)
    tee.get_request_pad("src_%u").link(queue_img.get_static_pad("sink"))
    queue_img.link(nvvideoconvert_img)
    nvvideoconvert_img.link(jpegenc_img)
    jpegenc_img.link(multifilesink_img)
    probe_data = {
        "stream_id": source_id,
        "uri_name": rtsp_url,
    }
    sinkpad = multifilesink_img.get_static_pad("sink")
    if not sinkpad:
        sys.stderr.write("Unable to get sink pad of multifilesink \n")
        sys.exit(1)
    sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
    create_rtsp_out(udp_port)
    print("Added stream specific element to pipeline")

    return True


def update_pgie_streamux_batch_size(batch_size):
    global pgie, streammux
    pgie.set_property("batch-size", batch_size)
    streammux.set_property("batch-size", batch_size)
    print(f"Updated batch size of pgie and streammux to {batch_size}")
    
        
def add_sources(rtsp_url):

    global g_num_sources, pipeline, pgie, streammux, nvstreamdemux

    if rtsp_url in dict_rtsp_source_bin:
        print(f"Source {rtsp_url} already in active sources", dict_rtsp_source_bin)
        return

    source_id = g_num_sources
    print(f"Adding new source to the pipeline id {source_id} url {rtsp_url}")

    source_bin = create_uridecode_bin(source_id, rtsp_url)
    if not source_bin:
        sys.stderr.write("Failed to create source bin. Exiting.")
        return False
    pipeline.add(source_bin)

    state_return = source_bin.set_state(Gst.State.PLAYING)

    if state_return == Gst.StateChangeReturn.SUCCESS:
        print("STATE CHANGE SUCCESS")

    elif state_return == Gst.StateChangeReturn.FAILURE:
        print("STATE CHANGE FAILURE")
    
    elif state_return == Gst.StateChangeReturn.ASYNC:
        state_return = source_bin.get_state(Gst.CLOCK_TIME_NONE)

    elif state_return == Gst.StateChangeReturn.NO_PREROLL:
        print("STATE CHANGE NO PREROLL")

    if add_stream_sepecific_element(source_id, rtsp_url, pipeline, nvstreamdemux):

        dict_rtsp_source_bin[rtsp_url] = [source_id, source_bin]

        current_active_sources = len(dict_rtsp_source_bin)
        print("current_active_sources", current_active_sources)
        print("dict_rtsp_source_bin", dict_rtsp_source_bin)

        update_pgie_streamux_batch_size(current_active_sources)

        global perf_data
        stream_index = f"stream{source_id}"
        perf_data.add_stream(stream_index)
        print(f"New source added: {rtsp_url}, registered as {stream_index}")

        g_num_sources += 1

    return True


def delete_roi_and_line_sections(stream_id, config_file):
    with open(config_file, "r") as file:
        lines = file.readlines()
    roi_section_header = f"[roi-filtering-stream-{stream_id}]"
    line_section_header = f"[line-crossing-stream-{stream_id}]"
    updated_lines = []
    in_section = False
    section_found = False
    for line in lines:
        if line.strip().startswith("["):
            if line.strip() in (roi_section_header, line_section_header):
                in_section = True
                section_found = True
                continue 
            else:
                in_section = False
        if not in_section:
            updated_lines.append(line)
    # If no sections were found, return False
    if not section_found:
        return False
    # Write the updated lines back to the file
    with open(config_file, "w") as file:
        file.writelines(updated_lines)
    return True



def delete_source(rtsp_url):
    global pipeline, streammux, nvstreamdemux, loop, nvanalytics

    if rtsp_url not in dict_rtsp_source_bin:
        print(f"Source {rtsp_url} not found in active sources {dict_rtsp_source_bin}")
        return

    source_id = dict_rtsp_source_bin[rtsp_url][0]
    source_bin = dict_rtsp_source_bin[rtsp_url][1]
    print(f"Trying to delete source {source_id}: {rtsp_url}")

    source_bin.set_state(Gst.State.NULL)
    
    # Wait for source bin to reach NULL state (with timeout)
    for _ in range(20): 
        if source_bin.get_state(0.1)[1] == Gst.State.NULL:
            break
        time.sleep(0.1)
    else:
        print("Timeout: Source bin did not reach NULL state.")
        return
    print("Source bin set to NULL state successfully.")

    # streammux sink pad
    pad_name = f"sink_{source_id}"
    sinkpad = streammux.get_static_pad(pad_name)
    if sinkpad:
        print(f"Flushing and releasing pad {pad_name} from streammux.")
        sinkpad.send_event(Gst.Event.new_flush_start())
        sinkpad.send_event(Gst.Event.new_flush_stop(False))
        # Unlink the pad before releasing
        streammux.unlink(source_bin)
        streammux.release_request_pad(sinkpad)
    else:
        print(f"Warning: Pad {pad_name} not found in streammux.")

    pipeline.remove(source_bin)

    # demux src pad
    demux_pad_name = f"src_{source_id}"
    demuxsrcpad = nvstreamdemux.get_static_pad(demux_pad_name)
    if demuxsrcpad:
        print(f"Flushing and releasing pad {demux_pad_name} from nvstreamdemux.")
    else:
        print(f"Warning: Pad {demux_pad_name} not found in nvstreamdemux.")

    # queue sink pad
    queue = pipeline.get_by_name("queue_nvosd"+str(source_id))
    if not queue:
        sys.stderr.write("Stop does not get the queue")
    queuesinkpad = queue.get_static_pad("sink")

    # unlink demux src pad and queue sink pad
    ret = demuxsrcpad.unlink(queuesinkpad)

    elements_to_remove = [
        "queue_nvosd"+str(source_id), "nvvideoconvert_nvosd"+str(source_id), "nvdsosd"+str(source_id), "nvsink-tee"+str(source_id),
        "queue_udp"+str(source_id), "nvvideoconvert_udp"+str(source_id), "caps_udp"+str(source_id), 
        "encoder"+str(source_id), "rtppay"+str(source_id),"udpsink"+str(source_id), 
        "queue_multifile"+str(source_id), "nvvideoconvert_multifile"+str(source_id),
        "jpegenc"+str(source_id), "multifilesink"+str(source_id)
    ]
    for elem_name in elements_to_remove:
        elem = pipeline.get_by_name(elem_name)
        if elem:
            elem.set_state(Gst.State.NULL)
            pipeline.remove(elem)
            print(f"Removed element: {elem_name}")

    dict_rtsp_source_bin.pop(rtsp_url)
    print(f"Source {source_id} {rtsp_url} removed successfully.")

    current_active_sources = len(dict_rtsp_source_bin)
    print("current_active_sources", current_active_sources)
    print("dict_rtsp_source_bin", dict_rtsp_source_bin)

    if (current_active_sources == 0):
        loop.quit()
        print("All sources stopped, quitting")
        return 
    
    update_pgie_streamux_batch_size(current_active_sources)

    stream_index = f"stream{source_id}"
    perf_data.delete_stream(stream_index)
    print(f"deleted source {source_id} {rtsp_url} in perf data")

    if delete_roi_and_line_sections(source_id, analytics_config):
        nvanalytics.set_property("config-file", analytics_config)
        print(f"Reloaded nvanalytics config after deleting stream {source_id}")

    return True


@app.post("/add_source")
def api_add_source(rtsp_url: str):
    try:
        add_sources(rtsp_url)
        return {"message": "RTSP source added successfully", "url": rtsp_url}
    except Exception as e:
        print("error in api_add_source()", e)
        raise HTTPException(status_code=500, detail=str(e))
    

@app.post("/delete_source")
def api_delete_source(rtsp_url: str):
    try:
        delete_source(rtsp_url)
        return {"message": "RTSP source deleted successfully", "url": rtsp_url}
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))


@app.post("/update_analytics")
def update_analytics(rtsp_url: str, zone_name: str = None, roi_value: str = None, line_value: str = None):
    try:
        if not roi_value and not line_value:
            raise HTTPException(status_code=400, detail="At least one of roi_value or line_value must be provided")
        
        update_analytics_config(rtsp_url, zone_name=zone_name, roi_value=roi_value, line_value=line_value)

        return {
            "message": "Analytics configuration updated successfully",
            "stream_id": rtsp_url,
            "roi_value": roi_value,
            "line_value": line_value,
        }
    except Exception as e:
        raise HTTPException(status_code=500, detail=str(e))
    

def run_api_server():
    uvicorn.run(app, host="0.0.0.0", port=8000)

def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream")
        loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s" % (err, debug))
        loop.quit()
    return True


# Image save
def extract_rtsp_details(rtsp_url):
    parsed_uri = urlparse(rtsp_url)
    if parsed_uri.path and parsed_uri.path != '/':
        return parsed_uri.path.split('/')[-1]
    else:
        return f"{parsed_uri.port}"

IST = pytz.timezone('Asia/Kolkata')
def create_dynamic_path(base_path, ip_port):
    now = datetime.now()
    now = datetime.now(IST) 
    dynamic_path = os.path.join(
        base_path,
        ip_port,
        str(now.year)+"_year",
        f"{now.month:02d}_month",
        f"{now.day:02d}_day",
        f"{now.hour:02d}_hour",
        f"{now.minute:02d}_minute",
    )
    os.makedirs(dynamic_path, exist_ok=True)
    return dynamic_path

last_saved_time = {}  
save_img_time_interval = 10

def frame_filter_pad_probe(pad, info, user_data):
    global last_saved_time 
    current_time = time.time()
    stream_id = user_data["stream_id"]
    uri_name = user_data["uri_name"]
    if stream_id not in last_saved_time:
        last_saved_time[stream_id] = 0
    if current_time - last_saved_time[stream_id] >= save_img_time_interval:
        last_saved_time[stream_id] = current_time
        cam_id_ip_port = extract_rtsp_details(uri_name)
        base_path = "/Deepstream_output/rtsp_v3"
        image_folder_path  = create_dynamic_path(base_path, cam_id_ip_port)
        now = datetime.now(IST)
        image_name = f"{now.year}_{now.month:02d}_{now.day:02d}_{now.hour:02d}_{now.minute:02d}_{now.second:02d}.jpg"
        image_save_path = os.path.join(image_folder_path, image_name)
        # print('image_save_path', image_save_path)
        multifilesink = pad.get_parent_element()
        multifilesink.set_property("location", image_save_path)
    return Gst.PadProbeReturn.OK 


rtsp_server = None

def create_rtsp_out(new_udp_port):
    global rtsp_server
    rtsp_port_num = 8554
    vcodec = "H264"

    print("new_udp_port",new_udp_port)

    if rtsp_server is None:  
        print(f"Initializing RTSP server on port {rtsp_port_num}")
        rtsp_server = GstRtspServer.RTSPServer.new()
        rtsp_server.props.service = str(rtsp_port_num)
        rtsp_server.attach(None)  

    mount_point = f"/stream_{new_udp_port}"
    
    factory = GstRtspServer.RTSPMediaFactory.new()
    factory.set_launch(
        '( udpsrc name=pay0 port=%d buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=%s, payload=96 " )'
        % (new_udp_port, vcodec)
    )
    factory.set_shared(True)
    rtsp_server.get_mount_points().add_factory(mount_point, factory)

    print(f"DeepStream: Launched RTSP Streaming at rtsp://localhost:{rtsp_port_num}{mount_point}")



def main(args):

    global pipeline, tiler, pgie, streammux, nvanalytics, nvstreamdemux, loop, g_num_sources, udp_ports

    print('args', args, len(args))

    num_sources = len(args)-1
    print("num_sources",num_sources)

    g_num_sources = num_sources
    
    global perf_data
    perf_data = PERF_DATA(num_sources)

    global platform_info
    platform_info = PlatformInfo()
    Gst.init(None)

    print("Creating Pipeline")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline ")

    print("Creating streammux")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux ")
    streammux.set_property("batched-push-timeout", 40000)
    streammux.set_property("batch-size", 1)
    streammux.set_property("gpu_id", GPU_ID)
    streammux.set_property('live-source', 1)
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', num_sources)
    streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
    streammux.set_property("sync-inputs",1)
    streammux.set_property("attach-sys-ts",False)
    """
    attach-sys-ts - True
    Host system time as NTP timestamp - when the frame is received by the NvStreamMux.
    This method requires you to synchronize host system to NTP clock. To attach host system time as NTP timestamp, you'll need to set attach-sys-ts to TRUE on nvstreammux.
    """
    streammux.set_property("max-latency",1000000000)
    streammux.set_property("frame-duration",125)
    pipeline.add(streammux)

    uri = args[1]

    for i in range(num_sources):

        print(f"Creating source_bin for source id {i}")
        uri_name=args[i+1]
        if uri_name.find("rtsp://") == 0 :
            is_live = True

        source_bin=create_uridecode_bin(i, uri_name)

        if not source_bin:
            sys.stderr.write("Failed to create source bin. Exiting. ")
            sys.exit(1)
        pipeline.add(source_bin)

        dict_rtsp_source_bin[uri_name] = [i, source_bin]

    print("dict_rtsp_source_bin", dict_rtsp_source_bin)

    queue1=Gst.ElementFactory.make("queue","queue1")
    queue2=Gst.ElementFactory.make("queue","queue2")
    queue3=Gst.ElementFactory.make("queue","queue3")
    queue4=Gst.ElementFactory.make("queue","queue4")

    queue1.set_property("max-size-buffers", 200) 
    queue1.set_property("leaky", True)  
    queue2.set_property("max-size-buffers", 200) 
    queue2.set_property("leaky", True)  
    queue3.set_property("max-size-buffers", 200) 
    queue3.set_property("leaky", True)  
    queue4.set_property("max-size-buffers", 200) 
    queue4.set_property("leaky", True)  

    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
    nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")

    nvanalytics.set_property("config-file", analytics_config)

    pgie.set_property('config-file-path', "dsnvanalytics_pgie_config.txt")
    pgie_batch_size=pgie.get_property("batch-size")
    if(pgie_batch_size != num_sources):
        print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", num_sources," \n")
        pgie.set_property("batch-size",num_sources)

    config = configparser.ConfigParser()
    config.read('dsnvanalytics_tracker_config.txt')
    config.sections()
 
    for key in config['tracker']:
        if key == 'tracker-width' :
            tracker_width = config.getint('tracker', key)
            tracker.set_property('tracker-width', tracker_width)
        if key == 'tracker-height' :
            tracker_height = config.getint('tracker', key)
            tracker.set_property('tracker-height', tracker_height)
        if key == 'gpu-id' :
            tracker_gpu_id = config.getint('tracker', key)
            tracker.set_property('gpu_id', tracker_gpu_id)
        if key == 'll-lib-file' :
            tracker_ll_lib_file = config.get('tracker', key)
            tracker.set_property('ll-lib-file', tracker_ll_lib_file)
        if key == 'll-config-file' :
            tracker_ll_config_file = config.get('tracker', key)
            tracker.set_property('ll-config-file', tracker_ll_config_file)

    pipeline.add(queue1,queue2,queue3,queue4,pgie,tracker,nvanalytics,nvstreamdemux)

    streammux.link(queue1)
    queue1.link(pgie)
    pgie.link(queue2)
    queue2.link(tracker)
    tracker.link(queue3)
    queue3.link(nvanalytics)
    nvanalytics.link(queue4)
    queue4.link(nvstreamdemux)

    for i in range(num_sources):

        # common
        queue5 = Gst.ElementFactory.make("queue", "queue_nvosd"+str(i))
        nvvideoconvert1 =Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_nvosd"+str(i))
        nvdsosd = Gst.ElementFactory.make("nvdsosd", "nvdsosd"+str(i))
        tee = Gst.ElementFactory.make("tee", "nvsink-tee"+str(i))

        queue5.set_property("max-size-buffers", 200) 
        queue5.set_property("leaky", True)
        nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
        nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT) 

        pipeline.add(queue5,nvvideoconvert1,nvdsosd,tee)

        padname = "src_%u" % i
        demuxsrcpad = nvstreamdemux.get_request_pad(padname)
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad")
        queuesinkpad = queue5.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad")

        nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
        nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)

        demuxsrcpad.link(queuesinkpad)
        queue5.link(nvvideoconvert1)
        nvvideoconvert1.link(nvdsosd)
        nvdsosd.link(tee)

        # rtsp out
        queue_udp = Gst.ElementFactory.make("queue", "queue_udp"+str(i))
        nvvideoconvert_udp = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_udp"+str(i))
        caps_udp = Gst.ElementFactory.make("capsfilter", "caps_udp"+str(i))
        encoder_udp = Gst.ElementFactory.make("nvv4l2h264enc", "encoder"+str(i))
        rtppay_udp = Gst.ElementFactory.make("rtph264pay", "rtppay"+str(i))
        udpsink_udp = Gst.ElementFactory.make("udpsink", "udpsink"+str(i))
        
        queue_udp.set_property("max-size-buffers", 200) 
        queue_udp.set_property("leaky", True)
        caps_udp.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM),format=I420"))
        
        udp_port = 5400 + i
  
        udpsink_udp.set_property("host", "224.224.255.255")
        udpsink_udp.set_property("port", udp_port)
        udpsink_udp.set_property("async", False)
        udpsink_udp.set_property("sync", 0)

        pipeline.add(queue_udp,nvvideoconvert_udp,caps_udp,encoder_udp,rtppay_udp,udpsink_udp)

        tee.get_request_pad("src_%u").link(queue_udp.get_static_pad("sink"))
        queue_udp.link(nvvideoconvert_udp)
        nvvideoconvert_udp.link(caps_udp)
        caps_udp.link(encoder_udp)
        encoder_udp.link(rtppay_udp)
        rtppay_udp.link(udpsink_udp)

        # img save
        queue_img = Gst.ElementFactory.make("queue", "queue_multifile"+str(i))
        nvvideoconvert_img = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_multifile"+str(i))
        jpegenc_img = Gst.ElementFactory.make("jpegenc", "jpegenc"+str(i))
        multifilesink_img = Gst.ElementFactory.make("multifilesink", "multifilesink"+str(i))

        queue_img.set_property("max-size-buffers", 200) 
        queue_img.set_property("leaky", True)
        multifilesink_img.set_property("post-messages", True)

        pipeline.add(queue_img, nvvideoconvert_img, jpegenc_img, multifilesink_img)

        tee.get_request_pad("src_%u").link(queue_img.get_static_pad("sink"))
        queue_img.link(nvvideoconvert_img)
        nvvideoconvert_img.link(jpegenc_img)
        jpegenc_img.link(multifilesink_img)

        probe_data = {
            "stream_id": i,
            "uri_name": args[i + 1],
        }

        sinkpad = multifilesink_img.get_static_pad("sink")
        if not sinkpad:
            sys.stderr.write("Unable to get sink pad of multifilesink \n")
            sys.exit(1)

        sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)

        create_rtsp_out(udp_port)
 
    nvanalytics_src_pad=nvanalytics.get_static_pad("src")
    if not nvanalytics_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)
        GLib.timeout_add(5000, perf_data.perf_print_callback)

    loop = GLib.MainLoop()
    
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)

    pipeline.set_state(Gst.State.PAUSED)

    print("Now playing...")
    for i, source in enumerate(args):
        if (i != 0):
            print(i, ": ", source)


    print("Starting pipeline ")

    pipeline.set_state(Gst.State.PLAYING)

    api_thread = Thread(target=run_api_server, daemon=True)
    api_thread.start()

    try:
        loop.run()
    except:
        print("Stopping pipeline")
        pass

    print("Exiting app")
    pipeline.set_state(Gst.State.NULL)

if __name__ == '__main__':
    sys.exit(main(sys.argv))

  1. please refer to sample deepstream-server, nvmultiurisrcbin can support adding and removing source dynamically.
  2. about update roi for new stream, nvanalytics plugin is opensource. after resetting config-file, the plugin will call “nvdsanalytics->stream_analytics_ctx->clear();” to clear all analytics instance of all streams. you can comment out this line. then in gst_nvdsanalytics_transform_ip, the plugin will create new analytics instance for new stream.
  3. about update roi for existing streams, since the old anaytics instance will be deleted, all saved status will be reseted in the new anaytics instance. please refer to my first comment. you can add global variable to save the status in the old anaytics instance.