tx2/xavier hw decoder thread leak?

I developed a program on tx2 and xavier. The program utilizes gstreamer to decode rtsp stream. When stress testing the program, I found that there seems to be a lot of hw decoder thread leak. Everytime after processing a stream, 4 more threads remain alive after receiving eos signal. The output looks like:

nvidia@nvidia-desktop:~$ ps -Tp 8809
  PID  SPID TTY          TIME CMD
 8809  8809 pts/1    00:00:00 rtsp_mem
 8809  8821 pts/1    00:00:00 rtsp_mem
 8809  8822 pts/1    00:00:00 rtsp_mem
 8809  8826 pts/1    00:00:00 dconf worker
 8809  8827 pts/1    00:00:00 gmain
 8809  8828 pts/1    00:00:00 gdbus
 8809  8842 pts/1    00:00:00 NVMDecBufProcT
 8809  8843 pts/1    00:00:00 NVMDecDisplayT
 8809  8844 pts/1    00:00:00 NVMDecFrmStatsT
 8809  8845 pts/1    00:00:00 NVMDecVPRFlrSzT
 8809  8894 pts/1    00:00:00 NVMDecBufProcT
 8809  8895 pts/1    00:00:00 NVMDecDisplayT
 8809  8896 pts/1    00:00:00 NVMDecFrmStatsT
 8809  8897 pts/1    00:00:00 NVMDecVPRFlrSzT

And after one or two days testing, my program will occupy a lot of memory. Please help me out.Is this a bug of the hw decoder? or there is bug in my code. My code is attached here:
rtsp_main.cpp:

#include <iostream>
#include <thread>
#include <chrono>

extern "C" {
	int extract_rtsp(char* req_id, char* rtsp_url, int need_audio);
}

using namespace std;

int main(int argc, char** argv)
{
	std::this_thread::sleep_for (std::chrono::seconds(30));
	for(int i=0; i<2; i++){
		cout << "loop " << i << endl;
		extract_rtsp("test1", "rtsp://192.168.3.11/1", 1);
		cout << "sleep_for 30s" << endl;
		std::this_thread::sleep_for (std::chrono::seconds(30));
	}

	while(1)
		std::this_thread::sleep_for (std::chrono::seconds(30));
}
// gcc -c  rtsp_mem.c `pkg-config --cflags --libs gstreamer-1.0`
// g++  rtsp_main.cpp rtsp_mem.o  -o rtsp_mem `pkg-config --cflags --libs gstreamer-1.0` -lgstapp-1.0

rtsp_mem.c

#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <stdio.h>
#include <string.h>
#include <pthread.h>
#include <unistd.h>

/* Structure to contain all our information, so we can pass it to callbacks */
typedef struct _CustomData {
    GstElement *pipeline;

    GstElement *source;

    GstElement *video_queue;
    GstElement *video_depay;
    GstElement *video_parse;
    GstElement *video_decode;

    GstElement *video_convert;
    GstElement *video_filter;
    GstCaps    *video_filtercaps;
    GstElement *video_rate;
    GstElement *jpg_encode;
    GstElement *video_sink;

    GstElement *audio_queue;
    GstElement *audio_depay;
    GstElement *audio_parse;
    GstElement *audio_decode;
    GstElement *audio_convert;
    GstElement *audio_resample;
    GstElement *audio_filter;
    GstCaps    *audio_filtercaps;
    GstElement *audio_sink;

    GMainLoop *main_loop;  /* GLib's Main Loop */

	char* req_id;
	int   result;
	int   is_video_end;
    int   is_audio_end;
} CustomData;

GstFlowReturn video_new_preroll(GstAppSink *appsink, gpointer data)
{
    g_print ("Got video preroll!\n");
    return GST_FLOW_OK;
}

GstFlowReturn new_frame_sample(GstAppSink *appsink, gpointer data)
{    
	g_print("enter new_frame_sample.\n");
    static int count = 0;
    count++;

    GstSample *sample = gst_app_sink_pull_sample(appsink);
    GstCaps *caps = gst_sample_get_caps(sample);
    GstBuffer *buffer = gst_sample_get_buffer(sample);

    GstMapInfo map;
    gst_buffer_map (buffer, &map, GST_MAP_READ);

    CustomData *pData = (CustomData*)data;
    if(pData->is_video_end == 0)
    {
    		
    }
    if(count%40 == 0)
        gst_element_send_event(pData->pipeline, gst_event_new_eos());

    if(pData->is_video_end && pData->is_audio_end)
        gst_element_send_event(pData->pipeline, gst_event_new_eos());
    
    gst_buffer_unmap(buffer, &map);

    // g_print ("%s\n", gst_caps_to_string(caps));

    gst_sample_unref (sample);
    
    return GST_FLOW_OK;
}

/* This function will be called by the pad-added signal */
static void pad_added_handler (GstElement *src, GstPad *new_pad, CustomData *data)
{
    GstPad *video_sink_pad = gst_element_get_static_pad (data->video_queue, "sink");
    GstPad *audio_sink_pad = gst_element_get_static_pad (data->audio_queue, "sink");
    GstPadLinkReturn ret;
    GstCaps *new_pad_caps = NULL;
    GstStructure *new_pad_struct = NULL;    
    const gchar *new_pad_type = NULL;

    g_print ("Received new pad '%s' from '%s':\n", GST_PAD_NAME (new_pad), GST_ELEMENT_NAME (src));

    /* Check the new pad's type */
    new_pad_caps = gst_pad_get_current_caps (new_pad);
    new_pad_struct = gst_caps_get_structure (new_pad_caps, 0);
    new_pad_type = gst_structure_get_name (new_pad_struct);
    if (!g_str_has_prefix (new_pad_type, "application/x-rtp"))
    {
        g_print ("It has type '%s' which is not raw audio. Ignoring.\n", new_pad_type);
        goto exit;
    }

/* If our converter is already linked, we have nothing to do here */
    if (gst_pad_is_linked (audio_sink_pad))
    {
        g_print ("audio_sink is already linked. Ignoring.\n");
    }
    else
    {
        /* Attempt the link */
        ret = gst_pad_link (new_pad, audio_sink_pad);
        if (GST_PAD_LINK_FAILED (ret))
        {
            g_print ("Type is '%s' but link audio_sink failed.\n", new_pad_type);
        }
        else
        {
            g_print ("audio_sink Link succeeded (type '%s').\n", new_pad_type);
            goto exit;
        }
    }

/* If our converter is already linked, we have nothing to do here */
    if (gst_pad_is_linked (video_sink_pad))
    {
        g_print ("video_sink is already linked. Ignoring.\n");
    }
    else
    {
        /* Attempt the link */
        ret = gst_pad_link (new_pad, video_sink_pad);
        if (GST_PAD_LINK_FAILED (ret))
        {
            g_print ("Type is '%s' but link video_sink failed.\n", new_pad_type);
        }
        else
        {
            g_print ("video_sink Link succeeded (type '%s').\n", new_pad_type);
            goto exit;
        }
        
    }

exit:
    /* Unreference the new pad's caps, if we got them */
    if (new_pad_caps != NULL)
        gst_caps_unref (new_pad_caps);

    /* Unreference the sink pad */
    gst_object_unref (video_sink_pad);
    gst_object_unref (audio_sink_pad);

}

static void eos_cb (GstBus *bus, GstMessage *msg, CustomData *data)
{
	g_print("received eos!!!\n");
    g_main_loop_quit (data->main_loop);
}

static void error_cb (GstBus *bus, GstMessage *msg, CustomData *data)
{
    GError *err;
    gchar *debug_info;

    /* Print error details on the screen */
    gst_message_parse_error (msg, &err, &debug_info);
    g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (msg->src), err->message);
    g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
    g_clear_error (&err);
    g_free (debug_info);

    //set the flag for false path
    data->result = -1;

    g_main_loop_quit (data->main_loop);
}

static gboolean element_cb(GstBus *bus, GstMessage *message, gpointer data)
{
    printf("Got %s message\n", GST_MESSAGE_TYPE_NAME (message));
    const GstStructure *s = gst_message_get_structure(message);
    
    const char *filename = gst_structure_get_string(s, "filename");
    // printf("filename:%s\n", filename);
    CustomData *pData = (CustomData*)data;
    if(!filename)return TRUE;
    if(pData->is_audio_end == 0)
    {
        
    }
    else
    {
        unlink(filename);
    }

    if(pData->is_video_end && pData->is_audio_end)
        gst_element_send_event(pData->pipeline, gst_event_new_eos());
    return TRUE;
}

int extract_rtsp(char* req_id, char* rtsp_url, int need_audio)
{
	CustomData data;
	GstBus *bus;
    GstStateChangeReturn ret;

    static int temp = 0;
    if(temp==0)
        gst_init (NULL, NULL);
    temp++;

    printf("%s %s %d\n", req_id, rtsp_url, need_audio);

    data.req_id           = req_id;
    data.result           = 0;
    data.is_video_end     = 0;
    data.is_audio_end     = 0;

    data.source = gst_element_factory_make ("rtspsrc", "source");

    data.video_queue = gst_element_factory_make ("queue", "video_queue");
    data.video_depay = gst_element_factory_make ("rtph264depay", "video_depay");
    data.video_parse = gst_element_factory_make ("h264parse", "video_parse");
    data.video_decode = gst_element_factory_make ("nvv4l2decoder", "video_decode");
    data.video_convert = gst_element_factory_make ("nvvidconv", "video_convert");
    data.video_filter = gst_element_factory_make ("capsfilter", "video_filter");
    
    data.video_filtercaps = gst_caps_new_simple ("video/x-raw",/*"format", G_TYPE_STRING, "RGBA",*/
        "width", G_TYPE_INT, 1920, "height", G_TYPE_INT, 1080, "framerate", GST_TYPE_FRACTION, 1, 1, NULL);
    data.video_rate = gst_element_factory_make("videorate", "video_rate");
    data.jpg_encode = gst_element_factory_make("nvjpegenc", "jpg_encode");

    data.video_sink = gst_element_factory_make ("appsink", "video_sink");

    data.audio_queue = gst_element_factory_make ("queue", "audio_queue");
    data.audio_depay = gst_element_factory_make ("rtpmp4gdepay", "audio_depay");
    data.audio_parse = gst_element_factory_make ("aacparse", "audio_parse");
    data.audio_decode = gst_element_factory_make ("avdec_aac", "audio_decode");
    data.audio_convert = gst_element_factory_make ("audioconvert", "audio_convert");
    data.audio_resample = gst_element_factory_make("audioresample", "audio_resample");
    data.audio_filter = gst_element_factory_make ("capsfilter", "audio_filter");
    data.audio_filtercaps = gst_caps_new_simple ("audio/x-raw", "format", G_TYPE_STRING, "S16LE", 
        "rate", G_TYPE_INT, 16000, "channels", G_TYPE_INT, 1, NULL);

    data.audio_sink = gst_element_factory_make ("multifilesink", "audio_sink");

    /* Create the empty pipeline */
    data.pipeline = gst_pipeline_new ("test-pipeline");

if (!data.pipeline || !data.source || !data.video_queue || !data.video_depay||
        !data.video_parse||!data.video_decode || !data.video_convert || 
        !data.video_filter || !data.video_rate || !data.video_sink || !data.audio_queue  ||
        !data.audio_depay || !data.audio_parse || !data.audio_decode ||
         !data.audio_convert || !data.audio_filter || !data.audio_sink)
    {
        g_printerr ("Not all elements could be created.\n");
        return -1;
    }
    
    g_object_set (G_OBJECT (data.audio_filter), "caps", data.audio_filtercaps, NULL);
    // gst_caps_unref (data.audio_filtercaps);
    
    /* Build the pipeline. Note that we are NOT linking the source at this point. We will do it later. */
    gst_bin_add_many (GST_BIN (data.pipeline), data.source,data.video_queue ,data.video_depay,
        data.video_parse,data.video_decode ,data.video_convert,data.video_rate,data.jpg_encode,data.video_sink,
        data.audio_queue,data.audio_depay,data.audio_parse,data.audio_decode,data.audio_convert,data.audio_resample,data.audio_filter,data.audio_sink,NULL);

    /*Link video pipeline*/
    if (!gst_element_link_many (data.video_queue,data.video_depay,data.video_parse,data.video_decode,data.video_convert,data.video_rate,NULL))
    {
        g_printerr ("Video-elements could not be linked.\n");
        gst_object_unref (data.pipeline);
        return -1;
    }
    gst_element_link_filtered(data.video_rate, data.jpg_encode, data.video_filtercaps);
    gst_caps_unref (data.video_filtercaps);
    gst_element_link_many(data.jpg_encode, data.video_sink, NULL);

    /*Link audio pipeline*/
    if (!gst_element_link_many (data.audio_queue,data.audio_depay,data.audio_parse,data.audio_decode,data.audio_convert,data.audio_resample,data.audio_filter,NULL))
    {
        g_printerr ("Audio-elements could not be linked.\n");
        gst_object_unref (data.pipeline);
        return -1;
    }
    gst_element_link_filtered(data.audio_filter, data.audio_sink, data.audio_filtercaps);
    gst_caps_unref (data.audio_filtercaps);

    g_object_set(data.source, "location", rtsp_url, NULL);

    char segment_name[512] = {0};
    strcat(segment_name, req_id);
    strcat(segment_name, "_%06d.pcm");
    g_object_set(data.audio_sink, "next-file", 5, "max-file-duration", 1000000000, 
        "location", segment_name, "post-messages", TRUE, "sync", TRUE, NULL);                         
    g_signal_connect (data.source, "pad-added", G_CALLBACK (pad_added_handler), &data);    /* Connect to the pad-added signal */

GstElement *sink = gst_bin_get_by_name (GST_BIN (data.pipeline), "video_sink");  /* get sink */
    gst_app_sink_set_emit_signals((GstAppSink*)sink, TRUE);
    gst_app_sink_set_drop((GstAppSink*)sink, TRUE);
    gst_app_sink_set_max_buffers((GstAppSink*)sink, 1);
    GstAppSinkCallbacks callbacks = { NULL, video_new_preroll, new_frame_sample };
    gst_app_sink_set_callbacks (GST_APP_SINK(sink), &callbacks, &data, NULL);
    
    /* Start playing */
    ret = gst_element_set_state (data.pipeline, GST_STATE_PLAYING);
    if (ret == GST_STATE_CHANGE_FAILURE)
    {
        g_printerr ("Unable to set the pipeline to the playing state.\n");
        gst_object_unref (data.pipeline);
        return -1;
    }

    /* Instruct the bus to emit signals for each received message, and connect to the interesting signals */
    bus = gst_element_get_bus (data.pipeline);
    gst_bus_add_signal_watch (bus);
    g_signal_connect (G_OBJECT (bus), "message::error", (GCallback)error_cb, &data);
    g_signal_connect (G_OBJECT (bus), "message::eos", (GCallback)eos_cb, &data);
    g_signal_connect(G_OBJECT(bus), "message::element", (GCallback)element_cb, &data);
    gst_object_unref (bus);

/* Create a GLib Main Loop and set it to run */
    data.main_loop = g_main_loop_new (NULL, FALSE);
    g_print("ready to run main loop.\n");
    g_main_loop_run (data.main_loop);

	gst_element_set_state (data.pipeline, GST_STATE_NULL);
	gst_object_unref (data.pipeline);
	g_print("leaving extract_rtsp.\n");
	return data.result;
}

Hi,
There is a known memleak in decoding interlaced stream:
https://elinux.org/L4T_Jetson/r32.2.1_patch
[GSTREAMER]Issues in decoding interlaced streams through nvv4l2decoder

Please apply the patch and try again.

I checked the video file used for rtsp streaming by this cmd:

ffmpeg -filter:v idet     -frames:v 100     -an     -f rawvideo -y /dev/null     -i 351.mp4
video:303750kB audio:0kB subtitle:0kB other streams:0kB global headers:0kB muxing overhead: 0.000000%
[Parsed_idet_0 @ 0x1ff13e0] Repeated Fields: Neither:    95 Top:     0 Bottom:     0
[Parsed_idet_0 @ 0x1ff13e0] Single frame detection: TFF:     0 BFF:     0 Progressive:    14 Undetermined:    81
[Parsed_idet_0 @ 0x1ff13e0] Multi frame detection: TFF:     0 BFF:     0 Progressive:    95 Undetermined:     0

I don’t think it’s interlaced video. Correct me if anything wrong.

Do you know how to clean up below threads?:

8809  8842 pts/1    00:00:00 NVMDecBufProcT
 8809  8843 pts/1    00:00:00 NVMDecDisplayT
 8809  8844 pts/1    00:00:00 NVMDecFrmStatsT
 8809  8845 pts/1    00:00:00 NVMDecVPRFlrSzT

Hi,
The threads are created in low level driver and should not trigger memory leak. Could you check if dropping cache helps?

# sysctl vm.drop_caches=3

Also do you also see the issue in local video playback(play an mp4 file)?

Thanks.I will try that.
Even they will not trigger memory leak, 4 more NVMDec* threads are added to my process and will never be destroyed after processing a rtsp stream. The threads still occupy memory and will not return to os.

I have to put the handling of a rtsp stream in a process so that those threads will be freed when the process exits. I still want to know why those threads of gstreamer behave like this.

Have someone found how to cancel those threads?

We use gstreamer to decode video and each time we recreate the pipeline in our program those threads are recreated without destroying the old ones. Furthermore, we believe that one of those threads (NVMDecBufProcT) is holding a syncpt and never freeing it, which causes our program to fail when the system runs out of syncpts.

We use a jetson nano and with software release r32.5

Hi doquiros,

Please help to open a new topic, thanks.

Was there ever a resolution to this? it is still a problem.