How to dynamically add remove filesink

In deepstream-app I’d like to dynamically disable/enable the filesink and wondering what the best way to do this is? Reading the gstreamer documentation on dynamic pipeline manipulation, it seems I need to block the src pad upstream from the file sink element and then set the filesink to paused and then remove the filesink. Then I can add it again when needed.
The purpose is to be able to write files only when an upstream nvinfer element has made a detection.

You can try to drop the buffers using a buffer probe just before the filesink or encoder when no objects are present in the frame. Install a pad probe on the sink pad of the encoder and check if there are objects in the metadata and return GST_PAD_PROBE_OK or GST_PAD_PROBE_DROP

Have done that but it does not work as expected. It will still just record one file with gaps in it. So would be a good way to do time-lapse for example… However what I require is a new file to be written on each detection.

ie detection1.mp4 then detection2.mp4, etc.

To do this I believe we need to do dynamic pipeline manipulation.
We could stream to tee with a fakesink ater it and then when a detection is made we add a new pad to the tee followed by a filesink with my preferred filename. Then after 10 seconds or so remove the filesink and tee pad. And so on… But - need to pass an EOS to the filesink for it to finalise writing the file (otherwise you get an unreadable file).

I see you have recently provided an example of how to dynamically add new sources - would be great if you could do the same with sinks. ;-)

I see you have recently provided an example of how to dynamically add new sources
which example are u referring?

In here: https://github.com/NVIDIA-AI-IOT/deepstream_reference_apps

Will check it internally.

There are 2 options to implement this feature:
1.You can set filesink component in NULL state instead of removing and adding from the pipeline. Then u can set the new filename on the filesink component and put the component back in previous state using gst_element_sync_state_with_parent() API.
2.If u want to write to different files then u can install a probe on the sinkpad of fakesink component and then can implement the code to write different detections to different files, u will require to change the filename in the probe based on it’s requirement.

Thanks @bcao, will test this and come back… Previous testing though without deepstream - just with a simple gstreamer pipeline, showed that the filesink needed an EOS to finalise the file, otherwise the generated files were not viewable.
I’ll let you know. ;-)

Also I had edit comment #7 and add one more option.

@bcao have you got a working program for you suggestions from comment 7? If I try this with a standard gstreamer pipeline (no deepstream elements) it doesn’t work.

What I do have working is the following:

videotestsrc ! tee name=t. ! videoconvert ! autovidesink

At runtime I can dynamically add a new t pad with a queue ! x264enc ! mp4mux ! filesink
Set the filesink filename and then:

    gst_element_sync_state_with_parent(queue);
    gst_element_sync_state_with_parent(encoder);
    gst_element_sync_state_with_parent(muxer);
    gst_element_sync_state_with_parent(filesink);

This seems to work great in plain gstreamer and I can dynamically add recording to file at any time and can cancel the recording as well via an idle probe on the tee with the callback:

Unlink the tee pad from the sink pad
Send an EOS to the unlinked pipeline via: gst_element_send_event(encoder, gst_event_new_eos());
Then set the states of all the unlinked elements to NULL and unref() them…

Sending the EOS is mandatory otherwise the muxer does not finalise the file and you end up with unreadable mp4 files.

Now I want to alter the above to use deepstream components. So I start by simply replacing the autovidesink with nveglglessink but as soon as I do this I get unreadable mp4 files - so somehow it totally stuffs up the file writing taking place in the other tee pipeline.

Any ideas?

Here’s the code for a simple gstreamer pipeline that allows you to dynamically add/remove file recording (attached). This code works well and creates playable files.

Also the same code merged into the deepstream-test3 app. This doesn’t create playable files and complains about dropped buffers. The EOS doesn’t seem to be finalising the file here??
dynamic-recording.c (6.14 KB)
deepstream_test3_app.c (20.2 KB)

1 Like

I’ve managed to solve this. Was missing a queue after all the tee branches and also a h264 parse element on the file recording branch of the tee. When you don’t have queues after tee branches you get strange timing issues which leads to dropped frames all over the place. Also needed to set sync = false on all sinks and live-source to true on the stream muxer.

Strange how it would work with plain gstreamer elements (no deepstream components) without the h264 parse elements and still create playable mp4 files. I think the mp4mux element must do the parsing internally. In my deepstream pipelines I use the qtmux element and it obviously needs a preceding h264parse element (which is how its done in the deepstream-app reference app).

I’m happy to share the code for anyone that wants it - just ping me here.

So I now have a modified deepstream app based on deepstream-test3 which will accept any number of incoming rtsp streams and tile them onto a display. When a detection is made by the standard pgie it dynamically adds the necessary file recording elements and saves a mp4 for 10 seconds then stops and unlink the dynamic elements. And over and over for new detection’s.

Next step will be to integrate this logic into the deepstream-app reference app so that it will work for any input sources and any number of outputs/sinks. Ideally I’d like a config option that says: record_file_on_detections and it will record each individual input stream as a separate mp4 file (whereas right now I just record the tiled output).
I’d also like to queue the footage so I record 2 or 3 seconds before the events occur. Not sure if this can be done by just increasing the buffers on the standard queue element.

I’m still a little surprised that this capability is not built in as a core feature especially with Nvidia targeting deepstream for IoT scenarios. In most use-case I can think of you don’t want to be full-time streaming from an edge device and using up huge bandwith - you only want to record or stream on interesting stuff (when detections are made).

ps. I’ll leave this thread open and update it as I progress in case someone else has this use-case…

2 Likes

Hi jasonpgf2a, have you achieved this function in deepstream-app?
I am doing same job, but have some trouble. It would be great if you can share your code about this. Thanks!

Standby… There is a bug in the system causing reasonably large memory leaks when you dynamically add/remove components to the pipeline. The nvidia team are investigating now. Once we have that sorted I’ll come back here and let you know how its done…

Hi, jasonpgf2a.
There is no matter for memory leaks problem. I want to dynamically add/remove sink rather than source. However there is no idea how to chieve it.
I would strongly appreciate it if you can share your code and give me some userful ideas. Thanks very much !

Hey jasonpgf2a, have you guys been able to resolve the memory leak issue? Do you have updated source you can share? We’re still on DS 4.0.1, and I’m aware DS 5.0 has that smart record feature, but it’s still in preview. Hope to hear back. Stay healthy, stay safe! TIA!

No never got to the bottom of the memory leak but it actually wasn’t a big deal for me. I use a python “controller” program to start and stop my deepstream app. The python controller can monitor system memory use… Once it getting down to around 10% remaining I just restart the deepstream app. You can control the deepstream app with the python subprocess module.

1 Like

btw. Its only a reasonably small leak - it took a lot of dynamic pipeline manipulations… adding the recording pipeline… removing the recording pipeline… over and over and over a hundred times for it to generate a problem.

awesome! are you able to share your updated deepstream_test3_app.c file after solving the issue with unplayable files as you posted on Dec 20?

That app I had is long gone… Its not in a working state as I moved on from it to jump onto the smart record band wagon. ;-)

But I can outline the exact steps you need to follow - note that if you can wait till July then DS5 is GA sometime around then…

In essence you need to:
(1) use a tee so that you can dynamically add a src pad when you want to start recording. Link the new tee src pad with your dynamic elements for recording… Call gst_element_sync_state_with_parent() on each new dynamic element.
Work out a way to stop recording. This could be via a timer - so for example - stop recording after 30 seconds.

(2) In your timer callback you add an IDLE probe on the tee src pad. When the idle probes callback is called you need to unlink the dynamic pipeline elements from the tee’s src pad. Then send an EOS message down that unlinked pipeline. This bit is the most important as it allows the file muxer to finalise the file.

(3) Now you need a way to capture that eos and remove all the dynamic elements. You can do that in your bus_call function.

I’ve added code snippets below.

(1) Assuming you want to record sources separately AND you want the bounding boxes in the video - then you need to put a tee element in your pipeline after the streamdemuxer.
When you link you’ll have just the one src pad on that tee which will go to whatever components you want running all the time. If there are none, then you need to finalise with a fakesink.

(2) When you want to start recording a file you can call a function like this. instances is an array of structs where I store details for each source:

char *
start_recording (guint source_id)
{
  GstPad *sinkpad;
  GstPadTemplate *templ;
 
  // Get tee pad on the dynamic tee for adding new sub-pipeline
  // The dyanmic tee already has src pad 0 linked to the fakesink so this one will be called src_1_n
  // where n is the source id.
  templ = gst_element_class_get_pad_template (GST_ELEMENT_GET_CLASS (instances[source_id].dynamic_tee), "src_%u");
  instances[source_id].teepad = gst_element_request_pad (instances[source_id].dynamic_tee, templ, NULL, NULL);

  instances[source_id].queue_record = gst_element_factory_make ("queue", NULL);
  instances[source_id].file_convert = gst_element_factory_make ("nvvideoconvert", NULL);
  instances[source_id].caps_filter = gst_element_factory_make ("capsfilter", NULL);
#ifdef RECORDING_CLIP_CODEC_H265
  instances[source_id].encoder = gst_element_factory_make ("nvv4l2h265enc", NULL);
  instances[source_id].parser = gst_element_factory_make ("h265parse", NULL);
#else 
  instances[source_id].encoder = gst_element_factory_make ("nvv4l2h264enc", NULL);
  instances[source_id].parser = gst_element_factory_make ("h264parse", NULL);
#endif
  instances[source_id].muxer = gst_element_factory_make ("qtmux", NULL);

  gchar file_bin_name[20] = { };
  g_snprintf (file_bin_name, 20, "filesink-%02d", source_id);
  instances[source_id].filesink = gst_element_factory_make ("filesink", file_bin_name);

  if (!instances[source_id].teepad || !instances[source_id].queue_record || !instances[source_id].file_convert ||
    !instances[source_id].caps_filter || !instances[source_id].encoder || !instances[source_id].parser ||
    !instances[source_id].muxer || !instances[source_id].filesink) {

    g_printerr ("Error creating recording pipeline elements for source = %d", source_id);
    return NULL;
  }

  /*
   * We use a timestamp for the filename with the real camera_id as a suffix.
   * The camera_ids array holds the actual camera_id indexed by the source_id.
   */
  char ts_buff[20];
  time_t now = time (NULL);
  strftime(ts_buff, 20, "%Y-%m-%d %H:%M:%S", localtime(&now));
  char *file_name = (char*) malloc(255 * sizeof(char)); // caller must free!
  char *full_file_name = (char*) malloc(255 * sizeof(char));
  int camera_id = camera_ids[source_id];
  sprintf(file_name, "%s-%d.mp4", ts_buff, camera_id);
  sprintf(full_file_name, "%s%s-%d.mp4", file_path, ts_buff, camera_id);

  //g_print ("Recording to file %s for %d seconds\n", full_file_name, RECORDING_CLIP_DURATION);

  /* Set element properties */
  g_object_set (G_OBJECT (instances[source_id].queue_record), "max-size-buffers", 0, "max-size-bytes", 0, "max-size-time", 0, NULL);

  SET_GPU_ID (instances[source_id].file_convert, 0);
  g_object_set (G_OBJECT (instances[source_id].encoder), "bufapi-version", TRUE, NULL);
  g_object_set (G_OBJECT (instances[source_id].encoder), "iframeinterval", dynamicParams.file_encoder_iframeinterval, NULL);
  g_object_set (G_OBJECT (instances[source_id].encoder), "maxperf-enable", TRUE, NULL);
  g_object_set (G_OBJECT (instances[source_id].encoder), "control-rate", dynamicParams.file_encoder_constant, NULL); //1-constant, 0-variable
  g_object_set (G_OBJECT (instances[source_id].encoder), "bitrate", dynamicParams.file_encoder_bitrate, NULL);

  if (dynamicParams.file_encoder_constant == 0) { //set peak bitrate when running VBR
    g_object_set (G_OBJECT (instances[source_id].encoder), "peak-bitrate", dynamicParams.file_encoder_peak_bitrate, NULL);
  }

  g_object_set (G_OBJECT (instances[source_id].encoder), "qos", TRUE, NULL); //handle qos events from downstream elements
  //g_object_set (G_OBJECT (instances[source_id].encoder), "MeasureEncoderLatency", TRUE, NULL);

  g_object_set (G_OBJECT (instances[source_id].filesink), "location", full_file_name, NULL);
  g_object_set (G_OBJECT (instances[source_id].filesink), "sync", FALSE, "max-lateness", -1, "async", FALSE, NULL);
  g_object_set (G_OBJECT (instances[source_id].filesink), "qos", TRUE, NULL);

  // variable bitrate : peak-rate is 1.2* higher by default
  // Currently testing with fixed rate: g_object_set (G_OBJECT (instances[source_id].encoder), "control-rate", 0, NULL); 

  GstCaps *caps = gst_caps_from_string ("video/x-raw(memory:NVMM), format=(string)I420");
  g_object_set (G_OBJECT (instances[source_id].caps_filter), "caps", caps, NULL);
  gst_caps_unref (caps);

  gst_bin_add_many (GST_BIN (pipeline),
    gst_object_ref (instances[source_id].queue_record), 
    gst_object_ref (instances[source_id].file_convert), 
    gst_object_ref (instances[source_id].caps_filter), 
    gst_object_ref (instances[source_id].encoder), 
    gst_object_ref (instances[source_id].parser), 
    gst_object_ref (instances[source_id].muxer), 
    gst_object_ref (instances[source_id].filesink), NULL);

  gst_element_link_many (instances[source_id].queue_record, instances[source_id].file_convert, 
    instances[source_id].caps_filter, instances[source_id].encoder, instances[source_id].parser, 
    instances[source_id].muxer, instances[source_id].filesink, NULL);

  gst_element_sync_state_with_parent (instances[source_id].queue_record);
  gst_element_sync_state_with_parent (instances[source_id].file_convert);
  gst_element_sync_state_with_parent (instances[source_id].caps_filter);
  gst_element_sync_state_with_parent (instances[source_id].encoder);
  gst_element_sync_state_with_parent (instances[source_id].parser);
  gst_element_sync_state_with_parent (instances[source_id].muxer);
  gst_element_sync_state_with_parent (instances[source_id].filesink);

  // Get sink pad on the first element of the recording sub-pipeline - link it to the teepad
  // Don't unref the teepad here as its used when disconnecting the recording sub-pipeline.
  sinkpad = gst_element_get_static_pad (instances[source_id].queue_record, "sink");
  gst_pad_link (instances[source_id].teepad, sinkpad);

  gst_object_unref (sinkpad);

  instances[source_id].recording = TRUE;
  instances[source_id].removing = FALSE;

  memset (instances[source_id].last_file_name, '\0', sizeof (instances[source_id].last_file_name));
  strcpy (instances[source_id].last_file_name, full_file_name);

  // You need to set a timer here (and pass it the source id) if you want to have a fixed
  // recording duration - like 30 seconds, etc.
  // Alternatively you could have a stop_recording() function.
  // Use something like:  g_timeout_add_seconds (RECORDING_CLIP_DURATION, timeout_cb_0, NULL);
     
  g_free (full_file_name);

  return file_name;
}

(3) In the timer callback… or stop_recording():

static gboolean
timeout_cb_0 (gpointer data)
{
  // get your source id out of data!

  //g_print ("\nStop Recording - source 0\n"); 
  gst_pad_add_probe (instances[0].teepad, GST_PAD_PROBE_TYPE_IDLE, unlink_cb, GINT_TO_POINTER (source_id), NULL);
  return FALSE; //cancels the timeout - makes it oneshot
}

(4)

static GstPadProbeReturn
unlink_cb (GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  /*
   * DEBUG: Output pipeline graph. Set env var first: `export GST_DEBUG_DUMP_DOT_DIR=pipeline_graphs/`
   * Convert exported grpah using `dot -T{format} input_file > output_file`.
   */
  GST_DEBUG_BIN_TO_DOT_FILE (GST_BIN (pipeline), GST_DEBUG_GRAPH_SHOW_ALL, "pipeline-before-unlink");

  gint source_id = GPOINTER_TO_INT (user_data);
  if (!g_atomic_int_compare_and_exchange (&instances[source_id].removing, FALSE, TRUE)) {
    return GST_PAD_PROBE_OK;
  }

  //g_print ("Unlinking and sending EOS to recording pipeline to finalise file. Source ID = %d\n", source_id);

  GstPad *sinkpad;
  sinkpad = gst_element_get_static_pad (instances[source_id].queue_record, "sink");
  if (!gst_pad_unlink (instances[source_id].teepad, sinkpad)) {
    g_printerr ("Error unlinking dynamic tee pad from queue_record element");
  }
  gst_object_unref (sinkpad);
 
  gst_element_release_request_pad (instances[source_id].dynamic_tee, instances[source_id].teepad);
  gst_object_unref (instances[source_id].teepad);

  gst_element_send_event (instances[source_id].encoder, gst_event_new_eos());

  //g_print ("Unlinked source = %d\n", source_id);
  return GST_PAD_PROBE_REMOVE;
}

(5) in bus_call add the following as a new case option. Set message-forwarding to true on the pipeline for this to work:

    /* Since message-forwarding has been set on the overall pipeline; when the EOS
     * is sent to the unlinked recording pipeline and makes its way to the sink, it
     * will be captured here. We can now safely remove the pipeline elements. */
    case GST_MESSAGE_ELEMENT:
      ; // This empty statement is necessary as C does not allow declarations immediately after a block statement
      const GstStructure *s = gst_message_get_structure (msg);

      if (gst_structure_has_name (s, "GstBinForwarded")) {
        GstMessage *forward_msg = NULL;
	gst_structure_get (s, "message", GST_TYPE_MESSAGE, &forward_msg, NULL);
	if (GST_MESSAGE_TYPE (forward_msg) == GST_MESSAGE_EOS) {
          // Extract which recording pipelines (source id) this message is from based on the
	  // sink name (e.g. object name is 'filesink-00'i means source_id=0)
          //g_print ("EOS from element %s\n", GST_OBJECT_NAME (GST_MESSAGE_SRC (forward_msg))); 
	  gchar file_bin_name[20] = { };
          strcpy (file_bin_name, gst_object_get_name (GST_MESSAGE_SRC (forward_msg)));
          int source_id_value;
          sscanf (file_bin_name, "%*[^0-9]%d", &source_id_value);

	  remove_recording_pipeline (source_id_value);
	}

	gst_message_unref (forward_msg);
      }
      break;

Hopefully this is clear enough for you to get it working… It is pretty much what smart record does anyway except they have wrapped the dynamic components into a bin. ;-)

1 Like