Handling GST errors in pipeline and recovering?

Setup

• Hardware Platform GPU
• DeepStream Version 7.0
• TensorRT Version 10.5.0.18
• Issue Type( questions, new requirements, bugs) Questions

Description

I have a python pipeline for which I would like to be able to detect erroneous videos when I’m trying to analyze them in the pipeline. I don’t quite understand de underlying error handling in GST / deepstream I guess, maybe you could help me clear things out?

Reproducible example
I’ve set up an example which can reproduce my problem, based on the sample deepstream_python_apps/apps/runtime_source_add_delete. I will provide my dockerfile so you can try it the same way I am running it

The small changes I made to the sample app is:

  • The “bus_call”-method: catching errors and trying to stop and release the source in which the error was caught. I also removed the “loop.quit()” as I want to be able to recover from the error rather than closing down the pipeline
  • Main takes more than 1 uri as source just by changing len(args) !=2 to len(args) > 1

The full code for the reproducible example is provided as a an archive so that you can just unzip and run the same way as I do. Hope it works:
reproduce_error_pipeline_test.zip (18.8 KB)

Corrupt file

When testing this I had a corrupt video giving the error: “no known streams found”, but for the purpose of reproducing this easily I also tried with just making an empty file with

touch empty.mp4

and giving it as input, it gives the same behaviour as if I provide a corrupt file (raising errors several times), only a different error message. So I hope this approach works to reproduce the problem. From now on, I use the empty file to display the behaviour.

I also provide the outputs as text files if you want to compare them.

Test happyflow

Running with 2 working source uri:s, ie running program like:

python3 runtime_source_add_delete/deepstream_rt_src_add_del_modified.py file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4 file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_office.mp4

Results in both sources being added and starts playing at the start, after which the addition/deletion of sources happen as expected.

Test 1 empty file
Running with one empty file like:

python3 runtime_source_add_delete/deepstream_rt_src_add_del_modified.py file:///opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/project/empty.mp4

Gives the error “stream contains no data”, which is reasonable given it is an empty file. It also says “STATE CHANGE SUCCESS” which makes me think the source is stopped and released as it should. However, it then tries to stop & release the source again it seems? And the pipeline hangs and won’t add any more sources as it should.

Test 1 working file and 1 corrupt file
Running with 1 legit file and one empty file like:

python3 runtime_source_add_delete/deepstream_rt_src_add_del_modified.py  file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4  file:///opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/project/empty.mp4

Gives the same problem, but the video renderer window is at least opened. But the legitimate source file won’t start playing either, so the whole pipeline is frozen it seems?

Questions
So my questions are:

  • It seems like the same error is raised several times. Why? And how should I go about handling them to avoid raising the same error several times?
  • Is this a reasonable approach to handle erroneous videos? I have a use case where I don’t know in advance which videos will have to be analyzed, so I need a way to handle it during the pipeline running. Is there another better way?
  • Are the sources being stopped/released in a faulty way? Do I need to add something to the bus_call to resume playing of the other sources (in the case of providing several sources)

Our demo removed the source after the playback was finished, so the EOS message was sent correctly before it was removed.
Could you try to send EOS message first when you receive the error message?
Also we suggest setting the batch-size of the nvstreammux to the number of your video sources.

Do you mean sending an EOS message like:

...
elif t == Gst.MessageType.ERROR:
    err, debug = message.parse_error()
    sys.stderr.write("Error: %s: %s\n" % (err, debug))
    pipeline.send_event(Gst.Event.new_eos())
elif t == Gst.MessageType.ELEMENT:
...

If that case there are two errors raised. The first one is the “stream contains no data”, and the second one is “Internal stream error”

Now playing...
1 :  file:///opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/project/empty.mp4
Starting pipeline 

Error: gst-stream-error-quark: Stream contains no data. (4): ../plugins/elements/gsttypefindelement.c(1154): gst_type_find_element_loop (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-00/GstDecodeBin:decodebin0/GstTypeFindElement:typefind:
Can't typefind empty stream
Error: gst-stream-error-quark: Internal data stream error. (1): ../plugins/elements/gsttypefindelement.c(1257): gst_type_find_element_loop (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-00/GstDecodeBin:decodebin0/GstTypeFindElement:typefind:
streaming stopped, reason error (-5)

And if I provide a legitimate file as well:

Now playing...
1 :  file:///opt/nvidia/deepstream/deepstream/sources/deepstream_python_apps/apps/project/empty.mp4
2 :  file:///opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.mp4
Starting pipeline 

Error: gst-stream-error-quark: Stream contains no data. (4): ../plugins/elements/gsttypefindelement.c(1154): gst_type_find_element_loop (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-00/GstDecodeBin:decodebin1/GstTypeFindElement:typefind:
Can't typefind empty stream
Error: gst-stream-error-quark: Internal data stream error. (1): ../plugins/elements/gsttypefindelement.c(1257): gst_type_find_element_loop (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-00/GstDecodeBin:decodebin1/GstTypeFindElement:typefind:
streaming stopped, reason error (-5)
Decodebin child added: qtdemux0 

Error: gst-stream-error-quark: This file contains no playable streams. (9): ../gst/isomp4/qtdemux.c(499): gst_qtdemux_post_no_playable_stream_error (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01/GstDecodeBin:decodebin0/GstQTDemux:qtdemux0:
no known streams found
Error: gst-stream-error-quark: This file contains no playable streams. (9): ../gst/isomp4/qtdemux.c(499): gst_qtdemux_post_no_playable_stream_error (): /GstPipeline:pipeline0/GstURIDecodeBin:source-bin-01/GstDecodeBin:decodebin0/GstQTDemux:qtdemux0:
no known streams found

And the legitimate video can’t start playing either.
So it feels like there is some part missing in how I should release the sources…

In theory, when one input video reports an error, this source is not really linked to nvstreammux. Therefore, you do not need to manually delete that.
But you need to pay attention to the logic in the demo code. In your scenario, you need to keep track of whether the source was actual added and use the real source_id when deleting it.

Hi! You are right, the “cb_newpad” is not called before the error, so there is no sink pad requested from the mux and the uridecodebin is not connected to the mux,. Its only added to the pipeline, so now I am at least not running into problems in regards to trying to release things that don’t need releasing…

But my “main problems” still remain:

  • When playing 2 sources and removing the one with an error, the other one is just frozen and never starts playing.
  • There is always another error raised right after the first , in the case of"Stream contains no data" the second error is “Internal data stream error”. Is this expected? Or could it be connected to why the other stream is not playing?

I still don’t understand how to get around this… any ideas?

We’ll check this issue.

Yes. This is an normal internal error handling process of Gstreamer. For some details, you can refer directly to the Gstreamer source code gsttypefindelement.c.

Thank you so much! Great to get confirmation that the second error is expected.

Looking forward to hearing back from you regarding the frozen video/pipeline!

When an error is reported, you need to follow these steps if you want to continue.

1. loop.quit
2. set the pipeline to NULL
3. remove the source_bin from the pipeline
4. set the pipeline to PLAYING
5. loop.run

Thank you!! Finally the other source is playing.

However, using this approach it seems something is still a bit funny; cause when the legitimate video is done playing, the pipeline is frozen again. (It doesn’t shut down as it would if I played 2 legitimate videos) Any ideas as to what that could depend on?

If you played 2 legitimate videos, there should be no problem because it will not trigger the error logic. You need to check the implementation logic of your code yourself.

No I mean in the case of playing one legitimate video and one corrupt video. So when the legitimate video is done playing, the pipeline should shut down but it freezes again.

I’ve noticed that even though I try sending EOS (when catching the error in bus_call), the streammux doesn’t report to “successfully handled EOS from source” for the corrupt source. I guessed that’s because of what you said earlier, that the erroneous source is never actually linked to the mux… but maybe that is somehow connected to the fact that the pipeline doesn’t shut down?

Is this the correct way of sending the EOS:

...
    elif t == Gst.MessageType.ERROR:
        err, debug = message.parse_error()
        sys.stderr.write("Error: %s: %s\n" % (err, debug))
        loop.quit()
        # Try to remove source causing error:
        if 'source-bin'in debug:
            # debug is a string like:
            #'../gst/isomp4/qtdemux.c(499): gst_qtdemux_post_no_playable_stream_error (): /GstPipeline:pipeline0/GstBin:source-bin-00/GstURIDecodeBin:uri-decode-bin/GstDecodeBin:decodebin0/GstQTDemux:qtdemux0:'
            # Get the different gst-elements present in debug message separated by /:
            gst_elements_trace = debug.split('/') 
            # Get the sourcebin-element:
            source_element = [el for el in gst_elements_trace if 'source-bin' in el].pop() 
            # Get the source id which is is at the end of the string:
            source_id = source_element.split('-')[-1] 
            sys.stderr.write(f"Trying to remove source {source_id} due to error")
            # Set pipeline state to NULL in order to remove elements
            pipeline.send_event(Gst.Event.new_eos())
            pipeline.set_state(Gst.State.NULL)
            erroneous_source_bin = Gst.Bin.get_by_name(pipeline, f"source-bin-{source_id}")
            pipeline.remove(erroneous_source_bin)
            pipeline.pipeline.set_state(Gst.State.PLAYING)
            loop.run()
    elif t == Gst.MessageType.ELEMENT and (source_state is not None and mux is not None):
...

The final stream-eos is never sent… so I still feel like there is something missing in the shutting down of the stream?

You’d better not call the loop in the bus_call thread. You can try to call that in the main thread after the ERROR.

The pipeline is running so the bus_call function is only called when a message is sent on the bus, right? So how do I know in the main thread when an error is caught by the bus_call thread?

In the sample apps, the bus_call uses loop.quit() so that’s why I thought I could do it this way. Would you be able to explain why it is problematic to use it this way?

Since bus_call function and main fcuntion are not in the same thread, you need to use multi-threaded programming to enable bus_call to notify the main thread when it receives an error.

Please refer to the GLib and Gstreamer bus.

Okay, is there any sample app or documentation in regards to communication from bus thread to main thread?

In the Gstreamer bus documentation that you posted above, it says: “It is important to know that the handler will be called in the thread context of the mainloop.”. It makes me a bit confused as to what you said about notifying the main thread when receiving an error?

No. We don’t have a demo like that right now. You can try exploring the asynchronous programming in python currently.

This is an unconventional approach itself. You can try to wrap DeepStream into a class and then stop or start it with external control.

I experimented a bit more and found my solution…! The approach you described above actually works as expected if I remove the “loop.quit()” and “loop.run()”. I also don’t seem to need the manual EOS.

So what I needed to fix my problem with pipeline being frozen at the end was, when catching error in “bus_call”, just:

1. set the pipeline to NULL
2. remove the source_bin from the pipeline (and flush & release streammux sinkpad)
3. set the pipeline to PLAYING

Now it works! Thanks for the help.

1 Like

This actually creates other issues it seems… if the videos I use end at the same time (ie their length is the exact same), I get the issue of shutting down pipeline prematurely (right after the first batch of streams are all finished). So the EOS is emitted and the app is exited. I guess because the new sources are not added fast enough or something?

I have added logic to ensure that new sources are added to the streammux when others are finished.

Is there a way to ensure that pipeline keeps running even though all sources are removed at the same time? So that the pipeline can “wait” for the new sources to be added

Actually right before the pipeline exits, the new source is added but it’s state change is returning ASYNC… so it seems it doesnt have time to start playing before EOS is emitted? :/ Do you have any suggestions as to what could be causing the ASYNC state for the new sources?

What happens is:

  1. all sources in batch 1 are shut down and released
  2. pipeline attempts to add new sources and start playing (stuck in ASYNC it seems?)
  3. I get Internal data stream error and other warnings like “pad not activated yet” => probably some issue with the state change?