I have made different tests today starting from a base pipeline and every run I added a piece in the pipeline. The tests were run using docker container and nomad as orchestrator to simulate production behavior. All application logic was removed and just the pipeline was running. Just a small webserver was added to send EOS to simulate production behavior.
I have started with this pipeline:
videotestsrc is-live=true ! video/x-raw, width=1920, height=1080, framerate=25/1 !
nvvideoconvert ! video/x-raw(memory:NVMM), format=NV12 ! queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
perf name=streammux_perf ! nvvideoconvert ! video/x-raw(memory:NVMM), format=RGBA ! perf name=inference_perf !
nvvideoconvert ! queue name=nvvideoconvert ! queue name=osd ! perf name=osd_perf ! fakesink
This pipeline did not generate any ram memory increasing even after various EOS manually triggered.
Then I added nvinferserver, nvdsosd, nvdsanalytics and nvdstracker.
Here we notice a small increasing and then the memory sets after few EOS.
The pipeline was the following:
videotestsrc is-live=true !
video/x-raw, width=1920, height=1080, framerate=25/1 !
nvvideoconvert !
video/x-raw(memory:NVMM), format=NV12 !
queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
nvinferserver name=inferserver config-file-path=/path/to/primary_config.txt !
nvtracker tracker-width=750 tracker-height=750 gpu-id=0 display-tracking-id=1 ll-lib-file=/path/to/tracker_library.so ll-config-file=/path/to/tracker_config.yaml !
queue name=nvtracker !
nvvideoconvert !
video/x-raw(memory:NVMM), format=RGBA !
nvdsanalytics config-file=/path/to/analytics_config.txt !
queue name=nvdsanalytics !
nvvideoconvert !
queue name=nvvideoconvert !
nvdsosd name=on_screen_display process-mode=0 display-text=True display-bbox=True !
queue name=osd !
nvvideoconvert !
video/x-raw(memory:NVMM) !
fakesink
Then I added the rtsp sink. Here we notice a more significant increase in memory consumption after each EOS triggered (the memory starts at 300MB). As we can see also in this case after the ram increasing at the beginning the memory kinda sets at 570.
Pipeline used:
videotestsrc is-live=true !
video/x-raw, width=1920, height=1080, framerate=25/1 !
nvvideoconvert !
video/x-raw(memory:NVMM), format=NV12 !
queue name=stream0 !
streammux.sink_0 nvstreammux name=streammux width=1920 height=1080 batch-size=1 batched-push-timeout=5000000 !
nvinferserver name=inferserver config-file-path=/path/to/primary_config.txt !
nvtracker tracker-width=750 tracker-height=750 gpu-id=0 display-tracking-id=1 ll-lib-file=/path/to/tracker_library.so ll-config-file=/path/to/tracker_config.yaml !
queue name=nvtracker !
nvvideoconvert !
video/x-raw(memory:NVMM), format=RGBA !
nvdsanalytics config-file=/path/to/analytics_config.txt !
queue name=nvdsanalytics !
nvvideoconvert !
queue name=nvvideoconvert !
nvdsosd name=on_screen_display process-mode=0 display-text=True display-bbox=True !
queue name=osd !
nvvideoconvert !
video/x-raw(memory:NVMM) !
nvv4l2h264enc insert-vui=true insert-sps-pps=true bitrate=4000000 profile=High preset-level=SlowPreset !
video/x-h264,stream-format=byte-stream !
rtspsink service=8554
Last piece was the rtsp source. When added the ram increases indefinitely (I stopped at 700, but it went up as I triggered EOS)
As I said I am using Nomad as orchestrator and docker. The rtspsink is a custom plugin I can’t share but the the problem appears also without it (by using fakesink).
The image I am using is the following:
nvcr.io/nvidia/deepstream:7.1-triton-multiarch
Here is the code used (I simplified the code removing pretty much every non used code):
import logging
import sys
from time import sleep
import gi
from getkey import getkey
from webserver import WebServerThread
from webserver import WebApplication
from threading import Thread
from app import App
gi.require_version("Gst", "1.0")
from gi.overrides import GLib
logger = logging.getLogger(__name__)
class MainLoopThread(Thread):
def __init__(self, name: str = "Mainloop") -> None:
self._main_loop = GLib.MainLoop()
super().__init__(name=name, target=self._main_loop.run)
@property
def loop(self) -> GLib.MainLoop:
return self._main_loop
def stop(self):
self._main_loop.quit()
def main() -> None:
main_loop_thread, web_server_thread = setup_app()
def wait_for_keys():
stop = False
logger.info("Waiting for key")
while not stop:
sleep(0.1)
key = getkey()
if key == "q":
stop = True
logger.info("Press Ctrl+C to close the program")
wait_for_keys()
tear_down_app(main_loop_thread, web_server_thread)
logger.info("Cleaned everything, bye :)")
def tear_down_app(main_loop_thread, web_server_thread):
logger.info("Stopping main thread...")
main_loop_thread.stop()
logger.info("Stopping webserver thread")
web_server_thread.stop()
def setup_app(
) -> tuple[MainLoopThread, WebServerThread]:
logger.info("Creating MainLoop...")
main_loop_thread = MainLoopThread()
main_loop_thread.start()
logger.info("Loading configuration...")
try:
pipeline = f""
logger.info(f"Pipeline is {pipeline}")
logger.info("Creating app...")
init_dict = {
"pipeline": pipeline,
"loop": main_loop_thread.loop,
}
app = App(**init_dict)
except:
logger.exception("Something went wrong creating the app :(")
sys.exit(-1)
app.start()
logging.info("Starting pipelines")
web_application = WebApplication(
app
)
web_server_thread = WebServerThread(web_application, port=8080)
web_server_thread.start()
return main_loop_thread, web_server_thread
if __name__ == "__main__":
logger.info("Parsing arguments...")
main()
sys.exit(0)
The web server for sending info
import json
from time import time
from tornado.web import Application, StaticFileHandler, RequestHandler
import gi
from app import App
import asyncio
import logging
from threading import Thread
from tornado.httpserver import HTTPServer
from tornado.ioloop import IOLoop
gi.require_version("Gst", "1.0")
from gi.repository import Gst
logger = logging.getLogger(__name__)
class WebApplication(Application):
start_at = time()
def __init__(self, app: App):
self.app = app
handlers = [
(r"/EOS", PipelineHandler),
]
super().__init__(handlers=handlers)
class WebServerThread(Thread):
def __init__(self, web_application: WebApplication, port: int):
self._web_application = web_application
self._port = port
self._server: HTTPServer | None = None
super().__init__(name="WebServerThread", daemon=True)
def run(self) -> None:
try:
asyncio.set_event_loop(asyncio.new_event_loop())
self._server = HTTPServer(self._web_application)
if self._server: # Check if the server is successfully initialized
self._server.listen(self._port, "0.0.0.0")
IOLoop.instance().start()
else:
logger.error("Failed to initialize HTTPServer")
except OSError as e:
logger.error(e)
def stop(self) -> None:
if self._server:
self._server.stop()
loop = IOLoop.current()
loop.stop()
class BaseAPIHandler(RequestHandler):
def initialize(self, **kwargs) -> None:
"""
- Set Content-type for JSON
"""
self.set_header("Content-Type", "application/json")
def write_json(self, obj):
self.write(json.dumps(obj).encode())
class PipelineHandler(BaseAPIHandler):
def get(self, *args, **kwargs) -> None:
app: App = self.application.app
if not app.pipeline:
self.set_status(400)
self.write({"status": "error", "message": "Pipeline not running"})
return
try:
logging.info("Sending EOS to pipeline via API")
app.pipeline.send_event(Gst.Event.new_eos())
self.write({"status": "ok", "message": "EOS sent"})
except Exception as e:
logging.error(f"Failed to send EOS: {e}")
self.set_status(500)
self.write({"status": "error", "message": str(e)})
The application for tearing down the pipeline:
import logging
import sys
import gi
gi.require_version("Gst", "1.0")
from gi.overrides import GLib
from gi.repository import Gst
sys.path.append("/opt/nvidia/deepstream/deepstream/lib")
logger = logging.getLogger(__name__)
class App:
def __init__(
self,
pipeline: str,
loop: GLib.MainLoop,
):
self.loop = loop
self.launch_string = pipeline
self.pipeline = None
self.bus = Gst.Bus.new()
def launch_pipeline(self):
assert self.pipeline is not None
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_message, self.loop)
#
def start(self) -> bool:
logger.info("Starting pipeline")
if self.pipeline is None:
self.pipeline = Gst.parse_launch(self.launch_string)
self.launch_pipeline()
self.pipeline.set_state(Gst.State.PLAYING)
logger.info("Pipeline started")
return False
def on_message(self, bus, message, loop) -> bool:
t = message.type
if self.pipeline is None:
logger.warning("Pipeline is none. Ignoring message")
if t == Gst.MessageType.EOS:
logger.info("End-of-stream")
self.pipeline.set_state(Gst.State.NULL)
self.pipeline.get_state(Gst.CLOCK_TIME_NONE)
self.pipeline = None
self.start()
elif t == Gst.MessageType.WARNING:
warning, debug = message.parse_warning()
logger.warning(f"Warning: {warning}: {debug}")
elif t == Gst.MessageType.ERROR:
logger.info("Gst Error")
err, debug = message.parse_error()
logger.error(f"Error: {err}: {debug}")
return True
In the pipeline shared there may be some dirty (perf plugin or converter useless, because in the application the pipeline are build by configuration, but they do not alter the tests outcome)
I have tried to modify the deepstream test 1 but was a bit complicated and I had already everything set up in the other application, so I simplified that and used it.