Smart recording on single GPU

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU)
• DeepStream Version
• JetPack Version (valid for Jetson only)
• TensorRT Version
• NVIDIA GPU Driver Version (valid for GPU only)
• Issue Type( questions, new requirements, bugs)
• How to reproduce the issue ? (This is for bugs. Including which sample app is using, the configuration files content, the command line used and other details for reproducing)
• Requirement details( This is for new requirement. Including the module name-for which plugin or for which sample application, the function description)

dGPU
DS version 7.0
TRT 8.6.1
Driver version 535.230.02

I am using Deepstream-7.0 for smart recording, but on multiple GPU’s I am facing some difficulties.
Say I want to trigger smart recording on a GPU1, the smart record triggers on GPU1, starts the recording, but when the recording stops, the process automatically spawns on GPU0, I have tried using Deepstream 7.1, and have looked at other forum topics, and found no solution to my problem,

  1. can this issue be reprodcued on DeepStream7.1? you can use 7.1 docker.
  2. if yes in question1, which sample are testng or referring to? what is the complete pipeline? how did you trigger smart recording on a GPU1? could you share a simplified code to reprodcue this issue? Thanks!

I have mentioned earlier that I have tried DS7.1 as well, an the issue persists.

From this above forum Topic, the deepstream-test3.c suggested by Nvidia.
If the nvurisrcbin’s GPUid is 1 that means it should run on GPU 1 right? so if the entire pipeline is on GPU1 how can it generate a process with same ID on GPU0

  1. To rule out the other plugins, if only using “nvurisrcbin->fakesink” pipeline in test3, will the issue persist?
  2. if yes in the question 1, will the issue persist without triggering smart recording? wondering if it is related to smart recording.

I haven’t tried that yet!

I have faced this issue only with smart-recording, when the smart recording is triggered.

Without streammux?

YES, without streammux. If the issue can’t be reproduced with “nvurisrcbin->fakesink” pipeline, that means the issue is not related to nvurisrcbin. then you can add other plugins one by one, for example, will the issue persist if using “nvurisrcbin->streammux->fakesink”? Thanks!

yes, that is right? does this have to do with .c or python?

it is right . please refer to the following actual pipeline. No. You can use C or python. the goal is to find the simplest pipeline that reproduces the problem.

gst-launch-1.0 -v nvurisrcbin uri=rtsp://xxx  ! mux.sink_0 nvstreammux name=mux batch-size=1 width=1280 height=720  ! fakesink

You are using streammux!!

Where are we checking if the start_sr is triggered and how will I know which GPU this gst-launch is running on?

You told to run without streammux, and also the start_sr signal properties are not set in this pipeline, and the GPU id as well

The gst-luanch pipeline is just a sample. you need to implement this pipeline and start smart recording in code. please refer to my last two comments. I means check “nvurisrcbin->fakesink” in code first.

import sys
sys.path.append("../")
# from bus_call import bus_call
import pyds
import math
import time
from ctypes import *
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
import configparser
import datetime
import os
import argparse
import multiprocessing
import time
from threading import Lock
from collections import defaultdict, deque
from typing import Tuple, Dict, Deque
import numpy as np
import traceback, time, copy 
import faulthandler
faulthandler.enable()
import uuid
from os import environ
import ctypes
start_time=time.time()

fps_mutex = Lock()

class SRUserContext(ctypes.Structure):
    _fields_ = [
        ("sessionid", ctypes.c_int),
        ("name", ctypes.c_char * 32)
    ]





# (Other constants and definitions remain unchanged)
MUXER_BATCH_TIMEOUT_USEC = 33000


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True

def cb_newpad(decodebin, decoder_src_pad, data):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)
    print("gstname=", gstname)
    if gstname.find("video") != -1:
        print("features=", features)
        if features.contains("memory:NVMM"):
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write("Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

def create_source_bin(index, uri,file_loop, gpu_id, list_for_nvurisrcbin):
    print("Creating source bin")
    bin_name = "source-bin-%02d" % index
    print(bin_name)
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write("Unable to create source bin \n")
    if file_loop:
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "nvurisrcbin")
    if not uri_decode_bin:
        sys.stderr.write("Unable to create uri decode bin \n")
    uri_decode_bin.set_property("uri", uri)
    uri_decode_bin.set_property("source-id", index)

    
    # smart record property added added ....
    
    
    
    base_path = f"/opt/nvidia/deepstream/deepstream-7.0/nvodin24/video/{index}_{uuid.uuid4().hex[:8]}"
    uri_decode_bin.set_property("smart-record", 2)
    os.makedirs(base_path, exist_ok=True)
    uri_decode_bin.set_property("smart-rec-dir-path", base_path)
    uri_decode_bin.set_property("smart-rec-cache", 20)
    
    
    
    
    uri_decode_bin.set_property("latency", 1)
    uri_decode_bin.set_property("num-extra-surfaces", 5)
    uri_decode_bin.set_property("gpu-id", gpu_id)
    uri_decode_bin.set_property("rtsp-reconnect-interval", 180)
    uri_decode_bin.connect("sr-done", record_done, nbin)
    uri_decode_bin.set_property("uri",uri)
    list_for_nvurisrcbin.append(uri_decode_bin)
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write("Failed to add ghost pad in source bin \n")
        return None
    return nbin
def start_sr_function(element):
    sessionid = pyds.get_native_ptr(pyds.alloc_buffer(4))
    print(f"sessionid {sessionid}")
    sr_user_context_size = ctypes.sizeof(SRUserContext)
    sr_user_context_buf = pyds.get_native_ptr(pyds.alloc_buffer(sr_user_context_size))
    sr_user_context = pyds.SRUserContext.cast(sr_user_context_buf)
    sr_user_context.sessionid = 42
    sr_user_context.name = "sr-demo " + element.get_name()
    print(f"sr_user_context_buf {sr_user_context_buf} {element.get_name()}")
    element.emit('start-sr', sessionid, 5, 5, sr_user_context_buf)
    pyds.free_gbuffer(sessionid)
    print('******start sr*****')
    return True

def record_done(nvurisrcbin, recordingInfo, user_ctx, user_data):
    print('******sr done*****')
    sr = pyds.SRUserContext.cast(hash(user_ctx))
    print(f"session id {sr.sessionid} -- name {sr.name}")
    pyds.free_buffer(hash(user_ctx))

def pipeline_process(args,name, gpu_id, list_for_nvurisrcbin):
    # args is a list of camera URIs for this pipeline instance (batch)
    number_sources = len(args)
    Gst.init(None)
    print("Creating Pipeline\n")
    pipeline = Gst.Pipeline()
    is_live = False
    if not pipeline:
        sys.stderr.write("Unable to create Pipeline\n")
    # print("Creating streammux\n")
    # streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    # if not streammux:
    #     sys.stderr.write("Unable to create NvStreamMux\n")
    # streammux.set_property("width", 1920)
    # streammux.set_property("height", 1080)
    # streammux.set_property("gpu-id", gpu_id)
    # streammux.set_property("batch-size", number_sources)
    # streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
    # pipeline.add(streammux)

    for i in range(number_sources):
        print("Creating source_bin", i, "\n")
        uri_name = args[i]
        if uri_name.find("rtsp://") == 0:
            file_loop = False
        if uri_name.find("file://") == 0:
            file_loop = True
            print("FILE LOOP iS TRUE -----------> ", uri_name)
        source_bin = create_source_bin(i, uri_name,file_loop, gpu_id, list_for_nvurisrcbin)
        if not source_bin:
            sys.stderr.write("Unable to create source bin\n")
        pipeline.add(source_bin)    
    
    # print("Creating Fakesink \n")
    pipeline_sink = Gst.ElementFactory.make("fakesink", "fakesink")
    pipeline_sink.set_property('enable-last-sample', 0)
    pipeline_sink.set_property('sync', 0)
    pipeline.add(pipeline_sink)
    source_bin.link(pipeline_sink)
    # streammux.link(pipeline_sink)
    print(f"Saving pipeline graph in folder {os.environ.get('GST_DEBUG_DUMP_DOT_DIR')}")
    # print('==================   ~~~~~', self.variation)
    graph_filename = "pipe_img"+str(gpu_id)
    Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, graph_filename)
    
    # Create an event loop and watch for bus messages
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
   
    
    print("Starting pipeline\n")
    pipeline.set_state(Gst.State.PLAYING)
    if gpu_id==1:
        for uribin in list_for_nvurisrcbin:
            timer_id = GLib.timeout_add_seconds(10, start_sr_function, uribin)
    try:
        loop.run()
    except BaseException:
        pass
    pipeline.set_state(Gst.State.NULL)




class Server:
    
    def __init__(self):
        # Adding 20 cameras per process 
        # pls note that exh cmara should haver minimum 20-30 people object 
        camids = [
                    "rtsp://xxxxx"

                ]


        # Wehave 2 l4 on the same cpu 
        # but the same is happening in single l4 also 
        processes = []

        for gpu_id in range(1,2):
            print("_________________________________________________________________________________________________________________________________________________________________________", gpu_id)
            for process_id in range(0,1):
                list_for_nvurisrcbin=[]
                p = multiprocessing.Process(target= pipeline_process, args=(camids,process_id, gpu_id,list_for_nvurisrcbin))
                p.start()
                processes.append(p)
        
        for p in processes:
            p.join()
            
            
Server()

This is my python code with just nvurisrcbin, and fakesink, and there is an additional process spawn on GPU0

The issue persists with just 2 elements, I think it is to do with nvurisrcbin, can you replicate this on your end, and provide the solution. Have been stuck with this for quite sometime now, and tried so many different things. But have not made any progress..

Hello @fanzh

export GST_DEBUG_DUMP_DOT_DIR=`pwd`

I am exporting above command and running my code. It is creating .dot file for every recorded video. I checked one of the .dot files and clearly it is showing me gpu-id: 0. Below I have attached a zip in which it has a .dot file of one of the recorded video. It may help you, please check it once and let us know.

recorded_video_dot_file.zip (2.8 KB)

@rishika.rao @s.Jagannath after checking the dot file you shared, I found gpu-id is not set. could you double check? and noticing you are using uridecodebin, this plugin does not have gpu-id property. you can use gst-launch to check.

nvurisrcbin element, we named as uri_decode_bin. Please check our code attached below once and let us know if any changes required.

import sys
sys.path.append("../")
# from bus_call import bus_call
import pyds
import math
import time
from ctypes import *
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
import configparser
import datetime
import os
import argparse
import multiprocessing
import time
from threading import Lock
from collections import defaultdict, deque
from typing import Tuple, Dict, Deque
import numpy as np
import traceback, time, copy 
import faulthandler
faulthandler.enable()
import uuid
from os import environ
import ctypes
start_time=time.time()

fps_mutex = Lock()
counter=0

class SRUserContext(ctypes.Structure):
    _fields_ = [
        ("sessionid", ctypes.c_int),
        ("name", ctypes.c_char * 32)
    ]





# (Other constants and definitions remain unchanged)
MUXER_BATCH_TIMEOUT_USEC = 33000


def bus_call(bus, message, loop):
    t = message.type
    if t == Gst.MessageType.EOS:
        sys.stdout.write("End-of-stream\n")
        loop.quit()
    elif t==Gst.MessageType.WARNING:
        err, debug = message.parse_warning()
        sys.stderr.write("Warning: %s: %s\n" % (err, debug))
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
    return True

def cb_newpad(decodebin, decoder_src_pad, data):
    print("In cb_newpad\n")
    caps = decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct = caps.get_structure(0)
    gstname = gststruct.get_name()
    source_bin = data
    features = caps.get_features(0)
    print("gstname=", gstname)
    if gstname.find("video") != -1:
        print("features=", features)
        if features.contains("memory:NVMM"):
            bin_ghost_pad = source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write("Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy, Object, name, user_data):
    print("Decodebin child added:", name, "\n")
    if name.find("decodebin") != -1:
        Object.connect("child-added", decodebin_child_added, user_data)

def create_source_bin(index, uri,file_loop, gpu_id, list_for_nvurisrcbin):
    print("Creating source bin")
    bin_name = "source-bin-%02d" % index
    print(bin_name)
    nbin = Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write("Unable to create source bin \n")
    if file_loop:
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", "nvurisrcbin")
    if not uri_decode_bin:
        sys.stderr.write("Unable to create uri decode bin \n")
    uri_decode_bin.set_property("uri", uri)
    uri_decode_bin.set_property("source-id", index)

    
    # smart record property added added ....
    
    
    
    base_path = f"/opt/nvidia/deepstream/deepstream-7.0/nvodin24/video/{index}_{uuid.uuid4().hex[:8]}"
    uri_decode_bin.set_property("smart-record", 2)
    os.makedirs(base_path, exist_ok=True)
    uri_decode_bin.set_property("smart-rec-dir-path", base_path)
    uri_decode_bin.set_property("smart-rec-cache", 20)
    
    
    
    
    uri_decode_bin.set_property("latency", 1)
    uri_decode_bin.set_property("num-extra-surfaces", 5)
    uri_decode_bin.set_property("gpu-id", gpu_id)
    uri_decode_bin.set_property("rtsp-reconnect-interval", 180)
    uri_decode_bin.connect("sr-done", record_done, nbin)
    uri_decode_bin.set_property("uri",uri)
    list_for_nvurisrcbin.append(uri_decode_bin)
    uri_decode_bin.connect("pad-added", cb_newpad, nbin)
    uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
    Gst.Bin.add(nbin, uri_decode_bin)
    bin_pad = nbin.add_pad(Gst.GhostPad.new_no_target("src", Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write("Failed to add ghost pad in source bin \n")
        return None
    return nbin
def start_sr_function(element):
    sessionid = pyds.get_native_ptr(pyds.alloc_buffer(4))
    print(f"sessionid {sessionid}")
    sr_user_context_size = ctypes.sizeof(SRUserContext)
    sr_user_context_buf = pyds.get_native_ptr(pyds.alloc_buffer(sr_user_context_size))
    sr_user_context = pyds.SRUserContext.cast(sr_user_context_buf)
    sr_user_context.sessionid = 42
    sr_user_context.name = "sr-demo " + element.get_name()
    print(f"sr_user_context_buf {sr_user_context_buf} {element.get_name()}")
    element.emit('start-sr', sessionid, 5, 5, sr_user_context_buf)
    pyds.free_gbuffer(sessionid)
    print('******start sr*****')
    global counter
    counter+=1
    if counter==3:
        print(f"Saving pipeline graph in folder {os.environ.get('GST_DEBUG_DUMP_DOT_DIR')}")
        # print('==================   ~~~~~', self.variation)
        graph_filename = "pipe_img"+str(gpu_id)
        Gst.debug_bin_to_dot_file(pipeline, Gst.DebugGraphDetails.ALL, graph_filename)
    return True

def record_done(nvurisrcbin, recordingInfo, user_ctx, user_data):
    print('******sr done*****')
    sr = pyds.SRUserContext.cast(hash(user_ctx))
    print(f"session id {sr.sessionid} -- name {sr.name}")
    pyds.free_buffer(hash(user_ctx))

def pipeline_process(args,name, gpu_id, list_for_nvurisrcbin):
    # args is a list of camera URIs for this pipeline instance (batch)
    number_sources = len(args)
    Gst.init(None)
    print("Creating Pipeline\n")
    pipeline = Gst.Pipeline()
    is_live = False
    if not pipeline:
        sys.stderr.write("Unable to create Pipeline\n")
    print("Creating streammux\n")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write("Unable to create NvStreamMux\n")
    streammux.set_property("width", 1920)
    streammux.set_property("height", 1080)
    streammux.set_property("gpu-id", gpu_id)
    streammux.set_property("batch-size", number_sources)
    streammux.set_property("batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC)
    pipeline.add(streammux)
    for i in range(number_sources):
        print("Creating source_bin", i, "\n")
        uri_name = args[i]
        if uri_name.find("rtsp://") == 0:
            file_loop = False
        if uri_name.find("file://") == 0:
            file_loop = True
            print("FILE LOOP iS TRUE -----------> ", uri_name)
        source_bin = create_source_bin(i, uri_name,file_loop, gpu_id, list_for_nvurisrcbin)
        if not source_bin:
            sys.stderr.write("Unable to create source bin\n")
        pipeline.add(source_bin)
        padname = "sink_%u" % i
        sinkpad = streammux.request_pad_simple(padname)
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin\n")
        srcpad = source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin\n")
        srcpad.link(sinkpad)
    
    
    print("Creating Fakesink \n")
    pipeline_sink = Gst.ElementFactory.make("fakesink", "fakesink")
    pipeline_sink.set_property('enable-last-sample', 0)
    pipeline_sink.set_property('sync', 0)
    pipeline.add(pipeline_sink)
    streammux.link(pipeline_sink)
    
    
    # Create an event loop and watch for bus messages
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect("message", bus_call, loop)
   
    
    print("Starting pipeline\n")
    pipeline.set_state(Gst.State.PLAYING)
    
    if gpu_id==1:
        for uribin in list_for_nvurisrcbin:
            timer_id = GLib.timeout_add_seconds(10, start_sr_function, uribin)
    try:
        loop.run()
    except BaseException:
        pass
    pipeline.set_state(Gst.State.NULL)




class Server:
    
    def __init__(self):
        # Adding 20 cameras per process 
        # pls note that exh cmara should haver minimum 20-30 people object 
        camids = [
                    "rtsp://xxxx/xxxx"
            
                ]


        # Wehave 2 l4 on the same cpu 
        # but the same is happening in single l4 also 
        processes = []

        for gpu_id in range(1,2):
            print("_________________________________________________________________________________________________________________________________________________________________________", gpu_id)
            for process_id in range(0,1):
                list_for_nvurisrcbin=[]
                p = multiprocessing.Process(target= pipeline_process, args=(camids,process_id, gpu_id,list_for_nvurisrcbin))
                p.start()
                processes.append(p)
        
        for p in processes:
            p.join()
            
            
Server()