Deepstream 7.1 memory leak

Please provide complete information as applicable to your setup.

• Hardware Platform (Jetson / GPU): Orin NX
• DeepStream Version: 7.1
• JetPack Version (valid for Jetson only): 6.2.1

Hello NVIDIA team,

I am developing a Python application using DeepStream, where I create a pipeline and, upon receiving an EOS event, I tear it down and recreate it.

I have noticed that this procedure seems to leave residual memory that I cannot free. Each EOS event increases the process memory by approximately 50 MB. In scenarios with frequent EOS events, this eventually causes the Docker container to hit OOM.

I have tried:

  • Setting the pipeline state to NULL

  • Deleting pipeline references

  • Running Python gc.collect()

  • Removing bus and signal watches

However, I have observed a peculiar behavior with the garbage collector: when I sleep briefly to allow GC to run, the memory seems to drop by several MB temporarily, but then increases again significantly beyond the previous peak.

For example:

  • Starting memory: 300 MB

  • After first EOS and GC: drops to ~250 MB, then rises to ~350 MB

  • After second EOS and GC: drops to ~300 MB, then rises to ~400 MB

This suggests that some internal resources or buffers are not being fully released.

Could you please advise:

  1. Is there a recommended way to safely destroy and recreate pipelines in Python without accumulating memory?

  2. Are there any known memory leaks in the DeepStream Python bindings when recreating pipelines frequently?

  3. Any best practices for handling frequent EOS events in a long-running Python application?

Thank you very much for your help!

Here how I tear down the pipeline

`def teardown_pipeline(self) → None:
if not self.pipeline:
return

try:
    nvosd

 = self.pipeline.get_by_name("on_screen_display")
    if nvosd and hasattr(self, "_osd_probe_id"):
        pad = nvosd.get_static_pad("sink")
        if pad and self._osd_probe_id is not None:
            pad.remove_probe(self._osd_probe_id)
            self._osd_probe_id = None
except Exception as e:
    logger.warning(f"Could not remove probe: {e}")

if hasattr(self, "bus") and self.bus:
    try:
        self.bus.set_flushing(True)
        if hasattr(self, "_bus_watch_id"):
            self.bus.disconnect(self._bus_watch_id)
        self.bus.remove_signal_watch()
    except Exception as e:
        logger.warning(f"Bus cleanup failed: {e}")
    self.bus = None

try:
    self.pipeline.send_event(Gst.Event.new_flush_start())
    self.pipeline.send_event(Gst.Event.new_flush_stop(False))
except Exception as e:
    logger.warning(e)

self.pipeline.set_state(Gst.State.NULL)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)

self.pipeline = None

for _ in range(10):
    print("Before GC:", len(gc.get_objects()))
    collected = gc.collect()
    print("Collected:", collected)
    print("After GC:", len(gc.get_objects()))

    time.sleep(1)`

This is the best clean up I came up with by following documentation and other forums.

I will also attach a sample of the pipeline:

rtspsrc location=rtsp://example.com/live name=source user-id=user user-pw=pass !
queue name=source_queue !
rtph265depay name=depay !
queue name=depay_queue !
capssetter caps=video/x-h265,framerate=25/1 name=capsetter !
h265parse config-interval=1 name=parser !
queue name=decoder_queue !
nvv4l2decoder name=decoder !
queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
perf name=streammux_perf !
nvinferserver name=inferserver config-file-path=/path/to/config.txt !
nvvideoconvert !
video/x-raw(memory:NVMM), format=RGBA !
perf name=inference_perf !
nvdsanalytics config-file=/path/to/analytics.txt !
queue name=nvdsanalytics !
perf name=analytics_perf !
nvvideoconvert !
queue name=nvvideoconvert !
nvdsosd name=on_screen_display process-mode=0 display-text=True display-bbox=True !
queue name=osd !
perf name=osd_perf !
nvvideoconvert !
video/x-raw(memory:NVMM) !
nvv4l2h264enc insert-vui=true insert-sps-pps=true bitrate=4000000 profile=High preset-level=SlowPreset !
video/x-h264,mapping=stream !
rtspsink service=8554

I also use a tracker (plugin) but is not in the pipeline. The problem appears with and without tracker anyway.

[UPDATE]
I have done some more test, even if I remove every application logic and leave just the pipeline running (also remove inference server and analytics), when an eos occures and the pipe is tired down and up again the memory increases of about 20/30 MB (against the 50/100 mb with inference server and analytics).

  1. do you mean GPU memory leak? how did you observe the leak? could you share a leak log? Thanks! If using soruce->decoder->nvstreammux->fakesink, can you reproduce the meory leak issue?
  2. can you modify the basic sample deepstream_test_1.py to reproduce the issue? if so, cou you share the simplifed test1 code? Thanks!

I am observing a RAM memory leak. I am using Nomad for running the application (so a docker container). I am seeing in the memory graph (in the UI) the memory increasing every eos. I will provide a screenshot of the RAM once I rerun the tests. I am working on providing further information. (In the past we noticed the same behavior in docker swarm)

I have made different tests today starting from a base pipeline and every run I added a piece in the pipeline. The tests were run using docker container and nomad as orchestrator to simulate production behavior. All application logic was removed and just the pipeline was running. Just a small webserver was added to send EOS to simulate production behavior.

I have started with this pipeline:

videotestsrc is-live=true ! video/x-raw, width=1920, height=1080, framerate=25/1 !
nvvideoconvert ! video/x-raw(memory:NVMM), format=NV12 ! queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
perf name=streammux_perf ! nvvideoconvert ! video/x-raw(memory:NVMM), format=RGBA ! perf name=inference_perf !
nvvideoconvert ! queue name=nvvideoconvert ! queue name=osd ! perf name=osd_perf ! fakesink

This pipeline did not generate any ram memory increasing even after various EOS manually triggered.

Then I added nvinferserver, nvdsosd, nvdsanalytics and nvdstracker.
Here we notice a small increasing and then the memory sets after few EOS.

The pipeline was the following:
videotestsrc is-live=true !
video/x-raw, width=1920, height=1080, framerate=25/1 !
nvvideoconvert !
video/x-raw(memory:NVMM), format=NV12 !
queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
nvinferserver name=inferserver config-file-path=/path/to/primary_config.txt !
nvtracker tracker-width=750 tracker-height=750 gpu-id=0 display-tracking-id=1 ll-lib-file=/path/to/tracker_library.so ll-config-file=/path/to/tracker_config.yaml !
queue name=nvtracker !
nvvideoconvert !
video/x-raw(memory:NVMM), format=RGBA !
nvdsanalytics config-file=/path/to/analytics_config.txt !
queue name=nvdsanalytics !
nvvideoconvert !
queue name=nvvideoconvert !
nvdsosd name=on_screen_display process-mode=0 display-text=True display-bbox=True !
queue name=osd !
nvvideoconvert !
video/x-raw(memory:NVMM) !
fakesink

Then I added the rtsp sink. Here we notice a more significant increase in memory consumption after each EOS triggered (the memory starts at 300MB). As we can see also in this case after the ram increasing at the beginning the memory kinda sets at 570.

Pipeline used:
videotestsrc is-live=true !
video/x-raw, width=1920, height=1080, framerate=25/1 !
nvvideoconvert !
video/x-raw(memory:NVMM), format=NV12 !
queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
nvinferserver name=inferserver config-file-path=/path/to/primary_config.txt !
nvtracker tracker-width=750 tracker-height=750 gpu-id=0 display-tracking-id=1 ll-lib-file=/path/to/tracker_library.so ll-config-file=/path/to/tracker_config.yaml !
queue name=nvtracker !
nvvideoconvert !
video/x-raw(memory:NVMM), format=RGBA !
nvdsanalytics config-file=/path/to/analytics_config.txt !
queue name=nvdsanalytics !
nvvideoconvert !
queue name=nvvideoconvert !
nvdsosd name=on_screen_display process-mode=0 display-text=True display-bbox=True !
queue name=osd !
nvvideoconvert !
video/x-raw(memory:NVMM) !
nvv4l2h264enc insert-vui=true insert-sps-pps=true bitrate=4000000 profile=High preset-level=SlowPreset !
video/x-h264,stream-format=byte-stream !
rtspsink service=8554

Last piece was the rtsp source. When added the ram increases indefinitely (I stopped at 700, but it went up as I triggered EOS)

As I said I am using Nomad as orchestrator and docker. The rtspsink is a custom plugin I can’t share but the the problem appears also without it (by using fakesink).

The image I am using is the following:

nvcr.io/nvidia/deepstream:7.1-triton-multiarch

Here is the code used (I simplified the code removing pretty much every non used code):

import logging
import sys
from time import sleep

import gi
from getkey import getkey
from webserver import WebServerThread
from webserver import WebApplication
from threading import Thread

from app import App

gi.require_version("Gst", "1.0")
from gi.overrides import GLib

logger = logging.getLogger(__name__)


class MainLoopThread(Thread):
    def __init__(self, name: str = "Mainloop") -> None:
        self._main_loop = GLib.MainLoop()

        super().__init__(name=name, target=self._main_loop.run)

    @property
    def loop(self) -> GLib.MainLoop:
        return self._main_loop

    def stop(self):
        self._main_loop.quit()


def main() -> None:

    main_loop_thread, web_server_thread = setup_app()

    def wait_for_keys():
        stop = False

        logger.info("Waiting for key")
        while not stop:
            sleep(0.1)

            key = getkey()

            if key == "q":
                stop = True

    logger.info("Press Ctrl+C to close the program")

    wait_for_keys()

    tear_down_app(main_loop_thread, web_server_thread)

    logger.info("Cleaned everything, bye :)")


def tear_down_app(main_loop_thread, web_server_thread):
    logger.info("Stopping main thread...")
    main_loop_thread.stop()
    logger.info("Stopping webserver thread")
    web_server_thread.stop()


def setup_app(
) -> tuple[MainLoopThread, WebServerThread]:

    logger.info("Creating MainLoop...")
    main_loop_thread = MainLoopThread()
    main_loop_thread.start()
    logger.info("Loading configuration...")


    try:
        pipeline = f""

        logger.info(f"Pipeline is {pipeline}")

        logger.info("Creating app...")

        init_dict = {
            "pipeline": pipeline,
            "loop": main_loop_thread.loop,
        }

        app = App(**init_dict)

    except:
        logger.exception("Something went wrong creating the app :(")
        sys.exit(-1)

    app.start()
    logging.info("Starting pipelines")

    web_application = WebApplication(
        app
    )

    web_server_thread = WebServerThread(web_application, port=8080)
    web_server_thread.start()

    return main_loop_thread, web_server_thread


if __name__ == "__main__":
    logger.info("Parsing arguments...")
    main()
    sys.exit(0)

The web server for sending info

import json
from time import time

from tornado.web import Application, StaticFileHandler, RequestHandler
import gi
from app import App
import asyncio
import logging
from threading import Thread

from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop

gi.require_version("Gst", "1.0")
from gi.repository import Gst

logger = logging.getLogger(__name__)


class WebApplication(Application):
    start_at = time()

    def __init__(self, app: App):
        self.app = app
        handlers = [
            (r"/EOS", PipelineHandler),
        ]

        super().__init__(handlers=handlers)


class WebServerThread(Thread):
    def __init__(self, web_application: WebApplication, port: int):
        self._web_application = web_application
        self._port = port
        self._server: HTTPServer | None = None
        super().__init__(name="WebServerThread", daemon=True)

    def run(self) -> None:
        try:
            asyncio.set_event_loop(asyncio.new_event_loop())

            self._server = HTTPServer(self._web_application)
            if self._server:  # Check if the server is successfully initialized
                self._server.listen(self._port, "0.0.0.0")
                IOLoop.instance().start()
            else:
                logger.error("Failed to initialize HTTPServer")
        except OSError as e:
            logger.error(e)

    def stop(self) -> None:
        if self._server:
            self._server.stop()
        loop = IOLoop.current()
        loop.stop()


class BaseAPIHandler(RequestHandler):
    def initialize(self, **kwargs) -> None:
        """
        - Set Content-type for JSON
        """
        self.set_header("Content-Type", "application/json")

    def write_json(self, obj):
        self.write(json.dumps(obj).encode())


class PipelineHandler(BaseAPIHandler):
    def get(self, *args, **kwargs) -> None:

        app: App = self.application.app

        if not app.pipeline:
            self.set_status(400)
            self.write({"status": "error", "message": "Pipeline not running"})
            return

        try:
            logging.info("Sending EOS to pipeline via API")
            app.pipeline.send_event(Gst.Event.new_eos())
            self.write({"status": "ok", "message": "EOS sent"})
        except Exception as e:
            logging.error(f"Failed to send EOS: {e}")
            self.set_status(500)
            self.write({"status": "error", "message": str(e)})

The application for tearing down the pipeline:


import logging
import sys
import gi

gi.require_version("Gst", "1.0")
from gi.overrides import GLib
from gi.repository import Gst

sys.path.append("/opt/nvidia/deepstream/deepstream/lib")

logger = logging.getLogger(__name__)


class App:
    def __init__(
        self,
        pipeline: str,
        loop: GLib.MainLoop,
    ):

        self.loop = loop
        self.launch_string = pipeline

        self.pipeline = None
        self.bus = Gst.Bus.new()


    def launch_pipeline(self):
        assert self.pipeline is not None

        self.bus = self.pipeline.get_bus()
        self.bus.add_signal_watch()
        self.bus.connect("message", self.on_message, self.loop)
    #
    def start(self) -> bool:
        logger.info("Starting pipeline")
        if self.pipeline is None:
            self.pipeline = Gst.parse_launch(self.launch_string)
            self.launch_pipeline()

        self.pipeline.set_state(Gst.State.PLAYING)
        logger.info("Pipeline started")
        return False


    def on_message(self, bus, message, loop) -> bool:
        t = message.type
        if self.pipeline is None:
            logger.warning("Pipeline is none. Ignoring message")
        if t == Gst.MessageType.EOS:
            logger.info("End-of-stream")
            self.pipeline.set_state(Gst.State.NULL)
            self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
            self.pipeline = None
            self.start()
        elif t == Gst.MessageType.WARNING:
            warning, debug = message.parse_warning()
            logger.warning(f"Warning: {warning}: {debug}")
        elif t == Gst.MessageType.ERROR:
            logger.info("Gst Error")
            err, debug = message.parse_error()
            logger.error(f"Error: {err}: {debug}")
        return True


In the pipeline shared there may be some dirty (perf plugin or converter useless, because in the application the pipeline are build by configuration, but they do not alter the tests outcome)

I have tried to modify the deepstream test 1 but was a bit complicated and I had already everything set up in the other application, so I simplified that and used it.

from your test, memory leaks only occur when using RTSP sources.

  1. to isolate the issue, if only replacing rtspsrc with filesrc, will the issue persist?
  2. Are you using opensource plugin rtspsrc? How did you set the propery for rtspsrc? AKY, rtsp will buffer the receiving data, if drop-on-latency is false, the memory usage will increase when some data are late.
  3. to isolate the issue, you can use this pipeline " rtspsrc location=rtsp://xx ! fakesink" to check if there is memory leak.

There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks.