Please provide complete information as applicable to your setup.
• Hardware Platform (Jetson / GPU) GPU
• DeepStream Version ds7.1-docker
• JetPack Version (valid for Jetson only)
• TensorRT Version 10.3
• NVIDIA GPU Driver Version (valid for GPU only) 550.142
• Issue Type( questions, new requirements, bugs)
When I use the nvmultiurisrcbin plugin, the sink screen doesn’t update but, it seems that the internal screen of the app is updating, and at the end of the video stream, the app crashes:
Here’s my code:
import sys
import threading
import copy
import json
from numpy.f2py.crackfortran import gotnextfile
sys.path.append('../')
from pathlib import Path
import gi
import numpy as np
import configparser
import argparse
import os
import base64
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from urllib.parse import urlparse
from ctypes import *
import time
import sys
import math
import platform
from collections import defaultdict
from common.platform_info import PlatformInfo
from common.bus_call import bus_call
from common.FPS import PERF_DATA
from my_log import MyLogger
from config_reader import MyConfigReader
from datetime import datetime
from KafkaProducer import KafkaProducer
from confluent_kafka import Producer, KafkaException
from confluent_kafka import KafkaError
from os import environ
import pyds
import cv2
#os.environ['GST_PLUGIN_PATH'] = '/opt/nvidia/deepstream/deepstream/lib/gst-plugins:' + os.environ.get('GST_PLUGIN_PATH', '')
no_display = False
silent = int(MyConfigReader.cfg_dict["RTSP"]["silent"])
file_loop = int(MyConfigReader.cfg_dict["RTSP"]["file_loop"]) # 设置cpu解码还是gpu解码
perf_data = None
measure_latency = False
MUXER_OUTPUT_WIDTH = int(MyConfigReader.cfg_dict["RTSP"]["output_width"])
MUXER_OUTPUT_HEIGHT = int(MyConfigReader.cfg_dict["RTSP"]["output_height"])
MUXER_BATCH_TIMEOUT_USEC = int(MyConfigReader.cfg_dict["RTSP"]["MUXER_BATCH_TIMEOUT_USEC"])
TILED_OUTPUT_WIDTH = int(MyConfigReader.cfg_dict["RTSP"]["TILED_OUTPUT_WIDTH"])
TILED_OUTPUT_HEIGHT = int(MyConfigReader.cfg_dict["RTSP"]["TILED_OUTPUT_HEIGHT"])
GST_CAPS_FEATURES_NVMM = "memory:NVMM"
OSD_PROCESS_MODE = int(MyConfigReader.cfg_dict["OSD"]["process-mode"])
OSD_DISPLAY_TEXT = int(MyConfigReader.cfg_dict["OSD"]["display-text"])
GPU_ID = int(MyConfigReader.cfg_dict["GPU"]["gpu_id"])
def get_current_timestamp_millis():
timestamp = time.time()
timestamp_millis = int(timestamp * 1000)
dt_object = datetime.fromtimestamp(timestamp)
standard_time_format = dt_object.strftime('%Y-%m-%d_%H-%M-%S.') + str(timestamp_millis % 1000).zfill(3)
return standard_time_format
class StreamCapture:
def __init__(self, pgie, config, disable_probe, overtime_dict):
super().__init__()
self.pipeline = None
self.framerate = int(MyConfigReader.cfg_dict['nvr']['fps'])
self.running = False
self.frame_interval = 1.0 / self.framerate
self.last_frame_time_dict = {}
self.reconnect_attempts = 5 # 最大重连次数
self.reconnect_interval = 5 # 重连间隔(秒)
self.thread = None
self.status = None
self.bus = None
self.update_time = time.time()
self.last_alive_time_dict = {}
self.over_time_dict = overtime_dict
self.dict_lock = threading.Lock()
self.stream_paths = None
self.stream_path_code = None
self.requested_pgie = pgie
self.disable_probe = disable_probe
self.config = config
self.number_sources = None
self.frame_dict = {}
self.source_bins = {}
self.lock = threading.Lock()
# self.kafka_producer = KafkaProducer({
# 'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
# }, retry_limit=5, retry_interval=3)
# self.kafka_producer = Producer({
# 'bootstrap.servers': f"{MyConfigReader.cfg_dict['Kafka']['ip']}:{MyConfigReader.cfg_dict['Kafka']['port']}",
# # Kafka 服务器地址
# 'client.id': 'dooropen-producer' # 客户端 ID
# })
self.x_minutes = 10
self.max_dropouts = 10
self.dropout_history = defaultdict(list)
self.last_alarm_dict = {}
self.platform_info = PlatformInfo()
def open(self, stream_paths_dict):
global perf_data
self.stream_path_code = list(stream_paths_dict.keys())
self.stream_paths = list(stream_paths_dict.values())
self.number_sources = len(self.stream_paths)
perf_data = PERF_DATA(self.number_sources)
Gst.init(None)
if not self.create_pipeline():
MyLogger.error('Failed to create Pipline.')
self.status = False
return False
# create an event loop and feed gstreamer bus mesages to it
self.loop = GLib.MainLoop()
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", bus_call, self.loop)
# pgie_src_pad = self.pgie.get_static_pad("src")
pgie_src_pad = self.tiler.get_static_pad("sink")
if not pgie_src_pad:
MyLogger.error(" Unable to get src pad")
else:
if not self.disable_probe:
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, self.pgie_src_pad_buffer_probe, 0)
# perf callback function to print fps every 5 sec
GLib.timeout_add(5000, perf_data.perf_print_callback)
# Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
# To enable component level latency measurement, please set environment variable
# NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
if environ.get('NVDS_ENABLE_LATENCY_MEASUREMENT') == '1':
print(
"Pipeline Latency Measurement enabled!\nPlease set env var NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 for Component Latency Measurement")
global measure_latency
measure_latency = True
# List the sources
MyLogger.info("Now playing...")
for i, source in enumerate(self.stream_paths):
MyLogger.info(f"{i}: {source}")
MyLogger.info("Starting pipeline")
# start play back and listed to events
self.pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except:
pass
# cleanup
print("Exiting app\n")
self.pipeline.set_state(Gst.State.NULL)
self.running = True
self.status = True
return True
def create_pipeline(self):
MyLogger.info("Creating Pipeline ")
self.pipeline = Gst.Pipeline()
self.is_live = False
if not self.pipeline:
MyLogger.error(" Unable to create Pipeline ")
MyLogger.info("Creating nvmultiurisrcbin ")
# Create nvstreammux instance to form batches from one or more sources.
self.nvmultiurisrcbin = Gst.ElementFactory.make("nvmultiurisrcbin", "multiurisrcbin")
if not self.nvmultiurisrcbin:
MyLogger.error("Unable to create nvmultiurisrcbin")
self.pipeline.add(self.nvmultiurisrcbin)
print("success")
# Set nvmultiurisrcbin properties to mimic the CLI usage
self.nvmultiurisrcbin.set_property("uri-list", "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4")
self.nvmultiurisrcbin.set_property("width", 1920)
self.nvmultiurisrcbin.set_property("height", 1080)
self.nvmultiurisrcbin.set_property("live-source", 1)
self.nvmultiurisrcbin.set_property("batched-push-timeout", 33333)
self.nvmultiurisrcbin.set_property("drop-pipeline-eos", 1)
# multiurisrcbin.set_property("max-batch-size", 10)
# Possibly set batch-push-timeout or other properties if you want to match your CLI exactly:
self.queue1 = Gst.ElementFactory.make("queue", "queue1")
self.queue2 = Gst.ElementFactory.make("queue", "queue2")
self.queue3 = Gst.ElementFactory.make("queue", "queue3")
self.queue4 = Gst.ElementFactory.make("queue", "queue4")
self.queue5 = Gst.ElementFactory.make("queue", "queue5")
self.queue6 = Gst.ElementFactory.make("queue", "queue6")
self.queue7 = Gst.ElementFactory.make("queue", "queue7")
self.queue8 = Gst.ElementFactory.make("queue", "queue8")
self.pipeline.add(self.queue1)
self.pipeline.add(self.queue2)
self.pipeline.add(self.queue3)
self.pipeline.add(self.queue4)
self.pipeline.add(self.queue5)
self.pipeline.add(self.queue6)
self.pipeline.add(self.queue7)
self.pipeline.add(self.queue8)
self.nvdslogger = None
MyLogger.info("Creating preprocess ")
self.preprocess = Gst.ElementFactory.make("nvdspreprocess", "preprocess-plugin")
if not self.preprocess:
MyLogger.error(f" Unable to create preprocess : {self.preprocess}")
return False
self.preprocess.set_property("gpu_id", GPU_ID)
self.preprocess.set_property("config-file", "config_preprocess.txt")
MyLogger.info("Creating Pgie")
if self.requested_pgie != None and (self.requested_pgie == 'nvinferserver' or self.requested_pgie == 'nvinferserver-grpc'):
self.pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
elif self.requested_pgie != None and self.requested_pgie == 'nvinfer':
self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
else:
self.pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not self.pgie:
MyLogger.error(f" Unable to create pgie : {self.requested_pgie}")
return False
if self.disable_probe:
# Use nvdslogger for perf measurement instead of probe function
MyLogger.info("Creating nvdslogger")
self.nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")
self.nvdslogger.set_property("gpu_id", GPU_ID)
MyLogger.info("Creating tiler")
self.tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
self.tiler.set_property("gpu_id", GPU_ID)
if not self.tiler:
MyLogger.error(" Unable to create tiler")
return False
MyLogger.info("Creating nvvidconv")
self.nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
self.nvvidconv.set_property("gpu_id", GPU_ID)
if not self.nvvidconv:
MyLogger.error(" Unable to create nvvidconv")
return False
MyLogger.info("Creating nvosd")
self.nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not self.nvosd:
MyLogger.error(" Unable to create nvosd \n")
return False
self.nvosd.set_property("gpu_id", GPU_ID)
self.nvosd.set_property('process-mode', OSD_PROCESS_MODE)
self.nvosd.set_property('display-text', OSD_DISPLAY_TEXT)
self.nvosd.set_property('display-clock', 1) # 启用时钟显示
self.nvosd.set_property('clock-font', 'Arial') # 设置时钟字体
self.nvosd.set_property('clock-font-size', 20) # 设置时钟字体大小
self.nvosd.set_property('clock-color', 0xff0000ff) # 设置时钟颜色(红色,alpha=1)
self.nvosd.set_property('x-clock-offset', 100) # 设置时钟水平偏移量
self.nvosd.set_property('y-clock-offset', 50) # 设置时钟垂直偏移量
# if file_loop:
# if self.platform_info.is_integrated_gpu():
# # Set nvbuf-memory-type=4 for integrated gpu for file-loop (nvurisrcbin case)
# self.streammux.set_property('nvbuf-memory-type', 4)
# else:
# # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
# self.streammux.set_property('nvbuf-memory-type', 2)
if not int(MyConfigReader.cfg_dict["nvr"]["show"]):
MyLogger.info("Creating Fakesink")
self.sink = Gst.ElementFactory.make("fakesink", "fakesink")
self.sink.set_property('enable-last-sample', 0)
self.sink.set_property('sync', 0)
else:
if self.platform_info.is_integrated_gpu():
MyLogger.info("Creating nv3dsink")
self.sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
if not self.sink:
MyLogger.error(" Unable to create nv3dsink")
self.sink.set_property("gpu_id", GPU_ID)
else:
if self.platform_info.is_platform_aarch64():
MyLogger.info("Creating nv3dsink")
self.sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
else:
MyLogger.info("Creating EGLSink")
self.sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
self.sink.set_property("gpu_id", GPU_ID)
if not self.sink:
MyLogger.error(" Unable to create egl sink")
self.converter = Gst.ElementFactory.make("nvvideoconvert", f"converter-2RBGA")
self.converter.set_property("gpu_id", GPU_ID)
self.capsfilter = Gst.ElementFactory.make("capsfilter", f"capsfilter-2RGBA")
caps = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
# self.converter.set_property("nvbuf-memory-type", 0)
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
self.converter.set_property("nvbuf-memory-type", mem_type)
self.tiler.set_property("nvbuf-memory-type", mem_type)
self.capsfilter.set_property("caps", caps)
if not self.sink:
MyLogger.error(" Unable to create sink element")
if self.requested_pgie == "nvinferserver" and self.config != None:
self.pgie.set_property('config-file-path', self.config)
elif self.requested_pgie == "nvinferserver-grpc" and self.config != None:
self.pgie.set_property('config-file-path', self.config)
elif self.requested_pgie == "nvinfer" and self.config != None:
self.pgie.set_property('config-file-path', self.config)
else:
self.pgie.set_property('config-file-path', self.config)
pgie_batch_size = self.pgie.get_property("batch-size")
if (pgie_batch_size != self.number_sources):
MyLogger.warning(
f"WARNING: Overriding infer-config batch-size{pgie_batch_size}with number of sources{self.number_sources}")
self.pgie.set_property("batch-size", self.number_sources)
tiler_rows = int(math.sqrt(self.number_sources))
tiler_columns = int(math.ceil((1.0 * self.number_sources) / tiler_rows))
self.tiler.set_property("rows", tiler_rows)
self.tiler.set_property("columns", tiler_columns)
self.tiler.set_property("width", TILED_OUTPUT_WIDTH)
self.tiler.set_property("height", TILED_OUTPUT_HEIGHT)
if self.platform_info.is_integrated_gpu():
self.tiler.set_property("compute-hw", 2)
else:
self.tiler.set_property("compute-hw", 1)
self.sink.set_property("qos", 0)
MyLogger.info("Adding elements to Pipeline")
self.pipeline.add(self.preprocess)
self.pipeline.add(self.pgie)
self.pipeline.add(self.converter)
self.pipeline.add(self.capsfilter)
if self.nvdslogger:
self.pipeline.add(self.nvdslogger)
self.pipeline.add(self.tiler)
self.pipeline.add(self.nvvidconv)
self.pipeline.add(self.nvosd)
self.pipeline.add(self.sink)
MyLogger.info("Linking elements in the Pipeline")
self.nvmultiurisrcbin.link(self.queue1)
self.queue1.link(self.preprocess)
self.preprocess.link(self.queue2)
self.queue2.link(self.pgie)
self.pgie.link(self.queue3)
self.queue3.link(self.converter)
self.converter.link(self.queue4)
self.queue4.link(self.capsfilter)
self.capsfilter.link(self.queue5)
if self.nvdslogger:
self.queue5.link(self.nvdslogger)
self.nvdslogger.link(self.tiler)
else:
self.queue5.link(self.tiler)
self.tiler.link(self.queue6)
self.queue6.link(self.nvvidconv)
self.nvvidconv.link(self.queue7)
self.queue7.link(self.nvosd)
self.nvosd.link(self.queue8)
self.queue8.link(self.sink)
return True
def create_source_bin(self, index, uri):
MyLogger.info("Creating source bin")
if not self.check_uri_valid(uri):
MyLogger.error(f"URI is invalid or unreachable: {uri}", )
return None
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
MyLogger.info(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
MyLogger.error(" Unable to create source bin")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
if file_loop:
# use nvurisrcbin to enable file-loop
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
uri_decode_bin.set_property("file-loop", 1)
uri_decode_bin.set_property("cudadec-memtype", 0)
MyLogger.info("nvurisrcbin")
else:
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
MyLogger.info("uridecodebin")
if not uri_decode_bin:
MyLogger.error(" Unable to create uri decode bin")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
self.source_bins[index] = nbin
uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
MyLogger.error(" Failed to add ghost pad in source bin ")
return None
return nbin
def cb_newpad(self, decodebin, decoder_src_pad, data):
global MUXER_OUTPUT_WIDTH
global MUXER_OUTPUT_HEIGHT
MyLogger.info("In cb_newpad")
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
MyLogger.info(f"gstname={gstname}")
if (gstname.find("video") != -1):
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
MyLogger.info(f"features={features}")
MUXER_OUTPUT_WIDTH = gststruct.get_value("width")
MUXER_OUTPUT_HEIGHT = gststruct.get_value("height")
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
MyLogger.error("Failed to link decoder src pad to source bin ghost pad")
else:
MyLogger.error(" Error: Decodebin did not pick nvidia decoder plugin.")
def decodebin_child_added(self, child_proxy, Object, name, user_data):
MyLogger.info(f"Decodebin child added:{name}")
if (name.find("decodebin") != -1):
Object.connect("child-added", self.decodebin_child_added, user_data)
if "nvv" in name.lower(): # 匹配 nvv4l2decoder, nvv4l2h264dec 等
if Object.find_property("gpu_id") is not None:
Object.set_property("gpu_id", GPU_ID) # 与下游组件(如 nvstreammux)一致
# MyLogger.info(f"设置 {name} 的 GPU_ID 为 1")
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def check_uri_valid(self, uri, timeout=5):
parsed = urlparse(uri)
if parsed.scheme == "file":
file_path = parsed.path
if os.name == "nt" and len(file_path) >= 3 and file_path[0] == "/":
file_path = file_path[1:]
return os.path.exists(file_path)
elif parsed.scheme == "rtsp":
if self.check_uri_playable(uri):
return True
else:
return False
else:
MyLogger.warning(f"Unsupported URI scheme: {parsed.scheme}, skipping validation")
return True
def check_uri_playable(self, uri, timeout=5):
pipeline = Gst.parse_launch(f"uridecodebin uri={uri} ! fakesink sync=false")
pipeline.set_state(Gst.State.PLAYING)
time.sleep(0.1)
start_time = time.time()
while time.time() - start_time < timeout:
state = pipeline.get_state(timeout=1000000)[1]
if state == Gst.State.PLAYING:
pipeline.set_state(Gst.State.NULL)
return True
pipeline.set_state(Gst.State.NULL)
return False
def pgie_src_pad_buffer_probe(self, pad, info, u_data):
frame_number = 0
num_rects = 0
got_fps = False
current_time = time.time()
gst_buffer = info.get_buffer()
if not gst_buffer:
MyLogger.error("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
# Enable latency measurement via probe if environment variable NVDS_ENABLE_LATENCY_MEASUREMENT=1 is set.
# To enable component level latency measurement, please set environment variable
# NVDS_ENABLE_COMPONENT_LATENCY_MEASUREMENT=1 in addition to the above.
global measure_latency
if measure_latency:
num_sources_in_batch = pyds.nvds_measure_buffer_latency(hash(gst_buffer))
if num_sources_in_batch == 0:
MyLogger.error("Unable to get number of sources in GstBuffer for latency measurement")
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
n_frame1_box = None
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
if current_time - self.last_alive_time_dict.setdefault(frame_meta.pad_index, 0) > 60 * 5:
MyLogger.info(
f"source [{frame_meta.pad_index}]:{self.stream_path_code[frame_meta.pad_index]} is alive...")
self.last_alive_time_dict[frame_meta.pad_index] = current_time
self.last_frame_time_dict[frame_meta.pad_index] = current_time
except StopIteration:
break
frame_number = frame_meta.frame_num
l_obj = frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
try:
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
if n_frame is None:
MyLogger.error("Failed to get surface from buffer")
break
n_frame1 = np.array(n_frame, copy=True, order='C')
except Exception as e:
MyLogger.error(f"Error getting frame surface: {e}")
break
detected_objects = []
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
obj_label = pyds.get_string(obj_meta.text_params.display_text)
obj_confidence = obj_meta.confidence
obj_bbox = (obj_meta.rect_params.left, obj_meta.rect_params.top,
obj_meta.rect_params.width, obj_meta.rect_params.height)
obj_meta.text_params.display_text = f"{obj_label} {obj_confidence:.2f}"
# print(obj_meta.class_id)
detected_objects.append({
"label": obj_label,
"confidence": obj_confidence,
"bbox": obj_bbox,
"pad_index": frame_meta.pad_index
})
except StopIteration:
break
try:
if n_frame1_box is None:
n_frame1_box = StreamCapture.draw_bounding_boxes(copy.deepcopy(n_frame1), obj_meta,
obj_meta.confidence)
else:
n_frame1_box = StreamCapture.draw_bounding_boxes(n_frame1_box, obj_meta,
obj_meta.confidence)
except Exception as e:
MyLogger.error(f"Error drawing bounding boxes: {e}")
try:
l_obj = l_obj.next
except StopIteration:
break
try:
# convert the array into cv2 default color format
n_frame1 = cv2.cvtColor(n_frame1, cv2.COLOR_RGBA2RGB)
except Exception as e:
MyLogger.error(f"Error converting color format: {e}")
break
try:
roi = [tuple(map(int, point.split(','))) for point in
MyConfigReader.cfg_dict["ROI"][f"source{frame_meta.pad_index}"].split(';')]
display_meta = self.draw_polygon_roi(batch_meta, frame_meta, roi)
# 将显示元数据添加到当前帧
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
except Exception as e:
MyLogger.error(f"Error drawing ROI: {e}")
try:
with self.lock:
self.frame_dict.setdefault(str(frame_meta.pad_index), {"frame": None, "time": None})
self.frame_dict[str(frame_meta.pad_index)]["frame"] = n_frame1
self.frame_dict[str(frame_meta.pad_index)]["time"] = get_current_timestamp_millis()
except Exception as e:
MyLogger.error(f"Error updating frame dictionary: {e}")
try:
if num_rects != 0:
self.detection_algorithm(detected_objects, n_frame1, n_frame1_box, roi)
if not silent:
MyLogger.info(
f"batchid:{frame_meta.pad_index},Frame Number={frame_number},Number of Objects={num_rects}")
except Exception as e:
MyLogger.error(f"Error in detection algorithm: {e}")
# Update frame rate through this probe
stream_index = "stream{0}".format(frame_meta.pad_index)
global perf_data
perf_data.update_fps(stream_index)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def draw_polygon_roi(self, batch_meta, frame_meta, roi_points, border_width=3, border_color=(0.0, 0.0, 1.0, 1.0)):
"""
绘制由4个点连线组成的多边形区域。
参数:
batch_meta: 批处理元数据对象。
frame_meta: 帧元数据对象。
roi_points: 包含4个点坐标的列表或元组,格式为 [(x1, y1), (x2, y2), (x3, y3), (x4, y4)]。
border_width: 边框宽度,默认为3。
border_color: 边框颜色,格式为 (R, G, B, A),默认为蓝色 (0.0, 0.0, 1.0, 1.0)。
返回:
display_meta: 配置好的显示元数据对象。
"""
# 检查参数有效性
if not batch_meta:
MyLogger.error("batch_meta is None in draw_polygon_roi")
return None
if not frame_meta:
MyLogger.error("frame_meta is None in draw_polygon_roi")
return None
if not roi_points or len(roi_points) < 4:
MyLogger.error(f"Invalid roi_points in draw_polygon_roi: {roi_points}")
return None
try:
# 获取显示元数据
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
if not display_meta:
MyLogger.error("Failed to acquire display meta from pool")
return None
display_meta.num_lines = 4 # 设置绘制4条线(4个点连接成一个多边形)
display_meta.num_rects = 0 # 确保没有绘制矩形
# 配置多边形的边框(通过绘制4条线)
for i in range(4):
try:
line = display_meta.line_params[i]
# 设置线的起点和终点
line.x1 = int(roi_points[i][0]) # 第 i 个点的 x 坐标
line.y1 = int(roi_points[i][1]) # 第 i 个点的 y 坐标
line.x2 = int(roi_points[(i + 1) % 4][0]) # 下一个点的 x 坐标
line.y2 = int(roi_points[(i + 1) % 4][1]) # 下一个点的 y 坐标
# 设置线的宽度和颜色
line.line_width = border_width
line.line_color.red = border_color[0]
line.line_color.green = border_color[1]
line.line_color.blue = border_color[2]
line.line_color.alpha = border_color[3]
except Exception as e:
MyLogger.error(f"Error configuring line {i} in draw_polygon_roi: {e}")
# 继续处理其他线条,不中断整个过程
return display_meta
except Exception as e:
MyLogger.error(f"Error in draw_polygon_roi: {e}")
return None
def check_bottom_edge_in_polygon(self, bottom_edge, polygon):
"""
判断底边是否在封闭线框内。
:param bottom_edge: 底边的两个点 [(x1, y1), (x2, y2)]
:param polygon: 多边形顶点列表 [(x1, y1), (x2, y2), ...]
:return: 如果有点在多边形内返回True,否则返回False
"""
# 检查参数有效性
if not bottom_edge or len(bottom_edge) < 2:
MyLogger.error(f"Invalid bottom_edge in check_bottom_edge_in_polygon: {bottom_edge}")
return False
if not polygon or len(polygon) < 3:
MyLogger.error(f"Invalid polygon in check_bottom_edge_in_polygon: {polygon}")
return False
try:
for point in bottom_edge:
if not point or len(point) < 2:
continue
if self.is_point_in_polygon(point, polygon):
return True
return False
except Exception as e:
MyLogger.error(f"Error in check_bottom_edge_in_polygon: {e}")
return False
def is_point_in_polygon(self, point, polygon):
"""
判断点是否在多边形内。
:param point: 要判断的点 (x, y)
:param polygon: 多边形顶点的列表 [(x1, y1), (x2, y2), ...]
:return: 是否在多边形内
"""
# 检查参数有效性
if not point or len(point) < 2:
MyLogger.error(f"Invalid point in is_point_in_polygon: {point}")
return False
if not polygon or len(polygon) < 3:
MyLogger.error(f"Invalid polygon in is_point_in_polygon: {polygon}")
return False
try:
# 转换为numpy数组并进行点多边形测试
polygon_array = np.array(polygon)
result = cv2.pointPolygonTest(polygon_array, point, False)
return result >= 0
except Exception as e:
MyLogger.error(f"Error in is_point_in_polygon: {e}")
return False
def np_array_to_base64(self, image):
"""
将numpy数组图像转换为base64编码字符串。
:param image: numpy数组格式的图像
:return: base64编码的字符串
"""
if image is None:
MyLogger.error("Image is None in np_array_to_base64")
return ""
try:
img_array = cv2.cvtColor(image, cv2.COLOR_RGB2BGR) # RGB2BGR,用于cv2编码
encode_result = cv2.imencode(".jpg", img_array)
if not encode_result[0]:
MyLogger.error("Failed to encode image in np_array_to_base64")
return ""
encode_image = encode_result[1]
byte_data = encode_image.tobytes() # 转换为二进制
base64_str = base64.b64encode(byte_data).decode("ascii") # 转换为base64
return base64_str
except Exception as e:
MyLogger.error(f"Error in np_array_to_base64: {e}")
return ""
def det_point_in_roi(self, point, roi):
"""
判断点是否在矩形ROI内。
:param point: 点坐标 (x, y)
:param roi: 矩形区域 [x, y, width, height]
:return: 点是否在ROI内
"""
if not point or len(point) < 2:
MyLogger.error(f"Invalid point in det_point_in_roi: {point}")
return False
if not roi or len(roi) < 4:
MyLogger.error(f"Invalid roi in det_point_in_roi: {roi}")
return False
try:
if point[0] < roi[0] or point[0] > roi[0] + roi[2]:
return False
if point[1] < roi[1] or point[1] > roi[1] + roi[3]:
return False
else:
return True
except Exception as e:
MyLogger.error(f"Error in det_point_in_roi: {e}")
return False
def determine_safety_status(self, gap_width, container_width):
if gap_width < container_width * float(MyConfigReader.cfg_dict["Detect"]["Threshold"]):
return True # Safe: green with transparency
else:
return False # Unsafe: red with transparency
def detection_algorithm(self, detected_obj, frame_src, frame_bbox, roi):
"""
检测算法,用于分析检测到的对象并触发报警。
参数:
detected_obj: 检测到的对象列表
frame_src: 原始帧图像
frame_bbox: 带有边界框的帧图像
roi: 感兴趣区域
"""
# 检查参数有效性
if not detected_obj:
return
if frame_bbox is None:
MyLogger.warning("frame_bbox is None in detection_algorithm")
return
if roi is None:
MyLogger.warning("roi is None in detection_algorithm")
return
try:
camera_index_code = ""
tag = False
for obj in detected_obj:
if not isinstance(obj, dict):
continue
if obj.get("label", "") == "people":
# 检查bbox是否有效
bbox = obj.get("bbox")
if not bbox or len(bbox) < 4:
continue
try:
center_pointer = (obj["bbox"][0] + (obj["bbox"][2] / 2), obj["bbox"][1] + (obj["bbox"][3] / 2))
# 检查是否在ROI内
bottom_edge = [(obj["bbox"][0], obj["bbox"][1] + obj["bbox"][3]),
(obj["bbox"][0] + obj["bbox"][2], obj["bbox"][1] + obj["bbox"][3])]
if self.check_bottom_edge_in_polygon(bottom_edge, roi):
# 可以在这里添加报警逻辑
pass
# MyLogger.info("detect people ,alarm")
except Exception as e:
MyLogger.error(f"Error processing object in detection_algorithm: {e}")
continue
# 如果需要触发报警
if tag:
try:
resized_img = cv2.resize(frame_bbox, (1920, 1080), interpolation=cv2.INTER_LINEAR)
msg = {
"System": "StreamingMedia",
"CMD": "Alarm",
"Type": "CJ12-001",
"Pic": self.np_array_to_base64(resized_img),
"Time": time.strftime("%Y-%m-%d %H:%M:%S", time.localtime()),
"Data": {
"Camera": camera_index_code,
"Message": "msg"
},
"Code": 200
}
# Kafka发送代码已注释
# self.kafka_producer.send(
# topic=f"{MyConfigReader.cfg_dict['Kafka']['topic']}",
# value=f"{json.dumps(msg)}")
# 异步发送消息
# self.kafka_producer.produce(f"{MyConfigReader.cfg_dict['Kafka']['topic']}", json.dumps(msg).encode('utf-8'), callback=self.delivery_report)
# 等待所有消息发送完成
# self.kafka_producer.flush()
# MyLogger.info("发送报警信息至kafka队列")
except Exception as e:
MyLogger.error(f"Error sending alarm in detection_algorithm: {e}")
except Exception as e:
MyLogger.error(f"Error in detection_algorithm: {e}")
return
def delivery_report(self, err, msg):
if err is not None:
MyLogger.error(f'kafka send error: {err}')
else:
MyLogger.info(f'{msg.topic()} [{msg.partition()}] @ {msg.offset()}')
@staticmethod
def draw_bounding_boxes(image, obj_meta, confidence):
image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
confidence = '{0:.2f}'.format(confidence)
rect_params = obj_meta.rect_params
top = int(rect_params.top)
left = int(rect_params.left)
width = int(rect_params.width)
height = int(rect_params.height)
obj_name = pyds.get_string(obj_meta.text_params.display_text)
# image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)
color = (0, 0, 255, 0)
w_percents = int(width * 0.05) if width > 100 else int(width * 0.1)
h_percents = int(height * 0.05) if height > 100 else int(height * 0.1)
linetop_c1 = (left + w_percents, top)
linetop_c2 = (left + width - w_percents, top)
image = cv2.line(image, linetop_c1, linetop_c2, color, 6)
linebot_c1 = (left + w_percents, top + height)
linebot_c2 = (left + width - w_percents, top + height)
image = cv2.line(image, linebot_c1, linebot_c2, color, 6)
lineleft_c1 = (left, top + h_percents)
lineleft_c2 = (left, top + height - h_percents)
image = cv2.line(image, lineleft_c1, lineleft_c2, color, 6)
lineright_c1 = (left + width, top + h_percents)
lineright_c2 = (left + width, top + height - h_percents)
image = cv2.line(image, lineright_c1, lineright_c2, color, 6)
# Note that on some systems cv2.putText erroneously draws horizontal lines across the image
image = cv2.putText(image, obj_name, (left - 10, top - 10),
cv2.FONT_HERSHEY_SIMPLEX,
1,
(0, 0, 255, 0), 2)
return image
if __name__ == '__main__':
a = StreamCapture(None, MyConfigReader.cfg_dict['Infer']['path'], False, {})
a.open({"asdasdasdas": "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_1080p_h264.mp4"})