import sys
sys.path.append("../")
# from bus_call import bus_call
import pyds
import math
import time
from ctypes import *
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
import configparser
import datetime
import os
import argparse
import multiprocessing
import time
from threading import Lock
from collections import defaultdict, deque
from typing import Tuple, Dict, Deque
import numpy as np
import traceback, time, copy
import faulthandler
faulthandler.enable()
import uuid
from os import environ
import ctypes
start_time=time.time()
fps_mutex = Lock()
class SRUserContext(ctypes.Structure):
_fields_ = [
("sessionid", ctypes.c_int),
("name", ctypes.c_char * 32)
]
# (Other constants and definitions remain unchanged)
MUXER_BATCH_TIMEOUT_USEC = 33000
def bus_call(bus, message, loop):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
loop.quit()
return True
def cb_newpad(decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
if not caps:
caps = decoder_src_pad.query_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
print("gstname=", gstname)
if gstname.find("video") != -1:
print("features=", features)
if features.contains("memory:NVMM"):
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
else:
sys.stderr.write("Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(child_proxy, Object, name, user_data):
print("Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect("child-added", decodebin_child_added, user_data)
def create_source_bin(index, uri,file_loop, gpu_id, list_for_nvurisrcbin):
print("Creating source bin")
bin_name = "source-bin-%02d" % index
print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write("Unable to create source bin \n")
if file_loop:
uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
uri_decode_bin.set_property("file-loop", 1)
uri_decode_bin.set_property("cudadec-memtype", 0)
else:
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "nvurisrcbin")
if not uri_decode_bin:
sys.stderr.write("Unable to create uri decode bin \n")
uri_decode_bin.set_property("uri", uri)
uri_decode_bin.set_property("source-id", index)
# smart record property added added ....
base_path = f"/opt/nvidia/deepstream/deepstream-7.0/nvodin24/video/{index}_{uuid.uuid4().hex[:8]}"
uri_decode_bin.set_property("smart-record", 2)
os.makedirs(base_path, exist_ok=True)
uri_decode_bin.set_property("smart-rec-dir-path", base_path)
uri_decode_bin.set_property("smart-rec-cache", 20)
uri_decode_bin.set_property("latency", 1)
uri_decode_bin.set_property("num-extra-surfaces", 5)
uri_decode_bin.set_property("gpu-id", gpu_id)
uri_decode_bin.set_property("rtsp-reconnect-interval", 180)
uri_decode_bin.connect("sr-done", record_done, nbin)
uri_decode_bin.set_property("uri",uri)
list_for_nvurisrcbin.append(uri_decode_bin)
uri_decode_bin.connect("pad-added", cb_newpad, nbin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write("Failed to add ghost pad in source bin \n")
return None
return nbin
def start_sr_function(element):
sessionid = pyds.get_native_ptr(pyds.alloc_buffer(4))
print(f"sessionid {sessionid}")
sr_user_context_size = ctypes.sizeof(SRUserContext)
sr_user_context_buf = pyds.get_native_ptr(pyds.alloc_buffer(sr_user_context_size))
sr_user_context = pyds.SRUserContext.cast(sr_user_context_buf)
sr_user_context.sessionid = 42
sr_user_context.name = "sr-demo " + element.get_name()
print(f"sr_user_context_buf {sr_user_context_buf} {element.get_name()}")
element.emit('start-sr', sessionid, 5, 5, sr_user_context_buf)
pyds.free_gbuffer(sessionid)
print('******start sr*****')
return True
def record_done(nvurisrcbin, recordingInfo, user_ctx, user_data):
print('******sr done*****')
sr = pyds.SRUserContext.cast(hash(user_ctx))
print(f"session id {sr.sessionid} -- name {sr.name}")
pyds.free_buffer(hash(user_ctx))
def pipeline_process(args,name, gpu_id, list_for_nvurisrcbin):
# args is a list of camera URIs for this pipeline instance (batch)
number_sources = len(args)
Gst.init(None)
print("Creating Pipeline\n")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write("Unable to create Pipeline\n")
# print("Creating streammux\n")
# streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
# if not streammux:
# sys.stderr.write("Unable to create NvStreamMux\n")
# streammux.set_property("width", 1920)
# streammux.set_property("height", 1080)
# streammux.set_property("gpu-id", gpu_id)
# streammux.set_property("batch-size", number_sources)
# streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
# pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin", i, "\n")
uri_name = args[i]
if uri_name.find("rtsp://") == 0:
file_loop = False
if uri_name.find("file://") == 0:
file_loop = True
print("FILE LOOP iS TRUE -----------> ", uri_name)
source_bin = create_source_bin(i, uri_name,file_loop, gpu_id, list_for_nvurisrcbin)
if not source_bin:
sys.stderr.write("Unable to create source bin\n")
pipeline.add(source_bin)
# print("Creating Fakesink \n")
pipeline_sink = Gst.ElementFactory.make("fakesink", "fakesink")
pipeline_sink.set_property('enable-last-sample', 0)
pipeline_sink.set_property('sync', 0)
pipeline.add(pipeline_sink)
source_bin.link(pipeline_sink)
# streammux.link(pipeline_sink)
print(f"Saving pipeline graph in folder {os.environ.get('GST_DEBUG_DUMP_DOT_DIR')}")
# print('================== ~~~~~', self.variation)
graph_filename = "pipe_img"+str(gpu_id)
Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, graph_filename)
# Create an event loop and watch for bus messages
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
print("Starting pipeline\n")
pipeline.set_state(Gst.State.PLAYING)
if gpu_id==1:
for uribin in list_for_nvurisrcbin:
timer_id = GLib.timeout_add_seconds(10, start_sr_function, uribin)
try:
loop.run()
except BaseException:
pass
pipeline.set_state(Gst.State.NULL)
class Server:
def __init__(self):
# Adding 20 cameras per process
# pls note that exh cmara should haver minimum 20-30 people object
camids = [
"rtsp://xxxxx"
]
# Wehave 2 l4 on the same cpu
# but the same is happening in single l4 also
processes = []
for gpu_id in range(1,2):
print("_________________________________________________________________________________________________________________________________________________________________________", gpu_id)
for process_id in range(0,1):
list_for_nvurisrcbin=[]
p = multiprocessing.Process(target= pipeline_process, args=(camids,process_id, gpu_id,list_for_nvurisrcbin))
p.start()
processes.append(p)
for p in processes:
p.join()
Server()
This is my python code with just nvurisrcbin, and fakesink, and there is an additional process spawn on GPU0