Nvinfer error:NVDSINFER_CUDA ERROR

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)
• Hardware Platform - GPU
• DeepStream Version - 7.0
• TensorRT Version - 10.6.0
• NVIDIA GPU Driver Version - 560.35.03
• Issue Type - questions
• Question -

Hi currently I’m working video analytics project and my requirement is save the imge using cv2 and working fine sample app python app , but when i integarte with my code at the time i got this error

ERROR: nvdsinfer_context_impl.cpp:342 Failed to make stream wait on event, cuda err_no:999, err_str:cudaErrorUnknown
ERROR: nvdsinfer_context_impl.cpp:1750 Preprocessor transform input data failed., nvinfer error:NVDSINFER_CUDA_ERROR
0:00:47.066622407 1119 0x562bcdf83450 WARN nvinfer gstnvinfer.cpp:1420:gst_nvinfer_input_queue_loop: error: Failed to queue input batch for inferencing
[2025-03-31 07:54:05] ERROR: gst-stream-error-quark: Failed to queue input batch for inferencing (1)
Error: gst-stream-error-quark: Failed to queue input batch for inferencing (1): gstnvinfer.cpp(1420): gst_nvinfer_input_queue_loop (): /GstPipeline:pipeline0/GstNvInfer:primary-inference
CUDA Runtime error cudaMemcpyAsync(device_, host_, bytes(), cudaMemcpyHostToDevice, stream) # unknown error, code = cudaErrorUnknown [ 999 ] in file memory.hpp:59
CUDA Runtime error cudaMemcpyAsync(device_, host_, bytes(), cudaMemcpyHostToDevice, stream) # unknown error, code = cudaErrorUnknown [ 999 ] in file memory.hpp:59
[cuOSD Error] at cuda/cuosd_kernel.cu:1072 : Launch kernel (render_elements_kernel) failed, code = 999CUDA Runtime error cudaPeekAtLastError() # unknown error, code = cudaErrorUnknown [ 999 ] in file cuosd.cpp:968

#!/usr/bin/env python3

################################################################################
import configparser
import sys
import gi
import time
import uuid
import logging
import os
import pytz
from urllib.parse import urlparse
from datetime import datetime
from gi.repository import Gst, GLib
from utils_prod import ntp_to_sydney_time,extract_rtsp_details,convert_pts_to_sydtimestamp,create_dynamic_path,get_current_epoch_in_syd,convert_syd_epoch_to_datetime_str,get_current_datetime_str, send_msg
from threading import Lock
gi.require_version('Gst', '1.0')
import pyds
import ctypes
import numpy as np
import cv2
import json
import sys
import platform
from threading import Lock
from cuda import cudart
from cuda import cuda


start_time=time.time()
pipeline = None
fps_mutex = Lock()


syd_tz = pytz.timezone('Australia/Sydney')
ntp_epoch = datetime(1970, 1, 1, tzinfo=pytz.UTC)
 

try:

    yaml_con = json.loads(os.environ["main_config"])


    streams = yaml_con["streams"]
    log_path = yaml_con["log_path"]
    no_of_sec_to_send_data = int(yaml_con["no_of_sec_send_msg"])
    img_dir_name = yaml_con["img_dir_name"]

    osd_process_mode = int(yaml_con["osd_process_mode"])
    osd_display_text = int(yaml_con["osd_display_text"])

    sync_inputs = int(yaml_con["streammux_sync_inputs"])
    attach_sys_ts = int(yaml_con["streammux_attach_sys_ts"])
    max_latency = int(yaml_con["streammux_max_latency"])
    frame_duration = int(yaml_con["streammux_frame_duration"])
    is_live = int(yaml_con["is_live"])
    frame_width = int(yaml_con["frame_width"])
    frame_height = int(yaml_con["frame_height"])
    drop_eos = int(yaml_con["drop_eos"])
    streammux_muxer_batch_timeout_usec = int(yaml_con["streammux_muxer_batch_timeout_usec"])

    analytics_config_path = yaml_con["analytics_config_path"]
    infer_config_path = yaml_con["infer_config_path"]
    tracker_config_path = yaml_con["tracker_config_path"]

    redis_container_name = yaml_con["redis_container_name"]
    redis_port = int(yaml_con["redis_port"])
    redis_key = yaml_con["redis_key"]
    redis_retry = 5
    PGIE_CLASS_ID_FACE = 2

    max_size_buffers = int(yaml_con["queue_max_size_buffers"])
    im_quality = int(yaml_con["jpeg_encoder_quality"])

    reconnect_interval = int(yaml_con["reconnect_interval"])

    is_add_face_blur = int(yaml_con.get("is_add_face_blur",1))
    face_blur_opacity = float(yaml_con.get("face_blur_opacity",0.8))
    
    application_quit_ther = float(yaml_con.get("application_quit_ther",0.5))
    reconnect_att = float(yaml_con.get("reconnect_att",5))

    blob_base_img_path = yaml_con["blob_base_img_path"]
    local_uld_dir_name = os.path.join(img_dir_name,yaml_con["local_uld_dir_name"])
    blob_base_img_path_uld = yaml_con["blob_base_img_path_uld"]
    uld_roi_name = yaml_con["uld_roi_name"]
    uld_cls_id = int(yaml_con["uld_cls_id"])

    class_dict = {0:"ULD"}

except Exception as e:
    print(f"Error reading config file: {e}")
    sys.exit(1)


if not all([ os.path.exists(p) for p in [analytics_config_path,infer_config_path,tracker_config_path]]):
    raise FileNotFoundError(f"One or more of the required analytics, inference, or tracker config files not found: {analytics_config_path}, {infer_config_path}, {tracker_config_path}")

os.makedirs(img_dir_name,exist_ok=True)
os.makedirs(local_uld_dir_name,exist_ok=True)


class SydneyFormatter(logging.Formatter):
    """Custom logging formatter to use Sydney timezone."""
    
    def formatTime(self, record, datefmt=None):
        sydney_time = datetime.fromtimestamp(record.created, syd_tz)
        return sydney_time.strftime("%Y-%m-%d %H:%M:%S")

logging.basicConfig(
    level=logging.INFO,
    format="[%(asctime)s] %(levelname)s: %(message)s",
    handlers=[logging.FileHandler(log_path), logging.StreamHandler()]
)

for handler in logging.getLogger().handlers:
    handler.setFormatter(SydneyFormatter("[%(asctime)s] %(levelname)s: %(message)s"))

logger = logging.getLogger(__name__)


number_sources = len(streams)
src_ids = {}
snap_dict = {}
first_frame_processed = False
curr_epoch = get_current_epoch_in_syd()
stream_start_time_dict = {i:None for i in range(number_sources)}
start_time_syd_dict_json = {i:curr_epoch for i in range(number_sources)} 
start_time_ntp_dict = {i:0 for i in range(number_sources)} 
start_time_one_min_dict = {i:0 for i in range(number_sources)} 
lag_dict = {i:{ "data":{"cumulative_line_cross_count":0} ,"ds":0,"is_first":True,"data_num":0} for i in range(number_sources)} 
guard_platform_info = Lock()
streammux = None
is_first = True

def get_current_timestamp():
    
    current_time = datetime.now()
    timestamp_str = current_time.strftime('%Y-%m-%d %H:%M:%S')
    
    return timestamp_str

def on_pad_added(src, pad, sink_element):
    
    # print(f"New pad {pad.get_name()} added from {src.get_name()}")

    sink_pad = sink_element.get_static_pad("sink")
    if not sink_pad:
        msg = (f"Error: Could not get sink pad for {sink_element.get_name()}")
        logger.error(msg)
        return

    if sink_pad.is_linked():
        msg = (f"Warning: Sink pad of {sink_element.get_name()} is already linked")
        logger.warning(msg)
        return

    if pad.query_caps(None).is_any():
        msg = (f"Warning: Pad {pad.get_name()} does not have valid caps yet!")
        logger.warning(msg)
        return

    link_status = pad.link(sink_pad)    
    if link_status != Gst.PadLinkReturn.OK:
        msg = (f"Pad link failed: {link_status}")
        logger.error(msg)



class GETFPS:
    def __init__(self,stream_id):
        global start_time
       
        self.start_time=start_time
        self.is_first=True
        self.frame_count=0
        self.stream_id=stream_id
 
    def update_fps(self):
        end_time = time.time()
        if self.is_first:
            self.start_time = end_time
            self.is_first = False
        else:
            global fps_mutex
            with fps_mutex:
                self.frame_count = self.frame_count + 1
 
    def get_fps(self):
 
        end_time = time.time()
        with fps_mutex:
 
            stream_fps = float(self.frame_count/(end_time - self.start_time))
 
            self.frame_count = 0
 
        self.start_time = end_time
 
        return round(stream_fps, 2)
    def print_data(self):
        print('frame_count=',self.frame_count)
        print('start_time=',self.start_time)
 

 
def get_linked_element(pad):
    
    if pad and pad.is_linked():
       
        peer_pad = pad.get_peer()
        if peer_pad:
            peer_element = peer_pad.get_parent_element()
            return peer_element.get_name() if peer_element else None
    return None


class PERF_DATA:
    def __init__(self, active_streams,main_loop):
        self.perf_dict = {} 
        self.all_stream_fps = {}  
 
        self.fps_mutex = Lock()   # Lock for thread safety
        self.main_loop = main_loop
 
        for i in active_streams:
            self.all_stream_fps[f"stream{i}"] = GETFPS(i)
  
 
    def update_fps(self, stream_index):
        """ Update FPS for the given stream index """
        with self.fps_mutex:
            if stream_index in self.all_stream_fps:
                self.all_stream_fps[stream_index].update_fps()
            else:
                print(f"Warning: Stream {stream_index} not registered. Ignoring update.")
 
    def perf_print_callback(self):
        
        with self.fps_mutex:
            self.perf_dict = {stream_index: stream.get_fps() for stream_index, stream in self.all_stream_fps.items()}
        print("\n**PERF Data : ", self.perf_dict, "\n")


        deactive_streams = 0
        for k, v in self.perf_dict.items():
            src_id = src_ids[k]
            last_id = src_id['id']
            stream_id = src_id['streamid']
            rtsp_url = src_id['src']
            
            
            if not v:
                
                if src_ids[k]["no_of_att"] > reconnect_att:
                    logger.info(f"reconnect att exist in {stream_id=} {src_ids[k]['no_of_att']}")
                    deactive_streams+=1
                    continue
                valve_element_name = f"valve_{stream_id}"
                valve = pipeline.get_by_name(valve_element_name)    
                # sink_valve = valve.get_static_pad("sink")
                # new_id = last_id
                new_id = str(uuid.uuid1())
                del_add_rtspsrc(last_id,new_id,stream_id,rtsp_url)
                src_ids[k]['id'] = new_id
                valve.set_property("drop", False)
                time.sleep(2)
                src_ids[k]["no_of_att"]+=1
                
        if (deactive_streams/number_sources) > application_quit_ther:
            logger.info(f"XXXXXXXXXXXXXXXXXXXXXXXXXX no of deactivate streams {deactive_streams} and no of streams {number_sources} ther {(deactive_streams/number_sources)} > {application_quit_ther} so quit the application XXXXXXXXXXXXXXXXXXXXXXXXXX")
            self.main_loop.quit()

        logger.info(f"pipeline alive data :  {self.perf_dict}")
        return True
 

def wait_for_null_state(element, timeout=3):

    # print(f"Waiting for {element.get_name()} to reach NULL state...")
    
    element.set_state(Gst.State.NULL)

    success, state, _ = element.get_state(timeout * Gst.SECOND)

    if success == Gst.StateChangeReturn.SUCCESS and state == Gst.State.NULL:
        # print(f"{element.get_name()} successfully reached NULL state.")
        return True
    else:
        msg = (f"Warning: {element.get_name()} did not reach NULL state within {timeout} seconds.")
        logger.warning(msg)
        return False


def flush_ele(stream_id):

    streammux = pipeline.get_by_name("Stream-muxer")

    
    if not streammux :
        msg = (f"[ERROR] Streammux or Demux not found in pipeline")
        logger.error(msg)
        return
    
    streammux_sink_pad = streammux.get_static_pad(f"sink_{stream_id}")


    if streammux_sink_pad:
        msg = (f"Flushing streammux pad for stream {stream_id}...")
        logger.info(msg)
        streammux_sink_pad.send_event(Gst.Event.new_flush_start())
        time.sleep(0.1)
        streammux_sink_pad.send_event(Gst.Event.new_flush_stop(True))
    
    return


def del_add_rtspsrc(old_id, new_id, stream_id, rtsp_url):
    msg = f"Trying to create a new RTSP source for stream {stream_id=}..."
    logger.info(msg)

    bin_name = f"source-bin-{old_id}"
    old_uridecodebin = pipeline.get_by_name(bin_name)
    

    if not old_uridecodebin:
        msg = f"[ERROR] No existing uridecodebin found for stream {stream_id}."
        logger.error(msg)
        return
    
    valve = pipeline.get_by_name(f"valve_{stream_id}")
    valve.set_property("drop", True)
    valve_sink = valve.get_static_pad("sink")

    src_pad = old_uridecodebin.get_static_pad("src")
    if src_pad and src_pad.is_linked():
        
        src_pad.unlink(valve_sink)

    old_uridecodebin.send_event(Gst.Event.new_eos())

    wait_for_null_state(old_uridecodebin)

    flush_ele(stream_id)

    pipeline.remove(old_uridecodebin)

    old_uridecodebin = None
    
    new_uridecodebin = create_source_bin(stream_id,new_id, rtsp_url)
    if not new_uridecodebin:
        msg = f"[ERROR] Failed to create new uridecodebin for stream {stream_id}."
        logger.error(msg)
        return
    
    pipeline.add(new_uridecodebin)

    new_uridecodebin_src_pad=new_uridecodebin.get_static_pad("src")
    
    if new_uridecodebin_src_pad.link(valve_sink) != Gst.PadLinkReturn.OK:
        msg = f"Error linking {new_uridecodebin.get_name()} to {valve.get_name()}"
        logger.error(msg)
        return

    new_uridecodebin.set_state(Gst.State.READY)
    time.sleep(0.1)
    new_uridecodebin.sync_state_with_parent()
    new_uridecodebin.set_state(Gst.State.PLAYING)
    msg = f"Successfully replaced RTSP source for stream {stream_id}"
    logger.info(msg)


def cb_newpad(decodebin, decoder_src_pad, data):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    if (gstname.find("video") != -1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")


def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

    if not platform_info.is_integrated_gpu() and name.find("nvv4l2decoder") != -1:
        # Use CUDA unified memory in the pipeline so frames can be easily accessed on CPU in Python.
        # 0: NVBUF_MEM_CUDA_DEVICE, 1: NVBUF_MEM_CUDA_PINNED, 2: NVBUF_MEM_CUDA_UNIFIED
        # Dont use direct macro here like NVBUF_MEM_CUDA_UNIFIED since nvv4l2decoder uses a
        # different enum internally
        Object.set_property("cudadec-memtype", 2)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)


def on_pad_added(decodebin, pad, data):
    print(f"Pad added: {pad.get_name()} (RTSP connection successful)")
    data["is_ready"] = True

def test_rtsp_connection(uri, stream_id ,timeout=10):
    """Checks if RTSP stream is alive before adding to main pipeline"""
    
    # Create a temporary pipeline
    pipeline = Gst.Pipeline.new("test-pipeline")
    uridecodebin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")

    if not uridecodebin:
        msg = ("Failed to create uridecodebin")
        logger.error(msg)
        return False

    uridecodebin.set_property("uri", uri)
    d = {"is_ready": False}
    uridecodebin.connect("pad-added", on_pad_added,d )

    fakesink = Gst.ElementFactory.make("fakesink","fakesink")
    
    pipeline.add(uridecodebin)
    pipeline.add(fakesink)
    uridecodebin.link(fakesink)

    pipeline.set_state(Gst.State.PLAYING)
    
    start_time = time.time()
    while time.time() - start_time < timeout:
        # Wait to see if pad-added is triggered
        success, state, _ = pipeline.get_state(Gst.CLOCK_TIME_NONE)
        if d["is_ready"]:
            msg = f"RTSP stream is active {stream_id=}"
            logger.info(msg)
            pipeline.set_state(Gst.State.NULL)  
            return True,uridecodebin

        time.sleep(0.5)  
        # print(f"wating for frame .... {time.time() - start_time}")
    
    msg = (f"RTSP connection failed (No `pad-added` event) {stream_id=}")
    logger.warning(msg)
    pipeline.set_state(Gst.State.NULL)  
    return False,None


def create_source_bin(stream_id,index,uri):

    uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if is_first:
        is_suc,uri_decode_bin = test_rtsp_connection(uri,stream_id)
        if not is_suc:
            msg = (f" Error creating source {stream_id=} ")
            logger.error(msg)
            return False
    
    bin_name=f"source-bin-{index}"
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        msg = f"Unable to create source bin {bin_name}"
        logger.error(msg)
        sys.stderr.write(msg)

    if not uri_decode_bin:
        msg = "Unable to create uri decode bin"
        logger.error(msg)
        sys.stderr.write(msg)
    

    uri_decode_bin.set_property("uri",uri)
    
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        msg = "Failed to add ghost pad in source bin"
        sys.stderr.write(msg)
        logger.error(msg)
        return None
    return nbin


def buffer_to_numpy(gst_buffer, frame_meta):

    n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
    frame_copy = np.array(n_frame, copy=True, order='C')
    frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGRA)
    if platform_info.is_integrated_gpu():
        pyds.unmap_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
    return frame_copy

os.makedirs("saved_images",exist_ok=True)

def nvanlytics_src_pad_buffer_probe(pad, info, user_data):

    local_uld_dir_name = user_data["local_uld_dir_name"]
    cam_ids_ip_port = user_data["cam_ids"]
    blob_base_img_path = user_data["blob_base_img_path_uld"]

    global first_frame_processed
    global start_time_syd_dict_json
    global stream_start_time_dict
    global start_time_ntp_dict
    global start_time_one_min_dict
    global snap_dict

    try:

        if not first_frame_processed:
            gst_buffer = info.get_buffer()
            if gst_buffer:
                batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
                l_frame = batch_meta.frame_meta_list
                while l_frame:
                    try:
                        frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                        stream_id = frame_meta.pad_index
                        # ntp_sec = frame_meta.ntp_timestamp / 1e9
                        pts = frame_meta.buf_pts
                        curr_time = (pts / 1e9)
                        start_time_ntp_dict[stream_id] = pts
                    except Exception as e:
                        msg = (f"Error processing frame demux prob: {e}")
                        logging.error(msg)
                        return Gst.PadProbeReturn.OK
                
                    try:
                        l_frame = l_frame.next
                    except StopIteration:
                        break
            
            stream_start_time_ = get_current_epoch_in_syd()
            start_time_syd_dict_json = {i: stream_start_time_ for i in range(number_sources)}
            stream_start_time_dict = {i: stream_start_time_ for i in range(number_sources)}
            start_time_one_min_dict = {i: stream_start_time_ for i in range(number_sources)}
            first_frame_processed = True

        # no_of_sec_send_msg = user_data["no_of_sec_send_msg"]

    except Exception as e:
        msg = (f"Error processing buffer demux prob : {e} ")
        logger.error(msg)
        return Gst.PadProbeReturn.OK
    
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return  Gst.PadProbeReturn.OK

    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    while l_frame:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            if frame_meta.frame_num%30==0:
                frame = buffer_to_numpy(gst_buffer, frame_meta)
                img_path = os.path.join("saved_images",f"{uuid.uuid1()}.jpg")
                cv2.imwrite(img_path,frame)
                print(f"frame saved sucessfully {img_path=}")
            stream_id = frame_meta.pad_index
            pts = frame_meta.buf_pts
            curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
            syd_time_pts_sql = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
            syd_time_pts_uld = convert_syd_epoch_to_datetime_str(curr_time, "uld_datetime")
        except StopIteration:
            break

        l_obj=frame_meta.obj_meta_list
        
        while l_obj:
            try: 
                
                obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
                class_id = int(obj_meta.class_id)
                confidence = obj_meta.confidence
                tracker_id = obj_meta.object_id

            except StopIteration:
                break

            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        l_user = frame_meta.frame_user_meta_list
        while l_user:
            try:
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSFRAME.NVDSANALYTICS_OBJ_META"):
                    analytics_meta = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
                    if analytics_meta.roiStatus == uld_roi_name and class_id == uld_cls_id:
                        if tracker_id not in snap_dict:
                            cam_id_ip_port = cam_ids_ip_port[stream_id]
                            image_path = f"{cam_id_ip_port[:5]}_{syd_time_pts_uld}_{tracker_id}.jpg"
                            local_uld_path = os.path.join(local_uld_dir_name,image_path)
                            frame = buffer_to_numpy(gst_buffer, frame_meta)
                            if frame is not None:
                                rect_params = obj_meta.rect_params
                                left = int(rect_params.left)
                                top = int(rect_params.top)
                                width = int(rect_params.width)
                                height = int(rect_params.height)

                                bbox = (left, top, left + width, top + height)
                                x1,y1,x2,y2 = [int(i) for i in bbox]
                                frame = frame[y1:y2,x1:x2]
                                cv2.imwrite(local_uld_path, frame)
                                print("######################################### frame saved ######################################### ")
                            else:
                                msg = (f"Error getting frame buffer for tracker_id {tracker_id}")
                                logger.error(msg)

                            blob_img_path = os.path.join(blob_base_img_path, cam_id_ip_port, image_path)
                            snap_dict[tracker_id] = {"earliest_seen_time":syd_time_pts_sql,"last_seen_time":syd_time_pts_sql,"zone":uld_roi_name,"blob_img_path":blob_img_path,"class_id":class_id}
                        
                        else:
                            snap_dict[tracker_id]["last_seen_time"] = syd_time_pts_sql
                            
            except StopIteration:
                break
            try:
                l_user = l_user.next
            except StopIteration:
                break
        
        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK




def demux_src_pad_buffer_probe(pad, info, user_data):    

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = ("Unable to get GstBuffer demux prob")
        logger.error(msg)
        return Gst.PadProbeReturn.DROP

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                stream_id = frame_meta.pad_index
                pts = frame_meta.buf_pts
                curr_time = stream_start_time_dict[stream_id] + (pts / 1e9)
                # ntp_sec = frame_meta.ntp_timestamp / 1e9
                # curr_time = pts

                if curr_time - start_time_syd_dict_json[stream_id] < 10:
                    # print(f"Dropping stream: Source {frame_meta.source_id}, Frame {frame_meta.frame_num}")
                    stream_index = "stream{0}".format(stream_id)
                    perf_data.update_fps(stream_index)
                    return Gst.PadProbeReturn.DROP

                start_time_syd_dict_json[stream_id] = curr_time
                syd_time_pts_ = convert_pts_to_sydtimestamp(pts, stream_start_time_dict[stream_id])
                display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
                display_meta.num_labels = 1
                text_params = display_meta.text_params[0]
                # text_params.display_text = f"buf ts : {syd_time_pts_}   ntp ts : {ntp_to_sydney_time(ntp_sec)}"
                text_params.display_text = f" PTS TS : {syd_time_pts_}"
                text_params.x_offset = 50
                text_params.y_offset = 50
                text_params.font_params.font_name = "Serif"
                text_params.font_params.font_size = 12
                text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
                text_params.set_bg_clr = 1
                text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
                pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta) 

                l_user = frame_meta.frame_user_meta_list
                while l_user:
                    try:
                        user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                        if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
                            "NVIDIA.DSANALYTICSFRAME.USER_META"
                        ):
                            user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)

                            line_cum_count = user_meta_data.objLCCumCnt
                            obj_in_roi = user_meta_data.objInROIcnt
                            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
                            d = {**line_cum_count,**obj_in_roi}
                            no_of_labels = len(d)
                            display_meta.num_labels = no_of_labels
                            start_x,start_y = 50,80
                            for i,(label_id,count) in enumerate(d.items()):
                                
                                text_params = display_meta.text_params[i]
                                text_params.display_text = f"{label_id} : {count}"
                                text_params.x_offset = start_x
                                text_params.y_offset = start_y
                                text_params.font_params.font_name = "Serif"
                                text_params.font_params.font_size = 12
                                text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
                                text_params.set_bg_clr = 1
                                text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
                                start_y+=30

                            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

                    except Exception as e:
                        msg = f"Error processing user metadata demux prob: {e}"
                        logger.error(msg)                        
                    
                    try:
                        l_user = l_user.next
                    except StopIteration:
                        break

            except Exception as e:
                msg = f"Error processing frame demux prob: {e}"
                logger.error(msg)

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        msg = f"Error processing buffer demux prob: {e}"
        logger.error(msg)
       
    return Gst.PadProbeReturn.OK

 
def bus_call(bus, message, loop):
   
    t = message.type
    if t == Gst.MessageType.EOS:
        msg = "======================= End-of-stream ======================="
        logger.error(msg)
        # loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        logger.debug(f"{err}")
        print("Warning: %s: %s" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        logger.error(f"{err}")
        print("Error: %s: %s" % (err, debug))
       
    return True
 
def valve_prob(pad, info, user_data):
    stream_id = user_data["stream_id"]
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = "Unable to get GstBuffer valve prob"
        logger.error(msg)
        return Gst.PadProbeReturn.OK
 
    try:
        stream_index = "stream{0}".format(stream_id)
        perf_data.update_fps(stream_index)
    except Exception as e:
        msg = f"Error accessing batch metadata: {e}"
        logger.error(msg)
 
    return Gst.PadProbeReturn.OK
 
def make_element(element_name, i):
    element = Gst.ElementFactory.make(element_name, element_name)
    if not element:
        sys.stderr.write(" Unable to create {0}".format(element_name))
    element.set_property("name", "{0}-{1}".format(element_name, str(i)))
    return element
 
 
def format_location_callback(splitmuxsink, fragment_id, video_dir_path, stream_id):
    # Get the current date and time
    current_datetime = convert_syd_epoch_to_datetime_str(get_current_epoch_in_syd(),"date_time")
    # Construct the filename with the current datetime
    return os.path.join(video_dir_path, f"stream_{stream_id}_rtsp_out_{current_datetime}_uri.mp4")
 

def frame_filter_pad_probe(pad, info, user_data):
    
    global lag_dict
    base_img_path = user_data["base_img_path"]
    cam_id_ip_port = user_data["cam_id"]
    blob_base_img_path = user_data["blob_base_img_path"]
    
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        msg = "Unable to get GstBuffer framefilter prob"
        logger.error(msg)
        return Gst.PadProbeReturn.OK

    try:
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list

        while l_frame:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
                stream_id = frame_meta.pad_index

                pts = frame_meta.buf_pts
                curr_time = stream_start_time_dict.get(stream_id, 0) + (pts / 1e9)
                syd_time_pts_hhmmss = convert_syd_epoch_to_datetime_str(curr_time, "hhmmss")
                current_yymmdd = get_current_datetime_str("yymmdd")
                current_hhmmss = get_current_datetime_str("hhmmss")
                
                frame_number = frame_meta.frame_num
                ntp_sec = frame_meta.ntp_timestamp / 1e9
                syd_time_ntp_hhmmss = ntp_to_sydney_time(ntp_sec,"hhmmss")
                syd_time_ntp_sql = ntp_to_sydney_time(ntp_sec)
                image_folder_path,blob_path = create_dynamic_path(curr_time, base_img_path, cam_id_ip_port)
                multifilesink = pad.get_parent_element()

                image_name = f"CAM-{cam_id_ip_port[:7]}_DATE-{current_yymmdd}_PTS-{syd_time_pts_hhmmss}_NTP-{syd_time_ntp_hhmmss}_SYS-{current_hhmmss}.jpg"
                image_save_path = os.path.join(image_folder_path, image_name)
                blob_img_path = os.path.join(blob_base_img_path,blob_path,image_name)

                # print(f"Saving image: {image_save_path}, stream_id: {stream_id}")
                multifilesink.set_property("location", image_save_path)


                l_user = frame_meta.frame_user_meta_list
                while l_user:
                    try:
                        user_meta = pyds.NvDsUserMeta.cast(l_user.data)
                        if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type(
                            "NVIDIA.DSANALYTICSFRAME.USER_META"
                        ):
                            user_meta_data = pyds.NvDsAnalyticsFrameMeta.cast(user_meta.user_meta_data)
                            objInROIcnt = user_meta_data.objInROIcnt
                            objLCCumCnt = user_meta_data.objLCCumCnt
                            objLCCurrCnt = user_meta_data.objLCCurrCnt
                            num_rects = frame_meta.num_obj_meta
                            if  lag_dict[stream_id]["is_first"]:
                                lag_dict[stream_id]["data"] = {"cumulative_line_cross_count":objLCCumCnt}
                                lag_dict[stream_id]["is_first"] = False
                            syd_time_pts_ = convert_syd_epoch_to_datetime_str(curr_time, "datetime_sql")
                            data = {
                                "stream_id": stream_id,
                                "camera_name": cam_id_ip_port,
                                "unique_number_uuid": 10, 
                                "batch_id": frame_meta.batch_id,
                                "frame_num": frame_number,
                                "buf_pts": pts,
                                "ntp_timestamp": frame_meta.ntp_timestamp,
                                "source_id": frame_meta.source_id,
                                "object_count": {class_dict.get(int(cls_id),cls_id):count for cls_id,count in user_meta_data.objCnt.items()},
                                "Number_of_objects": num_rects,
                                "object_in_ROI_count": objInROIcnt ,
                                "cumulative_line_cross_count": objLCCumCnt,
                                "current_line_cross_count": objLCCurrCnt,
                                "overcrowding_status": user_meta_data.ocStatus,
                                "unique_id": user_meta_data.unique_id,
                                "PTS_TIME": syd_time_pts_,
                                "NTP_TS": syd_time_ntp_sql,
                                "blob_image_path":blob_img_path,
                                "reconnect_id": src_ids[f'stream{stream_id}']["id"],
                                "data_num":lag_dict[stream_id]["data_num"],
                                "uld_data":snap_dict
                            }

                            diff_lag = {k: {k_:(v_-v[k_]) for k_,v_ in data[k].items()}  for k, v in lag_dict[stream_id]["data"].items()}["cumulative_line_cross_count"]

                            data["lag_diff_cumulative_line_cross_count"] = diff_lag

                            lag_dict[stream_id]["data"]["cumulative_line_cross_count"] = data["cumulative_line_cross_count"]
                            lag_dict[stream_id]["ds"] = curr_time
                            lag_dict[stream_id]["data_num"]+=1

                            # send_msg(data,redis_key)
                            print(f"Data : {data}")
                    except Exception as e:
                        msg = f"Error processing user metadata: {e}"
                        logger.error(msg)
                        # return Gst.PadProbeReturn.OK
                    
                    try:
                        l_user = l_user.next
                    except StopIteration:
                        break

            except Exception as e:
                msg = f"Error processing frame metadata: {e}"
                logger.error(msg)
                return Gst.PadProbeReturn.OK

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

    except Exception as e:
        msg = f"Error accessing batch metadata: {e}"
        logger.error(msg)
        return Gst.PadProbeReturn.OK

    return Gst.PadProbeReturn.OK



def dummy_prob(pad, info, user_data):
    print(f"inside the {user_data} prob")
    return Gst.PadProbeReturn.OK


def cb_newpad(decodebin, decoder_src_pad, data):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    if (gstname.find("video") != -1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")


class PlatformInfo:
    def __init__(self):
        self.is_wsl_system = False
        self.wsl_verified = False
        self.is_integrated_gpu_system = False
        self.is_integrated_gpu_verified = False
        self.is_aarch64_platform = False
        self.is_aarch64_verified = False

    def is_wsl(self):
        with guard_platform_info:
            # Check if its already verified as WSL system or not.
            if not self.wsl_verified:
                try:
                    # Open /proc/version file
                    with open("/proc/version", "r") as version_file:
                        # Read the content
                        version_info = version_file.readline()
                        version_info = version_info.lower()
                        self.wsl_verified = True

                        # Check if "microsoft" is present in the version information
                        if "microsoft" in version_info:
                            self.is_wsl_system = True
                except Exception as e:
                    print(f"ERROR: Opening /proc/version failed: {e}")

        return self.is_wsl_system
    
    def is_integrated_gpu(self):
        #Using cuda apis to identify whether integrated/discreet
        #This is required to distinguish Tegra and ARM_SBSA devices
        with guard_platform_info:
            #Cuda initialize
            if not self.is_integrated_gpu_verified:
                cuda_init_result, = cuda.cuInit(0)
                if  cuda_init_result == cuda.CUresult.CUDA_SUCCESS:
                    #Get cuda devices count
                    device_count_result, num_devices = cuda.cuDeviceGetCount()
                    if device_count_result == cuda.CUresult.CUDA_SUCCESS:
                        #If atleast one device is found, we can use the property from
                        #the first device
                        if num_devices >= 1:
                            #Get properties from first device
                            property_result, properties = cudart.cudaGetDeviceProperties(0)
                            if property_result == cuda.CUresult.CUDA_SUCCESS:
                                print("Is it Integrated GPU? :", properties.integrated)
                                self.is_integrated_gpu_system = properties.integrated
                                self.is_integrated_gpu_verified = True
                            else:
                                print("ERROR: Getting cuda device property failed: {}".format(property_result))
                        else:
                            print("ERROR: No cuda devices found to check whether iGPU/dGPU")
                    else:
                        print("ERROR: Getting cuda device count failed: {}".format(device_count_result))
                else:
                    print("ERROR: Cuda init failed: {}".format(cuda_init_result))

        return self.is_integrated_gpu_system

    def is_platform_aarch64(self):
        #Check if platform is aarch64 using uname
        if not self.is_aarch64_verified:
            if platform.uname()[4] == 'aarch64':
                self.is_aarch64_platform =  True
            self.is_aarch64_verified = True
        return self.is_aarch64_platform

def main(args):
    # Check input arguments
    global perf_data, pipeline, src_ids, streammux, number_sources,is_first
    
    Gst.init(None)
    global platform_info
    platform_info = PlatformInfo()
    pipeline = Gst.Pipeline.new(f"rtsp-pipeline-{uuid.uuid1()}")

    src_ids = {f'stream{i}': {'id': None, 'src': None, 'streamid': i,"no_of_att":0} for i in range(number_sources)}

    cam_ids = {stream_id:extract_rtsp_details(uri_name) for stream_id,uri_name in enumerate(streams)}
    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    print("Creating streamux \n ")

    # Create nvstreammux instance to form batches from one or more sources.
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")

    pipeline.add(streammux)
    for src_id, src in enumerate(streams):

        last_id = str(uuid.uuid1())
        src_ids[f'stream{src_id}']['id'] = last_id
        src_ids[f'stream{src_id}']['src'] = src

        bin = create_source_bin(src_id,last_id,src)
        if not bin:
            msg = f"Error: Could not create source bin for {src_id} so remove the src dict"
            logger.error(msg)
            print(msg)
            del src_ids[f'stream{src_id}']
            number_sources-=1
            continue
        msg = (f"uridecodebin_{last_id} created for {src_id}")
        logger.info(msg)

        valve = Gst.ElementFactory.make("valve", f"valve_{src_id}")
        queue = Gst.ElementFactory.make("queue", f"queue_{src_id}")

        pipeline.add(bin)
        pipeline.add(valve)
        pipeline.add(queue)

        bin_src = bin.get_static_pad("src")

        user_data={"stream_id":src_id}
        
        
        sink_pad = valve.get_static_pad("sink")
        sink_pad.add_probe(Gst.PadProbeType.BUFFER, valve_prob, user_data)

        padname=f"sink_{src_id}"
        valve_src_pad = valve.get_static_pad("src")
        valve_sink_pad = valve.get_static_pad("sink")

        if not valve_src_pad or not valve_sink_pad:
            msg = (f"Error: Could not get src pad for valve_{src_id} or sink_{src_id}")
            logger.error(msg)
            return
        streammux_sinkpad= streammux.request_pad_simple(padname)
        if not streammux_sinkpad:
            msg = (f"Error: Could not get sink pad for streammux_{src_id}")
            logger.error(msg)
            return
        
        user_meta = {"stream_id":src_id,'no_of_sec_to_predict':no_of_sec_to_send_data}
        
        queue_sink_pad = queue.get_static_pad("sink")
        queue_src_pad = queue.get_static_pad("src")

        bin_src.link(valve_sink_pad)
        valve_src_pad.link(queue_sink_pad)
        queue_src_pad.link(streammux_sinkpad)

    active_streams = [ src["streamid"] for src in src_ids.values()]
    is_first = False
    print("Creating Pgie \n ")
    pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    if not pgie:
        sys.stderr.write(" Unable to create pgie \n")

    print("Creating nvvidconv1 \n ")
    nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
    if not nvvidconv1:
        sys.stderr.write(" Unable to create nvvidconv1 \n")
    print("Creating filter1 \n ")
    caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
    filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
    if not filter1:
        sys.stderr.write(" Unable to get the caps filter1 \n")
    filter1.set_property("caps", caps1)


    if is_live:
        print("Atleast one of the sources is live")
        streammux.set_property('live-source', 1)

    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', number_sources)
    streammux.set_property('batched-push-timeout', 33000)
    pgie.set_property('config-file-path', "dsnvanalytics_pgie_config_tripwire.txt")
    pgie_batch_size = pgie.get_property("batch-size")
    if (pgie_batch_size != number_sources):
        print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
              number_sources, " \n")
        pgie.set_property("batch-size", number_sources)


    if not platform_info.is_integrated_gpu():
        # Use CUDA unified memory in the pipeline so frames
        # can be easily accessed on CPU in Python.
        vc_mem_type = int(pyds.NVBUF_MEM_CUDA_PINNED)
        mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
        streammux.set_property("nvbuf-memory-type", vc_mem_type)
        if platform_info.is_wsl():
           
            print("using nvbuf_mem_cuda_pinned memory for nvvidconv1\n")
            
            nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)
        else:
            nvvidconv1.set_property("nvbuf-memory-type", mem_type)

    tracker = Gst.ElementFactory.make("nvtracker", "tracker")
    if not tracker:
        msg = " Unable to create tracker "
        sys.stderr.write(msg)
        sys.exit(1)

    config = configparser.ConfigParser()
    config.read("dsnvanalytics_tracker_config_tripwire.txt")
    config.sections()
 
    for key in config['tracker']:
        if key == 'tracker-width' :
            tracker_width = config.getint('tracker', key)
            tracker.set_property('tracker-width', tracker_width)
        if key == 'tracker-height' :
            tracker_height = config.getint('tracker', key)
            tracker.set_property('tracker-height', tracker_height)
        if key == 'gpu-id' :
            tracker_gpu_id = config.getint('tracker', key)
            tracker.set_property('gpu_id', tracker_gpu_id)
        if key == 'll-lib-file' :
            tracker_ll_lib_file = config.get('tracker', key)
            tracker.set_property('ll-lib-file', tracker_ll_lib_file)
        if key == 'll-config-file' :
            tracker_ll_config_file = config.get('tracker', key)
            tracker.set_property('ll-config-file', tracker_ll_config_file)

    nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
    if not nvanalytics:

        msg = " Unable to create nvanalytics "
        sys.stderr.write(msg)
        sys.exit(1)

    nvanalytics.set_property("config-file", "config_tripwire.txt")

    nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
    if not nvstreamdemux:
        msg = " Unable to create nvstreamdemux "
        sys.stderr.write(msg)
        sys.exit(1)


    print("Adding elements to Pipeline \n")
    pipeline.add(pgie)
    pipeline.add(tracker)
    pipeline.add(nvanalytics)
    pipeline.add(filter1)
    pipeline.add(nvvidconv1)
    pipeline.add(nvstreamdemux)

    print("Linking elements in the Pipeline \n")
    streammux.link(pgie)
    pgie.link(tracker)
    tracker.link(nvvidconv1)
    nvvidconv1.link(filter1)
    filter1.link(nvanalytics)
    nvanalytics.link(nvstreamdemux)


    for i in range(number_sources):

        queue4_ = Gst.ElementFactory.make("queue", f"queue__{i}")
    
        pipeline.add(queue4_)

        queue4_.set_property("max-size-buffers", 100)
        queue4_.set_property("leaky", 2)

        nvvideoconvert1 =  Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert_{i}")
        nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)
        pipeline.add(nvvideoconvert1)

        nvdsosd = Gst.ElementFactory.make("nvdsosd", f"nvdsosd_{i}")
        pipeline.add(nvdsosd)
        nvdsosd.set_property("process-mode", 1)
        nvdsosd.set_property("display-text", 1)
        
        padname = "src_%u" % i
        demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
        if not demuxsrcpad:
            sys.stderr.write("Unable to create demux src pad \n")
            sys.exit(1)
        
        queuesinkpad = queue4_.get_static_pad("sink")
        if not queuesinkpad:
            sys.stderr.write("Unable to create queue sink pad \n")
            sys.exit(1)

        demuxsrcpad.link(queuesinkpad)

        queue5 = Gst.ElementFactory.make("queue", f"queue5_{i}")
        pipeline.add(queue5)

        queue5.set_property("max-size-buffers", 100)
        queue5.set_property("leaky", 2)


        nvvidconv2 = Gst.ElementFactory.make("nvvideoconvert", f"nvvideoconvert1_{i}")
        nvvidconv1.set_property("nvbuf-memory-type", vc_mem_type)

        pipeline.add(nvvidconv2)

        jpegenc = Gst.ElementFactory.make("nvjpegenc", f"jpeg-encoder_{i}")
        jpegenc.set_property("quality", 30)

        if not jpegenc:
            sys.stderr.write("Unable to create nvjpegenc \n")
            sys.exit(1)

        pipeline.add(jpegenc)

        multifilesink = Gst.ElementFactory.make("multifilesink", f"multi-file-sink_{i}")
        if not multifilesink:
            sys.stderr.write(" Unable to create multifilesink \n")
            sys.exit(1)

        multifilesink.set_property("post-messages", True)


        sinkpad = multifilesink.get_static_pad("sink")


        pipeline.add(multifilesink)
        queue4_.link(nvvideoconvert1)
        nvvideoconvert1.link(nvdsosd)
        nvdsosd.link(queue5)
        queue5.link(nvvidconv2)
        nvvidconv2.link(jpegenc)
        jpegenc.link(multifilesink)

    probe_data = {

            "cam_ids": cam_ids,
            "local_uld_dir_name":local_uld_dir_name,
            "blob_base_img_path_uld":blob_base_img_path_uld

        }

    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    perf_data = PERF_DATA(active_streams,loop)
    bus.connect("message", bus_call, loop)

    nvanalytics_src_pad = nvanalytics.get_static_pad("src")
    if not nvanalytics_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanlytics_src_pad_buffer_probe, probe_data)
        # perf callback function to print fps every 5 sec
        GLib.timeout_add(5000, perf_data.perf_print_callback)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args[:-1]):
        if i != 0:
            print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)


if __name__ == '__main__':
    sys.exit(main(sys.argv))

TRT/DS/Driver version mismatch. Please refer to this compatibility table.

Recommend using docker.

Hi,
I’m already using Docker to run the DeepStream application, and my code was working fine if I removed the image-saving functionality using OpenCV and set the mem-type property for elements like streammux and nvvideoconvert. I believe the issue is related to this change.
Why is this error occurring, especially since the code works fine for the first few minutes before the error shows up? How can I fix this issue?

It is not recommended to use OpenCV to save images, which requires copying data from GPU to CPU. It is recommended to use nvds_obj_enc_create_context/nvds_obj_enc_process/nvds_obj_enc_create_context

This is sample code.

This may also be the reason for the problem in your code. OpenCV cannot read GstBuffer correctly, GstBuffer not converted the correct format. Please refer to deepstream_imagedata-multistream.py to debug your own code.