Please provide complete information as applicable to your setup.
• Hardware Platform A100
• DeepStream Version 7.1
I have this code:
#!/usr/bin/env python3
################################################################################
SPDX-FileCopyrightText: Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
SPDX-License-Identifier: Apache-2.0
Licensed under the Apache License, Version 2.0 (the “License”);
you may not use this file except in compliance with the License.
You may obtain a copy of the License at
Apache License, Version 2.0
Unless required by applicable law or agreed to in writing, software
distributed under the License is distributed on an “AS IS” BASIS,
WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
See the License for the specific language governing permissions and
limitations under the License.
################################################################################
import sys
import os
sys.path.append(“../”)
from common.bus_call import bus_call
from common.platform_info import PlatformInfo
import numpy as np
import cv2
import pyds
import platform
import math
import time
import json
import configparser
from ctypes import *
import gi
gi.require_version(“Gst”, “1.0”)
gi.require_version(“GstRtspServer”, “1.0”)
import base64
from threading import Thread, Lock
from queue import Queue, Empty
from gi.repository import Gst, GstRtspServer, GLib
import configparser
from datetime import datetime, timedelta
import argparse
from debug import save_graph
from kafka import KafkaProducer
MAX_DISPLAY_LEN = 64
PGIE_CLASS_ID_PERSON = 0
PGIE_CLASS_ID_BAG = 1
PGIE_CLASS_ID_FACE = 2
PGIE_BATCH_SIZE = 32
HTTP_API_REST_PORT = 9014
MUXER_OUTPUT_WIDTH = 1280
MUXER_OUTPUT_HEIGHT = 720
MUXER_BATCH_TIMEOUT_USEC = 33000
GST_CAPS_FEATURES_NVMM = “memory:NVMM”
OSD_PROCESS_MODE = 0
OSD_DISPLAY_TEXT = 0
pgie_classes_str = [“Person”, “Bag”, “Face”]
def kafka_sender_loop():
global last_send_time
grouped_data = {}
while True:
now = datetime.now()
# Enviar si hay datos y ha pasado el tiempo
if grouped_data and last_send_time and (now - last_send_time).total_seconds() >= limit_time:
for cam_id, detections in grouped_data.items():
msg = {
"tags": {"camID": cam_id},
"fields": {"detections": {}}
}
for elem in detections:
label = elem["label"]
obj_id = str(elem["obj_id"])
if label not in msg["fields"]["detections"]:
msg["fields"]["detections"][label] = {}
if obj_id not in msg["fields"]["detections"][label]:
msg["fields"]["detections"][label][obj_id] = {
"confidence": round(elem["confidence"], 2),
"bbox": elem["bbox"],
"image": elem["image"],
"time": elem["timestamp"]
}
try:
kafka_producer.send(
kafka_topic,
key=str(cam_id).encode("utf-8"),
value=json.dumps(msg).encode("utf-8")
)
print(f"📤 Enviado a Kafka cam {cam_id} con {len(detections)} objetos")
except Exception as e:
print(f"❌ Error al enviar a Kafka: {e}")
last_send_time = now
grouped_data.clear() # Solo limpiamos luego de enviar todo
try:
obj = data_queue.get(timeout=1)
except Empty:
continue
cam_id = obj["source_id"]
if cam_id not in grouped_data:
grouped_data[cam_id] = []
grouped_data[cam_id].append(obj)
if last_send_time is None:
last_send_time = datetime.now()
def multiurisrcbin_src_pad_probe(pad, info, user_data):
pipeline, loop = user_data
buf = info.get_buffer()
if not buf:
return Gst.PadProbeReturn.OK
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(buf))
if not batch_meta:
return Gst.PadProbeReturn.OK
stream_count = batch_meta.num_frames_in_batch
if stream_count == 0:
current_state = pipeline.get_state(0).state
if current_state != Gst.State.PAUSED: # Si aun está ejecutando
GLib.idle_add(deferred_pause, pipeline) # Ejecuta función cuando hilo principal esté libre
elif stream_count > 0:
current_state = pipeline.get_state(0).state
if current_state != Gst.State.PLAYING:
print("▶️ Sources detected. Resuming pipeline.")
pipeline.set_state(Gst.State.PLAYING)
return Gst.PadProbeReturn.OK
def deferred_pause(pipeline):
print(“⏸️ [DEFERRED] No sources left. Flushing and pausing pipeline.”)
pipeline.send_event(Gst.Event.new_flush_start())
pipeline.send_event(Gst.Event.new_flush_stop(False))
pipeline.set_state(Gst.State.PAUSED)
return False # para que idle_add se autoelimine
def safe_link(element1, element2, name1, name2):
try:
if not element1.link(element2):
raise RuntimeError(f"Failed to link {name1} to {name2}“)
except Exception as e:
sys.stderr.write(f"❌ ERROR: {e}\n”)
sys.exit(1)
def osd_sink_pad_buffer_probe(pad, info):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer")
return Gst.PadProbeReturn.OK
try:
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
if not batch_meta:
return Gst.PadProbeReturn.OK
except Exception as e:
print(f"⚠️ Error accediendo a batch_meta: {e}")
return Gst.PadProbeReturn.OK
l_frame = batch_meta.frame_meta_list
while l_frame:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
print("⚠️ Error accediendo a frame_meta")
break
source_id = frame_meta.source_id
frame_number = frame_meta.frame_num
print(f"Source ID = {source_id}, Frame Number = {frame_number}")
# Definir timestamp en formato string
if ts_from_rtsp:
ts = frame_meta.ntp_timestamp / 1e9
utc_time = datetime.utcfromtimestamp(ts)
local_time = utc_time - timedelta(hours=5) # ajusta a tu zona horaria si hace falta
timestamp_str = local_time.strftime('%Y-%m-%d %H:%M:%S')
else:
timestamp_str = datetime.now().strftime('%Y-%m-%d %H:%M:%S')
if get_crop == "yes":
# Obtener la superficie del frame
surface = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
frame_copy = np.array(surface, copy=True, order='C')
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGR)
l_obj = frame_meta.obj_meta_list
while l_obj:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
tracker_id = obj_meta.object_id
confidence = obj_meta.confidence
class_id = obj_meta.class_id
bbox_coords = obj_meta.tracker_bbox_info.org_bbox_coords
x = max(int(bbox_coords.left), 0)
y = max(int(bbox_coords.top), 0)
w = int(bbox_coords.width)
h = int(bbox_coords.height)
img_base64 = ""
if get_crop == "yes":
#Extraer el crop
crop = frame_copy[y:y+h, x:x+w]
try:
_, buffer = cv2.imencode('.jpg', crop)
img_base64 = base64.b64encode(buffer).decode('utf-8')
except Exception as e:
print(f"⚠️ Error codificando crop a base64: {e}")
label = pgie_classes_str[class_id]
print(f"Object ID = {tracker_id}, Label = {label}, Confidence = {confidence:.2f}, BBox = {[x, y, w, h]}")
obj_info = {
"source_id": source_id,
"confidence": confidence,
"label": label,
"obj_id": tracker_id,
"bbox": [x, y, w, h],
"image": img_base64,
"timestamp": timestamp_str
}
# Enviar data a la cola
data_queue.put(obj_info)
try:
l_obj = l_obj.next
except StopIteration:
break
print("***************************************")
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def create_output_branch(pipeline, pad, idx, server, codec, bitrate, platform_info):
# Create a videoconvert element
conv = Gst.ElementFactory.make("nvvideoconvert", f"conv_{idx}")
if not conv:
sys.stderr.write(" Unable to create nvvidconv_postosd \n")
# Create a caps filter
caps = Gst.ElementFactory.make("capsfilter", f"caps_{idx}")
if enc_type == 0:
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
else:
caps.set_property("caps", Gst.Caps.from_string("video/x-raw, format=I420"))
if codec == "H264":
if enc_type == 0:
encoder = Gst.ElementFactory.make("nvv4l2h264enc", f"enc_{idx}")
else:
encoder = Gst.ElementFactory.make("x264enc", f"enc_{idx}")
encoder.set_property("threads", 2)
print(f"Creating H264 Encoder for source {idx}")
elif codec == "H265":
if enc_type == 0:
encoder = Gst.ElementFactory.make("nvv4l2h265enc", f"enc_{idx}")
else:
encoder = Gst.ElementFactory.make("x265enc", f"enc_{idx}")
print(f"Creating H265 Encoder for source {idx}")
if not encoder:
sys.stderr.write(" Unable to create encoder")
encoder.set_property("bitrate", int(bitrate / 1000))
encoder.set_property("tune", "zerolatency")
encoder.set_property("speed-preset", "ultrafast")
encoder.set_property("key-int-max", 15) # GOP corto para RT
encoder.set_property("option-string", "bframes=0")
if platform_info.is_integrated_gpu() and enc_type == 0:
encoder.set_property('preset-level', 1)
encoder.set_property('insert-sps-pps', 1)
#encoder.set_property('bufapi-version', 1)
# Make the payload-encode video into RTP packets
if codec == "H264":
pay = Gst.ElementFactory.make("rtph264pay", f"pay_{idx}")
print(f"Creating H264 rtppay for source {idx}")
elif codec == "H265":
pay = Gst.ElementFactory.make("rtph265pay", f"pay_{idx}")
print(f"Creating H265 rtppay for source {idx}")
if not pay:
sys.stderr.write(" Unable to create rtppay")
pay.set_property("config-interval", 1)
pay.set_property("pt", 96) # Específicamente para asegurar VLC lo interprete bien
pay.set_property("mtu", 1400) # Tamaño de paquete RTP ideal
udpsink = Gst.ElementFactory.make("udpsink", f"udpsink_{idx}")
if not udpsink:
sys.stderr.write(" Unable to create udpsink")
udpsink.set_property("host", "127.0.0.1")
udpsink.set_property("port", 5000 + idx)
udpsink.set_property("sync", False)
udpsink.set_property("async", False)
if not all([conv, caps, encoder, pay, udpsink]):
print(f"Error creando elementos para source {idx}")
return
# Creando un bin para el flujo
branch_bin = Gst.Bin.new(f"branch_bin_{idx}")
for elem in [conv, caps, encoder, pay, udpsink]:
branch_bin.add(elem)
safe_link(conv, caps, "conv", "caps")
safe_link(caps, encoder, "caps", "encoder")
safe_link(encoder, pay, "encoder", "pay")
safe_link(pay, udpsink, "pay", "udpsink")
sinkpad = conv.get_static_pad("sink")
ghostpad = Gst.GhostPad.new("sink", sinkpad)
branch_bin.add_pad(ghostpad)
pipeline.add(branch_bin)
branch_bin.sync_state_with_parent()
pad.link(ghostpad)
# Creando media factory para servir el flujo
factory = GstRtspServer.RTSPMediaFactory.new()
if codec == "H264":
rtphdepay = "rtph264depay"
hparse = "h264parse"
rtphpay = "rtph264pay"
elif codec == "H265":
rtphdepay = "rtph265depay"
hparse = "h265parse"
rtphpay = "rtph265pay"
factory.set_launch(
f"( udpsrc port={5000+idx} caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name={codec}, payload=96\" ! {rtphdepay} ! {hparse} ! {rtphpay} config-interval=1 pt=96 name=pay0 )"
)
factory.set_shared(True)
factory.set_latency(500) # Aumenta el buffer de entrada
server.get_mount_points().add_factory(f"/dynamic_stream/{idx}", factory)
print(f"*** RTSP stream at rtsp://IP:8554/dynamic_stream/{idx} ***")
def main(args):
global kafka_producer
number_sources = len(args) # Obtener cantidad de streamings
uri_list = “,”.join(args)
sensor_ids =
platform_info = PlatformInfo() # Obtener info del hardware para saber si se usa GPU integrada
Gst.init(None) # Estandarizacion de Gstreamer
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline() # Contenedor principal de todos los elementos
is_live = False # Para manejo de RTSP's en el pipeline
if not pipeline: # Validacion creacion correcta pipeline
sys.stderr.write(" Unable to create Pipeline \n")
for i in range(number_sources):
print(f"Adding source {i} to multiurisrcbin \n") # Agregación de source a multiurisrcbin
uri_name = args[i]
sensor_ids.append(f"cam-{i+1}")
if uri_name.find("rtsp://") == 0: # Si encuentra rtsp al inicio de
is_live = True # Seteamos el parámetro de control
sensor_id_list = ",".join(sensor_ids)
print("Creating multiurisrcbin \n ")
multiurisrcbin = Gst.ElementFactory.make("nvmultiurisrcbin", "nvmultiurisrcbin")
if not multiurisrcbin: # Validacion creacion correcta multiurisrcbin
sys.stderr.write(" Unable to create NvMultiUriSrcBin")
if is_live:
multiurisrcbin.set_property("live-source", 1) # Seteando parámetro para manejo de RTSP's
pipeline.add(multiurisrcbin) # Se agrega multiurisrcbin a pipeline
print("Creating Pgie \n ")
if gie=="nvinfer":
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference") # Creacion de Primary GPU Inference Engine
else:
pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference") # Creacion de Primary GPU Inference Engine
if not pgie: # Validacion de Primary GPU Inference Engine
sys.stderr.write(" Unable to create pgie \n")
print("Creating nvtracker \n ")
tracker = Gst.ElementFactory.make("nvtracker", "tracker") # Creacion de Tracker
if not tracker: # Validacion de Tracker
sys.stderr.write(" Unable to create pgie \n")
tracker_config = configparser.ConfigParser()
tracker_config.read("nvtracker_tracker_config_ds6.txt")
tracker_config.sections()
for key in tracker_config['tracker']:
if key == 'tracker-width' :
tracker_width = tracker_config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = tracker_config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = tracker_config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = tracker_config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = tracker_config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor") # Convertidor de formatos de color NVMM/RAW -> NVMM/RAW
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
# Create streamdemux
streamdemux = Gst.ElementFactory.make("nvstreamdemux", "stream-demuxer")
if not streamdemux:
sys.stderr.write(" Unable to create nvstreamdemux \n")
# Seteando propiedades del multiurisrcbin
multiurisrcbin.set_property("rtsp-reconnect-interval", 30) # Intervalo de reconexión
multiurisrcbin.set_property("rtsp-reconnect-attempts", -1) # Intentos de reconexión ilimitados
multiurisrcbin.set_property("mode", 0) # 0 -> video / 1 -> audio
multiurisrcbin.set_property("select-rtp-protocol", 4) # Fuerza TCP desde el inicio
multiurisrcbin.set_property("uri-list", uri_list) # Envía string que contiene todos los RTSP
multiurisrcbin.set_property("sensor-id-list", sensor_id_list) # Envía string que contiene todos los ID's
multiurisrcbin.set_property("file-loop", True) # Al llegar a EOF en un file local, simplemente continua
multiurisrcbin.set_property("max-batch-size", PGIE_BATCH_SIZE) # Max tam del batch en streammux
multiurisrcbin.set_property("port", HTTP_API_REST_PORT) # Puerto HTTP REST API
multiurisrcbin.set_property("width", MUXER_OUTPUT_WIDTH) # Asigna width a cada frame
multiurisrcbin.set_property("height", MUXER_OUTPUT_HEIGHT) # Asigna height a cada frame
multiurisrcbin.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
if ts_from_rtsp:
multiurisrcbin.set_property("attach-sys-ts", 0) # Recupera timestamps de los streams
if gie=="nvinfer":
pgie.set_property("config-file-path", "config_peoplenet.txt") # Archivo de configuracion del modelo para tensorrt
else:
pgie.set_property("config-file-path", "config_peoplenet_inferserver.txt")
pgie_batch_size = pgie.get_property("batch-size") # Recuperando batch size del Primary GPU Inference Engine
if pgie_batch_size != PGIE_BATCH_SIZE: # Si discrepa con el max batch size posible
print(
"WARNING: Overriding infer-config batch-size",
pgie_batch_size,
"with number of sources ",
PGIE_BATCH_SIZE,
" \n",
)
pgie.set_property("batch-size", PGIE_BATCH_SIZE) # Asignamos al Primary GPU Inference Engine el mismo max batch size
print("Playing file(s):", args)
print("Adding elements to Pipeline \n") # Agregamos los elementos al pipeline
if not platform_info.is_integrated_gpu(): # Si es GPU integrada (Jetson) devuelve False, si es dedicada (A100) devuelve True
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
multiurisrcbin.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
loop = GLib.MainLoop() # Crea el mainloop principal
# Agregar probe y callback
srcpad = multiurisrcbin.get_static_pad("src")
if not srcpad:
print("❌ Unable to get src pad of nvmultiurisrcbin")
else:
srcpad.add_probe(
Gst.PadProbeType.BUFFER,
multiurisrcbin_src_pad_probe,
(pipeline, loop)
)
# Agregar elementos al pipeline
pipeline.add(tracker)
pipeline.add(pgie)
pipeline.add(nvvidconv)
pipeline.add(streamdemux)
# Vincular elementos al pipeline
safe_link(multiurisrcbin, pgie, "multiurisrcbin", "pgie")
safe_link(pgie, tracker, "pgie", "tracker")
safe_link(tracker, nvvidconv, "tracker", "nvvidconv")
safe_link(nvvidconv, streamdemux, "nvvidconv", "streamdemux")
kafka_producer = KafkaProducer(
bootstrap_servers=kafka_broker
)
bus = pipeline.get_bus() # Obtener bus de mensajes del pipeline (EOS, ERROR, STATE_CHANGED, etc)
bus.add_signal_watch() # Activar la emision de signals, permite conectar un callback como si fuera un evento
bus.connect("message", bus_call, loop) # Cada vez que se recibe un mensaje, se llama a bus_call()
# RTSP server setup
global rtsp_port_num
rtsp_port_num = 8554
server = GstRtspServer.RTSPServer.new()
server.props.service = str(rtsp_port_num)
server.attach(None)
for i in range(PGIE_BATCH_SIZE):
demux_pad = streamdemux.request_pad_simple(f"src_{i}")
if not demux_pad:
print(f"Failed to get src_{i} pad from streamdemux")
continue
# Crear queue
queue = Gst.ElementFactory.make("queue", f"queue_{i}")
pipeline.add(queue)
queue.sync_state_with_parent()
# Link demux_pad a queue
sinkpad = queue.get_static_pad("sink")
if demux_pad.link(sinkpad) != Gst.PadLinkReturn.OK:
print(f"Failed to link demux_pad to queue for source {i}")
continue
# Crear identity
identity = Gst.ElementFactory.make("identity", f"identity_{i}")
pipeline.add(identity)
identity.sync_state_with_parent()
identity.set_property("drop-allocation", True)
# Link queue a identity
safe_link(queue, identity, "queue", "identity")
# Create videoconvert
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", f"convi_{i}") # Convertidor de formatos de color NVMM/RAW -> NVMM/RAW
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
pipeline.add(nvvidconv)
safe_link(identity, nvvidconv, "identity", "nvvidconv")
if not platform_info.is_integrated_gpu(): # Si es GPU integrada (Jetson) devuelve False, si es dedicada (A100) devuelve True
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
# Create a caps filter
caps = Gst.ElementFactory.make("capsfilter", f"capsi_{i}")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA"))
pipeline.add(caps)
safe_link(nvvidconv, caps, "nvvidconv", "caps")
# Create OSD to draw on the converted RGBA buffer
nvosd = Gst.ElementFactory.make("nvdsosd", f"onscreendisplay_{i}")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
pipeline.add(nvosd)
nvosd.sync_state_with_parent()
# Link nvosd a queue
safe_link(caps, nvosd, "caps", "nvosd")
# Agregar buffer probe después del queue
srcpad = nvosd.get_static_pad("sink")
srcpad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe)
# Ahora seguir conectando a create_output_branch
create_output_branch(pipeline, nvosd.get_static_pad("src"), i, server, codec, bitrate, platform_info)
# Iniciamos el pipeline
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
#save_graph(pipeline,"init_pipeline","dinamyc_pipeline_api_reid_demux_proxy")
try:
kafka_thread = Thread(target=kafka_sender_loop, daemon=True)
kafka_thread.start()
loop.run() # Poner a correr el mainloop
except BaseException:
pass
pipeline.set_state(Gst.State.NULL)
def parse_args():
parser = argparse.ArgumentParser(description='RTSP Output Sample Application Help ')
parser.add_argument(“-i”, “–input”,
help=“Path to input H264 elementry stream”, nargs=“+”, default=[“a”], required=True)
parser.add_argument(“-g”, “–gie”, default=“nvinfer”,
help=“choose GPU inference engine type nvinfer or nvinferserver , default=nvinfer”, choices=[‘nvinfer’,‘nvinferserver’])
parser.add_argument(“-c”, “–codec”, default=“H264”,
help=“RTSP Streaming Codec H264/H265 , default=H265”, choices=[‘H264’,‘H265’])
parser.add_argument(“-b”, “–bitrate”, default=4000000,
help=“Set the encoding bitrate “, type=int)
parser.add_argument(”-e”, “–enc_type”, default=1, choices=[0, 1], type=int,
help=“0:Hardware encoder, 1:Software encoder”)
parser.add_argument(“–rtsp-ts”, action=“store_true”, default=True, dest=‘rtsp_ts’, help=“Attach NTP timestamp from RTSP source”,
)
parser.add_argument(“–kafka-broker”, default=“kafka:9202”, dest=“kafka_broker”, help=“Kafka Broker”,
)
parser.add_argument(“–kafka-topic”, default=“deepstream_test_api_rest_plugin” , dest=“kafka_topic”, help=“Kafka Topic”,
),
parser.add_argument(“–crop”, choices=[“yes”, “no”], default=“yes”,
help=“Enable or disable base64 crop encoding. Default is yes”)
# Check input arguments
if len(sys.argv)==1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
global codec
global bitrate
global stream_path
global enc_type
global gie
global ts_from_rtsp
global kafka_producer
global kafka_broker
global kafka_topic
global get_crop
global last_send_time
global limit_time
global data_queue
global data_lock
gie = args.gie
codec = args.codec
bitrate = args.bitrate
stream_path = args.input
enc_type = args.enc_type
get_crop = args.crop
ts_from_rtsp = args.rtsp_ts
kafka_producer = None
kafka_broker = args.kafka_broker
kafka_topic = args.kafka_topic
data_queue = Queue()
data_lock = Lock()
last_send_time = None
limit_time = 10
return stream_path
if name == ‘main’:
stream_path = parse_args()
sys.exit(main(stream_path))
And I’m subsequently adding cameras through the nvds_rest_server api inside multiurisrcbin with this code:
import requests
import json
from datetime import datetime
Configura la URL del API
url = “http://localhost:9014/api/v1/stream/add”
inicio = 2
fin = 4
for i in range(inicio,fin):
#camera_url = "file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264"
camera_url = "rtsp://..." # REQUIRED
prefix = camera_url.strip().split(":")[0]
if i>=11 and i <=21 and prefix == "rtsp":
camera_url = "rtsp://..."
if i>=22 and i<=32 and prefix == "rtsp":
camera_url = "rtsp://..."
# Datos del stream a agregar
payload = {
"key": "sensor",
"event": {
"camera_id": f"cam-{i}", # REQUIRED
"camera_name": "camera_name",
"camera_url": camera_url,
#"change": "camera_add",
"change": "camera_streaming", # REQUIRED
"metadata": {
"resolution": "2560x1440",
"codec": "h265",
"framerate": 10,
"select-rtp-protocol": 4
}
},
"headers": {
"source": "vst",
"created_at": datetime.utcnow().isoformat() + "Z"
}
}
# Encabezados HTTP
headers = {
"Content-Type": "application/json"
}
# Enviar solicitud POST
response = requests.post(url, headers=headers, data=json.dumps(payload))
# Mostrar respuesta
print("*************************************")
print(f"cam-{i}")
print("Status Code:", response.status_code)
print("Response Body:", response.text)
if response.status_code == 200:
print(f"RTSP to add to Proxy: rtsp://<IP>:<PORT>/dynamic_stream/{i-1}/")
I run the pipeline with one camera, then add more with the nvds_rest_server API in multiurisrcbin, and everything works fine from sending to Kafka to the individual sinks. The problem is that I need to set the 32 demux src pads and their connected branches from the start. I’ve tried several ways to make the demux have, for example, one src pad when launching the pipeline with an initial src, but nothing works. It only works like this by leaving everything set with the maximum number of possible sources so that when adding sources through the nvds_rest_server API, they automatically flow to the end of the pipeline. It seems that the streamdemux src pads are not automatically generated when sending sources through this API.
Can someone please help me with these questions?