Current code I uses for runtime source addition, deleteion and nvds (roi and line) update.
I used FastAPI to listen for inputs from user
if add_sources → create source_bin, link with streammux and add it to pipeline, and add demux after stream specific elements as well
if delete_source → unlink from streammux and demux and remove elements from pipline
if update_analytics - > update config_file.txt of nvanalytics property which already set while initialising and set property config_file.txt for nvanalytics again → this result in cumulative line crosiing count to 0 when every I update config_file.txt and set it as property
what is the best way to add stream, add stream specific element after demux and how to update roi and line for new and existing streams?
import sys
sys.path.append('../')
import gi
import configparser
gi.require_version('Gst', '1.0')
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GstRtspServer
from gi.repository import Gst, GLib
from gi.repository import GLib
from ctypes import *
import time
import sys
import pyds
import platform
from common.platform_info import PlatformInfo
import os
import pytz
from threading import Thread, Lock
from datetime import datetime
from zoneinfo import ZoneInfo
from urllib.parse import urlparse
import uvicorn
from fastapi import FastAPI, HTTPException
import logging
from datetime import datetime, timedelta
app = FastAPI()
perf_data = None
MUXER_BATCH_TIMEOUT_USEC = 33000
GPU_ID = 0
g_num_sources = 0
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1
PGIE_CLASS_ID_PERSON = 0
PGIE_CLASS_ID_BAG = 1
PGIE_CLASS_ID_FACE = 2
dict_rtsp_source_bin = {}
analytics_config = "c.txt"
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name)
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if name.find("source") != -1:
print("RTSP source detected, configuring NTP sync.",name)
pyds.configure_source_for_ntp_sync(hash(Object))
if(name.find("nvv4l2decoder") != -1):
if (platform_info.is_integrated_gpu()):
Object.set_property("enable-max-performance", True)
Object.set_property("drop-frame-interval", 0)
Object.set_property("num-extra-surfaces", 0)
else:
Object.set_property("gpu_id", GPU_ID)
def cb_newpad(decodebin,pad,data):
global streammux
print("In cb_newpad")
caps=pad.get_current_caps()
gststruct=caps.get_structure(0)
gstname=gststruct.get_name()
print("gstname=",gstname)
if(gstname.find("video")!=-1):
source_id = data
pad_name = "sink_%u" % source_id
print('pad_name',pad_name)
sinkpad = streammux.request_pad_simple(pad_name)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin ")
if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
print("Decodebin linked to pipeline")
else:
sys.stderr.write("Failed to link decodebin to pipeline")
def create_uridecode_bin(index,rtsp_url):
print("Creating uridecodebin for [%s]" % rtsp_url)
bin_name="source-bin-%02d" % index
print(bin_name)
bin=Gst.ElementFactory.make("uridecodebin", bin_name)
if not bin:
sys.stderr.write(" Unable to create uri decode bin ")
bin.set_property("uri",rtsp_url)
bin.connect("pad-added",cb_newpad,index)
bin.connect("child-added",decodebin_child_added,index)
return bin
def update_analytics_config(rtsp_url, zone_name=None, roi_value=None, line_value=None):
try:
global nvanalytics
if rtsp_url not in dict_rtsp_source_bin:
print(f"Source {rtsp_url} not found in active sources.")
return
stream_id = dict_rtsp_source_bin[rtsp_url][0]
config_file = analytics_config
print(f"Updating analytics config for stream {stream_id} {rtsp_url}")
with open(config_file, "r") as file:
lines = file.readlines()
# Copy of the lines for modification
updated_lines = lines[:]
# Handle ROI configuration
if roi_value is not None:
zone_name = "roi-"+zone_name
roi_section = f"[roi-filtering-stream-{stream_id}]"
enable_line = "enable=1\n"
roi_line = f"{zone_name}={roi_value}\n"
zone_found = False
section_found = False
# Search for the section corresponding to the stream
for i, line in enumerate(updated_lines):
if line.strip() == roi_section:
section_found = True
j = i + 1
while j < len(updated_lines) and updated_lines[j].strip():
if updated_lines[j].startswith("roi-"):
# Check if the zone already exists and update its value
zone_key = updated_lines[j].split("=")[0].strip()
if zone_key == zone_name:
updated_lines[j] = roi_line
zone_found = True
break
j += 1
# If zone not found, append a new zone line
if not zone_found:
updated_lines.insert(j, roi_line)
break
# for new stream
if not section_found:
updated_lines.append(f"\n{roi_section}\n")
updated_lines.append(enable_line)
updated_lines.append(roi_line)
# Handle Line Crossing Configuration
if line_value is not None:
line_section = f"[line-crossing-stream-{stream_id}]"
enable_line = "enable=1\n"
line_line = f"line-crossing-Exit={line_value}\n"
class_id_line = "class-id=0\n"
extended_line = "extended=0\n"
mode_line = "mode=loose\n"
section_found = False
for i, line in enumerate(updated_lines):
if line.strip() == line_section:
section_found = True
# Add or update the configuration lines
if i + 1 < len(updated_lines) and not updated_lines[i + 1].strip().startswith("enable="):
updated_lines.insert(i + 1, enable_line)
elif i + 1 >= len(updated_lines) or not updated_lines[i + 1].strip() == enable_line.strip():
updated_lines[i + 1] = enable_line
if i + 2 < len(updated_lines) and updated_lines[i + 2].startswith("line-crossing-Exit="):
updated_lines[i + 2] = line_line
else:
updated_lines.insert(i + 2, line_line)
# Ensure other properties exist
other_props = [class_id_line, extended_line, mode_line]
for prop in other_props:
if prop not in updated_lines[i:]:
updated_lines.insert(i + 3, prop)
break
if not section_found:
updated_lines.append(f"\n{line_section}\n")
updated_lines.append(enable_line)
updated_lines.append(line_line)
updated_lines.append(class_id_line)
updated_lines.append(extended_line)
updated_lines.append(mode_line)
# Write the updated configuration back to the file
with open(config_file, "w") as file:
file.writelines(updated_lines)
# Reload nvanalytics configuration
if nvanalytics:
nvanalytics.set_property("config-file", config_file)
print(f"Reloaded nvanalytics config file: {config_file}")
print(f"Successfully updated analytics for stream {stream_id} {rtsp_url}")
except Exception as e:
print("Error updating analytics config:", e)
def add_stream_sepecific_element(source_id, rtsp_url, pipeline, nvstreamdemux):
print(f"add_stream_sepecific_element for source_id {source_id} {rtsp_url}")
# common
queue5 = Gst.ElementFactory.make("queue", "queue_nvosd"+str(source_id))
queue5.set_property("max-size-buffers", 200)
queue5.set_property("leaky", True)
pipeline.add(queue5)
nvstreamdemux.set_state(Gst.State.NULL)
padname = f"src_{source_id}"
demuxsrcpad = nvstreamdemux.request_pad_simple(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue5.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
nvstreamdemux.set_state(Gst.State.PLAYING)
nvvideoconvert1 =Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_nvosd"+str(source_id))
nvdsosd = Gst.ElementFactory.make("nvdsosd", "nvdsosd"+str(source_id))
tee = Gst.ElementFactory.make("tee", "nvsink-tee"+str(source_id))
nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)
pipeline.add(nvvideoconvert1, nvdsosd, tee)
queue5.link(nvvideoconvert1)
nvvideoconvert1.link(nvdsosd)
nvdsosd.link(tee)
# rtsp out
queue_udp = Gst.ElementFactory.make("queue", "queue_udp"+str(source_id))
nvvideoconvert_udp = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_udp"+str(source_id))
caps_udp = Gst.ElementFactory.make("capsfilter", "caps_udp"+str(source_id))
encoder_udp = Gst.ElementFactory.make("nvv4l2h264enc", "encoder"+str(source_id))
rtppay_udp = Gst.ElementFactory.make("rtph264pay", "rtppay"+str(source_id))
udpsink_udp = Gst.ElementFactory.make("udpsink", "udpsink"+str(source_id))
queue_udp.set_property("max-size-buffers", 200)
queue_udp.set_property("leaky", True)
caps_udp.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM),format=I420"))
udp_port = 5400 + source_id
udpsink_udp.set_property("host", "224.224.255.255")
udpsink_udp.set_property("port", udp_port)
udpsink_udp.set_property("async", False)
udpsink_udp.set_property("sync", 0)
pipeline.add(queue_udp,nvvideoconvert_udp,caps_udp,encoder_udp,rtppay_udp,udpsink_udp)
tee.get_request_pad("src_%u").link(queue_udp.get_static_pad("sink"))
queue_udp.link(nvvideoconvert_udp)
nvvideoconvert_udp.link(caps_udp)
caps_udp.link(encoder_udp)
encoder_udp.link(rtppay_udp)
rtppay_udp.link(udpsink_udp)
# img save
queue_img = Gst.ElementFactory.make("queue", "queue_multifile"+str(source_id))
nvvideoconvert_img = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_multifile"+str(source_id))
jpegenc_img = Gst.ElementFactory.make("jpegenc", "jpegenc"+str(source_id))
multifilesink_img = Gst.ElementFactory.make("multifilesink", "multifilesink"+str(source_id))
queue_img.set_property("max-size-buffers", 200)
queue_img.set_property("leaky", True)
multifilesink_img.set_property("post-messages", True)
pipeline.add(queue_img, nvvideoconvert_img, jpegenc_img, multifilesink_img)
tee.get_request_pad("src_%u").link(queue_img.get_static_pad("sink"))
queue_img.link(nvvideoconvert_img)
nvvideoconvert_img.link(jpegenc_img)
jpegenc_img.link(multifilesink_img)
probe_data = {
"stream_id": source_id,
"uri_name": rtsp_url,
}
sinkpad = multifilesink_img.get_static_pad("sink")
if not sinkpad:
sys.stderr.write("Unable to get sink pad of multifilesink \n")
sys.exit(1)
sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
create_rtsp_out(udp_port)
print("Added stream specific element to pipeline")
return True
def update_pgie_streamux_batch_size(batch_size):
global pgie, streammux
pgie.set_property("batch-size", batch_size)
streammux.set_property("batch-size", batch_size)
print(f"Updated batch size of pgie and streammux to {batch_size}")
def add_sources(rtsp_url):
global g_num_sources, pipeline, pgie, streammux, nvstreamdemux
if rtsp_url in dict_rtsp_source_bin:
print(f"Source {rtsp_url} already in active sources", dict_rtsp_source_bin)
return
source_id = g_num_sources
print(f"Adding new source to the pipeline id {source_id} url {rtsp_url}")
source_bin = create_uridecode_bin(source_id, rtsp_url)
if not source_bin:
sys.stderr.write("Failed to create source bin. Exiting.")
return False
pipeline.add(source_bin)
state_return = source_bin.set_state(Gst.State.PLAYING)
if state_return == Gst.StateChangeReturn.SUCCESS:
print("STATE CHANGE SUCCESS")
elif state_return == Gst.StateChangeReturn.FAILURE:
print("STATE CHANGE FAILURE")
elif state_return == Gst.StateChangeReturn.ASYNC:
state_return = source_bin.get_state(Gst.CLOCK_TIME_NONE)
elif state_return == Gst.StateChangeReturn.NO_PREROLL:
print("STATE CHANGE NO PREROLL")
if add_stream_sepecific_element(source_id, rtsp_url, pipeline, nvstreamdemux):
dict_rtsp_source_bin[rtsp_url] = [source_id, source_bin]
current_active_sources = len(dict_rtsp_source_bin)
print("current_active_sources", current_active_sources)
print("dict_rtsp_source_bin", dict_rtsp_source_bin)
update_pgie_streamux_batch_size(current_active_sources)
global perf_data
stream_index = f"stream{source_id}"
perf_data.add_stream(stream_index)
print(f"New source added: {rtsp_url}, registered as {stream_index}")
g_num_sources += 1
return True
def delete_roi_and_line_sections(stream_id, config_file):
with open(config_file, "r") as file:
lines = file.readlines()
roi_section_header = f"[roi-filtering-stream-{stream_id}]"
line_section_header = f"[line-crossing-stream-{stream_id}]"
updated_lines = []
in_section = False
section_found = False
for line in lines:
if line.strip().startswith("["):
if line.strip() in (roi_section_header, line_section_header):
in_section = True
section_found = True
continue
else:
in_section = False
if not in_section:
updated_lines.append(line)
# If no sections were found, return False
if not section_found:
return False
# Write the updated lines back to the file
with open(config_file, "w") as file:
file.writelines(updated_lines)
return True
def delete_source(rtsp_url):
global pipeline, streammux, nvstreamdemux, loop, nvanalytics
if rtsp_url not in dict_rtsp_source_bin:
print(f"Source {rtsp_url} not found in active sources {dict_rtsp_source_bin}")
return
source_id = dict_rtsp_source_bin[rtsp_url][0]
source_bin = dict_rtsp_source_bin[rtsp_url][1]
print(f"Trying to delete source {source_id}: {rtsp_url}")
source_bin.set_state(Gst.State.NULL)
# Wait for source bin to reach NULL state (with timeout)
for _ in range(20):
if source_bin.get_state(0.1)[1] == Gst.State.NULL:
break
time.sleep(0.1)
else:
print("Timeout: Source bin did not reach NULL state.")
return
print("Source bin set to NULL state successfully.")
# streammux sink pad
pad_name = f"sink_{source_id}"
sinkpad = streammux.get_static_pad(pad_name)
if sinkpad:
print(f"Flushing and releasing pad {pad_name} from streammux.")
sinkpad.send_event(Gst.Event.new_flush_start())
sinkpad.send_event(Gst.Event.new_flush_stop(False))
# Unlink the pad before releasing
streammux.unlink(source_bin)
streammux.release_request_pad(sinkpad)
else:
print(f"Warning: Pad {pad_name} not found in streammux.")
pipeline.remove(source_bin)
# demux src pad
demux_pad_name = f"src_{source_id}"
demuxsrcpad = nvstreamdemux.get_static_pad(demux_pad_name)
if demuxsrcpad:
print(f"Flushing and releasing pad {demux_pad_name} from nvstreamdemux.")
else:
print(f"Warning: Pad {demux_pad_name} not found in nvstreamdemux.")
# queue sink pad
queue = pipeline.get_by_name("queue_nvosd"+str(source_id))
if not queue:
sys.stderr.write("Stop does not get the queue")
queuesinkpad = queue.get_static_pad("sink")
# unlink demux src pad and queue sink pad
ret = demuxsrcpad.unlink(queuesinkpad)
elements_to_remove = [
"queue_nvosd"+str(source_id), "nvvideoconvert_nvosd"+str(source_id), "nvdsosd"+str(source_id), "nvsink-tee"+str(source_id),
"queue_udp"+str(source_id), "nvvideoconvert_udp"+str(source_id), "caps_udp"+str(source_id),
"encoder"+str(source_id), "rtppay"+str(source_id),"udpsink"+str(source_id),
"queue_multifile"+str(source_id), "nvvideoconvert_multifile"+str(source_id),
"jpegenc"+str(source_id), "multifilesink"+str(source_id)
]
for elem_name in elements_to_remove:
elem = pipeline.get_by_name(elem_name)
if elem:
elem.set_state(Gst.State.NULL)
pipeline.remove(elem)
print(f"Removed element: {elem_name}")
dict_rtsp_source_bin.pop(rtsp_url)
print(f"Source {source_id} {rtsp_url} removed successfully.")
current_active_sources = len(dict_rtsp_source_bin)
print("current_active_sources", current_active_sources)
print("dict_rtsp_source_bin", dict_rtsp_source_bin)
if (current_active_sources == 0):
loop.quit()
print("All sources stopped, quitting")
return
update_pgie_streamux_batch_size(current_active_sources)
stream_index = f"stream{source_id}"
perf_data.delete_stream(stream_index)
print(f"deleted source {source_id} {rtsp_url} in perf data")
if delete_roi_and_line_sections(source_id, analytics_config):
nvanalytics.set_property("config-file", analytics_config)
print(f"Reloaded nvanalytics config after deleting stream {source_id}")
return True
@app.post("/add_source")
def api_add_source(rtsp_url: str):
try:
add_sources(rtsp_url)
return {"message": "RTSP source added successfully", "url": rtsp_url}
except Exception as e:
print("error in api_add_source()", e)
raise HTTPException(status_code=500, detail=str(e))
@app.post("/delete_source")
def api_delete_source(rtsp_url: str):
try:
delete_source(rtsp_url)
return {"message": "RTSP source deleted successfully", "url": rtsp_url}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
@app.post("/update_analytics")
def update_analytics(rtsp_url: str, zone_name: str = None, roi_value: str = None, line_value: str = None):
try:
if not roi_value and not line_value:
raise HTTPException(status_code=400, detail="At least one of roi_value or line_value must be provided")
update_analytics_config(rtsp_url, zone_name=zone_name, roi_value=roi_value, line_value=line_value)
return {
"message": "Analytics configuration updated successfully",
"stream_id": rtsp_url,
"roi_value": roi_value,
"line_value": line_value,
}
except Exception as e:
raise HTTPException(status_code=500, detail=str(e))
def run_api_server():
uvicorn.run(app, host="0.0.0.0", port=8000)
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream")
loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s" % (err, debug))
loop.quit()
return True
# Image save
def extract_rtsp_details(rtsp_url):
parsed_uri = urlparse(rtsp_url)
if parsed_uri.path and parsed_uri.path != '/':
return parsed_uri.path.split('/')[-1]
else:
return f"{parsed_uri.port}"
IST = pytz.timezone('Asia/Kolkata')
def create_dynamic_path(base_path, ip_port):
now = datetime.now()
now = datetime.now(IST)
dynamic_path = os.path.join(
base_path,
ip_port,
str(now.year)+"_year",
f"{now.month:02d}_month",
f"{now.day:02d}_day",
f"{now.hour:02d}_hour",
f"{now.minute:02d}_minute",
)
os.makedirs(dynamic_path, exist_ok=True)
return dynamic_path
last_saved_time = {}
save_img_time_interval = 10
def frame_filter_pad_probe(pad, info, user_data):
global last_saved_time
current_time = time.time()
stream_id = user_data["stream_id"]
uri_name = user_data["uri_name"]
if stream_id not in last_saved_time:
last_saved_time[stream_id] = 0
if current_time - last_saved_time[stream_id] >= save_img_time_interval:
last_saved_time[stream_id] = current_time
cam_id_ip_port = extract_rtsp_details(uri_name)
base_path = "/Deepstream_output/rtsp_v3"
image_folder_path = create_dynamic_path(base_path, cam_id_ip_port)
now = datetime.now(IST)
image_name = f"{now.year}_{now.month:02d}_{now.day:02d}_{now.hour:02d}_{now.minute:02d}_{now.second:02d}.jpg"
image_save_path = os.path.join(image_folder_path, image_name)
# print('image_save_path', image_save_path)
multifilesink = pad.get_parent_element()
multifilesink.set_property("location", image_save_path)
return Gst.PadProbeReturn.OK
rtsp_server = None
def create_rtsp_out(new_udp_port):
global rtsp_server
rtsp_port_num = 8554
vcodec = "H264"
print("new_udp_port",new_udp_port)
if rtsp_server is None:
print(f"Initializing RTSP server on port {rtsp_port_num}")
rtsp_server = GstRtspServer.RTSPServer.new()
rtsp_server.props.service = str(rtsp_port_num)
rtsp_server.attach(None)
mount_point = f"/stream_{new_udp_port}"
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch(
'( udpsrc name=pay0 port=%d buffer-size=524288 caps="application/x-rtp, media=video, clock-rate=90000, encoding-name=%s, payload=96 " )'
% (new_udp_port, vcodec)
)
factory.set_shared(True)
rtsp_server.get_mount_points().add_factory(mount_point, factory)
print(f"DeepStream: Launched RTSP Streaming at rtsp://localhost:{rtsp_port_num}{mount_point}")
def main(args):
global pipeline, tiler, pgie, streammux, nvanalytics, nvstreamdemux, loop, g_num_sources, udp_ports
print('args', args, len(args))
num_sources = len(args)-1
print("num_sources",num_sources)
g_num_sources = num_sources
global perf_data
perf_data = PERF_DATA(num_sources)
global platform_info
platform_info = PlatformInfo()
Gst.init(None)
print("Creating Pipeline")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline ")
print("Creating streammux")
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux ")
streammux.set_property("batched-push-timeout", 40000)
streammux.set_property("batch-size", 1)
streammux.set_property("gpu_id", GPU_ID)
streammux.set_property('live-source', 1)
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', num_sources)
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
streammux.set_property("sync-inputs",1)
streammux.set_property("attach-sys-ts",False)
"""
attach-sys-ts - True
Host system time as NTP timestamp - when the frame is received by the NvStreamMux.
This method requires you to synchronize host system to NTP clock. To attach host system time as NTP timestamp, you'll need to set attach-sys-ts to TRUE on nvstreammux.
"""
streammux.set_property("max-latency",1000000000)
streammux.set_property("frame-duration",125)
pipeline.add(streammux)
uri = args[1]
for i in range(num_sources):
print(f"Creating source_bin for source id {i}")
uri_name=args[i+1]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=create_uridecode_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Failed to create source bin. Exiting. ")
sys.exit(1)
pipeline.add(source_bin)
dict_rtsp_source_bin[uri_name] = [i, source_bin]
print("dict_rtsp_source_bin", dict_rtsp_source_bin)
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue1.set_property("max-size-buffers", 200)
queue1.set_property("leaky", True)
queue2.set_property("max-size-buffers", 200)
queue2.set_property("leaky", True)
queue3.set_property("max-size-buffers", 200)
queue3.set_property("leaky", True)
queue4.set_property("max-size-buffers", 200)
queue4.set_property("leaky", True)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
nvanalytics = Gst.ElementFactory.make("nvdsanalytics", "analytics")
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
nvanalytics.set_property("config-file", analytics_config)
pgie.set_property('config-file-path', "dsnvanalytics_pgie_config.txt")
pgie_batch_size=pgie.get_property("batch-size")
if(pgie_batch_size != num_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", num_sources," \n")
pgie.set_property("batch-size",num_sources)
config = configparser.ConfigParser()
config.read('dsnvanalytics_tracker_config.txt')
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
pipeline.add(queue1,queue2,queue3,queue4,pgie,tracker,nvanalytics,nvstreamdemux)
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2)
queue2.link(tracker)
tracker.link(queue3)
queue3.link(nvanalytics)
nvanalytics.link(queue4)
queue4.link(nvstreamdemux)
for i in range(num_sources):
# common
queue5 = Gst.ElementFactory.make("queue", "queue_nvosd"+str(i))
nvvideoconvert1 =Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_nvosd"+str(i))
nvdsosd = Gst.ElementFactory.make("nvdsosd", "nvdsosd"+str(i))
tee = Gst.ElementFactory.make("tee", "nvsink-tee"+str(i))
queue5.set_property("max-size-buffers", 200)
queue5.set_property("leaky", True)
nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)
pipeline.add(queue5,nvvideoconvert1,nvdsosd,tee)
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.get_request_pad(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad")
queuesinkpad = queue5.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad")
nvdsosd.set_property("process-mode", OSD_PROCESS_MODE)
nvdsosd.set_property("display-text", OSD_DISPLAY_TEXT)
demuxsrcpad.link(queuesinkpad)
queue5.link(nvvideoconvert1)
nvvideoconvert1.link(nvdsosd)
nvdsosd.link(tee)
# rtsp out
queue_udp = Gst.ElementFactory.make("queue", "queue_udp"+str(i))
nvvideoconvert_udp = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_udp"+str(i))
caps_udp = Gst.ElementFactory.make("capsfilter", "caps_udp"+str(i))
encoder_udp = Gst.ElementFactory.make("nvv4l2h264enc", "encoder"+str(i))
rtppay_udp = Gst.ElementFactory.make("rtph264pay", "rtppay"+str(i))
udpsink_udp = Gst.ElementFactory.make("udpsink", "udpsink"+str(i))
queue_udp.set_property("max-size-buffers", 200)
queue_udp.set_property("leaky", True)
caps_udp.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM),format=I420"))
udp_port = 5400 + i
udpsink_udp.set_property("host", "224.224.255.255")
udpsink_udp.set_property("port", udp_port)
udpsink_udp.set_property("async", False)
udpsink_udp.set_property("sync", 0)
pipeline.add(queue_udp,nvvideoconvert_udp,caps_udp,encoder_udp,rtppay_udp,udpsink_udp)
tee.get_request_pad("src_%u").link(queue_udp.get_static_pad("sink"))
queue_udp.link(nvvideoconvert_udp)
nvvideoconvert_udp.link(caps_udp)
caps_udp.link(encoder_udp)
encoder_udp.link(rtppay_udp)
rtppay_udp.link(udpsink_udp)
# img save
queue_img = Gst.ElementFactory.make("queue", "queue_multifile"+str(i))
nvvideoconvert_img = Gst.ElementFactory.make("nvvideoconvert", "nvvideoconvert_multifile"+str(i))
jpegenc_img = Gst.ElementFactory.make("jpegenc", "jpegenc"+str(i))
multifilesink_img = Gst.ElementFactory.make("multifilesink", "multifilesink"+str(i))
queue_img.set_property("max-size-buffers", 200)
queue_img.set_property("leaky", True)
multifilesink_img.set_property("post-messages", True)
pipeline.add(queue_img, nvvideoconvert_img, jpegenc_img, multifilesink_img)
tee.get_request_pad("src_%u").link(queue_img.get_static_pad("sink"))
queue_img.link(nvvideoconvert_img)
nvvideoconvert_img.link(jpegenc_img)
jpegenc_img.link(multifilesink_img)
probe_data = {
"stream_id": i,
"uri_name": args[i + 1],
}
sinkpad = multifilesink_img.get_static_pad("sink")
if not sinkpad:
sys.stderr.write("Unable to get sink pad of multifilesink \n")
sys.exit(1)
sinkpad.add_probe(Gst.PadProbeType.BUFFER, frame_filter_pad_probe, probe_data)
create_rtsp_out(udp_port)
nvanalytics_src_pad=nvanalytics.get_static_pad("src")
if not nvanalytics_src_pad:
sys.stderr.write(" Unable to get src pad \n")
else:
nvanalytics_src_pad.add_probe(Gst.PadProbeType.BUFFER, nvanalytics_src_pad_buffer_probe, 0)
GLib.timeout_add(5000, perf_data.perf_print_callback)
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
pipeline.set_state(Gst.State.PAUSED)
print("Now playing...")
for i, source in enumerate(args):
if (i != 0):
print(i, ": ", source)
print("Starting pipeline ")
pipeline.set_state(Gst.State.PLAYING)
api_thread = Thread(target=run_api_server, daemon=True)
api_thread.start()
try:
loop.run()
except:
print("Stopping pipeline")
pass
print("Exiting app")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
sys.exit(main(sys.argv))