Deepstream doesn't let go of the resources after setting the pipeline to NULL

Hi,

The application we are developing requires us to create, play, stop, and destroy the pipeline multiple times. However, we have noticed that once the GStreamer pipeline state is set to NULL, it doesn’t let go of the resources (as observed in htop). I have attached a script test.py with comments to test this along with models and configs required

. Note that I have attached a fakesink for the purpose of simplicity but the issue persists with other kinds of sinks too. Please let me know if I am doing something wrong in stopping the pipeline or relinquishing the resources

test.zip (8.5 MB)

• Hardware Platform: GPU
• DeepStream Version: 6.3
• NVIDIA GPU Driver Version (valid for GPU only): 545.23.06
• Issue Type( questions, new requirements, bugs): Bug/Question
• How to reproduce the issue ?: Attached a script with comments along with the topic

1 Like

Gst.init(None) will initialize some related resources, and these resources will only be released after the process exits.
Also if the model has not changed, please do not comment out the following configuration.

#model-engine-file=models/peoplenet_detector/resnet34_peoplenet_pruned_int8.etlt_b1_gpu0_int8.engine

Even with Gst.init(None) outside the script and initializing it only once, setting the state to NULL does not release the resources. Furthermore, everytime the object is created and run, there is additional memory being held so eventually the server crashes with OOM. Are you able to reproduce this behavior?

Regarding the model-engine-file argument in the configuration file, I have commented it out because it is specified inside the script.

I modified the script test.py to run and stop three times whilst Gst.init(None) is initialized only once. I still see the memory ramping up after every run (start and stop, then destroying the object created). Here is the modified script

from queue import Queue
from threading import Thread
from multiprocessing import Process

import gi
gi.require_version('Gst', '1.0')
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GLib, Gst, GstRtspServer, GObject

from ctypes import *
import time
import sys
import math
import pyds
from collections import deque

import numpy as np
import cv2

Gst.init(None)


class SingleCameraPeopleDetector:
	def __init__(self, uri_name) -> None:
		
		self.loop = GLib.MainLoop()
		
		self.bitrate = 4000000
		self.pipeline = Gst.Pipeline()

		streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
		if not streammux:
			sys.stderr.write(" Unable to create NvStreamMux \n")
		self.pipeline.add(streammux)

		source_bin = self.create_source_bin(0, uri_name)

		if not source_bin:
			sys.stderr.write("Unable to create source bin \n")

		self.pipeline.add(source_bin)

		padname = "sink_0"
		sinkpad = streammux.get_request_pad(padname)
		if not sinkpad:
			sys.stderr.write("Unable to create sink pad bin \n")

		srcpad = source_bin.get_static_pad("src")
		if not srcpad:
			sys.stderr.write("Unable to create src pad bin \n")
		srcpad.link(sinkpad)

		pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
		if not pgie:
			sys.stderr.write(" Unable to create pgie \n")

		nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
		if not nvvidconv1:
			sys.stderr.write(" Unable to create nvvidconv \n")


		fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
		if not fakesink:
			sys.stderr.write(" Unable to create fakesink \n")
		

		## Streammux settings
		streammux.set_property('live-source', 1)
		streammux.set_property("width", 1920)
		streammux.set_property("height", 1080)
		streammux.set_property("batch-size", 1)
		streammux.set_property("batched-push-timeout", 4000000)

		## PGIE settings
		pgie.set_property("config-file-path", "configs/peoplenet_detector_config.txt")
		pgie.set_property("batch-size", 1)
		pgie.set_property("model-engine-file", "configs/models/peoplenet_detector/resnet34_peoplenet_pruned_int8.etlt_b1_gpu0_int8.engine")

		mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
		streammux.set_property("nvbuf-memory-type", mem_type)
		nvvidconv1.set_property("nvbuf-memory-type", mem_type)

		self.pipeline.add(pgie)
		self.pipeline.add(nvvidconv1)
		self.pipeline.add(fakesink)

		streammux.link(pgie)
		pgie.link(nvvidconv1)
		nvvidconv1.link(fakesink)

		self.bus = self.pipeline.get_bus()
		self.bus.add_signal_watch()
		self.bus.connect("message", self.bus_call)


	def bus_call(self, bus, message):
		t = message.type
		if t == Gst.MessageType.EOS:
			sys.stdout.write("End-of-stream\n")
			#loop.quit()
		elif t==Gst.MessageType.WARNING:
			err, debug = message.parse_warning()
			sys.stderr.write("Warning: %s: %s\n" % (err, debug))
		elif t == Gst.MessageType.ERROR:
			err, debug = message.parse_error()
			sys.stderr.write("Error: %s: %s\n" % (err, debug))
			#loop.quit()
		return True

	def cb_newpad(self, decodebin, decoder_src_pad, data):
		print("In cb_newpad\n")
		caps = decoder_src_pad.get_current_caps()
		gststruct = caps.get_structure(0)
		gstname = gststruct.get_name()
		source_bin = data
		features = caps.get_features(0)

		# Need to check if the pad created by the decodebin is for video and not
		# audio.
		if gstname.find("video") != -1:
			# Link the decodebin pad only if decodebin has picked nvidia
			# decoder plugin nvdec_*. We do this by checking if the pad caps contain
			# NVMM memory features.
			print("features=", features)
			if features.contains("memory:NVMM"):
				# Get the source bin ghost pad
				bin_ghost_pad = source_bin.get_static_pad("src")
				if not bin_ghost_pad.set_target(decoder_src_pad):
					sys.stderr.write(
						"Failed to link decoder src pad to source bin ghost pad\n"
					)
			else:
				sys.stderr.write(
					" Error: Decodebin did not pick nvidia decoder plugin.\n")


	def decodebin_child_added(self, child_proxy, Object, name, user_data):
		print("Decodebin child added:", name, "\n")
		if name.find("decodebin") != -1:
			Object.connect("child-added", self.decodebin_child_added, user_data)


	def create_source_bin(self, index, uri):
		print("Creating source bin")

		# Create a source GstBin to abstract this bin's content from the rest of the
		# pipeline
		bin_name = "source-bin-%02d" % index
		print(bin_name)
		nbin = Gst.Bin.new(bin_name)
		if not nbin:
			sys.stderr.write(" Unable to create source bin \n")

		# Source element for reading from the uri.
		# We will use decodebin and let it figure out the container format of the
		# stream and the codec and plug the appropriate demux and decode plugins.
		uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
		if not uri_decode_bin:
			sys.stderr.write(" Unable to create uri decode bin \n")
		# We set the input uri to the source element
		uri_decode_bin.set_property("uri", uri)
		# Connect to the "pad-added" signal of the decodebin which generates a
		# callback once a new pad for raw data has beed created by the decodebin
		uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
		uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)

		# We need to create a ghost pad for the source bin which will act as a proxy
		# for the video decoder src pad. The ghost pad will not have a target right
		# now. Once the decode bin creates the video decoder and generates the
		# cb_newpad callback, we will set the ghost pad target to the video decoder
		# src pad.
		Gst.Bin.add(nbin, uri_decode_bin)
		bin_pad = nbin.add_pad(
			Gst.GhostPad.new_no_target(
				"src", Gst.PadDirection.SRC))
		if not bin_pad:
			sys.stderr.write(" Failed to add ghost pad in source bin \n")
			return None
		return nbin

	def play(self):
		self.loop_thread = Thread(target=self.loop.run)
		self.loop_thread.start()
		self.pipeline.set_state(Gst.State.PLAYING)

	def stop(self):
		self.pipeline.set_state(Gst.State.NULL)
		self.loop.quit()
		self.loop_thread.join()


for i in range(3):

	detector = SingleCameraPeopleDetector("rtsp://18.185.101.169:8554/stream1")

	detector_thread = Thread(target=detector.play)
	detector_thread.start()

	# Check HTOP or similar system resource tool to see the resource used 
	# while the script runs for 20 seconds.

	t_end = time.time() + 20
	while time.time() < t_end:
		print ("Running")
		time.sleep(1)

	# Set the pipeline state to NULL and exit the bus call loop thread

	detector.stop()
	detector_thread.join()
	del detector

	print ("Detector stopped")

	# Check the htop again to see if the resouces are relenquished

	t_end = time.time() + 20
	while time.time() < t_end:
		print ("Stopped")
		time.sleep(1)

	print ("Loop ended")

print ("Script ended")

Would it be convenient for you to post the specific memory data? Like before the start, during the running, after the stop.

Hi, sorry for the delayed reply. What tool should I use to post the memory data that could help you in debugging this?

Even if you set the state to NULL, some resources initialized by gst_init will still exist. So you can record the memory data before the program starts, during runtime, set to NULL, and exiting the program. Then compare them separately.

You can use top or htop to record the memory.

Here is the memory data of a single run,


Before the program starts: 907 MB
During runtime: 3.08 GB
Set to NULL: 3.05 GB
Exiting the program: 947 MB

And additionally, here is the memory data for three runs,

Before the program starts: 908 MB

During runtime-1: 3.07 GB
Set to NULL-1: 3.05 GB

During runtime-2: 3.37 GB
Set to NULL-2: 3.35 GB

During runtime-3: 3.66 GB
Set to NULL-3: 3.64 GB

Exiting the program: 948 MB

From what I see, the memory is not being let go even after the pipeline is deleted.

After further investigation, I shrinked down the script to remove the mainloop thread and directly set the pipeline to NULL and PLAY, here is the memory dump of that

Before the program starts: 949 MB

During runtime-1: 3.08 GB
Set to NULL-1: 3.05 GB

During runtime-2: 3.15 GB
Set to NULL-2: 3.12 GB

During runtime-3: 3.17 GB
Set to NULL-3: 3.15 GB

Exiting the program: 950 MB

Removing the main loop thread which is initialized as GLib.MainLoop() decreases the memory leak but it still ramps up as more cameras are added. Let me know if you can reproduce this or if you need anymore information

OK. Could you do the following tests?
1.You can refer to our FAQ to check if there is memory leak.
2.Could you try to not start a new thread and execute the play and stop at the main thread?

  1. valgrind --tool=memcheck --leak-check=full --num-callers=100 --show-leak-kinds=definite,indirect --track-origins=yes ./app This seems to be for C compiled application. How do I run this with python script?

  2. I have already done the main thread start and stop without starting a new thread. Here is the memory dump of that

Before the program starts: 949 MB

During runtime-1: 3.08 GB
Set to NULL-1: 3.05 GB

During runtime-2: 3.15 GB
Set to NULL-2: 3.12 GB

During runtime-3: 3.17 GB
Set to NULL-3: 3.15 GB

Exiting the program: 950 MB

This is with one camera only and if I add more cameras for batch processing, the memory leak increases.

You can just change the ./app to python3 ....

Could you attach your latest code?
  1. Here is the leak summary from valgrind

==293== LEAK SUMMARY:
==293== definitely lost: 162,176 bytes in 16 blocks
==293== indirectly lost: 5,434 bytes in 25 blocks
==293== possibly lost: 27,643,549 bytes in 575 blocks
==293== still reachable: 167,114,944 bytes in 127,899 blocks
==293== of which reachable via heuristic:
==293== stdstring : 74,043 bytes in 614 blocks
==293== length64 : 3,368 bytes in 65 blocks
==293== newarray : 1,984 bytes in 44 blocks
==293== suppressed: 0 bytes in 0 blocks
==293== Reachable blocks (those to which a pointer was found) are not shown.
==293== To see them, rerun with: --leak-check=full --show-leak-kinds=all
==293==
==293== For lists of detected and suppressed errors, rerun with: -s
==293== ERROR SUMMARY: 4862 errors from 606 contexts (suppressed: 0 from 0)

  1. I have attached the script where no separate thread is called and the pipeline is set to play and set to null. This is the same script that I ran Valgrind with
from queue import Queue
from threading import Thread
from multiprocessing import Process

import gi
gi.require_version('Gst', '1.0')
gi.require_version("GstRtspServer", "1.0")
from gi.repository import GLib, Gst, GstRtspServer, GObject
import gc

from ctypes import *
import time
import sys
import math
import pyds
from collections import deque

import numpy as np
import cv2

Gst.init(None)


class SingleCameraPeopleDetector:
	def __init__(self, uri_name) -> None:
		
		#self.loop = GLib.MainLoop()
		
		self.bitrate = 4000000
		self.pipeline = Gst.Pipeline()

		streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
		if not streammux:
			sys.stderr.write(" Unable to create NvStreamMux \n")
		self.pipeline.add(streammux)

		source_bin = self.create_source_bin(0, uri_name)

		if not source_bin:
			sys.stderr.write("Unable to create source bin \n")

		self.pipeline.add(source_bin)

		padname = "sink_0"
		sinkpad = streammux.get_request_pad(padname)
		if not sinkpad:
			sys.stderr.write("Unable to create sink pad bin \n")

		srcpad = source_bin.get_static_pad("src")
		if not srcpad:
			sys.stderr.write("Unable to create src pad bin \n")
		srcpad.link(sinkpad)

		pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
		if not pgie:
			sys.stderr.write(" Unable to create pgie \n")

		nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
		if not nvvidconv1:
			sys.stderr.write(" Unable to create nvvidconv \n")


		fakesink = Gst.ElementFactory.make("fakesink", "fakesink")
		if not fakesink:
			sys.stderr.write(" Unable to create fakesink \n")
		

		## Streammux settings
		streammux.set_property('live-source', 1)
		streammux.set_property("width", 1920)
		streammux.set_property("height", 1080)
		streammux.set_property("batch-size", 1)
		streammux.set_property("batched-push-timeout", 4000000)

		## PGIE settings
		pgie.set_property("config-file-path", "configs/peoplenet_detector_config.txt")
		pgie.set_property("batch-size", 1)
		pgie.set_property("model-engine-file", "configs/models/peoplenet_detector/resnet34_peoplenet_pruned_int8.etlt_b1_gpu0_int8.engine")

		mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
		streammux.set_property("nvbuf-memory-type", mem_type)
		nvvidconv1.set_property("nvbuf-memory-type", mem_type)

		self.pipeline.add(pgie)
		self.pipeline.add(nvvidconv1)
		self.pipeline.add(fakesink)

		streammux.link(pgie)
		pgie.link(nvvidconv1)
		nvvidconv1.link(fakesink)

		self.bus = self.pipeline.get_bus()
		self.bus.add_signal_watch()
		self.bus.connect("message", self.bus_call)


	def bus_call(self, bus, message):
		t = message.type
		if t == Gst.MessageType.EOS:
			sys.stdout.write("End-of-stream\n")
			#loop.quit()
		elif t==Gst.MessageType.WARNING:
			err, debug = message.parse_warning()
			sys.stderr.write("Warning: %s: %s\n" % (err, debug))
		elif t == Gst.MessageType.ERROR:
			err, debug = message.parse_error()
			sys.stderr.write("Error: %s: %s\n" % (err, debug))
			#loop.quit()
		# elif t == Gst.MessageType.STATE_CHANGED:
		# 	old_state, new_state, pending_state = message.parse_state_changed()
		# 	print('%s : State changed from %s to %s' % (message.src.get_name(), old_state, new_state))
		else:
			pass
		return True

	def cb_newpad(self, decodebin, decoder_src_pad, data):
		print("In cb_newpad\n")
		caps = decoder_src_pad.get_current_caps()
		gststruct = caps.get_structure(0)
		gstname = gststruct.get_name()
		source_bin = data
		features = caps.get_features(0)

		# Need to check if the pad created by the decodebin is for video and not
		# audio.
		if gstname.find("video") != -1:
			# Link the decodebin pad only if decodebin has picked nvidia
			# decoder plugin nvdec_*. We do this by checking if the pad caps contain
			# NVMM memory features.
			print("features=", features)
			if features.contains("memory:NVMM"):
				# Get the source bin ghost pad
				bin_ghost_pad = source_bin.get_static_pad("src")
				if not bin_ghost_pad.set_target(decoder_src_pad):
					sys.stderr.write(
						"Failed to link decoder src pad to source bin ghost pad\n"
					)
			else:
				sys.stderr.write(
					" Error: Decodebin did not pick nvidia decoder plugin.\n")


	def decodebin_child_added(self, child_proxy, Object, name, user_data):
		print("Decodebin child added:", name, "\n")
		if name.find("decodebin") != -1:
			Object.connect("child-added", self.decodebin_child_added, user_data)


	def create_source_bin(self, index, uri):
		print("Creating source bin")

		# Create a source GstBin to abstract this bin's content from the rest of the
		# pipeline
		bin_name = "source-bin-%02d" % index
		print(bin_name)
		nbin = Gst.Bin.new(bin_name)
		if not nbin:
			sys.stderr.write(" Unable to create source bin \n")

		# Source element for reading from the uri.
		# We will use decodebin and let it figure out the container format of the
		# stream and the codec and plug the appropriate demux and decode plugins.
		uri_decode_bin = Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
		if not uri_decode_bin:
			sys.stderr.write(" Unable to create uri decode bin \n")
		# We set the input uri to the source element
		uri_decode_bin.set_property("uri", uri)
		# Connect to the "pad-added" signal of the decodebin which generates a
		# callback once a new pad for raw data has beed created by the decodebin
		uri_decode_bin.connect("pad-added", self.cb_newpad, nbin)
		uri_decode_bin.connect("child-added", self.decodebin_child_added, nbin)

		# We need to create a ghost pad for the source bin which will act as a proxy
		# for the video decoder src pad. The ghost pad will not have a target right
		# now. Once the decode bin creates the video decoder and generates the
		# cb_newpad callback, we will set the ghost pad target to the video decoder
		# src pad.
		Gst.Bin.add(nbin, uri_decode_bin)
		bin_pad = nbin.add_pad(
			Gst.GhostPad.new_no_target(
				"src", Gst.PadDirection.SRC))
		if not bin_pad:
			sys.stderr.write(" Failed to add ghost pad in source bin \n")
			return None
		return nbin

	def play(self):
		self.pipeline.set_state(Gst.State.PLAYING)

	def stop(self):
		self.pipeline.set_state(Gst.State.NULL)
		gc.collect()


detector = SingleCameraPeopleDetector("rtsp://18.185.101.169:8554/stream1")

for i in range(1):

    print ("Detector starting for the {} time".format(i))
	
    detector.play()
	
    # Check HTOP or similar system resource tool to see the resource used 
    # while the script runs for 20 seconds.

    t_end = time.time() + 40
    while time.time() < t_end:
        print ("Running")
        time.sleep(1)

    # Set the pipeline state to NULL and exit the bus call loop thread

    detector.stop()

    print ("Detector stopped")
	
    # Check the htop again to see if the resouces are relenquished

    t_end = time.time() + 20
    while time.time() < t_end:
        print ("Stopped")
        time.sleep(1)

print ("Script ended")

For the sake of completeness, this is the htop memory increase that is observed,

Before the program starts: 949 MB

During runtime-1: 3.08 GB
Set to NULL-1: 3.05 GB

During runtime-2: 3.15 GB
Set to NULL-2: 3.12 GB

During runtime-3: 3.17 GB
Set to NULL-3: 3.15 GB

Exiting the program: 950 MB

Thanks, we can reproduce this issue. Currently, the most likely leaked plugin is nvinfer. We will analyze this issue.

2 Likes

Thank you very much. Looking forward to it. Let me know if I can be of help

Did this ever get figured out?

Yes. We will optimize this issue in the next release. We also found that as the times increases, the memory tends to stabilize.

Which release will the update be targeting?
Thanks.