How can I use the nvdspreprocess plugin to rescale multiple streams in a streammux

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version 6.3-docker
• JetPack Version (valid for Jetson only)
• TensorRT Version 8.5
• NVIDIA GPU Driver Version (valid for GPU only) 550.142

The resolution of the model I trained is 12801280, but the video stream is 38402160, so I scale it, but I use the nvdspreprocess plug-in to scale, it seems that the execution is not successful, and the detection result is the same as that without using the plug-in, is there an error in the configuration file setting? This is my configuration file:

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# The values in the config file are overridden by values set through GObject
# properties.

[property]
enable=1
target-unique-ids=1
    # 0=NCHW, 1=NHWC, 2=CUSTOM
network-input-order=0
process-on-frame=1
    # if enabled maintain the aspect ratio while scaling
maintain-aspect-ratio=1
    # if enabled pad symmetrically with maintain-aspect-ratio enabled
symmetric-padding=1
    # processing width/height at which image scaled
processing-width=1280
processing-height=1280
scaling-buf-pool-size=6
tensor-buf-pool-size=6
    # tensor shape based on network-input-order
network-input-shape=1;3;1280;1280
    # 0=RGB, 1=BGR, 2=GRAY

network-color-format=0
    # 0=FP32, 1=UINT8, 2=INT8, 3=UINT32, 4=INT32, 5=FP16
tensor-data-type=0
tensor-name=input_1
    # 0=NVBUF_MEM_DEFAULT 1=NVBUF_MEM_CUDA_PINNED 2=NVBUF_MEM_CUDA_DEVICE 3=NVBUF_MEM_CUDA_UNIFIED
scaling-pool-memory-type=0
    # 0=NvBufSurfTransformCompute_Default 1=NvBufSurfTransformCompute_GPU 2=NvBufSurfTransformCompute_VIC
scaling-pool-compute-hw=0
    # Scaling Interpolation method
    # 0=NvBufSurfTransformInter_Nearest 1=NvBufSurfTransformInter_Bilinear 2=NvBufSurfTransformInter_Algo1
    # 3=NvBufSurfTransformInter_Algo2 4=NvBufSurfTransformInter_Algo3 5=NvBufSurfTransformInter_Algo4
    # 6=NvBufSurfTransformInter_Default
scaling-filter=0
custom-lib-path=/opt/nvidia/deepstream/deepstream/lib/gst-plugins/libcustom2d_preprocess.so
custom-tensor-preparation-function=CustomTensorPreparation

[user-configs]
pixel-normalization-factor=0.003921568
#mean-file=
#offsets=


[group-0]
src-ids=0;1;2;3
custom-input-transformation-function=CustomAsyncTransformation
process-on-roi=0
roi-params-src-0=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-1=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-2=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-3=0;540;900;500;960;0;900;500;0;0;540;900;

Have you set the input-tensor-from-meta=1 for your nvinfer plugin?

I didn’t set this before, but now I’ll add it and run it with a warning:

My modified nvinfer plugin configuration file is as follows:

[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
onnx-file=/home/user/Downloads/DoorOpen/dooropen_det.pt.onnx
model-engine-file=/home/user/Downloads/DoorOpen/dooropen_det.engine
#int8-calib-file=calib.table
labelfile-path=/home/user/Downloads/DoorOpen/labels.txt
batch-size=2
network-mode=0
num-detected-classes=2
interval=0
gie-unique-id=1
process-mode=1
network-type=0
cluster-mode=2
maintain-aspect-ratio=1
symmetric-padding=1
#workspace-size=2000
parse-bbox-func-name=NvDsInferParseYolo
#parse-bbox-func-name=NvDsInferParseYoloCuda
custom-lib-path=/home/user/Work/DeepStream-Yolo/nvdsinfer_custom_impl_Yolo/libnvdsinfer_custom_impl_Yolo.so
engine-create-func-name=NvDsInferYoloCudaEngineGet
input-tensor-from-meta=1

[class-attrs-all]
nms-iou-threshold=0.45
pre-cluster-threshold=0.25
topk=300

My engine model is automatically generated with an onnx model while the program is running, how do I know my input layer name

You need to be familiar with your model when configuring parameters of nvinfer or nvdspreprocess. You can use many tools to check your own model and get the input and output layers, like netron.

I have now successfully configured the nvdspreprocess config file, but the results detected by YOLO-onnx and deepstream-tensorrt for the same image are very different and look like random detection. Why is that

According to my research, the results detected by YOLO-onnx and deepstream-tensorrt for the same image are very different because the scaling algorithms of deepstream and opencv yield different results (Image comparison Deepstream vs opencv python). Therefore, I have customized the preprocess plugin to support scaling methods using opencv. You can try it out at the repo (GitHub - hieptran2k2/DeepStream_Custom_Preprocess_Plugin: Create a custom preprocessing plugin in DeepStream using OpenCV with a scaling filter.). If you find the repo useful, please give me a star!

its error:

root@74a17d586781:/home/smuvision/Work/DeepStream_Custom_Preprocess_Plugin# bash setting_envirioment.sh /home/smuvision/Work/DeepStream_Custom_Preprocess_Plugin
g++ -c -o nvdsinfer_context_impl.o -fPIC -Wno-deprecated-declarations -std=c++14 -I /usr/local/cuda-12.1/include -I ../../includes -DNDEBUG -DDUMP_INPUT_TO_FILE -DWITH_OPENCV -I/usr/include/opencv4 nvdsinfer_context_impl.cpp
In file included from nvdsinfer_context_impl.cpp:39:
nvdsinfer_context_impl.h:36:10: fatal error: nvdsinfer_context.h: No such file or directory
   36 | #include <nvdsinfer_context.h>
      |          ^~~~~~~~~~~~~~~~~~~~~
compilation terminated.
make: *** [Makefile:60: nvdsinfer_context_impl.o] Error 1
cp: cannot stat './libnvds_infer.so': No such file or directory
-fPIC -DHAVE_CONFIG_H -std=c++17 -Wall -Werror -DDS_VERSION="6.3.0" -I /usr/local/cuda-12.1/include -I include -I ../../includes -DDUMP_ROIS -DWITH_OPENCV -pthread -I/usr/include/gstreamer-1.0 -I/usr/include/orc-0.4 -I/usr/include/gstreamer-1.0 -I/usr/include/glib-2.0 -I/usr/lib/x86_64-linux-gnu/glib-2.0/include -I/usr/include/opencv4/opencv -I/usr/include/opencv4
g++ -c -o gstnvdspreprocess.o -fPIC -DHAVE_CONFIG_H -std=c++17 -Wall -Werror -DDS_VERSION=\"6.3.0\" -I /usr/local/cuda-12.1/include -I include -I ../../includes -DDUMP_ROIS -DWITH_OPENCV -pthread -I/usr/include/gstreamer-1.0 -I/usr/include/orc-0.4 -I/usr/include/gstreamer-1.0 -I/usr/include/glib-2.0 -I/usr/lib/x86_64-linux-gnu/glib-2.0/include -I/usr/include/opencv4/opencv -I/usr/include/opencv4 gstnvdspreprocess.cpp
In file included from gstnvdspreprocess.cpp:31:
gstnvdspreprocess.h:31:10: fatal error: nvbufsurface.h: No such file or directory
   31 | #include "nvbufsurface.h"
      |          ^~~~~~~~~~~~~~~~
compilation terminated.
make: *** [Makefile:73: gstnvdspreprocess.o] Error 1
cp: cannot stat './libnvdsgst_preprocess.so': No such file or directory
-fPIC -DHAVE_CONFIG_H -std=c++17 -Wall -Werror -DDS_VERSION="6.3.0" -I /usr/local/cuda-12.1/include -I ../include -I ../../../includes -DWITH_OPENCV -pthread -I/usr/include/gstreamer-1.0 -I/usr/include/orc-0.4 -I/usr/include/gstreamer-1.0 -I/usr/include/glib-2.0 -I/usr/lib/x86_64-linux-gnu/glib-2.0/include -I/usr/include/opencv4/opencv -I/usr/include/opencv4
g++ -c -o nvdspreprocess_lib.o -fPIC -DHAVE_CONFIG_H -std=c++17 -Wall -Werror -DDS_VERSION=\"6.3.0\" -I /usr/local/cuda-12.1/include -I ../include -I ../../../includes -DWITH_OPENCV -pthread -I/usr/include/gstreamer-1.0 -I/usr/include/orc-0.4 -I/usr/include/gstreamer-1.0 -I/usr/include/glib-2.0 -I/usr/lib/x86_64-linux-gnu/glib-2.0/include -I/usr/include/opencv4/opencv -I/usr/include/opencv4 nvdspreprocess_lib.cpp
nvdspreprocess_lib.cpp:12:10: fatal error: nvbufsurface.h: No such file or directory
   12 | #include "nvbufsurface.h"
      |          ^~~~~~~~~~~~~~~~
compilation terminated.
make: *** [Makefile:71: nvdspreprocess_lib.o] Error 1
cp: cannot stat './libcustom2d_preprocess.so': No such file or directory

i used docker-deepstream 6.3

If you want to use yolo model, could you refer to our deepstream_tools sample to see if that meet your needs?

Will deepstream6.3 work?

Oh sorry, I have added the missing libraries to the repo. Could you please update and try again?

bash setting_envirioment.sh path/to/folder/DeepStream_Custom_Preprocess_Plugin

Once I’ve done that, I can use it in the nvdspreprocess config file:

1# 7 = OPEN_CV_INTER_NEAREST
1# 8 = OPEN_CV_INTER_LINEAR
1# 9 = OPEN_CV_INTER_CUBIC
1# 10 = OPEN_CV_INTER_AREA
1# 11 = OPEN_CV_INTER_LANCZOS4
1# 12 = OPEN_CV_INTER_MAX
1# 13 = OPEN_CV_WARP_FILL_OUTLIER
1# 14 = OPEN_CV_WARP_INVERSE_MAP

Is that all?

When I used scaling-filter=8, the program stuck and crashed

NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c48010740
NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c480108f0
NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c48010aa0
NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c48010c50
Segmentation fault (core dumped)

You can try
custom-input-transformation-function=CustomTransformation

As before, there are generated out_0__src__0__roi__0.png, but the program stuck and crashed

NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c48010740
NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c480108f0
NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c48010aa0
NvBufSurfTransformSyncObjWait: Multiple wait being done on Object 0x7f5c48010c50
Segmentation fault (core dumped)

Could you provide me with the config files for preprocess and nvinfer, as well as the main file to review?

pgie

[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
onnx-file=/home/smuvision/Downloads/DoorOpen/dooropen_det.pt.onnx
model-engine-file=/home/smuvision/Downloads/DoorOpen/dooropen_det.engine
#int8-calib-file=calib.table
labelfile-path=/home/smuvision/Downloads/DoorOpen/labels.txt
batch-size=2
network-mode=0
num-detected-classes=2
interval=0
gie-unique-id=1
process-mode=1
network-type=0
cluster-mode=2
maintain-aspect-ratio=1
symmetric-padding=1
#workspace-size=2000
parse-bbox-func-name=NvDsInferParseYolo
#parse-bbox-func-name=NvDsInferParseYoloCuda
custom-lib-path=/home/smuvision/Work/DeepStream-Yolo/nvdsinfer_custom_impl_Yolo/libnvdsinfer_custom_impl_Yolo.so
engine-create-func-name=NvDsInferYoloCudaEngineGet
input-tensor-from-meta=1

[class-attrs-all]
nms-iou-threshold=0.2
pre-cluster-threshold=0.25
topk=300

preprocess

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

# The values in the config file are overridden by values set through GObject
# properties.

[property]
enable=1
target-unique-ids=1
    # 0=NCHW, 1=NHWC, 2=CUSTOM
network-input-order=0
process-on-frame=1
    # if enabled maintain the aspect ratio while scaling
maintain-aspect-ratio=1
    # if enabled pad symmetrically with maintain-aspect-ratio enabled
symmetric-padding=0
    # processing width/height at which image scaled
processing-width=1280
processing-height=1280
scaling-buf-pool-size=6
tensor-buf-pool-size=6
    # tensor shape based on network-input-order
network-input-shape=1;3;1280;1280
    # 0=RGB, 1=BGR, 2=GRAY

network-color-format=0
    # 0=FP32, 1=UINT8, 2=INT8, 3=UINT32, 4=INT32, 5=FP16
tensor-data-type=0
tensor-name=input
    # 0=NVBUF_MEM_DEFAULT 1=NVBUF_MEM_CUDA_PINNED 2=NVBUF_MEM_CUDA_DEVICE 3=NVBUF_MEM_CUDA_UNIFIED
#scaling-pool-memory-type=0
scaling-pool-memory-type=2
    # 0=NvBufSurfTransformCompute_Default 1=NvBufSurfTransformCompute_GPU 2=NvBufSurfTransformCompute_VIC
#scaling-pool-compute-hw=0
scaling-pool-compute-hw=1
    # Scaling Interpolation method
    # 0=NvBufSurfTransformInter_Nearest 1=NvBufSurfTransformInter_Bilinear 2=NvBufSurfTransformInter_Algo1
    # 3=NvBufSurfTransformInter_Algo2 4=NvBufSurfTransformInter_Algo3 5=NvBufSurfTransformInter_Algo4
    # 6=NvBufSurfTransformInter_Default
#scaling-filter=0
scaling-filter=8
custom-lib-path=/opt/nvidia/deepstream/deepstream/lib/gst-plugins/libcustom2d_preprocess.so
custom-tensor-preparation-function=CustomTensorPreparation
custom-input-transformation-function=CustomTransformation
[user-configs]
pixel-normalization-factor=0.003921568
#mean-file=
#offsets=


[group-0]
src-ids=0;1;2;3
custom-input-transformation-function=CustomAsyncTransformation
process-on-roi=0
roi-params-src-0=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-1=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-2=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-3=0;540;900;500;960;0;900;500;0;0;540;900;

main.py

from numpy.lib.utils import source

import sys
import threading
import copy
import json
sys.path.append('../')
from pathlib import Path
import gi
import numpy as np
import configparser
import argparse
import os
import base64
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from urllib.parse import urlparse
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from my_log import MyLogger
from config_reader import MyConfigReader
from datetime import datetime
from KafkaProducer import KafkaProducer
import pyds
import cv2

os.environ['GST_PLUGIN_PATH'] = '/opt/nvidia/deepstream/deepstream/lib/gst-plugins:' + os.environ.get('GST_PLUGIN_PATH',
                                                                                                      '')

no_display = False
silent = False
file_loop = True   #设置cpu解码还是gpu解码
perf_data = None

MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
# TILED_OUTPUT_WIDTH=1920
# TILED_OUTPUT_HEIGHT=1080
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
GPU_ID = int(MyConfigReader.cfg_dict["GPU"]["gpu_id"])

def get_current_timestamp_millis():
    timestamp = time.time()
    timestamp_millis = int(timestamp * 1000)
    dt_object = datetime.fromtimestamp(timestamp)
    standard_time_format = dt_object.strftime('%Y-%m-%d_%H-%M-%S.') + str(timestamp_millis % 1000).zfill(3)
    return standard_time_format


class StreamCapture:

    def __init__(self, pgie, config, disable_probe):
        super().__init__()
        self.pipeline = None
        self.framerate = int(MyConfigReader.cfg_dict['nvr']['fps'])
        self.running = False
        self.frame_interval = 1.0 / self.framerate
        self.last_frame_time_dict = {}
        self.reconnect_attempts = 5  # 最大重连次数
        self.reconnect_interval = 5  # 重连间隔(秒)
        self.thread = None
        self.status = None
        self.bus = None
        self.update_time = time.time()
        self.last_alive_time_dict = {}
        self.over_time_dict = {}
        self.dict_lock = threading.Lock()
        self.stream_paths = None
        self.stream_path_code = None

        self.requested_pgie = pgie
        self.disable_probe = disable_probe
        self.config = config
        self.number_sources = None
        self.frame_dict = {}
        self.source_bins = {}
        self.lock = threading.Lock()
        self.kafka_producer = KafkaProducer({
            'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
        }, retry_limit=5, retry_interval=3)

    def cb_newpad(self, decodebin, decoder_src_pad, data):
        MyLogger.info("In cb_newpad")

        caps = decoder_src_pad.get_current_caps()
        if not caps:
            caps = decoder_src_pad.query_caps()
        gststruct = caps.get_structure(0)
        gstname = gststruct.get_name()
        source_bin = data
        features = caps.get_features(0)

        # Need to check if the pad created by the decodebin is for video and not
        # audio.
        MyLogger.info(f"fgstname={gstname}")
        if gstname.find("video") != -1:
            MyLogger.info(f"features={features}")
            if features.contains("memory:NVMM"):
                # Get the source bin ghost pad
                bin_ghost_pad = source_bin.get_static_pad("src")
                if not bin_ghost_pad.set_target(decoder_src_pad):
                    MyLogger.error("Failed to link decoder src pad to source bin ghost pad")
            else:
                MyLogger.error(" Error: Decodebin did not pick nvidia decoder plugin.")

    def read(self):
        with self.lock:
            # return [value["frame"] for key, value in self.frame_dict.items()]
            return copy.deepcopy(self.frame_dict)

    def close(self):
        if self.thread.is_alive():
            self.thread.join()
        if self.pipeline is not None and not self.is_pipeline_null():
            self.pipeline.set_state(Gst.State.NULL)
            MyLogger.info("关闭取流线程,管道状态切换至NULL")

        if self.bus:
            self.bus = None
        self.pipeline = None
        self.status = False

    def is_pipeline_null(self):
        (state, pending) = self.pipeline.get_state(timeout=1000000000)
        return state == Gst.State.NULL

    def decodebin_child_added(self, child_proxy, Object, name, user_data):
        MyLogger.info(f"Decodebin child added:{name}")
        if (name.find("decodebin") != -1):
            Object.connect("child-added", self.decodebin_child_added, user_data)

        if "nvv" in name.lower():  # 匹配 nvv4l2decoder, nvv4l2h264dec 等
            if Object.find_property("gpu_id") is not None:
                Object.set_property("gpu_id", GPU_ID)  # 与下游组件(如 nvstreammux)一致
                MyLogger.info(f"设置 {name} 的 GPU_ID 为 1")

        #todo
        # if "source" in name:
        #     source_element = child_proxy.get_by_name("source")
        #     if source_element.find_property('drop-on-latency') != None:
        #         Object.set_property("drop-on-latency", True)

    def create_pipeline(self):
        MyLogger.info("Creating Pipeline ")
        self.pipeline = Gst.Pipeline()
        self.is_live = False

        if not self.pipeline:
            MyLogger.error(" Unable to create Pipeline")
        MyLogger.info("Creating streamux ")

        # Create nvstreammux instance to form batches from one or more sources.
        self.streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
        if not self.streammux:
            MyLogger.error(" Unable to create NvStreamMux ")
            return False
        self.streammux.set_property('batched-push-timeout', 40000)
        self.streammux.set_property("gpu_id", GPU_ID)

        self.pipeline.add(self.streammux)
        valid_sources = []
        for i in range(self.number_sources):
            self.last_frame_time_dict[i] = time.time()
            MyLogger.info(f"Creating source_bin: {i}")
            uri_name = self.stream_paths[i]
            if uri_name.find("rtsp://") == 0:
                self.is_live = True
            # todo
            self.source_bin = self.create_source_bin(i, uri_name)
            # if not self.source_bin:
            #     MyLogger.error("Unable to create source bin")
            #     return False
            # self.pipeline.add(self.source_bin)
            if self.source_bin is not None:
                # valid_sources.append(source_bin)
                # ½« source_bin Ìí¼Óµ½Ö÷ Pipeline ÖÐ
                valid_sources.append(self.source_bin)
                self.pipeline.add(self.source_bin)
            else:
                MyLogger.error(f"Skipping invalid source: {uri_name}")
                continue

            padname = "sink_%u" % i
            self.sinkpad = self.streammux.get_request_pad(padname)
            if not self.sinkpad:
                MyLogger.error("Unable to create sink pad bin")
                return False
            self.srcpad = self.source_bin.get_static_pad("src")
            if not self.srcpad:
                MyLogger.error("Unable to create src pad bin")
                return False
            self.srcpad.link(self.sinkpad)

        self.number_sources = len(valid_sources)
        # self.streammux.set_property("batch-size", self.number_sources)
        # self.pgie.set_property("batch-size", self.number_sources)
        self.queue1 = Gst.ElementFactory.make("queue", "queue1")
        self.queue2 = Gst.ElementFactory.make("queue", "queue2")
        self.queue3 = Gst.ElementFactory.make("queue", "queue3")
        self.queue4 = Gst.ElementFactory.make("queue", "queue4")
        self.queue5 = Gst.ElementFactory.make("queue", "queue5")
        self.queue6 = Gst.ElementFactory.make("queue", "queue6")
        self.queue7 = Gst.ElementFactory.make("queue", "queue7")
        # self.queue8 = Gst.ElementFactory.make("queue", "queue8")
        self.pipeline.add(self.queue1)
        self.pipeline.add(self.queue2)
        self.pipeline.add(self.queue3)
        self.pipeline.add(self.queue4)
        self.pipeline.add(self.queue5)
        self.pipeline.add(self.queue6)
        self.pipeline.add(self.queue7)
        # self.pipeline.add(self.queue8)

        self.nvdslogger = None
        self.transform = None

        MyLogger.info("Creating preprocess ")
        self.preprocess = Gst.ElementFactory.make("nvdspreprocess", "preprocess-plugin")

        if not self.preprocess:
            MyLogger.error(f" Unable to create preprocess :  {self.preprocess}")
            return False
        self.preprocess.set_property("gpu_id", GPU_ID)
        self.preprocess.set_property("config-file", "config_preprocess.txt")


        MyLogger.info("Creating Pgie ")
        if self.requested_pgie is not None and (
                self.requested_pgie == 'nvinferserver' or self.requested_pgie == 'nvinferserver-grpc'):
            self.pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
        elif self.requested_pgie is not None and self.requested_pgie == 'nvinfer':
            self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
        else:
            self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

        if not self.pgie:
            MyLogger.error(f" Unable to create pgie :  {self.requested_pgie}")
            return False
        # self.pgie.set_property("interval", 5)
        if self.disable_probe:
            # Use nvdslogger for perf measurement instead of probe function
            MyLogger.info("Creating nvdslogger ")
            self.nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
            self.nvdslogger.set_property("gpu_id", GPU_ID)

        MyLogger.info("Creating tiler ")
        self.tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
        self.tiler.set_property("gpu_id", GPU_ID)
        if not self.tiler:
            MyLogger.error(" Unable to create tiler")
            return False
        MyLogger.info("Creating nvvidconv")
        self.nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
        self.nvvidconv.set_property("gpu_id", GPU_ID)
        if not self.nvvidconv:
            MyLogger.error(" Unable to create nvvidconv")
            return False
        MyLogger.info("Creating nvosd")
        self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
        self.nvosd.set_property("gpu_id", GPU_ID)
        if not self.nvosd:
            MyLogger.error(" Unable to create nvosd")
            return False
        self.nvosd.set_property('process-mode', OSD_PROCESS_MODE)
        self.nvosd.set_property('display-text', OSD_DISPLAY_TEXT)

        if not int(MyConfigReader.cfg_dict["nvr"]["show"]):
            MyLogger.info("Creating Fakesink ")
            self.sink = Gst.ElementFactory.make("fakesink", "fakesink")
            self.sink.set_property('enable-last-sample', 0)
            self.sink.set_property('sync', 0)
        else:
            if is_aarch64():
                MyLogger.info("Creating transform")
                # self.transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
                # if not self.transform:
                #     MyLogger.error(" Unable to create transform")
                self.sink = Gst.ElementFactory.make("appsink", f"appsink")
            MyLogger.info("Creating EGLSink \n")
            self.sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            self.sink.set_property('sync', 0)
            self.sink.set_property("gpu_id", GPU_ID)
            # self.sink = Gst.ElementFactory.make("appsink", f"appsink1")

        self.converter = Gst.ElementFactory.make("nvvideoconvert", f"converter2")
        self.converter.set_property("gpu_id", GPU_ID)
        self.capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter")
        caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
        # self.converter.set_property("nvbuf-memory-type", 0)
        mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
        self.converter.set_property("nvbuf-memory-type", mem_type)
        self.tiler.set_property("nvbuf-memory-type", mem_type)
        self.capsfilter.set_property("caps", caps)

        if not self.sink:
            MyLogger.error(" Unable to create sink element")
            return False

        if self.is_live:
            MyLogger.info("At least one of the sources is live")
            self.streammux.set_property('live-source', 1)

        self.streammux.set_property('width', 3840)
        self.streammux.set_property('height', 2160)
        # self.streammux.set_property('width', 1920)
        # self.streammux.set_property('height', 1080)

        self.streammux.set_property('batch-size', self.number_sources)

        if self.requested_pgie == "nvinferserver" and self.config is not None:
            self.pgie.set_property('config-file-path', self.config)
        elif self.requested_pgie == "nvinferserver-grpc" and self.config is not None:
            self.pgie.set_property('config-file-path', self.config)
        elif self.requested_pgie == "nvinfer" and self.config is not None:
            self.pgie.set_property('config-file-path', self.config)
        else:
            # todo
            self.pgie.set_property('config-file-path', self.config)
        pgie_batch_size = self.pgie.get_property("batch-size")
        if pgie_batch_size != self.number_sources:
            # print( pgie_batch_size, ":", self.number_sources)
            MyLogger.warning(
                f"WARNING: Overriding infer-config batch-size{pgie_batch_size}with number of sources{self.number_sources}")
            self.pgie.set_property("batch-size", self.number_sources)
        tiler_rows = int(math.sqrt(self.number_sources))
        tiler_columns = int(math.ceil((1.0 * self.number_sources) / tiler_rows))
        self.tiler.set_property("rows", tiler_rows)
        self.tiler.set_property("columns", tiler_columns)
        self.tiler.set_property("width", TILED_OUTPUT_WIDTH)
        self.tiler.set_property("height", TILED_OUTPUT_HEIGHT)
        # todo
        self.sink.set_property("qos", 1)
        if int(MyConfigReader.cfg_dict["nvr"]["show"]):
            # todo
            # self.sink.set_property("emit-signals", True)
            # self.sink.set_property("sync", False)
            # self.sink.set_property("max-buffers", 50)
            # self.sink.set_property("drop", True)
            # self.sink.connect("new-sample", self.on_new_sample)
            pass

        MyLogger.info("Adding elements to Pipeline")
        self.pipeline.add(self.preprocess)
        self.pipeline.add(self.pgie)
        # todo
        self.pipeline.add(self.converter)
        self.pipeline.add(self.capsfilter)
        if self.nvdslogger:
            self.pipeline.add(self.nvdslogger)
        self.pipeline.add(self.tiler)
        self.pipeline.add(self.nvvidconv)
        self.pipeline.add(self.nvosd)

        if self.transform:
            self.pipeline.add(self.transform)

        if int(MyConfigReader.cfg_dict["nvr"]["show"]):
            pass

        # self.pipeline.add(self.encoder)
        # self.pipeline.add(self.parser)
        self.pipeline.add(self.sink)

        MyLogger.info("Linking elements in the Pipeline \n")
        self.streammux.link(self.queue1)
        #todo
        self.queue1.link(self.preprocess)
        self.preprocess.link(self.pgie)
        self.pgie.link(self.queue2)
        self.queue2.link(self.converter)
        self.converter.link(self.queue3)
        self.queue3.link(self.capsfilter)
        self.capsfilter.link(self.queue4)
        if self.nvdslogger:
            self.queue4.link(self.nvdslogger)
            self.nvdslogger.link(self.tiler)
        else:
            self.queue4.link(self.tiler)
        self.tiler.link(self.queue5)
        self.queue5.link(self.nvvidconv)
        self.nvvidconv.link(self.queue6)
        self.queue6.link(self.nvosd)
        if self.transform:
            self.nvosd.link(self.queue7)
            self.queue7.link(self.transform)
            self.transform.link(self.sink)
        else:
            self.nvosd.link(self.queue7)
            if int(MyConfigReader.cfg_dict["nvr"]["show"]):
                # self.queue5.link(self.converter)
                # self.converter.link(self.capsfilter)
                # self.capsfilter.link(self.sink)
                self.queue7.link(self.sink)

            else:
                self.queue7.link(self.sink)
        return True

    def create_source_bin(self, index, uri):
        MyLogger.info("Creating source bin")
        if not self.check_uri_valid(uri):
            MyLogger.error(f"URI is invalid or unreachable: {uri}", )
            return None  # Ö±½Ó·µ»Ø None ±íʾ´´½¨Ê§°Ü

        # Create a source GstBin to abstract this bin's content from the rest of the
        # pipeline
        bin_name = "source-bin-%02d" % index
        MyLogger.info(bin_name)
        nbin = Gst.Bin.new(bin_name)
        if not nbin:
            MyLogger.error(" Unable to create source bin")

        if file_loop:
            # use nvurisrcbin to enable file-loop
            uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
            uri_decode_bin.set_property("file-loop", 1)
            MyLogger.info("nvurisrcbin")
        else:
            uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
            MyLogger.info("uridecodebin")
        if not uri_decode_bin:
            sys.stderr.write(" Unable to create uri decode bin \n")
        # We set the input uri to the source element
        uri_decode_bin.set_property("uri", uri)
        # Connect to the "pad-added" signal of the decodebin which generates a
        # callback once a new pad for raw data has beed created by the decodebin
        # todo
        uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
        self.source_bins[index] = nbin
        uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)

        Gst.Bin.add(nbin, uri_decode_bin)
        bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
        if not bin_pad:
            sys.stderr.write(" Failed to add ghost pad in source bin \n")
            return None
        return nbin

    # pgie_src_pad_buffer_probe  will extract metadata received on tiler sink pad
    # and update params for drawing rectangle, object information etc.

    def check_uri_valid(self, uri, timeout=5):
        """¼ì²é URI ÊÇ·ñÓÐЧ"""
        parsed = urlparse(uri)

        # Îļþ·¾¶¼ì²é
        if parsed.scheme == "file":
            file_path = parsed.path

            # ´¦Àí Windows ·¾¶µÄÌØÊâÇé¿ö£¨Èç file:///C:/path£©
            if os.name == "nt" and len(file_path) >= 3 and file_path[0] == "/":
                file_path = file_path[1:]  # ÒƳý¿ªÍ·µÄб¸Ü

            return os.path.exists(file_path)


        # RTSP »òÆäËûЭÒ飨Ðè×Ô¶¨Òå¼ì²éÂß¼­£©
        elif parsed.scheme == "rtsp":
            # ´Ë´¦¿ÉÒÔµ÷Óà GStreamer ²âÊÔ Pipeline ¼ì²éÁ÷ÓÐЧÐÔ
            # ·µ»Ø True ±íʾ¼ÙÉèÓÐЧ£¨Ðè¸ù¾Ýʵ¼ÊÐèÇóʵÏÖ£©
            if self.check_uri_playable(uri):
                return True
            else:
                return False

        # ÆäËûЭÒéÖ±½ÓÌø¹ý¼ì²é
        else:
            MyLogger.warning(f"Unsupported URI scheme: {parsed.scheme}, skipping validation")
            return True  # ĬÈÏÊÓΪÓÐЧ

    # def check_uri_playable(self, uri, timeout=5):
    #     """ͨ¹ý GStreamer ²âÊÔ Pipeline ¼ì²é URI ÊÇ·ñ¿É²¥·Å"""
    #     pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
    #     pipeline.set_state(Gst.State.PLAYING)  # ½øÈë PAUSED ״̬³¢ÊÔ³õʼ»¯
    #
    #     # µÈ´ý״̬ת»»Íê³É»ò³¬Ê±
    #     result = pipeline.get_state(timeout=Gst.SECOND * timeout)
    #     if result[0] == Gst.StateChangeReturn.FAILURE:
    #         return False
    #
    #     # ¼ì²éÊÇ·ñÓдíÎó
    #     bus = pipeline.get_bus()
    #     msg = bus.timed_pop_filtered(
    #         Gst.SECOND * timeout,
    #         Gst.MessageType.ERROR | Gst.MessageType.STATE_CHANGED
    #     )
    #     if msg and msg.type == Gst.MessageType.ERROR:
    #         return False
    #
    #     pipeline.set_state(Gst.State.NULL)
    #     MyLogger.info(f"url:{uri} valid")
    #     return True
    def check_uri_playable(self, uri, timeout=5):
        """¼ì²éRTSP/RTMPµÈÍøÂçÁ÷ÓÐЧÐÔ"""
        pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
        pipeline.set_state(Gst.State.PLAYING)
        time.sleep(0.1)
        # µÈ´ý¹ÜµÀ½øÈëPLAYING״̬»ò³¬Ê±
        start_time = time.time()
        while time.time() - start_time < timeout:
            state = pipeline.get_state(timeout=1000000)[1]
            if state == Gst.State.PLAYING:
                pipeline.set_state(Gst.State.NULL)
                return True
        pipeline.set_state(Gst.State.NULL)
        return False

    def pgie_src_pad_buffer_probe(self, pad, info, u_data):
        frame_number = 0
        num_rects = 0
        got_fps = False

        current_time = time.time()

        # self.update_time = time.time()

        gst_buffer = info.get_buffer()
        if not gst_buffer:
            MyLogger.error("Unable to get GstBuffer ")
            return Gst.PadProbeReturn.OK

        # opencv_image = StreamCapture.gst_buffer_to_opencv(gst_buffer)
        # Retrieve batch metadata from the gst_buffer
        # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
        # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        # todo
        if not batch_meta:
            MyLogger.warning("Batch meta is None, skipping frame processing")
            return Gst.PadProbeReturn.OK
        l_frame = batch_meta.frame_meta_list
        while l_frame is not None:
            n_frame1_box = None
            try:
                # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
                # The casting is done by pyds.NvDsFrameMeta.cast()
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)



                # MyLogger.info(f"--------------------------------{frame_meta.batch_id}")

                if current_time - self.last_alive_time_dict.setdefault(frame_meta.pad_index, 0) > 60 * 5:
                    MyLogger.info(
                        f"source [{frame_meta.pad_index}]:{self.stream_path_code[frame_meta.pad_index]} is alive...")
                    self.last_alive_time_dict[frame_meta.pad_index] = current_time

                # if current_time - self.last_frame_time_dict.setdefault(frame_meta.pad_index, 0) < self.frame_interval:
                #     MyLogger.info(f"source{frame_meta.pad_index}:skip frame")
                #     return Gst.PadProbeReturn.OK

                self.last_frame_time_dict[frame_meta.pad_index] = current_time

            except StopIteration:
                break

            frame_number = frame_meta.frame_num
            l_obj = frame_meta.obj_meta_list
            num_rects = frame_meta.num_obj_meta
            is_first_obj = True
            save_image = False

            n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
            n_frame1 = np.array(n_frame, copy=True, order='C')
            n_frame1 = cv2.rectangle(n_frame1, (100, 100), (400, 300), (255, 0, 0), 2)
            detected_objects = []
            while l_obj is not None:
                try:
                    # Casting l_obj.data to pyds.NvDsObjectMeta
                    obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
                    obj_label = pyds.get_string(obj_meta.text_params.display_text)
                    obj_confidence = obj_meta.confidence
                    obj_bbox = (obj_meta.rect_params.left, obj_meta.rect_params.top,
                                obj_meta.rect_params.width, obj_meta.rect_params.height)
                    # print(obj_meta.class_id)
                    # print(obj_confidence)
                    detected_objects.append({
                        "label": obj_label,
                        "confidence": obj_confidence,
                        "bbox": obj_bbox,
                        "pad_index": frame_meta.pad_index
                    })
                except StopIteration:
                    break



                # if is_first_obj:
                #     is_first_obj = False
                if n_frame1_box is None:
                    n_frame1_box = StreamCapture.draw_bounding_boxes(copy.deepcopy(n_frame1), obj_meta,
                                                                     obj_meta.confidence)
                else:
                    n_frame1_box = StreamCapture.draw_bounding_boxes(n_frame1_box, obj_meta,
                                                                     obj_meta.confidence)

                # convert python array into numpy array format in the copy mode.

                try:
                    l_obj = l_obj.next
                except StopIteration:
                    break

            # convert the array into cv2 default color format
            n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2RGB)
            # n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2BGRA)
            if n_frame1_box is not None:
                cv2.imwrite(f"1-{frame_meta.batch_id}.jpg", n_frame1_box)

            # 配置矩形参数
            # display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            # display_meta.num_rects = 1  # 设置绘制一个矩形
            # rect = display_meta.rect_params[0]
            # x, y , w, h = MyConfigReader.cfg_dict["ROI"][f"source{frame_meta.pad_index}"].split(",")
            # rect.left = int(x)  # ROI左边界x坐标
            # rect.top = int(y)  # ROI上边界y坐标
            # rect.width = int(w)  # ROI宽度
            # rect.height = int(h)  # ROI高度
            # rect.border_width = 3  # 边框宽度
            # rect.border_color.red = 0.0  # 边框颜色(RGBA,范围0.0-1.0)
            # rect.border_color.green = 0.0
            # rect.border_color.blue = 1.0
            # rect.border_color.alpha = 1.0
            #
            # # 将显示元数据添加到当前帧
            # pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
            with self.lock:
                self.frame_dict.setdefault(str(frame_meta.pad_index), {"frame": None, "time": None})
                self.frame_dict[str(frame_meta.pad_index)]["frame"] = n_frame1
                self.frame_dict[str(frame_meta.pad_index)]["time"] = get_current_timestamp_millis()

            if not silent:
                if num_rects != 0:
                    self.detection_algorithm(detected_objects, n_frame1, n_frame1_box)
                    MyLogger.info(
                        f"batchid:{frame_meta.pad_index},Frame Number={frame_number},Number of Objects={num_rects}")
            # Update frame rate through this probe
            stream_index = "stream{0}".format(frame_meta.pad_index)
            # global perf_data
            # perf_data.update_fps(stream_index)
            try:
                l_frame = l_frame.next
            except StopIteration:
                break
        return Gst.PadProbeReturn.OK


    def np_array_to_base64(self, image):
        img_array = cv2.cvtColor(image, cv2.COLOR_RGB2BGR)  # RGB2BGR,用于cv2编码
        encode_image = cv2.imencode(".jpg", img_array)[1]  # 用cv2压缩/编码,转为一维数组
        byte_data = encode_image.tobytes()  # 转换为二进制
        base64_str = base64.b64encode(byte_data).decode("ascii")  # 转换为base64
        return base64_str

    def detection_algorithm(self, detected_obj, frame_src, frame_bbox):
        camera_index_code = ""
        tag = False
        for obj in detected_obj:
            if obj.get("label", "") == "Person":
                camera_index_code = self.stream_path_code[obj.get('pad_index')]
                tag = True
                break

        if tag:
            msg = {
                "System": "StreamingMedia",
                "CMD": "Alarm",
                "Type": "CJ12-001",
                "Pic": self.np_array_to_base64(frame_bbox),
                "Time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
                "Data": {
                    "Camera": camera_index_code,
                    "Message": "msg"
                },
                "Code": 200
            }
            self.kafka_producer.send(
                topic=f"{MyConfigReader.cfg_dict['Kafka']['topic']}",
                value=f"{json.dumps(msg).encode('utf-8')}")
            MyLogger.info("发送报警信息至kafka队列")
        else:
            pass
            # MyLogger.warning("检测到非目标物体")

    @staticmethod
    def draw_bounding_boxes(image, obj_meta, confidence):
        image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
        confidence = '{0:.2f}'.format(confidence)
        rect_params = obj_meta.rect_params
        top = int(rect_params.top)
        left = int(rect_params.left)
        width = int(rect_params.width)
        height = int(rect_params.height)
        obj_name = pyds.get_string(obj_meta.text_params.display_text)
        # image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)
        color = (0, 0, 255, 0)
        w_percents = int(width * 0.05) if width > 100 else int(width * 0.1)
        h_percents = int(height * 0.05) if height > 100 else int(height * 0.1)
        linetop_c1 = (left + w_percents, top)
        linetop_c2 = (left + width - w_percents, top)
        image = cv2.line(image, linetop_c1, linetop_c2, color, 6)
        linebot_c1 = (left + w_percents, top + height)
        linebot_c2 = (left + width - w_percents, top + height)
        image = cv2.line(image, linebot_c1, linebot_c2, color, 6)
        lineleft_c1 = (left, top + h_percents)
        lineleft_c2 = (left, top + height - h_percents)
        image = cv2.line(image, lineleft_c1, lineleft_c2, color, 6)
        lineright_c1 = (left + width, top + h_percents)
        lineright_c2 = (left + width, top + height - h_percents)
        image = cv2.line(image, lineright_c1, lineright_c2, color, 6)
        # Note that on some systems cv2.putText erroneously draws horizontal lines across the image
        image = cv2.putText(image, obj_name + f":{obj_meta.class_id}" + ',C=' + str(confidence), (left - 10, top - 10),
                            cv2.FONT_HERSHEY_SIMPLEX,
                            0.5,
                            (0, 0, 255, 0), 2)
        return image

    def open(self, stream_paths_dict):
        self.stream_path_code = list(stream_paths_dict.keys())
        self.stream_paths = list(stream_paths_dict.values())
        self.number_sources = len(self.stream_paths)
        Gst.init(None)
        if not self.create_pipeline():
            MyLogger.error('Failed to create Pipline.')
            self.status = False
            return False
        self.running = True
        self.thread = threading.Thread(target=self._read_loop)
        self.thread.daemon = True
        self.thread.start()
        self.status = True
        return True

    # 定义探针回调函数
    def osd_sink_pad_buffer_probe(self, pad, info, u_data):
        buf = info.get_buffer()
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buf))

        # 遍历所有帧元数据
        frame_meta_list = batch_meta.frame_meta_list
        while frame_meta_list:
            frame_meta = pyds.NvDsFrameMeta.cast(frame_meta_list.data)

            # 从元数据池中获取显示元数据对象
            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            display_meta.num_rects = 1  # 设置绘制一个矩形

            # 配置矩形参数
            rect = display_meta.rect_params[0]
            rect.left = 100  # ROI左边界x坐标
            rect.top = 100  # ROI上边界y坐标
            rect.width = 300  # ROI宽度
            rect.height = 200  # ROI高度
            rect.border_width = 3  # 边框宽度
            rect.border_color.red = 1.0  # 边框颜色(RGBA,范围0.0-1.0)
            rect.border_color.green = 0.0
            rect.border_color.blue = 0.0
            rect.border_color.alpha = 1.0

            # 将显示元数据添加到当前帧
            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

            frame_meta_list = frame_meta_list.next

        return Gst.PadProbeReturn.OK

    def _read_loop(self):

        # global perf_data
        # perf_data = PERF_DATA(len(self.stream_paths))

        number_sources = len(self.stream_paths)

        # Standard GStreamer initialization

        # create an event loop and feed gstreamer bus mesages to it
        self.bus = self.pipeline.get_bus()
        # self.pgie_src_pad = self.tiler.get_static_pad("src")
        self.pgie_src_pad = self.tiler.get_static_pad("sink")
        if not self.pgie_src_pad:
            MyLogger.error(" Unable to get src pad")
        else:
            if not self.disable_probe:
                self.pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_src_pad_buffer_probe, 0)
                # perf callback function to print fps every 5 sec
                # GLib.timeout_add(5000, perf_data.perf_print_callback)
        # List the sources
        MyLogger.info("Now playing...")
        for i, source in enumerate(self.stream_paths):
            MyLogger.info(f"{i}: {source}")

        MyLogger.info("Starting pipeline \n")
        # start play back and listed to events
        self.pipeline.set_state(Gst.State.PLAYING)
        for index_code, item_code in enumerate(self.stream_path_code):
            self.last_frame_time_dict[index_code] = time.time()
        while self.status:
            for index_code, item_code in enumerate(self.stream_path_code):
                if time.time() - self.last_frame_time_dict[index_code] > 20:
                    MyLogger.warning(f"source[{index_code}]:{item_code} 码流长时间未读取,准备重启...")
                    with self.dict_lock:
                        self.over_time_dict[item_code] = index_code
                        # self.status = False

            message = self.bus.timed_pop_filtered(100 * Gst.MSECOND, Gst.MessageType.ANY)
            if message:
                if message.type == Gst.MessageType.ERROR:
                    err, debug = message.parse_error()
                    MyLogger.error(f"Error from {message.src.get_name()}: {err}")

                    MyLogger.error(f"Debugging info: {debug}")
                    # self.pipeline.set_state(Gst.State.NULL)
                    # if self.bus:
                    #     self.bus = None
                    MyLogger.info("取流发生错误,管道状态切换至NULL,准备重连")
                    # Try to reconnect
                    # if not self.reconnect_stream():
                    #     with self.dict_lock:
                    #         self.over_time_dict[self.strIndexCode] = self.strName
                    #         # self.close()
                    #         self.status = False

                    # with self.dict_lock:
                    self.restart = True
                    #     self.over_time_dict[self.strIndexCode] = self.strName
                    #     self.status = False
                elif message.type == Gst.MessageType.EOS:
                    MyLogger.warning("End-Of-Stream reached.")
                    self.restart = True
                    # with self.dict_lock:
                    #     self.over_time_dict[self.strIndexCode] = self.strName
                    #     self.status = False
                    # self.close()
                elif message.type == Gst.MessageType.WARNING:
                    err, debug = message.parse_warning()
                    MyLogger.warning(f"Pipline Warning: {err}: {debug}\n")
                elif message.type == Gst.MessageType.STATE_CHANGED:
                    if message.src == self.pipeline:
                        old, new, pending = message.parse_state_changed()
                        MyLogger.info(f"Pipeline state changed from {old} to {new}")

        # cleanup
        MyLogger.info("Exiting app\n")
        self.pipeline.set_state(Gst.State.NULL)
        if self.bus:
            self.bus = None
        self.pipeline = None
        MyLogger.info("pipline End")

    # ÔÚStreamCaptureÀàÖÐÌí¼ÓÒÔÏ·½·¨
    def add_source(self, stream_id, uri):
        """¶¯Ì¬Ìí¼ÓÊÓƵԴ"""
        # todo
        if self.number_sources >= 2:
            MyLogger.warning("ÒÑ´ïµ½×î´óÔ´ÊýÁ¿ÏÞÖÆ")
            return False

        # ´´½¨ÐµÄsource bin
        source_bin = self.create_source_bin(stream_id, uri)
        if not source_bin:
            return False

        # ½«source bin¼ÓÈë¹ÜµÀ
        self.pipeline.add(source_bin)
        source_bin.sync_state_with_parent()

        # »ñÈ¡streammuxµÄsink pad²¢Á¬½Ó
        pad_name = f"sink_{stream_id}"
        sinkpad = self.streammux.get_request_pad(pad_name)
        srcpad = source_bin.get_static_pad("src")
        srcpad.link(sinkpad)

        # ¸üÐÂÅäÖÃ
        # self.stream_paths[stream_id] = uri
        self.number_sources += 1
        self.streammux.set_property("batch-size", self.number_sources)
        self.pgie.set_property("batch-size", self.number_sources)
        state_return = source_bin.set_state(Gst.State.PLAYING)

        return True

    def remove_source(self, stream_id):
        """¶¯Ì¬ÒƳýÊÓƵԴ"""
        # Í£Ö¹²¢ÒƳýsource bin
        source_bin = self.source_bins.get(stream_id)

        if not source_bin:
            return False
        state_return = source_bin.set_state(Gst.State.NULL)
        if state_return == Gst.StateChangeReturn.SUCCESS:
            MyLogger.info("STATE CHANGE SUCCESS")

        # ·¢ËÍEOS²¢ÊÍ·Åpad
        pad_name = f"sink_{stream_id}"
        sinkpad = self.streammux.get_static_pad(pad_name)
        sinkpad.send_event(Gst.Event.new_flush_stop(False))
        self.streammux.release_request_pad(sinkpad)

        # ´Ó¹ÜµÀÖÐÒƳýÔªËØ
        source_bin.set_state(Gst.State.NULL)
        self.pipeline.remove(source_bin)

        # ¸üÐÂÅäÖÃ
        del self.stream_paths[stream_id]
        self.number_sources -= 1
        self.streammux.set_property("batch-size", self.number_sources)
        self.pgie.set_property("batch-size", self.number_sources)

        return True



        time.sleep(0.1)

change in preprocess config:

- scaling-pool-memory-type=2
+ scaling-pool-memory-type=0
...
[group-0]
src-ids=0;1;2;3
- custom-input-transformation-function=CustomAsyncTransformation
+ custom-input-transformation-function=CustomTransformation
process-on-roi=0

in pgie:

+ input-tensor-from-meta=1
+ force-implicit-batch-dim=1

You can refer to the model folder of mine.

There is no error after the modification, thank you.
Also could you please tell me if I want to change picture save position, is to rerun bash setting_envirioment.sh path/to/folder/DeepStream_Custom_Preprocess_Plugin?

Simply run it once to install the libraries. If you make changes to the Makefile, you’ll need to run it again to reinstall. If you find the repo useful, please consider giving it a star.

1 Like

Sure. Thank you.