pgie
[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
onnx-file=/home/smuvision/Downloads/DoorOpen/dooropen_det.pt.onnx
model-engine-file=/home/smuvision/Downloads/DoorOpen/dooropen_det.engine
#int8-calib-file=calib.table
labelfile-path=/home/smuvision/Downloads/DoorOpen/labels.txt
batch-size=2
network-mode=0
num-detected-classes=2
interval=0
gie-unique-id=1
process-mode=1
network-type=0
cluster-mode=2
maintain-aspect-ratio=1
symmetric-padding=1
#workspace-size=2000
parse-bbox-func-name=NvDsInferParseYolo
#parse-bbox-func-name=NvDsInferParseYoloCuda
custom-lib-path=/home/smuvision/Work/DeepStream-Yolo/nvdsinfer_custom_impl_Yolo/libnvdsinfer_custom_impl_Yolo.so
engine-create-func-name=NvDsInferYoloCudaEngineGet
input-tensor-from-meta=1
[class-attrs-all]
nms-iou-threshold=0.2
pre-cluster-threshold=0.25
topk=300
preprocess
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
# The values in the config file are overridden by values set through GObject
# properties.
[property]
enable=1
target-unique-ids=1
# 0=NCHW, 1=NHWC, 2=CUSTOM
network-input-order=0
process-on-frame=1
# if enabled maintain the aspect ratio while scaling
maintain-aspect-ratio=1
# if enabled pad symmetrically with maintain-aspect-ratio enabled
symmetric-padding=0
# processing width/height at which image scaled
processing-width=1280
processing-height=1280
scaling-buf-pool-size=6
tensor-buf-pool-size=6
# tensor shape based on network-input-order
network-input-shape=1;3;1280;1280
# 0=RGB, 1=BGR, 2=GRAY
network-color-format=0
# 0=FP32, 1=UINT8, 2=INT8, 3=UINT32, 4=INT32, 5=FP16
tensor-data-type=0
tensor-name=input
# 0=NVBUF_MEM_DEFAULT 1=NVBUF_MEM_CUDA_PINNED 2=NVBUF_MEM_CUDA_DEVICE 3=NVBUF_MEM_CUDA_UNIFIED
#scaling-pool-memory-type=0
scaling-pool-memory-type=2
# 0=NvBufSurfTransformCompute_Default 1=NvBufSurfTransformCompute_GPU 2=NvBufSurfTransformCompute_VIC
#scaling-pool-compute-hw=0
scaling-pool-compute-hw=1
# Scaling Interpolation method
# 0=NvBufSurfTransformInter_Nearest 1=NvBufSurfTransformInter_Bilinear 2=NvBufSurfTransformInter_Algo1
# 3=NvBufSurfTransformInter_Algo2 4=NvBufSurfTransformInter_Algo3 5=NvBufSurfTransformInter_Algo4
# 6=NvBufSurfTransformInter_Default
#scaling-filter=0
scaling-filter=8
custom-lib-path=/opt/nvidia/deepstream/deepstream/lib/gst-plugins/libcustom2d_preprocess.so
custom-tensor-preparation-function=CustomTensorPreparation
custom-input-transformation-function=CustomTransformation
[user-configs]
pixel-normalization-factor=0.003921568
#mean-file=
#offsets=
[group-0]
src-ids=0;1;2;3
custom-input-transformation-function=CustomAsyncTransformation
process-on-roi=0
roi-params-src-0=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-1=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-2=0;540;900;500;960;0;900;500;0;0;540;900;
roi-params-src-3=0;540;900;500;960;0;900;500;0;0;540;900;
main.py
from numpy.lib.utils import source
import sys
import threading
import copy
import json
sys.path.append('../')
from pathlib import Path
import gi
import numpy as np
import configparser
import argparse
import os
import base64
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from urllib.parse import urlparse
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from my_log import MyLogger
from config_reader import MyConfigReader
from datetime import datetime
from KafkaProducer import KafkaProducer
import pyds
import cv2
os.environ['GST_PLUGIN_PATH'] = '/opt/nvidia/deepstream/deepstream/lib/gst-plugins:' + os.environ.get('GST_PLUGIN_PATH',
'')
no_display = False
silent = False
file_loop = True #设置cpu解码还是gpu解码
perf_data = None
MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_VEHICLE = 0
PGIE_CLASS_ID_BICYCLE = 1
PGIE_CLASS_ID_PERSON = 2
PGIE_CLASS_ID_ROADSIGN = 3
MUXER_OUTPUT_WIDTH = 1920
MUXER_OUTPUT_HEIGHT = 1080
MUXER_BATCH_TIMEOUT_USEC = 4000000
TILED_OUTPUT_WIDTH = 1280
TILED_OUTPUT_HEIGHT = 720
# TILED_OUTPUT_WIDTH=1920
# TILED_OUTPUT_HEIGHT=1080
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 1
GPU_ID = int(MyConfigReader.cfg_dict["GPU"]["gpu_id"])
def get_current_timestamp_millis():
timestamp = time.time()
timestamp_millis = int(timestamp * 1000)
dt_object = datetime.fromtimestamp(timestamp)
standard_time_format = dt_object.strftime('%Y-%m-%d_%H-%M-%S.') + str(timestamp_millis % 1000).zfill(3)
return standard_time_format
class StreamCapture:
def __init__(self, pgie, config, disable_probe):
super().__init__()
self.pipeline = None
self.framerate = int(MyConfigReader.cfg_dict['nvr']['fps'])
self.running = False
self.frame_interval = 1.0 / self.framerate
self.last_frame_time_dict = {}
self.reconnect_attempts = 5 # 最大重连次数
self.reconnect_interval = 5 # 重连间隔(秒)
self.thread = None
self.status = None
self.bus = None
self.update_time = time.time()
self.last_alive_time_dict = {}
self.over_time_dict = {}
self.dict_lock = threading.Lock()
self.stream_paths = None
self.stream_path_code = None
self.requested_pgie = pgie
self.disable_probe = disable_probe
self.config = config
self.number_sources = None
self.frame_dict = {}
self.source_bins = {}
self.lock = threading.Lock()
self.kafka_producer = KafkaProducer({
'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
}, retry_limit=5, retry_interval=3)
def cb_newpad(self, decodebin, decoder_src_pad, data):
MyLogger.info("In cb_newpad")
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
MyLogger.info(f"fgstname={gstname}")
if gstname.find("video") != -1:
MyLogger.info(f"features={features}")
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
MyLogger.error("Failed to link decoder src pad to source bin ghost pad")
else:
MyLogger.error(" Error: Decodebin did not pick nvidia decoder plugin.")
def read(self):
with self.lock:
# return [value["frame"] for key, value in self.frame_dict.items()]
return copy.deepcopy(self.frame_dict)
def close(self):
if self.thread.is_alive():
self.thread.join()
if self.pipeline is not None and not self.is_pipeline_null():
self.pipeline.set_state(Gst.State.NULL)
MyLogger.info("关闭取流线程,管道状态切换至NULL")
if self.bus:
self.bus = None
self.pipeline = None
self.status = False
def is_pipeline_null(self):
(state, pending) = self.pipeline.get_state(timeout=1000000000)
return state == Gst.State.NULL
def decodebin_child_added(self, child_proxy, Object, name, user_data):
MyLogger.info(f"Decodebin child added:{name}")
if (name.find("decodebin") != -1):
Object.connect("child-added", self.decodebin_child_added, user_data)
if "nvv" in name.lower(): # 匹配 nvv4l2decoder, nvv4l2h264dec 等
if Object.find_property("gpu_id") is not None:
Object.set_property("gpu_id", GPU_ID) # 与下游组件(如 nvstreammux)一致
MyLogger.info(f"设置 {name} 的 GPU_ID 为 1")
#todo
# if "source" in name:
# source_element = child_proxy.get_by_name("source")
# if source_element.find_property('drop-on-latency') != None:
# Object.set_property("drop-on-latency", True)
def create_pipeline(self):
MyLogger.info("Creating Pipeline ")
self.pipeline = Gst.Pipeline()
self.is_live = False
if not self.pipeline:
MyLogger.error(" Unable to create Pipeline")
MyLogger.info("Creating streamux ")
# Create nvstreammux instance to form batches from one or more sources.
self.streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not self.streammux:
MyLogger.error(" Unable to create NvStreamMux ")
return False
self.streammux.set_property('batched-push-timeout', 40000)
self.streammux.set_property("gpu_id", GPU_ID)
self.pipeline.add(self.streammux)
valid_sources = []
for i in range(self.number_sources):
self.last_frame_time_dict[i] = time.time()
MyLogger.info(f"Creating source_bin: {i}")
uri_name = self.stream_paths[i]
if uri_name.find("rtsp://") == 0:
self.is_live = True
# todo
self.source_bin = self.create_source_bin(i, uri_name)
# if not self.source_bin:
# MyLogger.error("Unable to create source bin")
# return False
# self.pipeline.add(self.source_bin)
if self.source_bin is not None:
# valid_sources.append(source_bin)
# ½« source_bin Ìí¼Óµ½Ö÷ Pipeline ÖÐ
valid_sources.append(self.source_bin)
self.pipeline.add(self.source_bin)
else:
MyLogger.error(f"Skipping invalid source: {uri_name}")
continue
padname = "sink_%u" % i
self.sinkpad = self.streammux.get_request_pad(padname)
if not self.sinkpad:
MyLogger.error("Unable to create sink pad bin")
return False
self.srcpad = self.source_bin.get_static_pad("src")
if not self.srcpad:
MyLogger.error("Unable to create src pad bin")
return False
self.srcpad.link(self.sinkpad)
self.number_sources = len(valid_sources)
# self.streammux.set_property("batch-size", self.number_sources)
# self.pgie.set_property("batch-size", self.number_sources)
self.queue1 = Gst.ElementFactory.make("queue", "queue1")
self.queue2 = Gst.ElementFactory.make("queue", "queue2")
self.queue3 = Gst.ElementFactory.make("queue", "queue3")
self.queue4 = Gst.ElementFactory.make("queue", "queue4")
self.queue5 = Gst.ElementFactory.make("queue", "queue5")
self.queue6 = Gst.ElementFactory.make("queue", "queue6")
self.queue7 = Gst.ElementFactory.make("queue", "queue7")
# self.queue8 = Gst.ElementFactory.make("queue", "queue8")
self.pipeline.add(self.queue1)
self.pipeline.add(self.queue2)
self.pipeline.add(self.queue3)
self.pipeline.add(self.queue4)
self.pipeline.add(self.queue5)
self.pipeline.add(self.queue6)
self.pipeline.add(self.queue7)
# self.pipeline.add(self.queue8)
self.nvdslogger = None
self.transform = None
MyLogger.info("Creating preprocess ")
self.preprocess = Gst.ElementFactory.make("nvdspreprocess", "preprocess-plugin")
if not self.preprocess:
MyLogger.error(f" Unable to create preprocess : {self.preprocess}")
return False
self.preprocess.set_property("gpu_id", GPU_ID)
self.preprocess.set_property("config-file", "config_preprocess.txt")
MyLogger.info("Creating Pgie ")
if self.requested_pgie is not None and (
self.requested_pgie == 'nvinferserver' or self.requested_pgie == 'nvinferserver-grpc'):
self.pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
elif self.requested_pgie is not None and self.requested_pgie == 'nvinfer':
self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
else:
self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not self.pgie:
MyLogger.error(f" Unable to create pgie : {self.requested_pgie}")
return False
# self.pgie.set_property("interval", 5)
if self.disable_probe:
# Use nvdslogger for perf measurement instead of probe function
MyLogger.info("Creating nvdslogger ")
self.nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
self.nvdslogger.set_property("gpu_id", GPU_ID)
MyLogger.info("Creating tiler ")
self.tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
self.tiler.set_property("gpu_id", GPU_ID)
if not self.tiler:
MyLogger.error(" Unable to create tiler")
return False
MyLogger.info("Creating nvvidconv")
self.nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
self.nvvidconv.set_property("gpu_id", GPU_ID)
if not self.nvvidconv:
MyLogger.error(" Unable to create nvvidconv")
return False
MyLogger.info("Creating nvosd")
self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
self.nvosd.set_property("gpu_id", GPU_ID)
if not self.nvosd:
MyLogger.error(" Unable to create nvosd")
return False
self.nvosd.set_property('process-mode', OSD_PROCESS_MODE)
self.nvosd.set_property('display-text', OSD_DISPLAY_TEXT)
if not int(MyConfigReader.cfg_dict["nvr"]["show"]):
MyLogger.info("Creating Fakesink ")
self.sink = Gst.ElementFactory.make("fakesink", "fakesink")
self.sink.set_property('enable-last-sample', 0)
self.sink.set_property('sync', 0)
else:
if is_aarch64():
MyLogger.info("Creating transform")
# self.transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
# if not self.transform:
# MyLogger.error(" Unable to create transform")
self.sink = Gst.ElementFactory.make("appsink", f"appsink")
MyLogger.info("Creating EGLSink \n")
self.sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
self.sink.set_property('sync', 0)
self.sink.set_property("gpu_id", GPU_ID)
# self.sink = Gst.ElementFactory.make("appsink", f"appsink1")
self.converter = Gst.ElementFactory.make("nvvideoconvert", f"converter2")
self.converter.set_property("gpu_id", GPU_ID)
self.capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter")
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
# self.converter.set_property("nvbuf-memory-type", 0)
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
self.converter.set_property("nvbuf-memory-type", mem_type)
self.tiler.set_property("nvbuf-memory-type", mem_type)
self.capsfilter.set_property("caps", caps)
if not self.sink:
MyLogger.error(" Unable to create sink element")
return False
if self.is_live:
MyLogger.info("At least one of the sources is live")
self.streammux.set_property('live-source', 1)
self.streammux.set_property('width', 3840)
self.streammux.set_property('height', 2160)
# self.streammux.set_property('width', 1920)
# self.streammux.set_property('height', 1080)
self.streammux.set_property('batch-size', self.number_sources)
if self.requested_pgie == "nvinferserver" and self.config is not None:
self.pgie.set_property('config-file-path', self.config)
elif self.requested_pgie == "nvinferserver-grpc" and self.config is not None:
self.pgie.set_property('config-file-path', self.config)
elif self.requested_pgie == "nvinfer" and self.config is not None:
self.pgie.set_property('config-file-path', self.config)
else:
# todo
self.pgie.set_property('config-file-path', self.config)
pgie_batch_size = self.pgie.get_property("batch-size")
if pgie_batch_size != self.number_sources:
# print( pgie_batch_size, ":", self.number_sources)
MyLogger.warning(
f"WARNING: Overriding infer-config batch-size{pgie_batch_size}with number of sources{self.number_sources}")
self.pgie.set_property("batch-size", self.number_sources)
tiler_rows = int(math.sqrt(self.number_sources))
tiler_columns = int(math.ceil((1.0 * self.number_sources) / tiler_rows))
self.tiler.set_property("rows", tiler_rows)
self.tiler.set_property("columns", tiler_columns)
self.tiler.set_property("width", TILED_OUTPUT_WIDTH)
self.tiler.set_property("height", TILED_OUTPUT_HEIGHT)
# todo
self.sink.set_property("qos", 1)
if int(MyConfigReader.cfg_dict["nvr"]["show"]):
# todo
# self.sink.set_property("emit-signals", True)
# self.sink.set_property("sync", False)
# self.sink.set_property("max-buffers", 50)
# self.sink.set_property("drop", True)
# self.sink.connect("new-sample", self.on_new_sample)
pass
MyLogger.info("Adding elements to Pipeline")
self.pipeline.add(self.preprocess)
self.pipeline.add(self.pgie)
# todo
self.pipeline.add(self.converter)
self.pipeline.add(self.capsfilter)
if self.nvdslogger:
self.pipeline.add(self.nvdslogger)
self.pipeline.add(self.tiler)
self.pipeline.add(self.nvvidconv)
self.pipeline.add(self.nvosd)
if self.transform:
self.pipeline.add(self.transform)
if int(MyConfigReader.cfg_dict["nvr"]["show"]):
pass
# self.pipeline.add(self.encoder)
# self.pipeline.add(self.parser)
self.pipeline.add(self.sink)
MyLogger.info("Linking elements in the Pipeline \n")
self.streammux.link(self.queue1)
#todo
self.queue1.link(self.preprocess)
self.preprocess.link(self.pgie)
self.pgie.link(self.queue2)
self.queue2.link(self.converter)
self.converter.link(self.queue3)
self.queue3.link(self.capsfilter)
self.capsfilter.link(self.queue4)
if self.nvdslogger:
self.queue4.link(self.nvdslogger)
self.nvdslogger.link(self.tiler)
else:
self.queue4.link(self.tiler)
self.tiler.link(self.queue5)
self.queue5.link(self.nvvidconv)
self.nvvidconv.link(self.queue6)
self.queue6.link(self.nvosd)
if self.transform:
self.nvosd.link(self.queue7)
self.queue7.link(self.transform)
self.transform.link(self.sink)
else:
self.nvosd.link(self.queue7)
if int(MyConfigReader.cfg_dict["nvr"]["show"]):
# self.queue5.link(self.converter)
# self.converter.link(self.capsfilter)
# self.capsfilter.link(self.sink)
self.queue7.link(self.sink)
else:
self.queue7.link(self.sink)
return True
def create_source_bin(self, index, uri):
MyLogger.info("Creating source bin")
if not self.check_uri_valid(uri):
MyLogger.error(f"URI is invalid or unreachable: {uri}", )
return None # Ö±½Ó·µ»Ø None ±íʾ´´½¨Ê§°Ü
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
MyLogger.info(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
MyLogger.error(" Unable to create source bin")
if file_loop:
# use nvurisrcbin to enable file-loop
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
uri_decode_bin.set_property("file-loop", 1)
MyLogger.info("nvurisrcbin")
else:
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
MyLogger.info("uridecodebin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
# todo
uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
self.source_bins[index] = nbin
uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
# pgie_src_pad_buffer_probe will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def check_uri_valid(self, uri, timeout=5):
"""¼ì²é URI ÊÇ·ñÓÐЧ"""
parsed = urlparse(uri)
# Îļþ·¾¶¼ì²é
if parsed.scheme == "file":
file_path = parsed.path
# ´¦Àí Windows ·¾¶µÄÌØÊâÇé¿ö£¨Èç file:///C:/path£©
if os.name == "nt" and len(file_path) >= 3 and file_path[0] == "/":
file_path = file_path[1:] # ÒƳý¿ªÍ·µÄб¸Ü
return os.path.exists(file_path)
# RTSP »òÆäËûÐÒ飨Ðè×Ô¶¨Òå¼ì²éÂß¼£©
elif parsed.scheme == "rtsp":
# ´Ë´¦¿ÉÒÔµ÷Óà GStreamer ²âÊÔ Pipeline ¼ì²éÁ÷ÓÐЧÐÔ
# ·µ»Ø True ±íʾ¼ÙÉèÓÐЧ£¨Ðè¸ù¾Ýʵ¼ÊÐèÇóʵÏÖ£©
if self.check_uri_playable(uri):
return True
else:
return False
# ÆäËûÐÒéÖ±½ÓÌø¹ý¼ì²é
else:
MyLogger.warning(f"Unsupported URI scheme: {parsed.scheme}, skipping validation")
return True # ĬÈÏÊÓΪÓÐЧ
# def check_uri_playable(self, uri, timeout=5):
# """ͨ¹ý GStreamer ²âÊÔ Pipeline ¼ì²é URI ÊÇ·ñ¿É²¥·Å"""
# pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
# pipeline.set_state(Gst.State.PLAYING) # ½øÈë PAUSED ״̬³¢ÊÔ³õʼ»¯
#
# # µÈ´ý״̬ת»»Íê³É»ò³¬Ê±
# result = pipeline.get_state(timeout=Gst.SECOND * timeout)
# if result[0] == Gst.StateChangeReturn.FAILURE:
# return False
#
# # ¼ì²éÊÇ·ñÓдíÎó
# bus = pipeline.get_bus()
# msg = bus.timed_pop_filtered(
# Gst.SECOND * timeout,
# Gst.MessageType.ERROR | Gst.MessageType.STATE_CHANGED
# )
# if msg and msg.type == Gst.MessageType.ERROR:
# return False
#
# pipeline.set_state(Gst.State.NULL)
# MyLogger.info(f"url:{uri} valid")
# return True
def check_uri_playable(self, uri, timeout=5):
"""¼ì²éRTSP/RTMPµÈÍøÂçÁ÷ÓÐЧÐÔ"""
pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
pipeline.set_state(Gst.State.PLAYING)
time.sleep(0.1)
# µÈ´ý¹ÜµÀ½øÈëPLAYING״̬»ò³¬Ê±
start_time = time.time()
while time.time() - start_time < timeout:
state = pipeline.get_state(timeout=1000000)[1]
if state == Gst.State.PLAYING:
pipeline.set_state(Gst.State.NULL)
return True
pipeline.set_state(Gst.State.NULL)
return False
def pgie_src_pad_buffer_probe(self, pad, info, u_data):
frame_number = 0
num_rects = 0
got_fps = False
current_time = time.time()
# self.update_time = time.time()
gst_buffer = info.get_buffer()
if not gst_buffer:
MyLogger.error("Unable to get GstBuffer ")
return Gst.PadProbeReturn.OK
# opencv_image = StreamCapture.gst_buffer_to_opencv(gst_buffer)
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
# todo
if not batch_meta:
MyLogger.warning("Batch meta is None, skipping frame processing")
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
n_frame1_box = None
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
# MyLogger.info(f"--------------------------------{frame_meta.batch_id}")
if current_time - self.last_alive_time_dict.setdefault(frame_meta.pad_index, 0) > 60 * 5:
MyLogger.info(
f"source [{frame_meta.pad_index}]:{self.stream_path_code[frame_meta.pad_index]} is alive...")
self.last_alive_time_dict[frame_meta.pad_index] = current_time
# if current_time - self.last_frame_time_dict.setdefault(frame_meta.pad_index, 0) < self.frame_interval:
# MyLogger.info(f"source{frame_meta.pad_index}:skip frame")
# return Gst.PadProbeReturn.OK
self.last_frame_time_dict[frame_meta.pad_index] = current_time
except StopIteration:
break
frame_number = frame_meta.frame_num
l_obj = frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
is_first_obj = True
save_image = False
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
n_frame1 = np.array(n_frame, copy=True, order='C')
n_frame1 = cv2.rectangle(n_frame1, (100, 100), (400, 300), (255, 0, 0), 2)
detected_objects = []
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
obj_label = pyds.get_string(obj_meta.text_params.display_text)
obj_confidence = obj_meta.confidence
obj_bbox = (obj_meta.rect_params.left, obj_meta.rect_params.top,
obj_meta.rect_params.width, obj_meta.rect_params.height)
# print(obj_meta.class_id)
# print(obj_confidence)
detected_objects.append({
"label": obj_label,
"confidence": obj_confidence,
"bbox": obj_bbox,
"pad_index": frame_meta.pad_index
})
except StopIteration:
break
# if is_first_obj:
# is_first_obj = False
if n_frame1_box is None:
n_frame1_box = StreamCapture.draw_bounding_boxes(copy.deepcopy(n_frame1), obj_meta,
obj_meta.confidence)
else:
n_frame1_box = StreamCapture.draw_bounding_boxes(n_frame1_box, obj_meta,
obj_meta.confidence)
# convert python array into numpy array format in the copy mode.
try:
l_obj = l_obj.next
except StopIteration:
break
# convert the array into cv2 default color format
n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2RGB)
# n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2BGRA)
if n_frame1_box is not None:
cv2.imwrite(f"1-{frame_meta.batch_id}.jpg", n_frame1_box)
# 配置矩形参数
# display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
# display_meta.num_rects = 1 # 设置绘制一个矩形
# rect = display_meta.rect_params[0]
# x, y , w, h = MyConfigReader.cfg_dict["ROI"][f"source{frame_meta.pad_index}"].split(",")
# rect.left = int(x) # ROI左边界x坐标
# rect.top = int(y) # ROI上边界y坐标
# rect.width = int(w) # ROI宽度
# rect.height = int(h) # ROI高度
# rect.border_width = 3 # 边框宽度
# rect.border_color.red = 0.0 # 边框颜色(RGBA,范围0.0-1.0)
# rect.border_color.green = 0.0
# rect.border_color.blue = 1.0
# rect.border_color.alpha = 1.0
#
# # 将显示元数据添加到当前帧
# pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
with self.lock:
self.frame_dict.setdefault(str(frame_meta.pad_index), {"frame": None, "time": None})
self.frame_dict[str(frame_meta.pad_index)]["frame"] = n_frame1
self.frame_dict[str(frame_meta.pad_index)]["time"] = get_current_timestamp_millis()
if not silent:
if num_rects != 0:
self.detection_algorithm(detected_objects, n_frame1, n_frame1_box)
MyLogger.info(
f"batchid:{frame_meta.pad_index},Frame Number={frame_number},Number of Objects={num_rects}")
# Update frame rate through this probe
stream_index = "stream{0}".format(frame_meta.pad_index)
# global perf_data
# perf_data.update_fps(stream_index)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def np_array_to_base64(self, image):
img_array = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # RGB2BGR,用于cv2编码
encode_image = cv2.imencode(".jpg", img_array)[1] # 用cv2压缩/编码,转为一维数组
byte_data = encode_image.tobytes() # 转换为二进制
base64_str = base64.b64encode(byte_data).decode("ascii") # 转换为base64
return base64_str
def detection_algorithm(self, detected_obj, frame_src, frame_bbox):
camera_index_code = ""
tag = False
for obj in detected_obj:
if obj.get("label", "") == "Person":
camera_index_code = self.stream_path_code[obj.get('pad_index')]
tag = True
break
if tag:
msg = {
"System": "StreamingMedia",
"CMD": "Alarm",
"Type": "CJ12-001",
"Pic": self.np_array_to_base64(frame_bbox),
"Time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"Data": {
"Camera": camera_index_code,
"Message": "msg"
},
"Code": 200
}
self.kafka_producer.send(
topic=f"{MyConfigReader.cfg_dict['Kafka']['topic']}",
value=f"{json.dumps(msg).encode('utf-8')}")
MyLogger.info("发送报警信息至kafka队列")
else:
pass
# MyLogger.warning("检测到非目标物体")
@staticmethod
def draw_bounding_boxes(image, obj_meta, confidence):
image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
confidence = '{0:.2f}'.format(confidence)
rect_params = obj_meta.rect_params
top = int(rect_params.top)
left = int(rect_params.left)
width = int(rect_params.width)
height = int(rect_params.height)
obj_name = pyds.get_string(obj_meta.text_params.display_text)
# image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)
color = (0, 0, 255, 0)
w_percents = int(width * 0.05) if width > 100 else int(width * 0.1)
h_percents = int(height * 0.05) if height > 100 else int(height * 0.1)
linetop_c1 = (left + w_percents, top)
linetop_c2 = (left + width - w_percents, top)
image = cv2.line(image, linetop_c1, linetop_c2, color, 6)
linebot_c1 = (left + w_percents, top + height)
linebot_c2 = (left + width - w_percents, top + height)
image = cv2.line(image, linebot_c1, linebot_c2, color, 6)
lineleft_c1 = (left, top + h_percents)
lineleft_c2 = (left, top + height - h_percents)
image = cv2.line(image, lineleft_c1, lineleft_c2, color, 6)
lineright_c1 = (left + width, top + h_percents)
lineright_c2 = (left + width, top + height - h_percents)
image = cv2.line(image, lineright_c1, lineright_c2, color, 6)
# Note that on some systems cv2.putText erroneously draws horizontal lines across the image
image = cv2.putText(image, obj_name + f":{obj_meta.class_id}" + ',C=' + str(confidence), (left - 10, top - 10),
cv2.FONT_HERSHEY_SIMPLEX,
0.5,
(0, 0, 255, 0), 2)
return image
def open(self, stream_paths_dict):
self.stream_path_code = list(stream_paths_dict.keys())
self.stream_paths = list(stream_paths_dict.values())
self.number_sources = len(self.stream_paths)
Gst.init(None)
if not self.create_pipeline():
MyLogger.error('Failed to create Pipline.')
self.status = False
return False
self.running = True
self.thread = threading.Thread(target=self._read_loop)
self.thread.daemon = True
self.thread.start()
self.status = True
return True
# 定义探针回调函数
def osd_sink_pad_buffer_probe(self, pad, info, u_data):
buf = info.get_buffer()
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buf))
# 遍历所有帧元数据
frame_meta_list = batch_meta.frame_meta_list
while frame_meta_list:
frame_meta = pyds.NvDsFrameMeta.cast(frame_meta_list.data)
# 从元数据池中获取显示元数据对象
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_rects = 1 # 设置绘制一个矩形
# 配置矩形参数
rect = display_meta.rect_params[0]
rect.left = 100 # ROI左边界x坐标
rect.top = 100 # ROI上边界y坐标
rect.width = 300 # ROI宽度
rect.height = 200 # ROI高度
rect.border_width = 3 # 边框宽度
rect.border_color.red = 1.0 # 边框颜色(RGBA,范围0.0-1.0)
rect.border_color.green = 0.0
rect.border_color.blue = 0.0
rect.border_color.alpha = 1.0
# 将显示元数据添加到当前帧
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
frame_meta_list = frame_meta_list.next
return Gst.PadProbeReturn.OK
def _read_loop(self):
# global perf_data
# perf_data = PERF_DATA(len(self.stream_paths))
number_sources = len(self.stream_paths)
# Standard GStreamer initialization
# create an event loop and feed gstreamer bus mesages to it
self.bus = self.pipeline.get_bus()
# self.pgie_src_pad = self.tiler.get_static_pad("src")
self.pgie_src_pad = self.tiler.get_static_pad("sink")
if not self.pgie_src_pad:
MyLogger.error(" Unable to get src pad")
else:
if not self.disable_probe:
self.pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_src_pad_buffer_probe, 0)
# perf callback function to print fps every 5 sec
# GLib.timeout_add(5000, perf_data.perf_print_callback)
# List the sources
MyLogger.info("Now playing...")
for i, source in enumerate(self.stream_paths):
MyLogger.info(f"{i}: {source}")
MyLogger.info("Starting pipeline \n")
# start play back and listed to events
self.pipeline.set_state(Gst.State.PLAYING)
for index_code, item_code in enumerate(self.stream_path_code):
self.last_frame_time_dict[index_code] = time.time()
while self.status:
for index_code, item_code in enumerate(self.stream_path_code):
if time.time() - self.last_frame_time_dict[index_code] > 20:
MyLogger.warning(f"source[{index_code}]:{item_code} 码流长时间未读取,准备重启...")
with self.dict_lock:
self.over_time_dict[item_code] = index_code
# self.status = False
message = self.bus.timed_pop_filtered(100 * Gst.MSECOND, Gst.MessageType.ANY)
if message:
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
MyLogger.error(f"Error from {message.src.get_name()}: {err}")
MyLogger.error(f"Debugging info: {debug}")
# self.pipeline.set_state(Gst.State.NULL)
# if self.bus:
# self.bus = None
MyLogger.info("取流发生错误,管道状态切换至NULL,准备重连")
# Try to reconnect
# if not self.reconnect_stream():
# with self.dict_lock:
# self.over_time_dict[self.strIndexCode] = self.strName
# # self.close()
# self.status = False
# with self.dict_lock:
self.restart = True
# self.over_time_dict[self.strIndexCode] = self.strName
# self.status = False
elif message.type == Gst.MessageType.EOS:
MyLogger.warning("End-Of-Stream reached.")
self.restart = True
# with self.dict_lock:
# self.over_time_dict[self.strIndexCode] = self.strName
# self.status = False
# self.close()
elif message.type == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
MyLogger.warning(f"Pipline Warning: {err}: {debug}\n")
elif message.type == Gst.MessageType.STATE_CHANGED:
if message.src == self.pipeline:
old, new, pending = message.parse_state_changed()
MyLogger.info(f"Pipeline state changed from {old} to {new}")
# cleanup
MyLogger.info("Exiting app\n")
self.pipeline.set_state(Gst.State.NULL)
if self.bus:
self.bus = None
self.pipeline = None
MyLogger.info("pipline End")
# ÔÚStreamCaptureÀàÖÐÌí¼ÓÒÔÏ·½·¨
def add_source(self, stream_id, uri):
"""¶¯Ì¬Ìí¼ÓÊÓƵԴ"""
# todo
if self.number_sources >= 2:
MyLogger.warning("ÒÑ´ïµ½×î´óÔ´ÊýÁ¿ÏÞÖÆ")
return False
# ´´½¨ÐµÄsource bin
source_bin = self.create_source_bin(stream_id, uri)
if not source_bin:
return False
# ½«source bin¼ÓÈë¹ÜµÀ
self.pipeline.add(source_bin)
source_bin.sync_state_with_parent()
# »ñÈ¡streammuxµÄsink pad²¢Á¬½Ó
pad_name = f"sink_{stream_id}"
sinkpad = self.streammux.get_request_pad(pad_name)
srcpad = source_bin.get_static_pad("src")
srcpad.link(sinkpad)
# ¸üÐÂÅäÖÃ
# self.stream_paths[stream_id] = uri
self.number_sources += 1
self.streammux.set_property("batch-size", self.number_sources)
self.pgie.set_property("batch-size", self.number_sources)
state_return = source_bin.set_state(Gst.State.PLAYING)
return True
def remove_source(self, stream_id):
"""¶¯Ì¬ÒƳýÊÓƵԴ"""
# Í£Ö¹²¢ÒƳýsource bin
source_bin = self.source_bins.get(stream_id)
if not source_bin:
return False
state_return = source_bin.set_state(Gst.State.NULL)
if state_return == Gst.StateChangeReturn.SUCCESS:
MyLogger.info("STATE CHANGE SUCCESS")
# ·¢ËÍEOS²¢ÊÍ·Åpad
pad_name = f"sink_{stream_id}"
sinkpad = self.streammux.get_static_pad(pad_name)
sinkpad.send_event(Gst.Event.new_flush_stop(False))
self.streammux.release_request_pad(sinkpad)
# ´Ó¹ÜµÀÖÐÒƳýÔªËØ
source_bin.set_state(Gst.State.NULL)
self.pipeline.remove(source_bin)
# ¸üÐÂÅäÖÃ
del self.stream_paths[stream_id]
self.number_sources -= 1
self.streammux.set_property("batch-size", self.number_sources)
self.pgie.set_property("batch-size", self.number_sources)
return True
time.sleep(0.1)