Setup:
• Hardware Platform (Jetson / GPU) GPU Titan V
• DeepStream Version 5.0
• JetPack Version (valid for Jetson only) None
• TensorRT Version Same as deepstream 5.0-dp-20.04-devel docker
• NVIDIA GPU Driver Version (valid for GPU only) 10.2
Problem:
Basic Logic
I have modified deepstream5.0 test5 app to implement the runtime source addition/deletion feature based on the deepstream runtime add delete reference app.
The basic idea is: Deepstream receives kafka message of uri string, and add coresponding new sources at runtime in message consumer callback. When the source produce EOS, the bus callback will receive stream EOS message from streammux and then delete the coresponding source.
Memory Leak
In our use case, the length of each video file is 8 seconds, and kafka produces about 1~2 uri messages per second.
During the app runing, I can see gradually increase of GPU memory usage by deepsteam-tes5-app. And the program will quit with Cuda failure after 2~3 hours.
My guess is that this problem is related to topic Cuda failure after runtime add and delete sources for multiple times - #12 by horacehxw. It says NvDsStreammux will have memory leak problem during “link” and “unlink” operations. I am not sure if this problem is already fixed in Deepstream 5.0.
The error messages are as follows:
** (deepstream-test5-app:55136): CRITICAL **: 05:00:30.151: gst_nvds_buffer_pool_alloc_buffer: assertion 'mem' failed
ERROR from src_bin_muxer: Failed to allocate the buffers inside the Nvstreammux output pool
Debug info: gstnvstreammux.c(576): gst_nvstreammux_alloc_output_buffers (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstNvStreamMux:src_bin_muxer
Cuda failure: status=2
Error(-1) in buffer allocation
Quitting
Cuda failure: status=2
nvbuf_utils: dmabuf_fd -1 mapped entry NOT found
Failed to get params from fd = -1
ERROR from nvv4l2decoder786: Failed to allocate required memory.
Debug info: gstv4l2videodec.c(1558): gst_v4l2_video_dec_handle_frame (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin37/GstURIDecodeBin:src_elem/GstDecodeBin:decodebin835/nvv4l2decoder:nvv4l
2decoder786:
Buffer pool activation failed
ERROR from qtdemux835: Internal data stream error.
Debug info: qtdemux.c(6073): gst_qtdemux_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin37/GstURIDecodeBin:src_elem/GstDecodeBin:decodebin835/GstQTDemux:qtdemux835:
streaming stopped, reason error (-5)
Quitting
Sources:
Config File
The Tracker is already disabled.
# Copyright (c) 2018-2020 NVIDIA Corporation. All rights reserved.
#
# NVIDIA Corporation and its licensors retain all intellectual property
# and proprietary rights in and to this software, related documentation
# and any modifications thereto. Any use, reproduction, disclosure or
# distribution of this software and related documentation without an express
# license agreement from NVIDIA Corporation is strictly prohibited.
[application]
enable-perf-measurement=0
perf-measurement-interval-sec=5
#gie-kitti-output-dir=streamscl
[tiled-display]
enable=1
rows=10
columns=10
width=1280
height=720
gpu-id=0
#(0): nvbuf-mem-default - Default memory allocated, specific to particular platform
#(1): nvbuf-mem-cuda-pinned - Allocate Pinned/Host cuda memory, applicable for Tesla
#(2): nvbuf-mem-cuda-device - Allocate Device cuda memory, applicable for Tesla
#(3): nvbuf-mem-cuda-unified - Allocate Unified cuda memory, applicable for Tesla
#(4): nvbuf-mem-surface-array - Allocate Surface Array memory, applicable for Jetson
nvbuf-memory-type=0
[source0]
enable=0
#Type - 1=CameraV4L2 2=URI 3=MultiURI 4: RTSP 5: Camera (CSI) (Jetson only)
type=2
camera-id=0
uri=file:///root/projects/test_mp4/road1/road1__07_01_2020_11_19_53_171_AM_UTC-00_00.mp4
num-sources=1
gpu-id=0
nvbuf-memory-type=0
[source1]
enable=0
#Type - 1=CameraV4L2 2=URI 3=MultiURI 4: RTSP 5: Camera (CSI) (Jetson only)
type=2
camera-id=1
#uri=rtsp://stream_video:554/road2
uri=file:///root/projects/test_mp4/road2/road2__07_01_2020_11_19_52_583_AM_UTC-00_00.mp4
num-sources=1
gpu-id=0
nvbuf-memory-type=0
[sink0]
enable=0
#Type - 1=FakeSink 2=EglSink 3=File
qos=0
type=2
sync=1
source-id=0
gpu-id=0
nvbuf-memory-type=0
[sink1]
enable=1
gpu-id=0
#Type - 1=FakeSink 2=EglSink 3=File 4=UDPSink 5=nvoverlaysink 6=MsgConvBroker
type=6
msg-conv-config=dstest5_msgconv_config.txt
#(0): PAYLOAD_DEEPSTREAM - Deepstream schema payload
#(1): PAYLOAD_DEEPSTREAM_MINIMAL - Deepstream schema payload minimal
#(256): PAYLOAD_RESERVED - Reserved type
#(257): PAYLOAD_CUSTOM - Custom schema payload
msg-conv-payload-type=0
msg-broker-proto-lib=/opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_kafka_proto.so
#Provide your msg-broker-conn-str here
msg-broker-conn-str=172.17.0.1;9092;metromind-start
topic=metromind-start
#msg-broker-conn-str=172.17.0.1;9092;metromind-start-test
#topic=metromind-start-test
#Optional:
#msg-broker-config=cfg_kafka.txt
[sink2]
enable=1
type=3
#1=mp4 2=mkv
container=1
#1=h264 2=h265 3=mpeg4
## only SW mpeg4 is supported right now.
codec=3
sync=1
bitrate=2000000
output-file=out.mp4
source-id=0
gpu-id=0
# sink type = 6 by default creates msg converter + broker.
# To use multiple brokers use this group for converter and use
# sink type = 6 with disable-msgconv = 1
[message-converter]
enable=1
msg-conv-config=dstest5_msgconv_sample_config.txt
#(0): PAYLOAD_DEEPSTREAM - Deepstream schema payload
#(1): PAYLOAD_DEEPSTREAM_MINIMAL - Deepstream schema payload minimal
#(256): PAYLOAD_RESERVED - Reserved type
#(257): PAYLOAD_CUSTOM - Custom schema payload
msg-conv-payload-type=0
# Name of library having custom implementation.
#msg-conv-msg2p-lib=<val>
# Id of component in case only selected message to parse.
#msg-conv-comp-id=<val>
# Configure this group to enable cloud message consumer.
[message-consumer0]
enable=1
proto-lib=/opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_kafka_proto.so
conn-str=172.17.0.1;9092
config-file=cfg_kafka.txt
subscribe-topic-list=source-control;add-source;delete-source;mp4-segment
[osd]
enable=0
gpu-id=0
border-width=1
text-size=15
text-color=1;1;1;1;
text-bg-color=0.3;0.3;0.3;1
font=Arial
show-clock=0
clock-x-offset=800
clock-y-offset=820
clock-text-size=12
clock-color=1;0;0;0
nvbuf-memory-type=0
[streammux]
gpu-id=0
##Boolean property to inform muxer that sources are live
live-source=0
batch-size=4
##time out in usec, to wait after the first buffer is available
##to push the batch even if the complete batch is not formed
batched-push-timeout=40000
## Set muxer output width and height
width=1920
height=1080
##Enable to maintain aspect ratio wrt source, and allow black borders, works
##along with width, height properties
enable-padding=0
nvbuf-memory-type=0
## If set to TRUE, system timestamp will be attached as ntp timestamp
## If set to FALSE, ntp timestamp from rtspsrc, if available, will be attached
# attach-sys-ts-as-ntp=1
[primary-gie]
enable=1
gpu-id=0
batch-size=4
## 0=FP32, 1=INT8, 2=FP16 mode
bbox-border-color0=1;0;0;1
bbox-border-color1=0;1;1;1
bbox-border-color2=0;1;1;1
bbox-border-color3=0;1;0;1
nvbuf-memory-type=0
interval=0
gie-unique-id=1
model-engine-file=../../../../../samples/models/Primary_Detector/resnet10.caffemodel_b4_gpu0_int8.engine
labelfile-path=../../../../../samples/models/Primary_Detector/labels.txt
config-file=../../../../../samples/configs/deepstream-app/config_infer_primary.txt
#infer-raw-output-dir=../../../../../samples/primary_detector_raw_output/
[tracker]
enable=0
tracker-width=600
tracker-height=288
ll-lib-file=/opt/nvidia/deepstream/deepstream-5.0/lib/libnvds_mot_klt.so
#ll-config-file required for DCF/IOU only
#ll-config-file=tracker_config.yml
#ll-config-file=iou_config.txt
gpu-id=0
#enable-batch-process applicable to DCF only
enable-batch-process=0
[tests]
file-loop=0
Code
Both source addition and deletion shares the same Mutex, so the operations are thread safe. All related source files are attached as files.
source addition part
/**
* @brief Add source element in pipeline with uri string
* @param ctx [IN/OUT] application context pointer
* @param uri [IN] uri string
* @param uri_len [IN] length of uri string
*/
gboolean
add_source_by_uri (void *ctx, char *uri, int uri_len) {
gboolean ret = FALSE;
guint source_id;
guint camera_id;
AppCtx *appCtx = (AppCtx *) ctx;
NvDsConfig *config = &appCtx->config;
GstPad *mux_sink_pad = NULL;
GstPad *src_pad = NULL;
static guint max_sources = 30;
if (config->num_source_sub_bins >= max_sources) {
NVGSTDS_ERR_MSG_V ("App supports max %d sources. Drop this message...", max_sources);
goto done;
}
if (config->source_list_enabled) {
NVGSTDS_WARN_MSG_V ("Source list ebable is not supported.");
}
// check if there's already source with provided uri.
for (int i = 0; i < MAX_SOURCE_BINS; i++) {
if (config->multi_source_config[i].enable &&
!g_strcmp0(config->multi_source_config[i].uri, uri)) {
NVGSTDS_ERR_MSG_V ("Source with uri (%.*s) already exists.", uri_len, uri);
goto done;
}
}
/**
* find unique camera_id
* There can be sources with same camera id in MultiURI case.
*/
gboolean used_camera_id[MAX_SOURCE_BINS] = {FALSE};
for (int i = 0; i < MAX_SOURCE_BINS; i++) {
if (config->multi_source_config[i].enable)
used_camera_id[config->multi_source_config[i].camera_id] = TRUE;
}
for (camera_id = 0; camera_id < MAX_SOURCE_BINS; camera_id++) {
if (!used_camera_id[camera_id])
break;
}
/** find available source_id */
for (source_id = 0; source_id < MAX_SOURCE_BINS; source_id++) {
if (!config->multi_source_config[source_id].enable)
break;
}
GST_CAT_INFO (NVDS_APP, "ADD source: set source id %04d", source_id);
NVGSTDS_INFO_MSG_V ("ADD source: set source id %04d", source_id);
/** set configuration based on uri provided. */
NvDsSourceConfig *source_config = &config->multi_source_config[source_id];
source_config->source_id = source_id; // QUESTION: <Xiaowu He>: is this correct?
if (config->file_loop) {
source_config->loop = TRUE;
}
if (!set_source_config_by_uri(source_config, uri, uri_len)) {
NVGSTDS_ERR_MSG_V ("Failed to set configuration based on uri: '%.*s'", uri_len, uri);
goto done;
}
source_config->enable=TRUE;
config->num_source_sub_bins++;
/**
* set gstreamer element & pipeline.
* Modified from file: sources/apps/apps-common/src/deepstream_source_bin.c
* Function: create_multi_source_bin
*/
NvDsSrcParentBin *bin = &appCtx->pipeline.multi_src_bin;
gchar elem_name[50];
g_snprintf (elem_name, sizeof (elem_name), "src_sub_bin%d", source_id);
bin->sub_bins[source_id].bin = gst_bin_new (elem_name);
if (!bin->sub_bins[source_id].bin) {
NVGSTDS_ERR_MSG_V ("Failed to create '%s'", elem_name);
goto done;
}
bin->sub_bins[source_id].bin_id = bin->sub_bins[source_id].source_id = source_id;
source_config->live_source = TRUE;
bin->live_source = TRUE;
bin->sub_bins[source_id].eos_done = TRUE;
bin->sub_bins[source_id].reset_done = TRUE;
switch(source_config->type) {
case NV_DS_SOURCE_URI:
if (!create_uridecode_src_bin (source_config, &bin->sub_bins[source_id])) {
goto done;
}
bin->live_source = source_config->live_source;
break;
case NV_DS_SOURCE_RTSP:
if (!create_rtsp_src_bin (source_config, &bin->sub_bins[source_id])) {
goto done;
}
break;
case NV_DS_SOURCE_CAMERA_CSI:
case NV_DS_SOURCE_CAMERA_V4L2:
default:
NVGSTDS_ERR_MSG_V ("Source type %03d not yet implemented!", source_config->type);
goto done;
}
/** connect source bin to streammux sink. (add bin to pipeline) */
gst_bin_add (GST_BIN (bin->bin), bin->sub_bins[source_id].bin);
if (!link_element_to_streammux_sink_pad (bin->streammux,
bin->sub_bins[source_id].bin, source_id)) {
goto done;
}
if(source_config->dewarper_config.enable) {
g_object_set(G_OBJECT(bin->sub_bins[source_id].dewarper_bin.nvdewarper), "source-id",
source_config->source_id, NULL);
}
bin->num_bins++;
GST_CAT_INFO (NVDS_APP, "Successfully create source id %04d, bin name: %s",
source_id, elem_name);
NVGSTDS_INFO_MSG_V ("Successfully create source id %04d, bin name: %s",
source_id, elem_name);
/**
* add probe to RTSP src pad
* * The streams[source_id] should be cleaned in deletion,
* so we can directly use it here.
* * The bbox_generated_probe_after_analytics() function need stream information
* to generate timestamp in message meta.
*/
testAppCtx->streams[source_id].id = source_id;
/**
* flush the pipeline. Otherwise pipeline will stall upon delete_source_by_id.
*/
src_pad = gst_element_get_static_pad (bin->sub_bins[source_id].bin, "src");
if (!src_pad) {
NVGSTDS_ERR_MSG_V ("Failed to get src pad from '%s'",
GST_ELEMENT_NAME (bin->sub_bins[source_id].bin));
goto done;
}
mux_sink_pad = gst_pad_get_peer (src_pad); // each source has correponding sink in mux.
if (!mux_sink_pad) {
NVGSTDS_ERR_MSG_V ("Failed to get sink pad from streammux");
goto done;
}
gst_pad_send_event (mux_sink_pad, gst_event_new_flush_stop (FALSE));
/**
* set the new source bin to be sync with parent bin.
*/
if (!gst_element_sync_state_with_parent (bin->sub_bins[source_id].bin)) {
NVGSTDS_ERR_MSG_V ("Fail to sync state of new source bin with parent bin.");
goto done;
}
ret = TRUE;
done:
if (mux_sink_pad) {
gst_object_unref (mux_sink_pad);
}
if (src_pad) {
gst_object_unref (src_pad);
}
if (!ret) {
NVGSTDS_ERR_MSG_V ("%s failed", __func__);
}
return ret;
}
source deletion part
/**
* @brief Delete source element in pipeline by its source id.
*
* @param ctx [IN/OUT] application context pointer
* @param source_id [IN] id of the source to delete
*/
gboolean
delete_source_by_id (void *ctx, guint source_id) {
AppCtx *appCtx = (AppCtx *) ctx;
NvDsConfig *config = &appCtx->config;
GstPad *mux_sink_pad = NULL;
GstPad *src_pad = NULL;
gboolean ret = FALSE;
NvDsSrcParentBin *bin = &appCtx->pipeline.multi_src_bin;
GstState cur;
GstState pending;
GstStateChangeReturn state_return;
static GstClockTime timeout = GST_SECOND; // wait 1 second for state change.
// get src_pad of source and coresponding sink pad of streammux.
src_pad = gst_element_get_static_pad (bin->sub_bins[source_id].bin, "src");
if (!src_pad) {
NVGSTDS_ERR_MSG_V ("Failed to get src pad from '%s'",
GST_ELEMENT_NAME (bin->sub_bins[source_id].bin));
goto done;
}
mux_sink_pad = gst_pad_get_peer (src_pad); // each source has correponding sink in mux.
if (!mux_sink_pad) {
NVGSTDS_ERR_MSG_V ("Failed to get sink pad from streammux");
goto done;
}
// // The flush stop event will be sent synchronously for all downstream elements.
// // So we can make sure the buffered dataflow are handled correctly.
/**
* flush the pipeline to make sure buffered data on this source is handled.
* Modified from file: sources/apps/sample_apps/runtime_source_add_delete/deepstream_test_rt_src_add_del.c
* Function: stop_release_source()
*/
gst_pad_send_event (mux_sink_pad, gst_event_new_flush_stop (FALSE));
/**
* resume the pipeline if it's not PLAYING
*/
resume_pipeline (appCtx);
state_return = gst_element_get_state (appCtx->pipeline.pipeline, &cur, &pending,
timeout);
if (cur != GST_STATE_PLAYING) {
NVGSTDS_ERR_MSG_V ("Fail to resume pipeline, when deleting source id %d, current pipe" \
"line state: %s, state return: %s", source_id, gst_element_state_get_name (cur),
gst_element_state_change_return_get_name (state_return));
gst_pad_send_event (mux_sink_pad, gst_event_new_eos ()); // inform bus to delete it again.
goto done;
}
/**
* set the state of selected source bin to NULL
*/
state_return =
gst_element_set_state(bin->sub_bins[source_id].bin, GST_STATE_NULL);
switch (state_return) {
case GST_STATE_CHANGE_SUCCESS:
break;
case GST_STATE_CHANGE_FAILURE:
NVGSTDS_ERR_MSG_V ("STATE CHANGE FAILURE, source id %d.", source_id);
goto done;
break;
case GST_STATE_CHANGE_ASYNC:
state_return =
gst_element_get_state (bin->sub_bins[source_id].bin, &cur, &pending,
timeout);
// TODO: add a reasonable time out.
if (cur != GST_STATE_NULL) {
NVGSTDS_ERR_MSG_V ("The state of source %d is not NULL, but %s. state return: %s.",
source_id, gst_element_state_get_name (cur),
gst_element_state_change_return_get_name (state_return));
gst_pad_send_event (mux_sink_pad, gst_event_new_eos ()); // inform bus to delete it again.
goto done;
}
break;
case GST_STATE_CHANGE_NO_PREROLL:
NVGSTDS_INFO_MSG_V ("STATE CHANGE NO PREROLL, source id %d.", source_id);
break;
}
/**
* disconnect source bin from streammux sink
*/
if (!unlink_element_from_streammux_sink_pad (bin->streammux, bin->sub_bins[source_id].bin)) {
NVGSTDS_ERR_MSG_V ("Fail to unlink source from streammux.");
goto done;
}
g_usleep (G_USEC_PER_SEC / 10);
/**
* remove source bin.
* subelements in the source bin shuold be removed automatically.
*/
if (!gst_bin_remove ( GST_BIN (bin->bin), bin->sub_bins[source_id].bin)) {
NVGSTDS_ERR_MSG_V ("Failed to remove source bin.");
goto done;
}
/** clear bin_lock for rtsp type */
if (config->multi_source_config[source_id].type == NV_DS_SOURCE_RTSP) {
g_mutex_clear (&bin->sub_bins[source_id].bin_lock);
}
/** reset NvDsSourceBin */
memset (&bin->sub_bins[source_id], 0, sizeof(NvDsSrcBin));
bin->num_bins--;
/** reset stream info */
memset (&testAppCtx->streams[source_id], 0, sizeof(StreamSourceInfo));
/**
* remove source configurations.
*/
memset (&config->multi_source_config[source_id], 0, sizeof(NvDsSourceConfig));
config->num_source_sub_bins--;
NVGSTDS_INFO_MSG_V ("Successfully delte source by id: %d", source_id);
ret = TRUE;
done:
if (mux_sink_pad) {
gst_object_unref (mux_sink_pad);
}
if (src_pad) {
gst_object_unref (src_pad);
}
if (!ret) {
NVGSTDS_ERR_MSG_V ("%s failed", __func__);
}
return ret;
}
Attachment
deepstream_c2d_msg.c (5.1 KB) deepstream_app.c (51.8 KB) deepstream_test5_app_main.c (68.7 KB)