How to dynamically add remove filesink

Standby… There is a bug in the system causing reasonably large memory leaks when you dynamically add/remove components to the pipeline. The nvidia team are investigating now. Once we have that sorted I’ll come back here and let you know how its done…

Hi, jasonpgf2a.
There is no matter for memory leaks problem. I want to dynamically add/remove sink rather than source. However there is no idea how to chieve it.
I would strongly appreciate it if you can share your code and give me some userful ideas. Thanks very much !

Hey jasonpgf2a, have you guys been able to resolve the memory leak issue? Do you have updated source you can share? We’re still on DS 4.0.1, and I’m aware DS 5.0 has that smart record feature, but it’s still in preview. Hope to hear back. Stay healthy, stay safe! TIA!

No never got to the bottom of the memory leak but it actually wasn’t a big deal for me. I use a python “controller” program to start and stop my deepstream app. The python controller can monitor system memory use… Once it getting down to around 10% remaining I just restart the deepstream app. You can control the deepstream app with the python subprocess module.

1 Like

btw. Its only a reasonably small leak - it took a lot of dynamic pipeline manipulations… adding the recording pipeline… removing the recording pipeline… over and over and over a hundred times for it to generate a problem.

awesome! are you able to share your updated deepstream_test3_app.c file after solving the issue with unplayable files as you posted on Dec 20?

That app I had is long gone… Its not in a working state as I moved on from it to jump onto the smart record band wagon. ;-)

But I can outline the exact steps you need to follow - note that if you can wait till July then DS5 is GA sometime around then…

In essence you need to:
(1) use a tee so that you can dynamically add a src pad when you want to start recording. Link the new tee src pad with your dynamic elements for recording… Call gst_element_sync_state_with_parent() on each new dynamic element.
Work out a way to stop recording. This could be via a timer - so for example - stop recording after 30 seconds.

(2) In your timer callback you add an IDLE probe on the tee src pad. When the idle probes callback is called you need to unlink the dynamic pipeline elements from the tee’s src pad. Then send an EOS message down that unlinked pipeline. This bit is the most important as it allows the file muxer to finalise the file.

(3) Now you need a way to capture that eos and remove all the dynamic elements. You can do that in your bus_call function.

I’ve added code snippets below.

(1) Assuming you want to record sources separately AND you want the bounding boxes in the video - then you need to put a tee element in your pipeline after the streamdemuxer.
When you link you’ll have just the one src pad on that tee which will go to whatever components you want running all the time. If there are none, then you need to finalise with a fakesink.

(2) When you want to start recording a file you can call a function like this. instances is an array of structs where I store details for each source:

char *
start_recording (guint source_id)
{
  GstPad *sinkpad;
  GstPadTemplate *templ;
 
  // Get tee pad on the dynamic tee for adding new sub-pipeline
  // The dyanmic tee already has src pad 0 linked to the fakesink so this one will be called src_1_n
  // where n is the source id.
  templ = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (instances[source_id].dynamic_tee), "src_%u");
  instances[source_id].teepad = gst_element_request_pad (instances[source_id].dynamic_tee, templ, NULL, NULL);

  instances[source_id].queue_record = gst_element_factory_make ("queue", NULL);
  instances[source_id].file_convert = gst_element_factory_make ("nvvideoconvert", NULL);
  instances[source_id].caps_filter = gst_element_factory_make ("capsfilter", NULL);
#ifdef RECORDING_CLIP_CODEC_H265
  instances[source_id].encoder = gst_element_factory_make ("nvv4l2h265enc", NULL);
  instances[source_id].parser = gst_element_factory_make ("h265parse", NULL);
#else 
  instances[source_id].encoder = gst_element_factory_make ("nvv4l2h264enc", NULL);
  instances[source_id].parser = gst_element_factory_make ("h264parse", NULL);
#endif
  instances[source_id].muxer = gst_element_factory_make ("qtmux", NULL);

  gchar file_bin_name[20] = { };
  g_snprintf (file_bin_name, 20, "filesink-%02d", source_id);
  instances[source_id].filesink = gst_element_factory_make ("filesink", file_bin_name);

  if (!instances[source_id].teepad || !instances[source_id].queue_record || !instances[source_id].file_convert ||
    !instances[source_id].caps_filter || !instances[source_id].encoder || !instances[source_id].parser ||
    !instances[source_id].muxer || !instances[source_id].filesink) {

    g_printerr ("Error creating recording pipeline elements for source = %d", source_id);
    return NULL;
  }

  /*
   * We use a timestamp for the filename with the real camera_id as a suffix.
   * The camera_ids array holds the actual camera_id indexed by the source_id.
   */
  char ts_buff[20];
  time_t now = time (NULL);
  strftime(ts_buff, 20, "%Y-%m-%d %H:%M:%S", localtime(&now));
  char *file_name = (char*) malloc(255 * sizeof(char)); // caller must free!
  char *full_file_name = (char*) malloc(255 * sizeof(char));
  int camera_id = camera_ids[source_id];
  sprintf(file_name, "%s-%d.mp4", ts_buff, camera_id);
  sprintf(full_file_name, "%s%s-%d.mp4", file_path, ts_buff, camera_id);

  //g_print ("Recording to file %s for %d seconds\n", full_file_name, RECORDING_CLIP_DURATION);

  /* Set element properties */
  g_object_set (G_OBJECT (instances[source_id].queue_record), "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", 0, NULL);

  SET_GPU_ID (instances[source_id].file_convert, 0);
  g_object_set (G_OBJECT (instances[source_id].encoder), "bufapi-version", TRUE, NULL);
  g_object_set (G_OBJECT (instances[source_id].encoder), "iframeinterval", dynamicParams.file_encoder_iframeinterval, NULL);
  g_object_set (G_OBJECT (instances[source_id].encoder), "maxperf-enable", TRUE, NULL);
  g_object_set (G_OBJECT (instances[source_id].encoder), "control-rate", dynamicParams.file_encoder_constant, NULL); //1-constant, 0-variable
  g_object_set (G_OBJECT (instances[source_id].encoder), "bitrate", dynamicParams.file_encoder_bitrate, NULL);

  if (dynamicParams.file_encoder_constant == 0) { //set peak bitrate when running VBR
    g_object_set (G_OBJECT (instances[source_id].encoder), "peak-bitrate", dynamicParams.file_encoder_peak_bitrate, NULL);
  }

  g_object_set (G_OBJECT (instances[source_id].encoder), "qos", TRUE, NULL); //handle qos events from downstream elements
  //g_object_set (G_OBJECT (instances[source_id].encoder), "MeasureEncoderLatency", TRUE, NULL);

  g_object_set (G_OBJECT (instances[source_id].filesink), "location", full_file_name, NULL);
  g_object_set (G_OBJECT (instances[source_id].filesink), "sync", FALSE, "max-lateness", -1, "async", FALSE, NULL);
  g_object_set (G_OBJECT (instances[source_id].filesink), "qos", TRUE, NULL);

  // variable bitrate : peak-rate is 1.2* higher by default
  // Currently testing with fixed rate: g_object_set (G_OBJECT (instances[source_id].encoder), "control-rate", 0, NULL); 

  GstCaps *caps = gst_caps_from_string ("video/x-raw(memory:NVMM), format=(string)I420");
  g_object_set (G_OBJECT (instances[source_id].caps_filter), "caps", caps, NULL);
  gst_caps_unref (caps);

  gst_bin_add_many (GST_BIN (pipeline),
    gst_object_ref (instances[source_id].queue_record), 
    gst_object_ref (instances[source_id].file_convert), 
    gst_object_ref (instances[source_id].caps_filter), 
    gst_object_ref (instances[source_id].encoder), 
    gst_object_ref (instances[source_id].parser), 
    gst_object_ref (instances[source_id].muxer), 
    gst_object_ref (instances[source_id].filesink), NULL);

  gst_element_link_many (instances[source_id].queue_record, instances[source_id].file_convert, 
    instances[source_id].caps_filter, instances[source_id].encoder, instances[source_id].parser, 
    instances[source_id].muxer, instances[source_id].filesink, NULL);

  gst_element_sync_state_with_parent (instances[source_id].queue_record);
  gst_element_sync_state_with_parent (instances[source_id].file_convert);
  gst_element_sync_state_with_parent (instances[source_id].caps_filter);
  gst_element_sync_state_with_parent (instances[source_id].encoder);
  gst_element_sync_state_with_parent (instances[source_id].parser);
  gst_element_sync_state_with_parent (instances[source_id].muxer);
  gst_element_sync_state_with_parent (instances[source_id].filesink);

  // Get sink pad on the first element of the recording sub-pipeline - link it to the teepad
  // Don't unref the teepad here as its used when disconnecting the recording sub-pipeline.
  sinkpad = gst_element_get_static_pad (instances[source_id].queue_record, "sink");
  gst_pad_link (instances[source_id].teepad, sinkpad);

  gst_object_unref (sinkpad);

  instances[source_id].recording = TRUE;
  instances[source_id].removing = FALSE;

  memset (instances[source_id].last_file_name, '\0', sizeof (instances[source_id].last_file_name));
  strcpy (instances[source_id].last_file_name, full_file_name);

  // You need to set a timer here (and pass it the source id) if you want to have a fixed
  // recording duration - like 30 seconds, etc.
  // Alternatively you could have a stop_recording() function.
  // Use something like:  g_timeout_add_seconds (RECORDING_CLIP_DURATION, timeout_cb_0, NULL);
     
  g_free (full_file_name);

  return file_name;
}

(3) In the timer callback… or stop_recording():

static gboolean
timeout_cb_0 (gpointer data)
{
  // get your source id out of data!

  //g_print ("\nStop Recording - source 0\n"); 
  gst_pad_add_probe (instances[0].teepad, GST_PAD_PROBE_TYPE_IDLE, unlink_cb, GINT_TO_POINTER (source_id), NULL);
  return FALSE; //cancels the timeout - makes it oneshot
}

(4)

static GstPadProbeReturn
unlink_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  /*
   * DEBUG: Output pipeline graph. Set env var first: `export GST_DEBUG_DUMP_DOT_DIR=pipeline_graphs/`
   * Convert exported grpah using `dot -T{format} input_file > output_file`.
   */
  GST_DEBUG_BIN_TO_DOT_FILE (GST_BIN (pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-before-unlink");

  gint source_id = GPOINTER_TO_INT (user_data);
  if (!g_atomic_int_compare_and_exchange (&instances[source_id].removing, FALSE, TRUE)) {
    return GST_PAD_PROBE_OK;
  }

  //g_print ("Unlinking and sending EOS to recording pipeline to finalise file. Source ID = %d\n", source_id);

  GstPad *sinkpad;
  sinkpad = gst_element_get_static_pad (instances[source_id].queue_record, "sink");
  if (!gst_pad_unlink (instances[source_id].teepad, sinkpad)) {
    g_printerr ("Error unlinking dynamic tee pad from queue_record element");
  }
  gst_object_unref (sinkpad);
 
  gst_element_release_request_pad (instances[source_id].dynamic_tee, instances[source_id].teepad);
  gst_object_unref (instances[source_id].teepad);

  gst_element_send_event (instances[source_id].encoder, gst_event_new_eos());

  //g_print ("Unlinked source = %d\n", source_id);
  return GST_PAD_PROBE_REMOVE;
}

(5) in bus_call add the following as a new case option. Set message-forwarding to true on the pipeline for this to work:

    /* Since message-forwarding has been set on the overall pipeline; when the EOS
     * is sent to the unlinked recording pipeline and makes its way to the sink, it
     * will be captured here. We can now safely remove the pipeline elements. */
    case GST_MESSAGE_ELEMENT:
      ; // This empty statement is necessary as C does not allow declarations immediately after a block statement
      const GstStructure *s = gst_message_get_structure (msg);

      if (gst_structure_has_name (s, "GstBinForwarded")) {
        GstMessage *forward_msg = NULL;
	gst_structure_get (s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
	if (GST_MESSAGE_TYPE (forward_msg) == GST_MESSAGE_EOS) {
          // Extract which recording pipelines (source id) this message is from based on the
	  // sink name (e.g. object name is 'filesink-00'i means source_id=0)
          //g_print ("EOS from element %s\n", GST_OBJECT_NAME (GST_MESSAGE_SRC (forward_msg))); 
	  gchar file_bin_name[20] = { };
          strcpy (file_bin_name, gst_object_get_name (GST_MESSAGE_SRC (forward_msg)));
          int source_id_value;
          sscanf (file_bin_name, "%*[^0-9]%d", &source_id_value);

	  remove_recording_pipeline (source_id_value);
	}

	gst_message_unref (forward_msg);
      }
      break;

Hopefully this is clear enough for you to get it working… It is pretty much what smart record does anyway except they have wrapped the dynamic components into a bin. ;-)

2 Likes

Yeah, we’re waiting for DS5.0 to go GA and get its kinks worked out before we migrate our deployed DS 4.0.2 app, but this is great starting point in the meantime - we’ll give it a shot! Thanks!

hi jasonpgf2a, we’ve been busy and haven’t had a chance to take a closer look at this until now. considering we want the recording of the same output on the tiled display of all sources together, where would we put the tee element in this case?

thanks!

After the tiler.

gotcha, and so similarly, we’ll have one permanent fakesink for that tee to finalise since we don’t have anything else that runs everytime and then whenever we need to record, we just hook on to this tee?

thats right…

Hi @jasonpgf2a,I’m using deepstream-4.0.2 and met the same problem when periodically add or delete sources to deepstream pipeline during runtime. After 40+ add and 40+ delete there has cuda failure (memory leak.) Have you solved the problem and do you know deepstream 5.0 solve the memory leak problem or not? Thank you!

Hi @weiweifu, I solved the problem by just having a python control script.
In the python script I launch my deepstream app and then continually monitor free memory on the system. When it gets down below 10% I just restart the deepstream app.

If you have engine files configured then the restart is very quick.

I have also noticed that in DS5.0dp the memory leaks are far less. I still have a sneaking suspicion that there is a small leak but small enough not to worry about now. ;-)
I have not done a detailed analysis with valgrind - don’t have a desktop Ubuntu system to play with at the moment.

Thank you so much! It really helps a lot! Wish you have a good day!

Thank you for sharing your knowledge!

I did it and it worked.
But I am also trying to save the audio and for some reason the branch is hanging.

hi jasonpgf2a, we got the dynamic tee working based on your recommendations here. However, today, the message forwarding of the EOS stopped working - have you seen this and do you have any idea what might be causing the bus callback from not being called?

Also, without moving to DS 5.0 and using smart recording feature, do you have any suggestions on how to achieve some type of buffering so dynamic recording actually starts a few seconds from the time it gets triggered?

Thanks again for all your help!

re: eos not working - haven’t seen that problem on DS4 or 5.0.

Re: SR buffer. Print out a pipeline graph and you will see how nvidia do it. It looks to be just a normal gstreamer queue with appropriate settings.

Thanks once again for your feedback!

We got past the eos issue - code was tweaked that changed where the message forwarding was set which caused the issue. However, now, we’re running into this issue when re-initiating a new recording after the initial one got triggered and persisted to mp4 file:

Cuda failure: status=700
nvbufsurface: Error(-1) in releasing cuda memory

Have you ran across these errors before? So, basically, a detection triggers a recording and it completes successfully (mp4 is playable), and then another detection triggers the recording but now, the errors above come up.

I haven’t seen that error either. Are you unlinking the pipeline - setting the state and then removing all the dynamic elements?