- Here is the leak summary from valgrind
==293== LEAK SUMMARY:
==293== definitely lost: 162,176 bytes in 16 blocks
==293== indirectly lost: 5,434 bytes in 25 blocks
==293== possibly lost: 27,643,549 bytes in 575 blocks
==293== still reachable: 167,114,944 bytes in 127,899 blocks
==293== of which reachable via heuristic:
==293== stdstring : 74,043 bytes in 614 blocks
==293== length64 : 3,368 bytes in 65 blocks
==293== newarray : 1,984 bytes in 44 blocks
==293== suppressed: 0 bytes in 0 blocks
==293== Reachable blocks (those to which a pointer was found) are not shown.
==293== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==293==
==293== For lists of detected and suppressed errors, rerun with: -s
==293== ERROR SUMMARY: 4862 errors from 606 contexts (suppressed: 0 from 0)
- I have attached the script where no separate thread is called and the pipeline is set to play and set to null. This is the same script that I ran Valgrind with
from queue import Queue
from threading import Thread
from multiprocessing import Process
import gi
gi.require_version('Gst', '1.0')
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GLib, Gst, GstRtspServer, GObject
import gc
from ctypes import *
import time
import sys
import math
import pyds
from collections import deque
import numpy as np
import cv2
Gst.init(None)
class SingleCameraPeopleDetector:
def __init__(self, uri_name) -> None:
#self.loop = GLib.MainLoop()
self.bitrate = 4000000
self.pipeline = Gst.Pipeline()
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
self.pipeline.add(streammux)
source_bin = self.create_source_bin(0, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
self.pipeline.add(source_bin)
padname = "sink_0"
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv \n")
fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
if not fakesink:
sys.stderr.write(" Unable to create fakesink \n")
## Streammux settings
streammux.set_property('live-source', 1)
streammux.set_property("width", 1920)
streammux.set_property("height", 1080)
streammux.set_property("batch-size", 1)
streammux.set_property("batched-push-timeout", 4000000)
## PGIE settings
pgie.set_property("config-file-path", "configs/peoplenet_detector_config.txt")
pgie.set_property("batch-size", 1)
pgie.set_property("model-engine-file", "configs/models/peoplenet_detector/resnet34_peoplenet_pruned_int8.etlt_b1_gpu0_int8.engine")
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
self.pipeline.add(pgie)
self.pipeline.add(nvvidconv1)
self.pipeline.add(fakesink)
streammux.link(pgie)
pgie.link(nvvidconv1)
nvvidconv1.link(fakesink)
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.bus_call)
def bus_call(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
sys.stdout.write("End-of-stream\n")
#loop.quit()
elif t==Gst.MessageType.WARNING:
err, debug = message.parse_warning()
sys.stderr.write("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
sys.stderr.write("Error: %s: %s\n" % (err, debug))
#loop.quit()
# elif t == Gst.MessageType.STATE_CHANGED:
# old_state, new_state, pending_state = message.parse_state_changed()
# print('%s : State changed from %s to %s' % (message.src.get_name(), old_state, new_state))
else:
pass
return True
def cb_newpad(self, decodebin, decoder_src_pad, data):
print("In cb_newpad\n")
caps = decoder_src_pad.get_current_caps()
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
source_bin = data
features = caps.get_features(0)
# Need to check if the pad created by the decodebin is for video and not
# audio.
if gstname.find("video") != -1:
# Link the decodebin pad only if decodebin has picked nvidia
# decoder plugin nvdec_*. We do this by checking if the pad caps contain
# NVMM memory features.
print("features=", features)
if features.contains("memory:NVMM"):
# Get the source bin ghost pad
bin_ghost_pad = source_bin.get_static_pad("src")
if not bin_ghost_pad.set_target(decoder_src_pad):
sys.stderr.write(
"Failed to link decoder src pad to source bin ghost pad\n"
)
else:
sys.stderr.write(
" Error: Decodebin did not pick nvidia decoder plugin.\n")
def decodebin_child_added(self, child_proxy, Object, name, user_data):
print("Decodebin child added:", name, "\n")
if name.find("decodebin") != -1:
Object.connect("child-added", self.decodebin_child_added, user_data)
def create_source_bin(self, index, uri):
print("Creating source bin")
# Create a source GstBin to abstract this bin's content from the rest of the
# pipeline
bin_name = "source-bin-%02d" % index
print(bin_name)
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(" Unable to create source bin \n")
# Source element for reading from the uri.
# We will use decodebin and let it figure out the container format of the
# stream and the codec and plug the appropriate demux and decode plugins.
uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
if not uri_decode_bin:
sys.stderr.write(" Unable to create uri decode bin \n")
# We set the input uri to the source element
uri_decode_bin.set_property("uri", uri)
# Connect to the "pad-added" signal of the decodebin which generates a
# callback once a new pad for raw data has beed created by the decodebin
uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)
# We need to create a ghost pad for the source bin which will act as a proxy
# for the video decoder src pad. The ghost pad will not have a target right
# now. Once the decode bin creates the video decoder and generates the
# cb_newpad callback, we will set the ghost pad target to the video decoder
# src pad.
Gst.Bin.add(nbin, uri_decode_bin)
bin_pad = nbin.add_pad(
Gst.GhostPad.new_no_target(
"src", Gst.PadDirection.SRC))
if not bin_pad:
sys.stderr.write(" Failed to add ghost pad in source bin \n")
return None
return nbin
def play(self):
self.pipeline.set_state(Gst.State.PLAYING)
def stop(self):
self.pipeline.set_state(Gst.State.NULL)
gc.collect()
detector = SingleCameraPeopleDetector("rtsp://18.185.101.169:8554/stream1")
for i in range(1):
print ("Detector starting for the {} time".format(i))
detector.play()
# Check HTOP or similar system resource tool to see the resource used
# while the script runs for 20 seconds.
t_end = time.time() + 40
while time.time() < t_end:
print ("Running")
time.sleep(1)
# Set the pipeline state to NULL and exit the bus call loop thread
detector.stop()
print ("Detector stopped")
# Check the htop again to see if the resouces are relenquished
t_end = time.time() + 20
while time.time() < t_end:
print ("Stopped")
time.sleep(1)
print ("Script ended")
For the sake of completeness, this is the htop memory increase that is observed,
Before the program starts: 949 MB
During runtime-1: 3.08 GB
Set to NULL-1: 3.05 GB
During runtime-2: 3.15 GB
Set to NULL-2: 3.12 GB
During runtime-3: 3.17 GB
Set to NULL-3: 3.15 GB
Exiting the program: 950 MB