Nvmultiurisrcbin plugin, the sink screen doesn't update but it seems that the internal screen of the app is updating and finally the app crashes

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version ds7.1-docker
• JetPack Version (valid for Jetson only)
• TensorRT Version 10.3
• NVIDIA GPU Driver Version (valid for GPU only) 550.142
• Issue Type( questions, new requirements, bugs)

When I use the nvmultiurisrcbin plugin, the sink screen doesn’t update but, it seems that the internal screen of the app is updating, and at the end of the video stream, the app crashes:

Here’s my code:

import sys
import threading
import copy
import json

from numpy.f2py.crackfortran import gotnextfile

sys.path.append('../')
from pathlib import Path
import gi
import numpy as np
import configparser
import argparse
import os
import base64
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from urllib.parse import urlparse
from ctypes import *
import time
import sys
import math
import platform
from collections import defaultdict
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from my_log import MyLogger
from config_reader import MyConfigReader
from datetime import datetime
from KafkaProducer import KafkaProducer
from confluent_kafka import Producer, KafkaException
from confluent_kafka import KafkaError
from os import environ
import pyds
import cv2


#os.environ['GST_PLUGIN_PATH'] = '/opt/nvidia/deepstream/deepstream/lib/gst-plugins:' + os.environ.get('GST_PLUGIN_PATH', '')

no_display = False
silent = int(MyConfigReader.cfg_dict["RTSP"]["silent"])
file_loop = int(MyConfigReader.cfg_dict["RTSP"]["file_loop"])  # 设置cpu解码还是gpu解码
perf_data = None
measure_latency = False


MUXER_OUTPUT_WIDTH = int(MyConfigReader.cfg_dict["RTSP"]["output_width"])
MUXER_OUTPUT_HEIGHT = int(MyConfigReader.cfg_dict["RTSP"]["output_height"])
MUXER_BATCH_TIMEOUT_USEC = int(MyConfigReader.cfg_dict["RTSP"]["MUXER_BATCH_TIMEOUT_USEC"])
TILED_OUTPUT_WIDTH = int(MyConfigReader.cfg_dict["RTSP"]["TILED_OUTPUT_WIDTH"])
TILED_OUTPUT_HEIGHT = int(MyConfigReader.cfg_dict["RTSP"]["TILED_OUTPUT_HEIGHT"])
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = int(MyConfigReader.cfg_dict["OSD"]["process-mode"])
OSD_DISPLAY_TEXT = int(MyConfigReader.cfg_dict["OSD"]["display-text"])
GPU_ID = int(MyConfigReader.cfg_dict["GPU"]["gpu_id"])


def get_current_timestamp_millis():
    timestamp = time.time()
    timestamp_millis = int(timestamp * 1000)
    dt_object = datetime.fromtimestamp(timestamp)
    standard_time_format = dt_object.strftime('%Y-%m-%d_%H-%M-%S.') + str(timestamp_millis % 1000).zfill(3)
    return standard_time_format



class StreamCapture:

    def __init__(self, pgie, config, disable_probe, overtime_dict):
        super().__init__()
        self.pipeline = None
        self.framerate = int(MyConfigReader.cfg_dict['nvr']['fps'])
        self.running = False
        self.frame_interval = 1.0 / self.framerate
        self.last_frame_time_dict = {}
        self.reconnect_attempts = 5  # 最大重连次数
        self.reconnect_interval = 5  # 重连间隔(秒)
        self.thread = None
        self.status = None
        self.bus = None
        self.update_time = time.time()
        self.last_alive_time_dict = {}
        self.over_time_dict = overtime_dict
        self.dict_lock = threading.Lock()
        self.stream_paths = None
        self.stream_path_code = None

        self.requested_pgie = pgie
        self.disable_probe = disable_probe
        self.config = config
        self.number_sources = None
        self.frame_dict = {}
        self.source_bins = {}
        self.lock = threading.Lock()
        # self.kafka_producer = KafkaProducer({
        #     'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
        # }, retry_limit=5, retry_interval=3)

        # self.kafka_producer = Producer({
        #     'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
        #     # Kafka 服务器地址
        #     'client.id': 'dooropen-producer'  # 客户端 ID
        # })

        self.x_minutes = 10
        self.max_dropouts = 10
        self.dropout_history = defaultdict(list)
        self.last_alarm_dict = {}
        self.platform_info = PlatformInfo()

    def open(self, stream_paths_dict):
        global  perf_data
        self.stream_path_code = list(stream_paths_dict.keys())
        self.stream_paths = list(stream_paths_dict.values())
        self.number_sources = len(self.stream_paths)
        perf_data = PERF_DATA(self.number_sources)
        Gst.init(None)
        if not self.create_pipeline():
            MyLogger.error('Failed to create Pipline.')
            self.status = False
            return False

        # create an event loop and feed gstreamer bus mesages to it
        self.loop = GLib.MainLoop()
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", bus_call, self.loop)
        # pgie_src_pad = self.pgie.get_static_pad("src")
        pgie_src_pad = self.tiler.get_static_pad("sink")
        if not pgie_src_pad:
            MyLogger.error(" Unable to get src pad")
        else:
            if not self.disable_probe:
                pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_src_pad_buffer_probe, 0)
                # perf callback function to print fps every 5 sec
                GLib.timeout_add(5000, perf_data.perf_print_callback)

        # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
        # To enable component level latency measurement, please set environment variable
        # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
        if environ.get('NVDS_ENABLE_LATENCY_MEASUREMENT') == '1':
            print(
                "Pipeline Latency Measurement enabled!\nPlease set env var NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 for Component Latency Measurement")
            global measure_latency
            measure_latency = True

        # List the sources
        MyLogger.info("Now playing...")
        for i, source in enumerate(self.stream_paths):
            MyLogger.info(f"{i}: {source}")

        MyLogger.info("Starting pipeline")
        # start play back and listed to events
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            self.loop.run()
        except:
            pass
        # cleanup
        print("Exiting app\n")
        self.pipeline.set_state(Gst.State.NULL)

        self.running = True
        self.status = True

        return True

    def create_pipeline(self):
        MyLogger.info("Creating Pipeline ")
        self.pipeline = Gst.Pipeline()
        self.is_live = False
        if not self.pipeline:
            MyLogger.error(" Unable to create Pipeline ")

        MyLogger.info("Creating nvmultiurisrcbin  ")
        # Create nvstreammux instance to form batches from one or more sources.
        self.nvmultiurisrcbin =  Gst.ElementFactory.make("nvmultiurisrcbin", "multiurisrcbin")
        if not self.nvmultiurisrcbin:
            MyLogger.error("Unable to create nvmultiurisrcbin")
        
        self.pipeline.add(self.nvmultiurisrcbin)
        print("success")
        # Set nvmultiurisrcbin properties to mimic the CLI usage
        self.nvmultiurisrcbin.set_property("uri-list", "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4")
        self.nvmultiurisrcbin.set_property("width", 1920)
        self.nvmultiurisrcbin.set_property("height", 1080)
        self.nvmultiurisrcbin.set_property("live-source", 1)
        self.nvmultiurisrcbin.set_property("batched-push-timeout", 33333)
        self.nvmultiurisrcbin.set_property("drop-pipeline-eos", 1)
    # multiurisrcbin.set_property("max-batch-size", 10)
    # Possibly set batch-push-timeout or other properties if you want to match your CLI exactly:
        self.queue1 = Gst.ElementFactory.make("queue", "queue1")
        self.queue2 = Gst.ElementFactory.make("queue", "queue2")
        self.queue3 = Gst.ElementFactory.make("queue", "queue3")
        self.queue4 = Gst.ElementFactory.make("queue", "queue4")
        self.queue5 = Gst.ElementFactory.make("queue", "queue5")
        self.queue6 = Gst.ElementFactory.make("queue", "queue6")
        self.queue7 = Gst.ElementFactory.make("queue", "queue7")
        self.queue8 = Gst.ElementFactory.make("queue", "queue8")
        self.pipeline.add(self.queue1)
        self.pipeline.add(self.queue2)
        self.pipeline.add(self.queue3)
        self.pipeline.add(self.queue4)
        self.pipeline.add(self.queue5)
        self.pipeline.add(self.queue6)
        self.pipeline.add(self.queue7)
        self.pipeline.add(self.queue8)

        self.nvdslogger = None

        MyLogger.info("Creating preprocess ")
        self.preprocess = Gst.ElementFactory.make("nvdspreprocess", "preprocess-plugin")

        if not self.preprocess:
            MyLogger.error(f" Unable to create preprocess :  {self.preprocess}")
            return False
        self.preprocess.set_property("gpu_id", GPU_ID)
        self.preprocess.set_property("config-file", "config_preprocess.txt")

        MyLogger.info("Creating Pgie")
        if self.requested_pgie != None and (self.requested_pgie == 'nvinferserver' or self.requested_pgie == 'nvinferserver-grpc'):
            self.pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
        elif self.requested_pgie != None and self.requested_pgie == 'nvinfer':
            self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
        else:
            self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

        if not self.pgie:
            MyLogger.error(f" Unable to create pgie :  {self.requested_pgie}")
            return False

        if self.disable_probe:
            # Use nvdslogger for perf measurement instead of probe function
            MyLogger.info("Creating nvdslogger")
            self.nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
            self.nvdslogger.set_property("gpu_id", GPU_ID)

        MyLogger.info("Creating tiler")
        self.tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
        self.tiler.set_property("gpu_id", GPU_ID)
        if not self.tiler:
            MyLogger.error(" Unable to create tiler")
            return False

        MyLogger.info("Creating nvvidconv")
        self.nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
        self.nvvidconv.set_property("gpu_id", GPU_ID)
        if not self.nvvidconv:
            MyLogger.error(" Unable to create nvvidconv")
            return False

        MyLogger.info("Creating nvosd")
        self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
        if not self.nvosd:
            MyLogger.error(" Unable to create nvosd \n")
            return False
        self.nvosd.set_property("gpu_id", GPU_ID)
        self.nvosd.set_property('process-mode', OSD_PROCESS_MODE)
        self.nvosd.set_property('display-text', OSD_DISPLAY_TEXT)
        self.nvosd.set_property('display-clock', 1)  # 启用时钟显示
        self.nvosd.set_property('clock-font', 'Arial')  # 设置时钟字体
        self.nvosd.set_property('clock-font-size', 20)  # 设置时钟字体大小
        self.nvosd.set_property('clock-color', 0xff0000ff)  # 设置时钟颜色(红色,alpha=1)
        self.nvosd.set_property('x-clock-offset', 100)  # 设置时钟水平偏移量
        self.nvosd.set_property('y-clock-offset', 50)  # 设置时钟垂直偏移量
        # if file_loop:
        #     if self.platform_info.is_integrated_gpu():
        #         # Set nvbuf-memory-type=4 for integrated gpu for file-loop (nvurisrcbin case)
        #         self.streammux.set_property('nvbuf-memory-type', 4)
        #     else:
        #         # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
        #         self.streammux.set_property('nvbuf-memory-type', 2)

        if not int(MyConfigReader.cfg_dict["nvr"]["show"]):
            MyLogger.info("Creating Fakesink")
            self.sink = Gst.ElementFactory.make("fakesink", "fakesink")
            self.sink.set_property('enable-last-sample', 0)
            self.sink.set_property('sync', 0)
        else:
            if self.platform_info.is_integrated_gpu():
                MyLogger.info("Creating nv3dsink")
                self.sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
                if not self.sink:
                    MyLogger.error(" Unable to create nv3dsink")

                self.sink.set_property("gpu_id", GPU_ID)
            else:
                if self.platform_info.is_platform_aarch64():
                    MyLogger.info("Creating nv3dsink")
                    self.sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
                else:
                    MyLogger.info("Creating EGLSink")
                    self.sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
                    self.sink.set_property("gpu_id", GPU_ID)
                if not self.sink:
                    MyLogger.error(" Unable to create egl sink")

        self.converter = Gst.ElementFactory.make("nvvideoconvert", f"converter-2RBGA")
        self.converter.set_property("gpu_id", GPU_ID)
        self.capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter-2RGBA")
        caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
        # self.converter.set_property("nvbuf-memory-type", 0)
        mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
        self.converter.set_property("nvbuf-memory-type", mem_type)
        self.tiler.set_property("nvbuf-memory-type", mem_type)
        self.capsfilter.set_property("caps", caps)



        if not self.sink:
            MyLogger.error(" Unable to create sink element")


        if self.requested_pgie == "nvinferserver" and self.config != None:
            self.pgie.set_property('config-file-path', self.config)
        elif self.requested_pgie == "nvinferserver-grpc" and self.config != None:
            self.pgie.set_property('config-file-path', self.config)
        elif self.requested_pgie == "nvinfer" and self.config != None:
            self.pgie.set_property('config-file-path', self.config)
        else:
            self.pgie.set_property('config-file-path', self.config)
        pgie_batch_size = self.pgie.get_property("batch-size")
        if (pgie_batch_size != self.number_sources):
            MyLogger.warning(
                f"WARNING: Overriding infer-config batch-size{pgie_batch_size}with number of sources{self.number_sources}")
            self.pgie.set_property("batch-size", self.number_sources)

        tiler_rows = int(math.sqrt(self.number_sources))
        tiler_columns = int(math.ceil((1.0 * self.number_sources) / tiler_rows))
        self.tiler.set_property("rows", tiler_rows)
        self.tiler.set_property("columns", tiler_columns)
        self.tiler.set_property("width", TILED_OUTPUT_WIDTH)
        self.tiler.set_property("height", TILED_OUTPUT_HEIGHT)
        if self.platform_info.is_integrated_gpu():
            self.tiler.set_property("compute-hw", 2)
        else:
            self.tiler.set_property("compute-hw", 1)
        self.sink.set_property("qos", 0)

        MyLogger.info("Adding elements to Pipeline")
        self.pipeline.add(self.preprocess)
        self.pipeline.add(self.pgie)
        self.pipeline.add(self.converter)
        self.pipeline.add(self.capsfilter)
        if self.nvdslogger:
            self.pipeline.add(self.nvdslogger)
        self.pipeline.add(self.tiler)
        self.pipeline.add(self.nvvidconv)
        self.pipeline.add(self.nvosd)
        self.pipeline.add(self.sink)

        MyLogger.info("Linking elements in the Pipeline")
        self.nvmultiurisrcbin.link(self.queue1)
        self.queue1.link(self.preprocess)
        self.preprocess.link(self.queue2)
        self.queue2.link(self.pgie)
        self.pgie.link(self.queue3)
        self.queue3.link(self.converter)
        self.converter.link(self.queue4)
        self.queue4.link(self.capsfilter)
        self.capsfilter.link(self.queue5)


        if self.nvdslogger:
            self.queue5.link(self.nvdslogger)
            self.nvdslogger.link(self.tiler)
        else:
            self.queue5.link(self.tiler)
        self.tiler.link(self.queue6)
        self.queue6.link(self.nvvidconv)
        self.nvvidconv.link(self.queue7)
        self.queue7.link(self.nvosd)
        self.nvosd.link(self.queue8)
        self.queue8.link(self.sink)
        return True







    def create_source_bin(self, index, uri):
        MyLogger.info("Creating source bin")
        if not self.check_uri_valid(uri):
            MyLogger.error(f"URI is invalid or unreachable: {uri}", )
            return None


        # Create a source GstBin to abstract this bin's content from the rest of the
        # pipeline
        bin_name = "source-bin-%02d" % index
        MyLogger.info(bin_name)
        nbin = Gst.Bin.new(bin_name)
        if not nbin:
            MyLogger.error(" Unable to create source bin")

        # Source element for reading from the uri.
        # We will use decodebin and let it figure out the container format of the
        # stream and the codec and plug the appropriate demux and decode plugins.
        if file_loop:
            # use nvurisrcbin to enable file-loop
            uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
            uri_decode_bin.set_property("file-loop", 1)
            uri_decode_bin.set_property("cudadec-memtype", 0)
            MyLogger.info("nvurisrcbin")
        else:
            uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
            MyLogger.info("uridecodebin")
        if not uri_decode_bin:
            MyLogger.error(" Unable to create uri decode bin")
        # We set the input uri to the source element
        uri_decode_bin.set_property("uri", uri)
        # Connect to the "pad-added" signal of the decodebin which generates a
        # callback once a new pad for raw data has beed created by the decodebin
        uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
        self.source_bins[index] = nbin
        uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)

        # We need to create a ghost pad for the source bin which will act as a proxy
        # for the video decoder src pad. The ghost pad will not have a target right
        # now. Once the decode bin creates the video decoder and generates the
        # cb_newpad callback, we will set the ghost pad target to the video decoder
        # src pad.
        Gst.Bin.add(nbin, uri_decode_bin)
        bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
        if not bin_pad:
            MyLogger.error(" Failed to add ghost pad in source bin ")
            return None
        return nbin

    def cb_newpad(self, decodebin, decoder_src_pad, data):
        global MUXER_OUTPUT_WIDTH
        global MUXER_OUTPUT_HEIGHT
        MyLogger.info("In cb_newpad")
        caps = decoder_src_pad.get_current_caps()
        if not caps:
            caps = decoder_src_pad.query_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        source_bin = data
        features = caps.get_features(0)

        # Need to check if the pad created by the decodebin is for video and not
        # audio.
        MyLogger.info(f"gstname={gstname}")
        if (gstname.find("video") != -1):
            # Link the decodebin pad only if decodebin has picked nvidia
            # decoder plugin nvdec_*. We do this by checking if the pad caps contain
            # NVMM memory features.
            MyLogger.info(f"features={features}")
            MUXER_OUTPUT_WIDTH = gststruct.get_value("width")
            MUXER_OUTPUT_HEIGHT = gststruct.get_value("height")
            if features.contains("memory:NVMM"):
                # Get the source bin ghost pad
                bin_ghost_pad = source_bin.get_static_pad("src")
                if not bin_ghost_pad.set_target(decoder_src_pad):
                    MyLogger.error("Failed to link decoder src pad to source bin ghost pad")
            else:
                MyLogger.error(" Error: Decodebin did not pick nvidia decoder plugin.")

    def decodebin_child_added(self, child_proxy, Object, name, user_data):
        MyLogger.info(f"Decodebin child added:{name}")
        if (name.find("decodebin") != -1):
            Object.connect("child-added", self.decodebin_child_added, user_data)

        if "nvv" in name.lower():  # 匹配 nvv4l2decoder, nvv4l2h264dec 等
            if Object.find_property("gpu_id") is not None:
                Object.set_property("gpu_id", GPU_ID)  # 与下游组件(如 nvstreammux)一致
                # MyLogger.info(f"设置 {name} 的 GPU_ID 为 1")

        if "source" in name:
            source_element = child_proxy.get_by_name("source")
            if source_element.find_property('drop-on-latency') != None:
                Object.set_property("drop-on-latency", True)

    def check_uri_valid(self, uri, timeout=5):
        parsed = urlparse(uri)

        if parsed.scheme == "file":
            file_path = parsed.path

            if os.name == "nt" and len(file_path) >= 3 and file_path[0] == "/":
                file_path = file_path[1:]

            return os.path.exists(file_path)


        elif parsed.scheme == "rtsp":
            if self.check_uri_playable(uri):
                return True
            else:
                return False

        else:
            MyLogger.warning(f"Unsupported URI scheme: {parsed.scheme}, skipping validation")
            return True


    def check_uri_playable(self, uri, timeout=5):
        pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
        pipeline.set_state(Gst.State.PLAYING)
        time.sleep(0.1)
        start_time = time.time()
        while time.time() - start_time < timeout:
            state = pipeline.get_state(timeout=1000000)[1]
            if state == Gst.State.PLAYING:
                pipeline.set_state(Gst.State.NULL)
                return True
        pipeline.set_state(Gst.State.NULL)
        return False

    def pgie_src_pad_buffer_probe(self, pad, info, u_data):
        frame_number = 0
        num_rects = 0
        got_fps = False
        current_time = time.time()
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            MyLogger.error("Unable to get GstBuffer ")
            return
        # Retrieve batch metadata from the gst_buffer
        # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
        # C address of gst_buffer as input, which is obtained with hash(gst_buffer)

        # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
        # To enable component level latency measurement, please set environment variable
        # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
        global measure_latency
        if measure_latency:
            num_sources_in_batch = pyds.nvds_measure_buffer_latency(hash(gst_buffer))
            if num_sources_in_batch == 0:
                MyLogger.error("Unable to get number of sources in GstBuffer for latency measurement")

        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame is not None:
            n_frame1_box = None
            try:
                # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
                # The casting is done by pyds.NvDsFrameMeta.cast()
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)

                if current_time - self.last_alive_time_dict.setdefault(frame_meta.pad_index, 0) > 60 * 5:
                    MyLogger.info(
                        f"source [{frame_meta.pad_index}]:{self.stream_path_code[frame_meta.pad_index]} is alive...")
                    self.last_alive_time_dict[frame_meta.pad_index] = current_time
                self.last_frame_time_dict[frame_meta.pad_index] = current_time

            except StopIteration:
                break

            frame_number = frame_meta.frame_num
            l_obj = frame_meta.obj_meta_list
            num_rects = frame_meta.num_obj_meta

            try:
                n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
                if n_frame is None:
                    MyLogger.error("Failed to get surface from buffer")
                    break

                n_frame1 = np.array(n_frame, copy=True, order='C')
            except Exception as e:
                MyLogger.error(f"Error getting frame surface: {e}")
                break

            detected_objects = []

            while l_obj is not None:
                try:
                    # Casting l_obj.data to pyds.NvDsObjectMeta
                    obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
                    obj_label = pyds.get_string(obj_meta.text_params.display_text)
                    obj_confidence = obj_meta.confidence
                    obj_bbox = (obj_meta.rect_params.left, obj_meta.rect_params.top,
                                obj_meta.rect_params.width, obj_meta.rect_params.height)
                    obj_meta.text_params.display_text = f"{obj_label} {obj_confidence:.2f}"
                    # print(obj_meta.class_id)
                    detected_objects.append({
                        "label": obj_label,
                        "confidence": obj_confidence,
                        "bbox": obj_bbox,
                        "pad_index": frame_meta.pad_index
                    })
                except StopIteration:
                    break

                try:
                    if n_frame1_box is None:
                        n_frame1_box = StreamCapture.draw_bounding_boxes(copy.deepcopy(n_frame1), obj_meta,
                                                                         obj_meta.confidence)
                    else:
                        n_frame1_box = StreamCapture.draw_bounding_boxes(n_frame1_box, obj_meta,
                                                                         obj_meta.confidence)
                except Exception as e:
                    MyLogger.error(f"Error drawing bounding boxes: {e}")

                try:
                    l_obj = l_obj.next
                except StopIteration:
                    break

            try:
                # convert the array into cv2 default color format
                n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2RGB)
            except Exception as e:
                MyLogger.error(f"Error converting color format: {e}")
                break

            try:
                roi = [tuple(map(int, point.split(','))) for point in
                       MyConfigReader.cfg_dict["ROI"][f"source{frame_meta.pad_index}"].split(';')]
                display_meta = self.draw_polygon_roi(batch_meta, frame_meta, roi)

                # 将显示元数据添加到当前帧
                pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
            except Exception as e:
                MyLogger.error(f"Error drawing ROI: {e}")

            try:
                with self.lock:
                    self.frame_dict.setdefault(str(frame_meta.pad_index), {"frame": None, "time": None})
                    self.frame_dict[str(frame_meta.pad_index)]["frame"] = n_frame1
                    self.frame_dict[str(frame_meta.pad_index)]["time"] = get_current_timestamp_millis()
            except Exception as e:
                MyLogger.error(f"Error updating frame dictionary: {e}")

            try:
                if num_rects != 0:
                    self.detection_algorithm(detected_objects, n_frame1, n_frame1_box, roi)
                    if not silent:
                        MyLogger.info(
                            f"batchid:{frame_meta.pad_index},Frame Number={frame_number},Number of Objects={num_rects}")
            except Exception as e:
                MyLogger.error(f"Error in detection algorithm: {e}")

            # Update frame rate through this probe
            stream_index = "stream{0}".format(frame_meta.pad_index)
            global perf_data
            perf_data.update_fps(stream_index)

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

        return Gst.PadProbeReturn.OK

    def draw_polygon_roi(self, batch_meta, frame_meta, roi_points, border_width=3, border_color=(0.0, 0.0, 1.0, 1.0)):
        """
        绘制由4个点连线组成的多边形区域。

        参数:
            batch_meta: 批处理元数据对象。
            frame_meta: 帧元数据对象。
            roi_points: 包含4个点坐标的列表或元组,格式为 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]。
            border_width: 边框宽度,默认为3。
            border_color: 边框颜色,格式为 (R, G, B, A),默认为蓝色 (0.0, 0.0, 1.0, 1.0)。

        返回:
            display_meta: 配置好的显示元数据对象。
        """
        # 检查参数有效性
        if not batch_meta:
            MyLogger.error("batch_meta is None in draw_polygon_roi")
            return None

        if not frame_meta:
            MyLogger.error("frame_meta is None in draw_polygon_roi")
            return None

        if not roi_points or len(roi_points) < 4:
            MyLogger.error(f"Invalid roi_points in draw_polygon_roi: {roi_points}")
            return None

        try:
            # 获取显示元数据
            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            if not display_meta:
                MyLogger.error("Failed to acquire display meta from pool")
                return None

            display_meta.num_lines = 4  # 设置绘制4条线(4个点连接成一个多边形)
            display_meta.num_rects = 0  # 确保没有绘制矩形

            # 配置多边形的边框(通过绘制4条线)
            for i in range(4):
                try:
                    line = display_meta.line_params[i]
                    # 设置线的起点和终点
                    line.x1 = int(roi_points[i][0])  # 第 i 个点的 x 坐标
                    line.y1 = int(roi_points[i][1])  # 第 i 个点的 y 坐标
                    line.x2 = int(roi_points[(i + 1) % 4][0])  # 下一个点的 x 坐标
                    line.y2 = int(roi_points[(i + 1) % 4][1])  # 下一个点的 y 坐标
                    # 设置线的宽度和颜色
                    line.line_width = border_width
                    line.line_color.red = border_color[0]
                    line.line_color.green = border_color[1]
                    line.line_color.blue = border_color[2]
                    line.line_color.alpha = border_color[3]
                except Exception as e:
                    MyLogger.error(f"Error configuring line {i} in draw_polygon_roi: {e}")
                    # 继续处理其他线条,不中断整个过程

            return display_meta
        except Exception as e:
            MyLogger.error(f"Error in draw_polygon_roi: {e}")
            return None

    def check_bottom_edge_in_polygon(self, bottom_edge, polygon):
        """
        判断底边是否在封闭线框内。

        :param bottom_edge: 底边的两个点 [(x1, y1), (x2, y2)]
        :param polygon: 多边形顶点列表 [(x1, y1), (x2, y2), ...]
        :return: 如果有点在多边形内返回True,否则返回False
        """
        # 检查参数有效性
        if not bottom_edge or len(bottom_edge) < 2:
            MyLogger.error(f"Invalid bottom_edge in check_bottom_edge_in_polygon: {bottom_edge}")
            return False

        if not polygon or len(polygon) < 3:
            MyLogger.error(f"Invalid polygon in check_bottom_edge_in_polygon: {polygon}")
            return False

        try:
            for point in bottom_edge:
                if not point or len(point) < 2:
                    continue

                if self.is_point_in_polygon(point, polygon):
                    return True
            return False
        except Exception as e:
            MyLogger.error(f"Error in check_bottom_edge_in_polygon: {e}")
            return False

    def is_point_in_polygon(self, point, polygon):
        """
        判断点是否在多边形内。

        :param point: 要判断的点 (x, y)
        :param polygon: 多边形顶点的列表 [(x1, y1), (x2, y2), ...]
        :return: 是否在多边形内
        """
        # 检查参数有效性
        if not point or len(point) < 2:
            MyLogger.error(f"Invalid point in is_point_in_polygon: {point}")
            return False

        if not polygon or len(polygon) < 3:
            MyLogger.error(f"Invalid polygon in is_point_in_polygon: {polygon}")
            return False

        try:
            # 转换为numpy数组并进行点多边形测试
            polygon_array = np.array(polygon)
            result = cv2.pointPolygonTest(polygon_array, point, False)
            return result >= 0
        except Exception as e:
            MyLogger.error(f"Error in is_point_in_polygon: {e}")
            return False

    def np_array_to_base64(self, image):
        """
        将numpy数组图像转换为base64编码字符串。

        :param image: numpy数组格式的图像
        :return: base64编码的字符串
        """
        if image is None:
            MyLogger.error("Image is None in np_array_to_base64")
            return ""

        try:
            img_array = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # RGB2BGR,用于cv2编码
            encode_result = cv2.imencode(".jpg", img_array)

            if not encode_result[0]:
                MyLogger.error("Failed to encode image in np_array_to_base64")
                return ""

            encode_image = encode_result[1]
            byte_data = encode_image.tobytes()  # 转换为二进制
            base64_str = base64.b64encode(byte_data).decode("ascii")  # 转换为base64
            return base64_str
        except Exception as e:
            MyLogger.error(f"Error in np_array_to_base64: {e}")
            return ""

    def det_point_in_roi(self, point, roi):
        """
        判断点是否在矩形ROI内。

        :param point: 点坐标 (x, y)
        :param roi: 矩形区域 [x, y, width, height]
        :return: 点是否在ROI内
        """
        if not point or len(point) < 2:
            MyLogger.error(f"Invalid point in det_point_in_roi: {point}")
            return False

        if not roi or len(roi) < 4:
            MyLogger.error(f"Invalid roi in det_point_in_roi: {roi}")
            return False

        try:
            if point[0] < roi[0] or point[0] > roi[0] + roi[2]:
                return False
            if point[1] < roi[1] or point[1] > roi[1] + roi[3]:
                return False
            else:
                return True
        except Exception as e:
            MyLogger.error(f"Error in det_point_in_roi: {e}")
            return False

    def determine_safety_status(self, gap_width, container_width):
        if gap_width < container_width * float(MyConfigReader.cfg_dict["Detect"]["Threshold"]):
            return True  # Safe: green with transparency
        else:
            return False  # Unsafe: red with transparency

    def detection_algorithm(self, detected_obj, frame_src, frame_bbox, roi):
        """
        检测算法,用于分析检测到的对象并触发报警。

        参数:
            detected_obj: 检测到的对象列表
            frame_src: 原始帧图像
            frame_bbox: 带有边界框的帧图像
            roi: 感兴趣区域
        """
        # 检查参数有效性
        if not detected_obj:
            return

        if frame_bbox is None:
            MyLogger.warning("frame_bbox is None in detection_algorithm")
            return

        if roi is None:
            MyLogger.warning("roi is None in detection_algorithm")
            return

        try:
            camera_index_code = ""
            tag = False

            for obj in detected_obj:
                if not isinstance(obj, dict):
                    continue

                if obj.get("label", "") == "people":
                    # 检查bbox是否有效
                    bbox = obj.get("bbox")
                    if not bbox or len(bbox) < 4:
                        continue

                    try:
                        center_pointer = (obj["bbox"][0] + (obj["bbox"][2] / 2), obj["bbox"][1] + (obj["bbox"][3] / 2))

                        # 检查是否在ROI内
                        bottom_edge = [(obj["bbox"][0], obj["bbox"][1] + obj["bbox"][3]),
                                       (obj["bbox"][0] + obj["bbox"][2], obj["bbox"][1] + obj["bbox"][3])]

                        if self.check_bottom_edge_in_polygon(bottom_edge, roi):
                            # 可以在这里添加报警逻辑
                            pass
                            # MyLogger.info("detect people ,alarm")
                    except Exception as e:
                        MyLogger.error(f"Error processing object in detection_algorithm: {e}")
                        continue

            # 如果需要触发报警
            if tag:
                try:
                    resized_img = cv2.resize(frame_bbox, (1920, 1080), interpolation=cv2.INTER_LINEAR)
                    msg = {
                        "System": "StreamingMedia",
                        "CMD": "Alarm",
                        "Type": "CJ12-001",
                        "Pic": self.np_array_to_base64(resized_img),
                        "Time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                        "Data": {
                            "Camera": camera_index_code,
                            "Message": "msg"
                        },
                        "Code": 200
                    }
                    # Kafka发送代码已注释
                    # self.kafka_producer.send(
                    #     topic=f"{MyConfigReader.cfg_dict['Kafka']['topic']}",
                    #     value=f"{json.dumps(msg)}")

                    # 异步发送消息
                    # self.kafka_producer.produce(f"{MyConfigReader.cfg_dict['Kafka']['topic']}", json.dumps(msg).encode('utf-8'), callback=self.delivery_report)
                    # 等待所有消息发送完成
                    # self.kafka_producer.flush()
                    # MyLogger.info("发送报警信息至kafka队列")
                except Exception as e:
                    MyLogger.error(f"Error sending alarm in detection_algorithm: {e}")
        except Exception as e:
            MyLogger.error(f"Error in detection_algorithm: {e}")
            return

    def delivery_report(self, err, msg):
        if err is not None:
            MyLogger.error(f'kafka send error: {err}')
        else:
            MyLogger.info(f'{msg.topic()} [{msg.partition()}] @ {msg.offset()}')

    @staticmethod
    def draw_bounding_boxes(image, obj_meta, confidence):
        image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
        confidence = '{0:.2f}'.format(confidence)
        rect_params = obj_meta.rect_params
        top = int(rect_params.top)
        left = int(rect_params.left)
        width = int(rect_params.width)
        height = int(rect_params.height)
        obj_name = pyds.get_string(obj_meta.text_params.display_text)
        # image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)
        color = (0, 0, 255, 0)
        w_percents = int(width * 0.05) if width > 100 else int(width * 0.1)
        h_percents = int(height * 0.05) if height > 100 else int(height * 0.1)
        linetop_c1 = (left + w_percents, top)
        linetop_c2 = (left + width - w_percents, top)
        image = cv2.line(image, linetop_c1, linetop_c2, color, 6)
        linebot_c1 = (left + w_percents, top + height)
        linebot_c2 = (left + width - w_percents, top + height)
        image = cv2.line(image, linebot_c1, linebot_c2, color, 6)
        lineleft_c1 = (left, top + h_percents)
        lineleft_c2 = (left, top + height - h_percents)
        image = cv2.line(image, lineleft_c1, lineleft_c2, color, 6)
        lineright_c1 = (left + width, top + h_percents)
        lineright_c2 = (left + width, top + height - h_percents)
        image = cv2.line(image, lineright_c1, lineright_c2, color, 6)
        # Note that on some systems cv2.putText erroneously draws horizontal lines across the image
        image = cv2.putText(image, obj_name, (left - 10, top - 10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            1,
                            (0, 0, 255, 0), 2)
        return image


if __name__ == '__main__':
    a = StreamCapture(None, MyConfigReader.cfg_dict['Infer']['path'], False, {})
    a.open({"asdasdasdas": "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"})

Is there any clue shows the issue is related to nvmultiurisrcbin? Can you run the original DeepStream sample correctly?

When I used the nvstreammux plugin instead of nvmultiurisrcbin, everything worked and the sink was updated in real time:


Here is my code using streammux:

import sys
import threading
import copy
import json

from numpy.f2py.crackfortran import gotnextfile

sys.path.append('../')
from pathlib import Path
import gi
import numpy as np
import configparser
import argparse
import os
import base64
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from urllib.parse import urlparse
from ctypes import *
import time
import sys
import math
import platform
from collections import defaultdict
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from my_log import MyLogger
from config_reader import MyConfigReader
from datetime import datetime
from KafkaProducer import KafkaProducer
from confluent_kafka import Producer, KafkaException
from confluent_kafka import KafkaError
from os import environ
import pyds
import cv2


#os.environ['GST_PLUGIN_PATH'] = '/opt/nvidia/deepstream/deepstream/lib/gst-plugins:' + os.environ.get('GST_PLUGIN_PATH', '')

no_display = False
silent = int(MyConfigReader.cfg_dict["RTSP"]["silent"])
file_loop = int(MyConfigReader.cfg_dict["RTSP"]["file_loop"])  # 设置cpu解码还是gpu解码
perf_data = None
measure_latency = False


MUXER_OUTPUT_WIDTH = int(MyConfigReader.cfg_dict["RTSP"]["output_width"])
MUXER_OUTPUT_HEIGHT = int(MyConfigReader.cfg_dict["RTSP"]["output_height"])
MUXER_BATCH_TIMEOUT_USEC = int(MyConfigReader.cfg_dict["RTSP"]["MUXER_BATCH_TIMEOUT_USEC"])
TILED_OUTPUT_WIDTH = int(MyConfigReader.cfg_dict["RTSP"]["TILED_OUTPUT_WIDTH"])
TILED_OUTPUT_HEIGHT = int(MyConfigReader.cfg_dict["RTSP"]["TILED_OUTPUT_HEIGHT"])
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = int(MyConfigReader.cfg_dict["OSD"]["process-mode"])
OSD_DISPLAY_TEXT = int(MyConfigReader.cfg_dict["OSD"]["display-text"])
GPU_ID = int(MyConfigReader.cfg_dict["GPU"]["gpu_id"])


def get_current_timestamp_millis():
    timestamp = time.time()
    timestamp_millis = int(timestamp * 1000)
    dt_object = datetime.fromtimestamp(timestamp)
    standard_time_format = dt_object.strftime('%Y-%m-%d_%H-%M-%S.') + str(timestamp_millis % 1000).zfill(3)
    return standard_time_format



class StreamCapture:

    def __init__(self, pgie, config, disable_probe, overtime_dict):
        super().__init__()
        self.pipeline = None
        self.framerate = int(MyConfigReader.cfg_dict['nvr']['fps'])
        self.running = False
        self.frame_interval = 1.0 / self.framerate
        self.last_frame_time_dict = {}
        self.reconnect_attempts = 5  # 最大重连次数
        self.reconnect_interval = 5  # 重连间隔(秒)
        self.thread = None
        self.status = None
        self.bus = None
        self.update_time = time.time()
        self.last_alive_time_dict = {}
        self.over_time_dict = overtime_dict
        self.dict_lock = threading.Lock()
        self.stream_paths = None
        self.stream_path_code = None

        self.requested_pgie = pgie
        self.disable_probe = disable_probe
        self.config = config
        self.number_sources = None
        self.frame_dict = {}
        self.source_bins = {}
        self.lock = threading.Lock()
        # self.kafka_producer = KafkaProducer({
        #     'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
        # }, retry_limit=5, retry_interval=3)

        # self.kafka_producer = Producer({
        #     'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
        #     # Kafka 服务器地址
        #     'client.id': 'dooropen-producer'  # 客户端 ID
        # })

        self.x_minutes = 10
        self.max_dropouts = 10
        self.dropout_history = defaultdict(list)
        self.last_alarm_dict = {}
        self.platform_info = PlatformInfo()

    def open(self, stream_paths_dict):
        global  perf_data
        self.stream_path_code = list(stream_paths_dict.keys())
        self.stream_paths = list(stream_paths_dict.values())
        self.number_sources = len(self.stream_paths)
        perf_data = PERF_DATA(self.number_sources)
        Gst.init(None)
        if not self.create_pipeline():
            MyLogger.error('Failed to create Pipline.')
            self.status = False
            return False

        # create an event loop and feed gstreamer bus mesages to it
        self.loop = GLib.MainLoop()
        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", bus_call, self.loop)
        # pgie_src_pad = self.pgie.get_static_pad("src")
        pgie_src_pad = self.tiler.get_static_pad("sink")
        if not pgie_src_pad:
            MyLogger.error(" Unable to get src pad")
        else:
            if not self.disable_probe:
                pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_src_pad_buffer_probe, 0)
                # perf callback function to print fps every 5 sec
                GLib.timeout_add(5000, perf_data.perf_print_callback)

        # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
        # To enable component level latency measurement, please set environment variable
        # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
        if environ.get('NVDS_ENABLE_LATENCY_MEASUREMENT') == '1':
            print(
                "Pipeline Latency Measurement enabled!\nPlease set env var NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 for Component Latency Measurement")
            global measure_latency
            measure_latency = True

        # List the sources
        MyLogger.info("Now playing...")
        for i, source in enumerate(self.stream_paths):
            MyLogger.info(f"{i}: {source}")

        MyLogger.info("Starting pipeline")
        # start play back and listed to events
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            self.loop.run()
        except:
            pass
        # cleanup
        print("Exiting app\n")
        self.pipeline.set_state(Gst.State.NULL)



        self.running = True
        # self.thread = threading.Thread(target=self._read_loop)
        # self.thread.daemon = True
        # self.thread.start()
        self.status = True

        return True

    def create_pipeline(self):
        MyLogger.info("Creating Pipeline ")
        self.pipeline = Gst.Pipeline()
        self.is_live = False
        if not self.pipeline:
            MyLogger.error(" Unable to create Pipeline ")

        MyLogger.info("Creating streamux  ")
        # Create nvstreammux instance to form batches from one or more sources.
        self.streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
        if not self.streammux:
            MyLogger.error(" Unable to create NvStreamMux ")
        self.streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
        self.streammux.set_property("gpu_id", GPU_ID)
        self.pipeline.add(self.streammux)
        valid_sources = []
        for i in range(self.number_sources):
            self.last_frame_time_dict[i] = time.time()
            MyLogger.info(f"Creating source_bin: {i}")
            uri_name = self.stream_paths[i]
            if uri_name.find("rtsp://") == 0:
                self.is_live = True
            source_bin = self.create_source_bin(i, uri_name)
            if source_bin is not None:
                # valid_sources.append(source_bin)
                # ½« source_bin Ìí¼Óµ½Ö÷ Pipeline ÖÐ
                valid_sources.append(source_bin)
                # self.pipeline.add(source_bin)
            else:
                MyLogger.error(f"Skipping invalid source: {uri_name}")
                continue

            # if not source_bin:
            #     sys.stderr.write("Unable to create source bin")
            self.pipeline.add(source_bin)
            padname = "sink_%u" % i
            sinkpad = self.streammux.request_pad_simple(padname)
            if not sinkpad:
                MyLogger.error("Unable to create sink pad bin")
            srcpad = source_bin.get_static_pad("src")
            if not srcpad:
                MyLogger.error("Unable to create src pad bin")
            srcpad.link(sinkpad)

        self.number_sources = len(valid_sources)
        # self.streammux.set_property("batch-size", self.number_sources)
        # self.pgie.set_property("batch-size", self.number_sources)
        self.queue1 = Gst.ElementFactory.make("queue", "queue1")
        self.queue2 = Gst.ElementFactory.make("queue", "queue2")
        self.queue3 = Gst.ElementFactory.make("queue", "queue3")
        self.queue4 = Gst.ElementFactory.make("queue", "queue4")
        self.queue5 = Gst.ElementFactory.make("queue", "queue5")
        self.queue6 = Gst.ElementFactory.make("queue", "queue6")
        self.queue7 = Gst.ElementFactory.make("queue", "queue7")
        self.queue8 = Gst.ElementFactory.make("queue", "queue8")
        self.pipeline.add(self.queue1)
        self.pipeline.add(self.queue2)
        self.pipeline.add(self.queue3)
        self.pipeline.add(self.queue4)
        self.pipeline.add(self.queue5)
        self.pipeline.add(self.queue6)
        self.pipeline.add(self.queue7)
        self.pipeline.add(self.queue8)

        self.nvdslogger = None

        MyLogger.info("Creating preprocess ")
        self.preprocess = Gst.ElementFactory.make("nvdspreprocess", "preprocess-plugin")

        if not self.preprocess:
            MyLogger.error(f" Unable to create preprocess :  {self.preprocess}")
            return False
        self.preprocess.set_property("gpu_id", GPU_ID)
        self.preprocess.set_property("config-file", "config_preprocess.txt")

        MyLogger.info("Creating Pgie")
        if self.requested_pgie != None and (self.requested_pgie == 'nvinferserver' or self.requested_pgie == 'nvinferserver-grpc'):
            self.pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
        elif self.requested_pgie != None and self.requested_pgie == 'nvinfer':
            self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
        else:
            self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

        if not self.pgie:
            MyLogger.error(f" Unable to create pgie :  {self.requested_pgie}")
            return False

        if self.disable_probe:
            # Use nvdslogger for perf measurement instead of probe function
            MyLogger.info("Creating nvdslogger")
            self.nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
            self.nvdslogger.set_property("gpu_id", GPU_ID)

        MyLogger.info("Creating tiler")
        self.tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
        self.tiler.set_property("gpu_id", GPU_ID)
        if not self.tiler:
            MyLogger.error(" Unable to create tiler")
            return False

        MyLogger.info("Creating nvvidconv")
        self.nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
        self.nvvidconv.set_property("gpu_id", GPU_ID)
        if not self.nvvidconv:
            MyLogger.error(" Unable to create nvvidconv")
            return False

        MyLogger.info("Creating nvosd")
        self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
        if not self.nvosd:
            MyLogger.error(" Unable to create nvosd \n")
            return False
        self.nvosd.set_property("gpu_id", GPU_ID)
        self.nvosd.set_property('process-mode', OSD_PROCESS_MODE)
        self.nvosd.set_property('display-text', OSD_DISPLAY_TEXT)
        self.nvosd.set_property('display-clock', 1)  # 启用时钟显示
        self.nvosd.set_property('clock-font', 'Arial')  # 设置时钟字体
        self.nvosd.set_property('clock-font-size', 20)  # 设置时钟字体大小
        self.nvosd.set_property('clock-color', 0xff0000ff)  # 设置时钟颜色(红色,alpha=1)
        self.nvosd.set_property('x-clock-offset', 100)  # 设置时钟水平偏移量
        self.nvosd.set_property('y-clock-offset', 50)  # 设置时钟垂直偏移量
        if file_loop:
            if self.platform_info.is_integrated_gpu():
                # Set nvbuf-memory-type=4 for integrated gpu for file-loop (nvurisrcbin case)
                self.streammux.set_property('nvbuf-memory-type', 4)
            else:
                # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
                self.streammux.set_property('nvbuf-memory-type', 2)

        if not int(MyConfigReader.cfg_dict["nvr"]["show"]):
            MyLogger.info("Creating Fakesink")
            self.sink = Gst.ElementFactory.make("fakesink", "fakesink")
            self.sink.set_property('enable-last-sample', 0)
            self.sink.set_property('sync', 0)
        else:
            if self.platform_info.is_integrated_gpu():
                MyLogger.info("Creating nv3dsink")
                self.sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
                if not self.sink:
                    MyLogger.error(" Unable to create nv3dsink")

                self.sink.set_property("gpu_id", GPU_ID)
            else:
                if self.platform_info.is_platform_aarch64():
                    MyLogger.info("Creating nv3dsink")
                    self.sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
                else:
                    MyLogger.info("Creating EGLSink")
                    self.sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
                    self.sink.set_property("gpu_id", GPU_ID)
                if not self.sink:
                    MyLogger.error(" Unable to create egl sink")

        self.converter = Gst.ElementFactory.make("nvvideoconvert", f"converter-2RBGA")
        self.converter.set_property("gpu_id", GPU_ID)
        self.capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter-2RGBA")
        caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
        # self.converter.set_property("nvbuf-memory-type", 0)
        mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
        self.converter.set_property("nvbuf-memory-type", mem_type)
        self.tiler.set_property("nvbuf-memory-type", mem_type)
        self.capsfilter.set_property("caps", caps)



        if not self.sink:
            MyLogger.error(" Unable to create sink element")

        if self.is_live:
            MyLogger.info("At least one of the sources is live")
            self.streammux.set_property('live-source', 1)

        self.streammux.set_property('width', MUXER_OUTPUT_WIDTH)
        self.streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
        self.streammux.set_property('batch-size', self.number_sources)
        if self.requested_pgie == "nvinferserver" and self.config != None:
            self.pgie.set_property('config-file-path', self.config)
        elif self.requested_pgie == "nvinferserver-grpc" and self.config != None:
            self.pgie.set_property('config-file-path', self.config)
        elif self.requested_pgie == "nvinfer" and self.config != None:
            self.pgie.set_property('config-file-path', self.config)
        else:
            self.pgie.set_property('config-file-path', self.config)
        pgie_batch_size = self.pgie.get_property("batch-size")
        if (pgie_batch_size != self.number_sources):
            MyLogger.warning(
                f"WARNING: Overriding infer-config batch-size{pgie_batch_size}with number of sources{self.number_sources}")
            self.pgie.set_property("batch-size", self.number_sources)

        tiler_rows = int(math.sqrt(self.number_sources))
        tiler_columns = int(math.ceil((1.0 * self.number_sources) / tiler_rows))
        self.tiler.set_property("rows", tiler_rows)
        self.tiler.set_property("columns", tiler_columns)
        self.tiler.set_property("width", TILED_OUTPUT_WIDTH)
        self.tiler.set_property("height", TILED_OUTPUT_HEIGHT)
        if self.platform_info.is_integrated_gpu():
            self.tiler.set_property("compute-hw", 2)
        else:
            self.tiler.set_property("compute-hw", 1)
        self.sink.set_property("qos", 0)

        MyLogger.info("Adding elements to Pipeline")
        self.pipeline.add(self.preprocess)
        self.pipeline.add(self.pgie)
        self.pipeline.add(self.converter)
        self.pipeline.add(self.capsfilter)
        if self.nvdslogger:
            self.pipeline.add(self.nvdslogger)
        self.pipeline.add(self.tiler)
        self.pipeline.add(self.nvvidconv)
        self.pipeline.add(self.nvosd)
        self.pipeline.add(self.sink)

        MyLogger.info("Linking elements in the Pipeline")
        self.streammux.link(self.queue1)
        self.queue1.link(self.preprocess)
        self.preprocess.link(self.queue2)
        self.queue2.link(self.pgie)
        self.pgie.link(self.queue3)
        self.queue3.link(self.converter)
        self.converter.link(self.queue4)
        self.queue4.link(self.capsfilter)
        self.capsfilter.link(self.queue5)


        if self.nvdslogger:
            self.queue5.link(self.nvdslogger)
            self.nvdslogger.link(self.tiler)
        else:
            self.queue5.link(self.tiler)
        self.tiler.link(self.queue6)
        self.queue6.link(self.nvvidconv)
        self.nvvidconv.link(self.queue7)
        self.queue7.link(self.nvosd)
        self.nvosd.link(self.queue8)
        self.queue8.link(self.sink)
        return True







    def create_source_bin(self, index, uri):
        MyLogger.info("Creating source bin")
        if not self.check_uri_valid(uri):
            MyLogger.error(f"URI is invalid or unreachable: {uri}", )
            return None


        # Create a source GstBin to abstract this bin's content from the rest of the
        # pipeline
        bin_name = "source-bin-%02d" % index
        MyLogger.info(bin_name)
        nbin = Gst.Bin.new(bin_name)
        if not nbin:
            MyLogger.error(" Unable to create source bin")

        # Source element for reading from the uri.
        # We will use decodebin and let it figure out the container format of the
        # stream and the codec and plug the appropriate demux and decode plugins.
        if file_loop:
            # use nvurisrcbin to enable file-loop
            uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
            uri_decode_bin.set_property("file-loop", 1)
            uri_decode_bin.set_property("cudadec-memtype", 0)
            MyLogger.info("nvurisrcbin")
        else:
            uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
            MyLogger.info("uridecodebin")
        if not uri_decode_bin:
            MyLogger.error(" Unable to create uri decode bin")
        # We set the input uri to the source element
        uri_decode_bin.set_property("uri", uri)
        # Connect to the "pad-added" signal of the decodebin which generates a
        # callback once a new pad for raw data has beed created by the decodebin
        uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
        self.source_bins[index] = nbin
        uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)

        # We need to create a ghost pad for the source bin which will act as a proxy
        # for the video decoder src pad. The ghost pad will not have a target right
        # now. Once the decode bin creates the video decoder and generates the
        # cb_newpad callback, we will set the ghost pad target to the video decoder
        # src pad.
        Gst.Bin.add(nbin, uri_decode_bin)
        bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
        if not bin_pad:
            MyLogger.error(" Failed to add ghost pad in source bin ")
            return None
        return nbin

    def cb_newpad(self, decodebin, decoder_src_pad, data):
        global MUXER_OUTPUT_WIDTH
        global MUXER_OUTPUT_HEIGHT
        MyLogger.info("In cb_newpad")
        caps = decoder_src_pad.get_current_caps()
        if not caps:
            caps = decoder_src_pad.query_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        source_bin = data
        features = caps.get_features(0)

        # Need to check if the pad created by the decodebin is for video and not
        # audio.
        MyLogger.info(f"gstname={gstname}")
        if (gstname.find("video") != -1):
            # Link the decodebin pad only if decodebin has picked nvidia
            # decoder plugin nvdec_*. We do this by checking if the pad caps contain
            # NVMM memory features.
            MyLogger.info(f"features={features}")
            MUXER_OUTPUT_WIDTH = gststruct.get_value("width")
            MUXER_OUTPUT_HEIGHT = gststruct.get_value("height")
            if not file_loop:
                self.streammux.set_property('width', MUXER_OUTPUT_WIDTH)
                self.streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
            if features.contains("memory:NVMM"):
                # Get the source bin ghost pad
                bin_ghost_pad = source_bin.get_static_pad("src")
                if not bin_ghost_pad.set_target(decoder_src_pad):
                    MyLogger.error("Failed to link decoder src pad to source bin ghost pad")
            else:
                MyLogger.error(" Error: Decodebin did not pick nvidia decoder plugin.")

    def decodebin_child_added(self, child_proxy, Object, name, user_data):
        MyLogger.info(f"Decodebin child added:{name}")
        if (name.find("decodebin") != -1):
            Object.connect("child-added", self.decodebin_child_added, user_data)

        if "nvv" in name.lower():  # 匹配 nvv4l2decoder, nvv4l2h264dec 等
            if Object.find_property("gpu_id") is not None:
                Object.set_property("gpu_id", GPU_ID)  # 与下游组件(如 nvstreammux)一致
                # MyLogger.info(f"设置 {name} 的 GPU_ID 为 1")

        if "source" in name:
            source_element = child_proxy.get_by_name("source")
            if source_element.find_property('drop-on-latency') != None:
                Object.set_property("drop-on-latency", True)

    def check_uri_valid(self, uri, timeout=5):
        parsed = urlparse(uri)

        if parsed.scheme == "file":
            file_path = parsed.path

            if os.name == "nt" and len(file_path) >= 3 and file_path[0] == "/":
                file_path = file_path[1:]

            return os.path.exists(file_path)


        elif parsed.scheme == "rtsp":
            if self.check_uri_playable(uri):
                return True
            else:
                return False

        else:
            MyLogger.warning(f"Unsupported URI scheme: {parsed.scheme}, skipping validation")
            return True


    def check_uri_playable(self, uri, timeout=5):
        pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
        pipeline.set_state(Gst.State.PLAYING)
        time.sleep(0.1)
        start_time = time.time()
        while time.time() - start_time < timeout:
            state = pipeline.get_state(timeout=1000000)[1]
            if state == Gst.State.PLAYING:
                pipeline.set_state(Gst.State.NULL)
                return True
        pipeline.set_state(Gst.State.NULL)
        return False

    def pgie_src_pad_buffer_probe(self, pad, info, u_data):
        frame_number = 0
        num_rects = 0
        got_fps = False
        current_time = time.time()
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            MyLogger.error("Unable to get GstBuffer ")
            return
        # Retrieve batch metadata from the gst_buffer
        # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
        # C address of gst_buffer as input, which is obtained with hash(gst_buffer)

        # Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
        # To enable component level latency measurement, please set environment variable
        # NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
        global measure_latency
        if measure_latency:
            num_sources_in_batch = pyds.nvds_measure_buffer_latency(hash(gst_buffer))
            if num_sources_in_batch == 0:
                MyLogger.error("Unable to get number of sources in GstBuffer for latency measurement")

        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame is not None:
            n_frame1_box = None
            try:
                # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
                # The casting is done by pyds.NvDsFrameMeta.cast()
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)

                if current_time - self.last_alive_time_dict.setdefault(frame_meta.pad_index, 0) > 60 * 5:
                    MyLogger.info(
                        f"source [{frame_meta.pad_index}]:{self.stream_path_code[frame_meta.pad_index]} is alive...")
                    self.last_alive_time_dict[frame_meta.pad_index] = current_time
                self.last_frame_time_dict[frame_meta.pad_index] = current_time

            except StopIteration:
                break

            frame_number = frame_meta.frame_num
            l_obj = frame_meta.obj_meta_list
            num_rects = frame_meta.num_obj_meta

            try:
                n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
                if n_frame is None:
                    MyLogger.error("Failed to get surface from buffer")
                    break

                n_frame1 = np.array(n_frame, copy=True, order='C')
            except Exception as e:
                MyLogger.error(f"Error getting frame surface: {e}")
                break

            detected_objects = []

            while l_obj is not None:
                try:
                    # Casting l_obj.data to pyds.NvDsObjectMeta
                    obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
                    obj_label = pyds.get_string(obj_meta.text_params.display_text)
                    obj_confidence = obj_meta.confidence
                    obj_bbox = (obj_meta.rect_params.left, obj_meta.rect_params.top,
                                obj_meta.rect_params.width, obj_meta.rect_params.height)
                    obj_meta.text_params.display_text = f"{obj_label} {obj_confidence:.2f}"
                    # print(obj_meta.class_id)
                    detected_objects.append({
                        "label": obj_label,
                        "confidence": obj_confidence,
                        "bbox": obj_bbox,
                        "pad_index": frame_meta.pad_index
                    })
                except StopIteration:
                    break

                try:
                    if n_frame1_box is None:
                        n_frame1_box = StreamCapture.draw_bounding_boxes(copy.deepcopy(n_frame1), obj_meta,
                                                                         obj_meta.confidence)
                    else:
                        n_frame1_box = StreamCapture.draw_bounding_boxes(n_frame1_box, obj_meta,
                                                                         obj_meta.confidence)
                except Exception as e:
                    MyLogger.error(f"Error drawing bounding boxes: {e}")

                try:
                    l_obj = l_obj.next
                except StopIteration:
                    break

            try:
                # convert the array into cv2 default color format
                n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2RGB)
            except Exception as e:
                MyLogger.error(f"Error converting color format: {e}")
                break

            try:
                roi = [tuple(map(int, point.split(','))) for point in
                       MyConfigReader.cfg_dict["ROI"][f"source{frame_meta.pad_index}"].split(';')]
                display_meta = self.draw_polygon_roi(batch_meta, frame_meta, roi)

                # 将显示元数据添加到当前帧
                pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
            except Exception as e:
                MyLogger.error(f"Error drawing ROI: {e}")

            try:
                with self.lock:
                    self.frame_dict.setdefault(str(frame_meta.pad_index), {"frame": None, "time": None})
                    self.frame_dict[str(frame_meta.pad_index)]["frame"] = n_frame1
                    self.frame_dict[str(frame_meta.pad_index)]["time"] = get_current_timestamp_millis()
            except Exception as e:
                MyLogger.error(f"Error updating frame dictionary: {e}")

            try:
                if num_rects != 0:
                    self.detection_algorithm(detected_objects, n_frame1, n_frame1_box, roi)
                    if not silent:
                        MyLogger.info(
                            f"batchid:{frame_meta.pad_index},Frame Number={frame_number},Number of Objects={num_rects}")
            except Exception as e:
                MyLogger.error(f"Error in detection algorithm: {e}")

            # Update frame rate through this probe
            stream_index = "stream{0}".format(frame_meta.pad_index)
            global perf_data
            perf_data.update_fps(stream_index)

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

        return Gst.PadProbeReturn.OK

    def draw_polygon_roi(self, batch_meta, frame_meta, roi_points, border_width=3, border_color=(0.0, 0.0, 1.0, 1.0)):
        """
        绘制由4个点连线组成的多边形区域。

        参数:
            batch_meta: 批处理元数据对象。
            frame_meta: 帧元数据对象。
            roi_points: 包含4个点坐标的列表或元组,格式为 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]。
            border_width: 边框宽度,默认为3。
            border_color: 边框颜色,格式为 (R, G, B, A),默认为蓝色 (0.0, 0.0, 1.0, 1.0)。

        返回:
            display_meta: 配置好的显示元数据对象。
        """
        # 检查参数有效性
        if not batch_meta:
            MyLogger.error("batch_meta is None in draw_polygon_roi")
            return None

        if not frame_meta:
            MyLogger.error("frame_meta is None in draw_polygon_roi")
            return None

        if not roi_points or len(roi_points) < 4:
            MyLogger.error(f"Invalid roi_points in draw_polygon_roi: {roi_points}")
            return None

        try:
            # 获取显示元数据
            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            if not display_meta:
                MyLogger.error("Failed to acquire display meta from pool")
                return None

            display_meta.num_lines = 4  # 设置绘制4条线(4个点连接成一个多边形)
            display_meta.num_rects = 0  # 确保没有绘制矩形

            # 配置多边形的边框(通过绘制4条线)
            for i in range(4):
                try:
                    line = display_meta.line_params[i]
                    # 设置线的起点和终点
                    line.x1 = int(roi_points[i][0])  # 第 i 个点的 x 坐标
                    line.y1 = int(roi_points[i][1])  # 第 i 个点的 y 坐标
                    line.x2 = int(roi_points[(i + 1) % 4][0])  # 下一个点的 x 坐标
                    line.y2 = int(roi_points[(i + 1) % 4][1])  # 下一个点的 y 坐标
                    # 设置线的宽度和颜色
                    line.line_width = border_width
                    line.line_color.red = border_color[0]
                    line.line_color.green = border_color[1]
                    line.line_color.blue = border_color[2]
                    line.line_color.alpha = border_color[3]
                except Exception as e:
                    MyLogger.error(f"Error configuring line {i} in draw_polygon_roi: {e}")
                    # 继续处理其他线条,不中断整个过程

            return display_meta
        except Exception as e:
            MyLogger.error(f"Error in draw_polygon_roi: {e}")
            return None

    def check_bottom_edge_in_polygon(self, bottom_edge, polygon):
        """
        判断底边是否在封闭线框内。

        :param bottom_edge: 底边的两个点 [(x1, y1), (x2, y2)]
        :param polygon: 多边形顶点列表 [(x1, y1), (x2, y2), ...]
        :return: 如果有点在多边形内返回True,否则返回False
        """
        # 检查参数有效性
        if not bottom_edge or len(bottom_edge) < 2:
            MyLogger.error(f"Invalid bottom_edge in check_bottom_edge_in_polygon: {bottom_edge}")
            return False

        if not polygon or len(polygon) < 3:
            MyLogger.error(f"Invalid polygon in check_bottom_edge_in_polygon: {polygon}")
            return False

        try:
            for point in bottom_edge:
                if not point or len(point) < 2:
                    continue

                if self.is_point_in_polygon(point, polygon):
                    return True
            return False
        except Exception as e:
            MyLogger.error(f"Error in check_bottom_edge_in_polygon: {e}")
            return False

    def is_point_in_polygon(self, point, polygon):
        """
        判断点是否在多边形内。

        :param point: 要判断的点 (x, y)
        :param polygon: 多边形顶点的列表 [(x1, y1), (x2, y2), ...]
        :return: 是否在多边形内
        """
        # 检查参数有效性
        if not point or len(point) < 2:
            MyLogger.error(f"Invalid point in is_point_in_polygon: {point}")
            return False

        if not polygon or len(polygon) < 3:
            MyLogger.error(f"Invalid polygon in is_point_in_polygon: {polygon}")
            return False

        try:
            # 转换为numpy数组并进行点多边形测试
            polygon_array = np.array(polygon)
            result = cv2.pointPolygonTest(polygon_array, point, False)
            return result >= 0
        except Exception as e:
            MyLogger.error(f"Error in is_point_in_polygon: {e}")
            return False

    def np_array_to_base64(self, image):
        """
        将numpy数组图像转换为base64编码字符串。

        :param image: numpy数组格式的图像
        :return: base64编码的字符串
        """
        if image is None:
            MyLogger.error("Image is None in np_array_to_base64")
            return ""

        try:
            img_array = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # RGB2BGR,用于cv2编码
            encode_result = cv2.imencode(".jpg", img_array)

            if not encode_result[0]:
                MyLogger.error("Failed to encode image in np_array_to_base64")
                return ""

            encode_image = encode_result[1]
            byte_data = encode_image.tobytes()  # 转换为二进制
            base64_str = base64.b64encode(byte_data).decode("ascii")  # 转换为base64
            return base64_str
        except Exception as e:
            MyLogger.error(f"Error in np_array_to_base64: {e}")
            return ""

    def det_point_in_roi(self, point, roi):
        """
        判断点是否在矩形ROI内。

        :param point: 点坐标 (x, y)
        :param roi: 矩形区域 [x, y, width, height]
        :return: 点是否在ROI内
        """
        if not point or len(point) < 2:
            MyLogger.error(f"Invalid point in det_point_in_roi: {point}")
            return False

        if not roi or len(roi) < 4:
            MyLogger.error(f"Invalid roi in det_point_in_roi: {roi}")
            return False

        try:
            if point[0] < roi[0] or point[0] > roi[0] + roi[2]:
                return False
            if point[1] < roi[1] or point[1] > roi[1] + roi[3]:
                return False
            else:
                return True
        except Exception as e:
            MyLogger.error(f"Error in det_point_in_roi: {e}")
            return False

    def determine_safety_status(self, gap_width, container_width):
        if gap_width < container_width * float(MyConfigReader.cfg_dict["Detect"]["Threshold"]):
            return True  # Safe: green with transparency
        else:
            return False  # Unsafe: red with transparency

    def detection_algorithm(self, detected_obj, frame_src, frame_bbox, roi):
        """
        检测算法,用于分析检测到的对象并触发报警。

        参数:
            detected_obj: 检测到的对象列表
            frame_src: 原始帧图像
            frame_bbox: 带有边界框的帧图像
            roi: 感兴趣区域
        """
        # 检查参数有效性
        if not detected_obj:
            return

        if frame_bbox is None:
            MyLogger.warning("frame_bbox is None in detection_algorithm")
            return

        if roi is None:
            MyLogger.warning("roi is None in detection_algorithm")
            return

        try:
            camera_index_code = ""
            tag = False

            for obj in detected_obj:
                if not isinstance(obj, dict):
                    continue

                if obj.get("label", "") == "people":
                    # 检查bbox是否有效
                    bbox = obj.get("bbox")
                    if not bbox or len(bbox) < 4:
                        continue

                    try:
                        center_pointer = (obj["bbox"][0] + (obj["bbox"][2] / 2), obj["bbox"][1] + (obj["bbox"][3] / 2))

                        # 检查是否在ROI内
                        bottom_edge = [(obj["bbox"][0], obj["bbox"][1] + obj["bbox"][3]),
                                       (obj["bbox"][0] + obj["bbox"][2], obj["bbox"][1] + obj["bbox"][3])]

                        if self.check_bottom_edge_in_polygon(bottom_edge, roi):
                            # 可以在这里添加报警逻辑
                            pass
                            # MyLogger.info("detect people ,alarm")
                    except Exception as e:
                        MyLogger.error(f"Error processing object in detection_algorithm: {e}")
                        continue

            # 如果需要触发报警
            if tag:
                try:
                    resized_img = cv2.resize(frame_bbox, (1920, 1080), interpolation=cv2.INTER_LINEAR)
                    msg = {
                        "System": "StreamingMedia",
                        "CMD": "Alarm",
                        "Type": "CJ12-001",
                        "Pic": self.np_array_to_base64(resized_img),
                        "Time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                        "Data": {
                            "Camera": camera_index_code,
                            "Message": "msg"
                        },
                        "Code": 200
                    }
                    # Kafka发送代码已注释
                    # self.kafka_producer.send(
                    #     topic=f"{MyConfigReader.cfg_dict['Kafka']['topic']}",
                    #     value=f"{json.dumps(msg)}")

                    # 异步发送消息
                    # self.kafka_producer.produce(f"{MyConfigReader.cfg_dict['Kafka']['topic']}", json.dumps(msg).encode('utf-8'), callback=self.delivery_report)
                    # 等待所有消息发送完成
                    # self.kafka_producer.flush()
                    # MyLogger.info("发送报警信息至kafka队列")
                except Exception as e:
                    MyLogger.error(f"Error sending alarm in detection_algorithm: {e}")
        except Exception as e:
            MyLogger.error(f"Error in detection_algorithm: {e}")
            return

    def delivery_report(self, err, msg):
        if err is not None:
            MyLogger.error(f'kafka send error: {err}')
        else:
            MyLogger.info(f'{msg.topic()} [{msg.partition()}] @ {msg.offset()}')

    @staticmethod
    def draw_bounding_boxes(image, obj_meta, confidence):
        image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
        confidence = '{0:.2f}'.format(confidence)
        rect_params = obj_meta.rect_params
        top = int(rect_params.top)
        left = int(rect_params.left)
        width = int(rect_params.width)
        height = int(rect_params.height)
        obj_name = pyds.get_string(obj_meta.text_params.display_text)
        # image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)
        color = (0, 0, 255, 0)
        w_percents = int(width * 0.05) if width > 100 else int(width * 0.1)
        h_percents = int(height * 0.05) if height > 100 else int(height * 0.1)
        linetop_c1 = (left + w_percents, top)
        linetop_c2 = (left + width - w_percents, top)
        image = cv2.line(image, linetop_c1, linetop_c2, color, 6)
        linebot_c1 = (left + w_percents, top + height)
        linebot_c2 = (left + width - w_percents, top + height)
        image = cv2.line(image, linebot_c1, linebot_c2, color, 6)
        lineleft_c1 = (left, top + h_percents)
        lineleft_c2 = (left, top + height - h_percents)
        image = cv2.line(image, lineleft_c1, lineleft_c2, color, 6)
        lineright_c1 = (left + width, top + h_percents)
        lineright_c2 = (left + width, top + height - h_percents)
        image = cv2.line(image, lineright_c1, lineright_c2, color, 6)
        # Note that on some systems cv2.putText erroneously draws horizontal lines across the image
        image = cv2.putText(image, obj_name, (left - 10, top - 10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            1,
                            (0, 0, 255, 0), 2)
        return image


if __name__ == '__main__':
    a = StreamCapture(None, MyConfigReader.cfg_dict['Infer']['path'], False, {})
    #a.open({"asdasdasdas": "rtsp://admin:Haizhu!123@192.168.54.45:554/h264/ch1/main/av_stream"})
    a.open({"asdasdasdas": "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"})

I only changed the streammux part of the code

I was able to rundeepstream-test3successfully

Please compare with the original deepstream-test3 to find out which modification caused the issue.

The only change is the use of nvmultiurisrcbin, I see other topics have the same problem but not solved:Nvmultiurisrcbin does not exit cleanly after EOS

deepstream-test5 works with nvmultiurisrcbin in DeepStream 7.1. Please refer to DeepStream Reference Application - deepstream-test5 app — DeepStream documentation

Is it necessary to use streammux in the pipeline after using nvmultiurisrcbin? I see that the test5_config_file_nvmultiurisrcbin_src_list_attr_all.txt configuration file uses both

The deepstream-test5 is open source. Please read the source code in /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-app/deepstream_app.c and /opt/nvidia/deepstream/deepstream/sources/apps/apps-common/src/deepstream_source_bin.c.

When the “use-nvmultiurisrcbin” is set to 1 in configuration, the nvstreammux parameters will be used to configure the nvstreammux inside nvmultiurisrcbin.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks