• Hardware Platform (GPU)
• DeepStream Version : 7.0
• TensorRT Version: 12.4
• NVIDIA GPU Driver Version: 550
• Issue Type : questions
How to dynamically add and delete RTSP input sources and corresponding RTSP server outputs.
My requirement is to implement dynamic multi-channel rtsp inputs on a server and output them through the rtsp server, with each rtsp input corresponding to each rtsp server output.
I am currently using the method of starting multiple pipelines, where one pipeline handles one RTSP input and the corresponding RTSP server output.
But the problem is that the first video can run normally, and when the second pipeline starts, both videos cannot output images through the RTSP server.
#!/usr/bin/env python3
################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2021-2023 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################
import os
os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/home/edyang/code/tunnel-fire-det-ds/dot"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/home/edyang/code/tunnel-fire-det-ds/dot')
import argparse
import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import GLib, Gst, GstRtspServer
from ctypes import *
import sys
import math
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.utils import long_to_uint64,get_md5_str,get_now_time_str
from common.FPS import PERF_DATA
import numpy as np
import pyds
import cv2
import os
import os.path
from os import path
from MediaServerAPI import MediaServer
perf_data = None
frame_count = {}
saved_count = {}
MUXER_BATCH_TIMEOUT_USEC = 33000
FRAME_W = 960
FRAME_H = 540
PGIE_CONFIG_FILE = "yolov5m-official/config_infer_primary_yoloV5-test.txt"
MSG_BROKER_CONN = '192.168.31.9;11883'
MSG_BROKER_TOPIC = 'tunnel/monitor/firework'
MSCONV_CONFIG_FILE = "yolov5m-official/mqtt/cfg_msgconv.txt"
MSG_BROKER_CONFIG_FILE = "yolov5m-official/mqtt/cfg_mqtt.txt"
DEVICE_CODE = ''
RTSP_SERVER_PORT = 8551
IS_SAVE_RESULT = True
SAVE_IMAGE_DIR = "./data/frame"
SAVE_VIDEO_DIR = "./data/video"
RECORD_VIDEO_FPS = 13
RECORD_VIDEO_WRITER = None
RECORD_VIDEO_FILE_NAME = None
RECORD_VIDEO_KEEP_FRAME = RECORD_VIDEO_FPS * 2
RECORD_VIDEO_FOURCC = cv2.VideoWriter_fourcc(*'XVID')
HAS_OBJECT_FRAME_COUNT = 0
MAX_TIME_STAMP_LEN = 32
def caculate(x1,y1,x2,y2):
u = (x1 + x1 + x1 + x2) / 4
v = y1
fx = 9095.541
fy = 8969.147
u0 = 2053.659
v0 = 1084.515
H = 6.577
h = 5.670
erfa = math.pi * 30 / 180
delta = math.pi * 10 / 180
d1 = float(H-h) / math.tan(math.atan((abs(v-v0)) / fy) - erfa)
l1 = float(abs(u - u0)) * (math.sqrt(d1 * d1+(H-h)*(H-h))/ math.sqrt(fy*fy + abs((v-v0)*(v-v0))))
d = (d1 / math.cos(math.atan(l1 / d1))) * math.cos(delta - math.atan(l1 / d1))
l = (d1 / math.cos(math.atan(l1 / d1))) * math.sin(delta - math.atan(l1 / d1))
horizontal_distance = d
vertical_distance = l
#result = [['horizontal_distance', 'vertical_distance'],[horizontal_distance, vertical_distance]]
return horizontal_distance,vertical_distance
def create_record():
file_name = "{}/record_video_{}.avi".format(SAVE_VIDEO_DIR, get_now_time_str())
out = cv2.VideoWriter(file_name, fourcc=RECORD_VIDEO_FOURCC, fps=RECORD_VIDEO_FPS,frameSize=(FRAME_W,FRAME_H))
return out,file_name
def stop_record(out):
out.release()
def draw_bounding_boxes(image, obj_meta, confidence):
confidence = '{0:.2f}'.format(confidence)
rect_params = obj_meta.rect_params
top = int(rect_params.top)
left = int(rect_params.left)
width = int(rect_params.width)
height = int(rect_params.height)
obj_name = str(obj_meta.class_id)
image = cv2.rectangle(image, (left, top), (left + width, top + height), (0, 0, 255, 0), 2, cv2.LINE_4)
image = cv2.putText(image, obj_name + ',C=' + str(confidence), (left - 10, top - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5,
(0, 0, 255, 0), 2)
return image
def draw_rtsp_url(image,rtsp_url):
image = cv2.putText(image, rtsp_url, (10, 50), cv2.FONT_HERSHEY_SIMPLEX, 1.3,
(255, 0, 0, 100), 3)
return image
# tiler_sink_pad_buffer_probe will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def tiler_sink_pad_buffer_probe(pad, info, u_data):
frame_number = 0
num_rects = 0
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
global RECORD_VIDEO_WRITER
global RECORD_VIDEO_FILE_NAME
global HAS_OBJECT_FRAME_COUNT
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number = frame_meta.frame_num
l_obj = frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
is_first_obj = True
save_image = False
frame_copy = None
alarm_type = "DETECT_OTHER"
if IS_SAVE_RESULT:
if l_obj is not None:
HAS_OBJECT_FRAME_COUNT += 1
if HAS_OBJECT_FRAME_COUNT < 0:
HAS_OBJECT_FRAME_COUNT = 0
if HAS_OBJECT_FRAME_COUNT > RECORD_VIDEO_KEEP_FRAME and RECORD_VIDEO_WRITER is None:
RECORD_VIDEO_WRITER,RECORD_VIDEO_FILE_NAME = create_record()
'''
@param type -> alarmType : RECORD_VIDEO_START, RECORD_VIDEO_STOP , DETECT_SMOKE, DETECT_FIRE
@param color -> imgUrl / url(videoSetName)
@param model -> horizontalDistance
@param region -> verticalDistance
'''
add_user_meta(batch_meta,frame_meta,None,frame_number, 'RECORD_VIDEO_START',RECORD_VIDEO_FILE_NAME)
else:
if HAS_OBJECT_FRAME_COUNT > 0:
HAS_OBJECT_FRAME_COUNT = 0
else:
HAS_OBJECT_FRAME_COUNT -= 1
if HAS_OBJECT_FRAME_COUNT < -RECORD_VIDEO_KEEP_FRAME and RECORD_VIDEO_WRITER is not None:
stop_record(RECORD_VIDEO_WRITER)
'''
@param type -> alarmType : RECORD_VIDEO_START, RECORD_VIDEO_STOP , DETECT_SMOKE, DETECT_FIRE
@param color -> imgUrl / url(videoSetName)
@param model -> horizontalDistance
@param region -> verticalDistance
'''
add_user_meta(batch_meta,frame_meta,None,frame_number, 'RECORD_VIDEO_STOP',RECORD_VIDEO_FILE_NAME)
RECORD_VIDEO_WRITER = None
RECORD_VIDEO_FILE_NAME = None
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
alarm_type = "DETECT_OTHER"
if(obj_meta.class_id == 0):
alarm_type = "DETECT_SMOKE"
if(obj_meta.class_id == 1):
alarm_type = "DETECT_FIRE"
if IS_SAVE_RESULT:
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
#draw_rtsp_url(n_frame,DEVICE_CODE)
#n_frame = crop_object(n_frame, obj_meta)
frame_copy = draw_bounding_boxes(n_frame.copy(), obj_meta, obj_meta.confidence)
# convert python array into numpy array format in the copy mode.
frame_copy = np.array(frame_copy, copy=True, order='C')
# convert the array into cv2 default color format
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGR)
if is_aarch64(): # If Jetson, since the buffer is mapped to CPU for retrieval, it must also be unmapped
pyds.unmap_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id) # The unmap call should be made after operations with the original array are complete.
# The original array cannot be accessed after this call.
if saved_count["stream_{}".format(frame_meta.pad_index)] % 30 == 0:
save_image = True
try:
l_obj = l_obj.next
except StopIteration:
break
# Update frame rate through this probe
stream_index = "stream{0}".format(frame_meta.pad_index)
global perf_data
perf_data.update_fps(stream_index)
if save_image and IS_SAVE_RESULT and saved_count["stream_{}".format(frame_meta.pad_index)] % (25*5) == 0:
global SAVE_IMAGE_DIR
img_path = "{}/frame_{}.jpg".format(SAVE_IMAGE_DIR, get_now_time_str())
cv2.imwrite(img_path, frame_copy.copy())
'''
@param type -> alarmType : RECORD_VIDEO_START, RECORD_VIDEO_STOP , DETECT_SMOKE, DETECT_FIRE
@param color -> imgUrl / url(videoSetName)
@param model -> horizontalDistance
@param region -> verticalDistance
'''
add_user_meta(batch_meta,frame_meta,obj_meta,frame_number, alarm_type,img_path)
saved_count["stream_{}".format(frame_meta.pad_index)] += 1
if RECORD_VIDEO_WRITER is not None and frame_copy is not None and IS_SAVE_RESULT:
ret = RECORD_VIDEO_WRITER.write(frame_copy)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def generate_vehicle_meta(data,class_id,type,license,color,make,model,region):
obj = pyds.NvDsVehicleObject.cast(data)
obj.type = type
obj.license = license
obj.color = color
obj.make = make
obj.model = model
obj.region = region
return obj
def generate_event_msg_meta(data, class_id, type,license,color,make,model,region):
'''
@param type -> alarmType : RECORD_VIDEO_START , DETECT_SMOKE, DETECT_FIRE
@param license -> deviceCode
@param color -> imgUrl / url(videoSetName)
@param make -> monitorTime
@param model -> horizontalDistance
@param region -> verticalDistance
'''
meta = pyds.NvDsEventMsgMeta.cast(data)
meta.sensorId = 0
meta.placeId = 0
meta.moduleId = 0
meta.sensorStr = "sensor-0"
meta.ts = pyds.alloc_buffer(MAX_TIME_STAMP_LEN + 1)
pyds.generate_ts_rfc3339(meta.ts, MAX_TIME_STAMP_LEN)
# This demonstrates how to attach custom objects.
# Any custom object as per requirement can be generated and attached
# like NvDsVehicleObject / NvDsPersonObject. Then that object should
# be handled in payload generator library (nvmsgconv.cpp) accordingly.
meta.type = pyds.NvDsEventType.NVDS_EVENT_MOVING
meta.objType = pyds.NvDsObjectType.NVDS_OBJECT_TYPE_VEHICLE
meta.objClassId = class_id
obj = pyds.alloc_nvds_vehicle_object()
obj = generate_vehicle_meta(obj,class_id,type,license,color,make,model,region)
meta.extMsg = obj
meta.extMsgSize = sys.getsizeof(pyds.NvDsVehicleObject)
return meta
def add_user_meta(batch_meta,frame_meta,obj_meta,frame_number, type,color):
'''
@param type -> alarmType : RECORD_VIDEO_START , DETECT_SMOKE, DETECT_FIRE
@param color -> imgUrl / url(videoSetName)
@param model -> horizontalDistance
@param region -> verticalDistance
'''
global DEVICE_CODE
user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
if user_event_meta:
# Allocating an NvDsEventMsgMeta instance and getting
# reference to it. The underlying memory is not manged by
# Python so that downstream plugins can access it. Otherwise
# the garbage collector will free it when this probe exits.
class_id = 0
msg_meta = pyds.alloc_nvds_event_msg_meta(user_event_meta)
horizontalDistance = '0'
verticalDistance = '0'
if obj_meta is not None :
class_id = obj_meta.class_id
msg_meta.bbox.top = obj_meta.rect_params.top
msg_meta.bbox.left = obj_meta.rect_params.left
msg_meta.bbox.width = obj_meta.rect_params.width
msg_meta.bbox.height = obj_meta.rect_params.height
msg_meta.frameId = frame_number
msg_meta.trackingId = long_to_uint64(obj_meta.object_id)
msg_meta.confidence = obj_meta.confidence
horizontal_distance,vertical_distance = caculate(msg_meta.bbox.left,msg_meta.bbox.top,(msg_meta.bbox.left+msg_meta.bbox.width),(msg_meta.bbox.top+msg_meta.bbox.height))
horizontalDistance = str(horizontal_distance)
verticalDistance = str(vertical_distance)
msg_meta = generate_event_msg_meta(msg_meta, class_id, type,DEVICE_CODE,color,get_now_time_str(),horizontalDistance,verticalDistance)
user_event_meta.user_meta_data = msg_meta
user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
pyds.nvds_add_user_meta_to_frame(frame_meta,
user_event_meta)
else:
print("Error in attaching event meta to buffer\n")
def cb_newpad(decodebin, decoder_src_pad,data):
print("In cb_newpad\n")
caps=decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
source_bin=data
features=caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
print("gstname=",gstname)
if(gstname.find("video")!=-1):
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
print("features=",features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad=source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if "source" in name:
source_element = child_proxy.get_by_name("source")
if source_element.find_property('drop-on-latency') != None:
Object.set_property("drop-on-latency", True)
def decodebin_child_added(child_proxy, Object, name, user_data):
print("Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
if name.find("source") != -1:
pyds.configure_source_for_ntp_sync(hash(Object))
def create_source_bin(index,uri):
print("Creating source bin")
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name="source-bin-%02d" %index
print(bin_name)
nbin=Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
uri_decode_bin.set_property("cudadec-memtype", 0)
#uri_decode_bin.set_property("drop-frame-interval", 2)
#uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri",uri)
uri_decode_bin.set_property("rtsp-reconnect-interval", 5)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added",cb_newpad,nbin)
uri_decode_bin.connect("child-added",decodebin_child_added,nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin,uri_decode_bin)
bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def main(uri_inputs,codec,bitrate,mqtt_server_ip,mqtt_server_port,rtsp_server_ip, rtsp_server_port, mip, mport, musername, mpassword):
MSG_BROKER_CONN = f'{mqtt_server_ip};{mqtt_server_port}'
# Check input arguments
number_sources = len(uri_inputs)
if number_sources <= 0:
sys.stderr.write("Rtsp resource can not empty!")
sys.exit(1)
global perf_data
perf_data = PERF_DATA(number_sources)
global DEVICE_CODE
global SAVE_IMAGE_DIR
global SAVE_VIDEO_DIR
stream_name = get_md5_str(uri_inputs[0])
DEVICE_CODE = uri_inputs[0]
SAVE_IMAGE_DIR = "{}/{}".format(SAVE_IMAGE_DIR,stream_name)
SAVE_VIDEO_DIR = "{}/{}".format(SAVE_VIDEO_DIR,stream_name)
if path.exists(SAVE_IMAGE_DIR) is False:
os.makedirs(SAVE_IMAGE_DIR)
print("Frames will be saved in ", SAVE_IMAGE_DIR)
if path.exists(SAVE_VIDEO_DIR) is False:
os.makedirs(SAVE_VIDEO_DIR)
print("Video will be saved in ", SAVE_VIDEO_DIR)
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
frame_count["stream_" + str(i)] = 0
saved_count["stream_" + str(i)] = 0
sys.stdout.write("Creating source_bin")
uri_name = uri_inputs[i]
if uri_name.find("rtsp://") == 0:
is_live = True
source_bin = create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname = "sink_%u" % i
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
#pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
# Add nvvidconv1 and filter1 to convert the frames to RGBA
# which is easier to work with in Python.
print("Creating nvvidconv1 \n ")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
print("Creating tiler \n ")
tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
if not tiler:
sys.stderr.write(" Unable to create tiler \n")
print("Creating nvvidconv \n ")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
print("Creating nvosd \n ")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
if not nvvidconv_postosd:
sys.stderr.write(" Unable to create nvvidconv_postosd \n")
# Create a caps filter
caps = Gst.ElementFactory.make("capsfilter", "filter")
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
# Make the encoder
if codec == "H264":
encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
print("Creating H264 Encoder")
elif codec == "H265":
encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
print("Creating H265 Encoder")
if not encoder:
sys.stderr.write(" Unable to create encoder")
encoder.set_property('bitrate', bitrate)
if is_aarch64():
encoder.set_property('preset-level', 1)
encoder.set_property('insert-sps-pps', 1)
#encoder.set_property('bufapi-version', 1)
# Make the payload-encode video into RTP packets
if codec == "H264":
rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
print("Creating H264 rtppay")
elif codec == "H265":
rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
print("Creating H265 rtppay")
if not rtppay:
sys.stderr.write(" Unable to create rtppay")
# mqtt start
msgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
if not msgconv:
sys.stderr.write(" Unable to create msgconv \n")
msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
if not msgbroker:
sys.stderr.write(" Unable to create msgbroker \n")
tee = Gst.ElementFactory.make("tee", "nvsink-tee")
if not tee:
sys.stderr.write(" Unable to create tee \n")
queue1 = Gst.ElementFactory.make("queue", "nvtee-que1")
if not queue1:
sys.stderr.write(" Unable to create queue1 \n")
queue2 = Gst.ElementFactory.make("queue", "nvtee-que2")
if not queue2:
sys.stderr.write(" Unable to create queue2 \n")
# mqtt end
# Make the UDP sink
updsink_port_num = 5400
sink = Gst.ElementFactory.make("udpsink", "udpsink")
if not sink:
sys.stderr.write(" Unable to create udpsink")
sink.set_property('host', '224.224.255.255')
sink.set_property('port', updsink_port_num)
sink.set_property('async', False)
sink.set_property('sync', 1)
print("Playing file {} ".format(uri_inputs))
streammux.set_property('width', FRAME_W)
streammux.set_property('height', FRAME_H)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property("attach-sys-ts", 0)
pgie.set_property('config-file-path', PGIE_CONFIG_FILE)
#pgie.set_property('interval', 5)
pgie_batch_size = pgie.get_property("batch-size")
if (pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
number_sources, " \n")
pgie.set_property("batch-size", number_sources)
tiler_rows = int(math.sqrt(number_sources))
tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
tiler.set_property("rows", tiler_rows)
tiler.set_property("columns", tiler_columns)
tiler.set_property("width", FRAME_W)
tiler.set_property("height", FRAME_H)
if not is_aarch64():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
tiler.set_property("nvbuf-memory-type", mem_type)
nvvidconv_postosd.set_property("nvbuf-memory-type", mem_type)
msgconv.set_property('config', MSCONV_CONFIG_FILE)
'''
schema = 0; Full message schema with separate payload per object (Default)
schema = 1; Minimal message with multiple objects in single payload.
'''
schema_type = 0
msgconv.set_property('payload-type', schema_type)
'''
Adaptor library can be found at /opt/nvidia/deepstream/deepstream-<version>/lib
kafka lib - libnvds_kafka_proto.so
azure device client - libnvds_azure_proto.so
AMQP lib - libnvds_amqp_proto.so
redis lib - libnvds_redis_proto.so
mosquitto lib - libnvds_mqtt_proto.so
'''
msgbroker.set_property('proto-lib', '/opt/nvidia/deepstream/deepstream/lib/libnvds_mqtt_proto.so')
msgbroker.set_property('conn-str', MSG_BROKER_CONN)
msgbroker.set_property('config', MSG_BROKER_CONFIG_FILE)
msgbroker.set_property('topic', MSG_BROKER_TOPIC)
msgbroker.set_property('sync', False)
print("Adding elements to Pipeline \n")
pipeline.add(pgie)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(filter1)
pipeline.add(nvvidconv1)
pipeline.add(nvosd)
pipeline.add(nvvidconv_postosd)
pipeline.add(caps)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(sink)
# mqtt start
pipeline.add(tee)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(msgconv)
pipeline.add(msgbroker)
# mqtt end
print("Linking elements in the Pipeline \n")
streammux.link(pgie)
pgie.link(nvvidconv1)
nvvidconv1.link(filter1)
filter1.link(tiler)
tiler.link(nvvidconv)
nvvidconv.link(nvosd)
# mqtt start
nvosd.link(tee)
queue1.link(msgconv)
msgconv.link(msgbroker)
queue2.link(nvvidconv_postosd)
# mqtt end
#nvosd.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)
# mqtt start
tee_msg_pad = tee.get_request_pad('src_%u')
tee_render_pad = tee.get_request_pad("src_%u")
if not tee_msg_pad or not tee_render_pad:
sys.stderr.write("Unable to get request pads\n")
sink_pad = queue1.get_static_pad("sink")
tee_msg_pad.link(sink_pad)
sink_pad = queue2.get_static_pad("sink")
tee_render_pad.link(sink_pad)
# mqtt end
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# Start streaming
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_server_port
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch( "(udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, codec))
factory.set_shared(True)
server.get_mount_points().add_factory("/bz-live", factory)
rtsp_url = 'rtsp://{}:{}/bz-live'.format(rtsp_server_ip,rtsp_server_port)
#print("\n *** DeepStream: Launched RTSP Streaming at %s ***\n\n" % rtsp_url)
sys.stdout.write("\n *** DeepStream: Launched RTSP Streaming at %s ***\n\n" % rtsp_url)
tiler_sink_pad = tiler.get_static_pad("sink")
if not tiler_sink_pad:
sys.stderr.write(" Unable to get sink pad \n")
else:
tiler_sink_pad.add_probe(Gst.PadProbeType.BUFFER, tiler_sink_pad_buffer_probe, 0)
# perf callback function to print fps every 5 sec
GLib.timeout_add(5000, perf_data.perf_print_callback)
# Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, "baize-ds-pipeline.dot")
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
# start media live
#media_server = MediaServer(mip, mport, musername, mpassword)
#add_media_result = media_server.addPushProxy(rtsp_url)
#print(f"Media server add stream:{add_media_result}")
try:
loop.run()
except:
pass
# cleanup
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
def parse_args():
parser = argparse.ArgumentParser(description='RTSP Output Sample Application Help ')
parser.add_argument("-i","--uri_inputs", metavar='N', type=str, nargs='+',
help='Path to inputs URI e.g. rtsp:// ... or file:// seperated by space')
parser.add_argument("-c", "--codec", default="H265",
help="RTSP Streaming Codec H264/H265 , default=H264", choices=['H264','H265'])
parser.add_argument("-b", "--bitrate", default=2000000,
help="Set the encoding bitrate ", type=int)
parser.add_argument("--mqtt_server_ip", default='127.0.0.1',required=False,
help="Set the mqtt server ip ")
parser.add_argument("--mqtt_server_port", default=11883,required=False,
help="Set the mqtt server ip ", type=int)
parser.add_argument("--rtsp_server_ip", default='127.0.0.1',required=False,
help="Set the rtsp server ip ")
parser.add_argument("--rtsp_server_port", default=8551,required=False,
help="Set the rtsp server port ", type=int)
parser.add_argument("--media_ip", default='127.0.0.1',required=False,
help="Set the live server ip ")
parser.add_argument( "--media_port", default=18080,required=False,
help="Set the live server port ", type=int)
parser.add_argument("--media_username", default='admin', required=False,
help="Set the live server username ")
parser.add_argument("--media_password", default='Stevejobs_',required=False,
help="Set the live server password ")
# Check input arguments
if len(sys.argv)==1:
parser.print_help(sys.stderr)
sys.exit(1)
args = parser.parse_args()
print("URI Inputs: " + str(args.uri_inputs ))
return args.uri_inputs , args.codec, args.bitrate , args.mqtt_server_ip, args.mqtt_server_port, args.rtsp_server_ip, args.rtsp_server_port, args.media_ip, args.media_port, args.media_username, args.media_password
if __name__ == '__main__':
'''
use:
python Main.py -i 'rtsp://admin:hk123456@192.168.31.100/cam/realmonitor?channel=1&subtype=0' \
--rtst_server_ip 192.168.31.9 --rtst_server_port 8551 \
--media_ip 192.168.31.9 --media_port 18080 --media_username admin --media_password Stevejobs_
'''
uri_inputs , out_codec, out_bitrate, mqtt_server_ip, mqtt_server_port, rtsp_server_ip, rtsp_server_port, mip, mport, musername, mpassword = parse_args()
sys.exit(main(uri_inputs, out_codec, out_bitrate,mqtt_server_ip,mqtt_server_port,rtsp_server_ip,rtsp_server_port,mip, mport, musername, mpassword ))