Dynamically Enabling/Disabling Inference Branches in a DeepStream Pipeline

• Hardware Platform (Jetson / GPU) : NVIDIA Jetson AGX Orin
• DeepStream Version : 7.1
• JetPack Version (valid for Jetson only) : 6.1
• TensorRT Version : 8.6.2.3
• Issue Type( questions, new requirements, bugs) : question

Helo,

I have a Python DeepStream application with two inference branches, as illustrated in the diagram below. My input stream is split into two separate paths, each performing different inferences.

pipeline_diagram.png.zip (256.8 KB)

I need a way to dynamically enable or disable one of the branches based on some condition. Just for an example let’s take a time condition:
From 10:00 to 12:00, only the upper branch should be active.
From 12:00 to 14:00, only the lower branch should be active.

What I’ve Tried:

I have tried adding valve element at the beginning of each branch as changing parameter drop however it drops frames thus the “active” inference branch looses frame to perform inference on.
I also considered setting the interval parameter of the nvinfer element to a very high value (e.g., 2147483647) when a branch is inactive. However, this method still allows frames to pass through the pipeline and requires skipping them in a pad probe function, which is inefficient.

Desired Solution:

I am looking for a DeepStream element or GStreamer that allows me to completely bypass an entire branch at the earliest possible stage, ideally before the nvstreammux. Essentially, I want to prevent frames from being processed by an inference branch without affecting the rest of the pipeline. Also I would like not to drop any frames but rather provide some sort of a temporary sink that frames would go there.

Does DeepStream offer a built-in element for this, or is there a recommended approach for dynamically switching inference branches?

Thanks in advance!

So your purpose is to switch between the two models, right? Why not construct the pipeline as source → nvstreammux → PGIE0 → PGIE1 → fakesink? You can switch between the two models by setting the “interval” parameter to the highest value of the PGIE you don’t want.

@Fiona.Chen , thank you for your response.

Yes this is exactly what I want to achieve. However, I have a few concerns.

If I later expand my pipeline to include five models, would your proposed solution look like this?

source → nvstreammux → PGIE0 → PGIE1 → PGIE2 → PGIE3 → PGIE4 → fakesink

My concern is whether having multiple models in a single inference pipeline would introduce significant latency across the entire pipeline. Additionally, how flexible is this approach? For example, if I need to activate only PGIE0 and PGIE1 at one moment and then later switch to PGIE2, PGIE3, and PGIE4, will adjusting the interval parameter provide the expected behavior that I described in the main question?

Another question relates to metadata persistence. Each inference model has its own probe function, and some of them add metadata to the Gst Buffer. If PGIE0 and PGIE3 are both classification models that use pyds.NvDsClassifierMeta, will the metadata added by PGIE0 affect metadata in the probe function of PGIE3? Or does processing the inference result in a probe function make the metadata no longer available to subsequent models?

Realization About valve Element

I discovered that the valve element does not completely drop frames from the entire pipeline but only drops them in the branch where it is placed. This means that for a setup with multiple models, all inference branches must converge into a common sink to ensure smooth switching between them.

For example, the following approach does not work because each branch has its own independent sink and frame dropped in e.g. valve of PGIE0 does never reach its fakesink0 element which is added in the pipeline and this results in crushed pipeline:

source → nvstreammux → tee → valve → PGIE0 → fakesink0  
                         ├── valve → PGIE1 → fakesink1  
                         ├── valve → PGIE2 → fakesink2  

However, this alternative setup works because all branches merge into a single sink using a funnel element. When valve of PGIE0 keeps dropping frames, the frame is still in PGIE1 or PGIE2 branch which eventually reach the final sink. This solution does not break the pipeline:

source → nvstreammux → tee → valve → PGIE0 →  
                         ├── valve → PGIE1 → funnel → common_fakesink  
                         ├── valve → PGIE2 →  

The funnel element acts as an N-to-1 muxer, ensuring that frames from different branches are collected before reaching the final sink.

Issue with valve and Live Source (nvarguscamerasrc)

I implemented this approach and used the drop property of the valve element to control which models are active. According to gst-inspect, the drop property is:

Whether to drop buffers and events or let them through
flags: readable, writable, changeable in NULL, READY, PAUSED or PLAYING state
Boolean. Default: false

This means that I should be able to dynamically change the drop property while the pipeline is in PLAYING mode.

While this works with file-based sources (filesrc, videotestsrc), I encounter a pipeline crash when using a live camera source (nvarguscamerasrc) and modifying the drop property on the fly. The error message I get is:

[MY LOGGER] Current drop value: False
[MY LOGGER] After change drop value: True
Error generated. gstnvarguscamerasrc.cpp, execute:805 Failed to create CaptureSession
nvstreammux: Successfully handled EOS for source_id=0
(Argus) Error EndOfFile: Unexpected error in reading socket (in src/rpc/socket/client/ClientSocketManager.cpp, function recvThreadCore(), line 277)
(Argus) Error EndOfFile: Receiving thread terminated with error (in src/rpc/socket/client/ClientSocketManager.cpp, function recvThreadWrapper(), line 379)

This only happens when I modify drop inside a probe function while using a live source.

Question:

Why does this error occur when using nvarguscamerasrc? Is there a way to safely toggle the drop property while keeping the pipeline stable with a live source? I also tried setting valve element to PAUSED state and then changing the drop property however I encountered the same error.

Any insights would be greatly appreciated!

If your enable only one model by set “interval” as 0 and disable the other models by set “interval” as the highest value, there is no extra latency being introduced since gst-nvinfer plugin is a “in-place” transform plugin, when there is no inferencing happens, the buffers will be passthrough to downstream.

Do you want to use classifier models as PGIE? The classifier model should be used as SGIE which should follow some PGIE. Can you explain the real relationship of the five models?

@Fiona.Chen , thanks for your response.

I used five models as an example, but could you clarify which types of models are considered primary (PGIE) and which are secondary (SGIE)?

Currently, my real pipeline setup is as follows (as shown in the diagram):
pipeline.png.zip (508.6 KB)
• The two models at the bottom are segmentation models .
• The top model is a classifier .
• I also plan to add a detector based on RetinaNet.

Reasoning Behind This Pipeline Design

My goal was to create a parallel inference pipeline, similar to this NVIDIA AI IoT example. However, in my case, I need the ability to dynamically switch between different models based on external conditions.

Question About interval Parameter

Based on your explanation, the best way to toggle models would be by modifying the interval parameter, setting it to 0 for active models and to the maximum value for inactive ones. However your solution: source → nvstreammux → PGIE0 → PGIE1 → fakesink, does not perform parallel inferences and next inference is dependent on previous inference, right? So if once inference takes longer, the next one will be delayed.

However, while inspecting the nvinfer element, I noticed that interval cannot be changed while the pipeline is in the PLAYING state :

Specifies number of consecutive batches to be skipped for inference
flags: readable, writable, changeable only in NULL or READY state
Unsigned Integer. Range: 0 - 2147483647 Default: 0

Does this mean I need to stop the entire pipeline just to change the interval value? If so, is there any alternative method to switch between models dynamically without stopping the pipeline?

PGIE is the models which inferences the whole image, E. G. The detector which can detect persons in the picture and give the bboxes of all persons.
SGIE is the model which inferences objects. E.G. The classifier which can identify one person’s gender.

PGIE is used to detect all objects from the picture, SGIE identify the properties of the objects one by one.

The SGIE relies on PGIE to provide objects.

What kind of segmentation? For multiple objects in one image?

Classifier for what?

Only SGIE depends on PGIE. PGIE does not depends one each other. PGIEs are independent.

@Fiona.Chen thank you for your reply.

I have two segmentation models, each performing segmentation on a single object independently. Their results are not dependent on each other. However, after their inference, I use an nvstreammux element because I need to perform post-processing that relies on the outputs of both models. This is why, in my pipeline diagram, the two segmentation models are placed in separate branches, and their results are merged later to ensure I have access to both outputs for post-processing. In this case, is it better to have one inference after the other or perform parallel inference as shown in the diagram?

I’m not sure what you mean by this. The classifier in my pipeline does not depend on the results of any other model. Since it does not require outputs from a detector or segmentation model, I assume it should be a PGIE rather than an SGIE.

So, in a pipeline like this:

nvstreammux → PGIE0 → PGIE1 → PGIE2 → fakesink

If each PGIE has a probe function that takes some time to process, does that mean PGIE2 is not affected by the processing time of any previous PGIEs?

If that’s the case, how does this setup differ from the following parallel inference approach?

tee → nvstreammux0  → PGIE0 →  
  ├── nvstreammux1  → PGIE1 → funnel → common_fakesink  
  ├── nvstreammux2  → PGIE2→  

I believe the second example runs inference in parallel , whereas the first one does not. In the first example, the inference time (not post-processing time) of PGIE1 would affect PGIE2 - meaning if PGIE1 takes 20 seconds for inference, then PGIE2’s inference will start only after those 20 seconds (I am talking about 1st exmple). However, in the second example, each inference runs independently , so PGIE2 is not delayed by PGIE1. Is my understanding correct?

Then these two segmentation model should work as SGIEs. They depend on the PGIE detectors which provide needed objects.

No. It is the wrong way. As your description, your models have logic relationship, you can’t use them in difefrent branches. It will introduce extra effort to reconstruct the relationship between them. You should not use nvstreammux in this way. After PGIE and SGIEs, the detection results are already organized in frame meta and object meta, that is why I suggest you to put the models in single pipeline branch. The tee has no hope for your requirement. Nvstreammux is not used to align the branches.

I can explain my question. If I have a model which can classify the color of cars and I have a picture which have 5 cars in the picture, each car’s color is different to the other cars. If the model take the whole picture as the input and output all the 5 cars’ color, we call this model a “others” model in DeepStream since it is a detector+classifier model. If the model takes one car image or the batches car images(each image has only one car in it) as the input, and output the color of the car, we call this model a “classifier”.

If you are sure your “classifier” can be used as PGIE, please make sure the “classifier”'s output can be put in correct metadata.

PGIE2 's inferencing speed is not impacted by the other PGIEs. But the latency may be PGIE0 latency + PGIE1 latency.

The whole pipeline works in asynchronized mode. E.G. If there is only one stream source, the components handle the frames in sequence. When one frame reaches any component in the pipeline, it will be handled ASAP, suppose PGIE0, PGIE1 and PGIE2’s inferencing(for one frame) speed are similar, when the frame 0 reaches PGIE2, PGIE1 may handle frame 1, PGIE 0 may handles frame 2 in parallel. So the total frame handling speed are decided by the slowest PGIE in the pipeline.

The first one also runs in parallel.

It is horrible for a PGIE to take 20 second to handle one batch.

The most important thing is to explain the relationship of your 5 models. We have got 3 of them.

@Fiona.Chen thank you for detailed explanation.

I realized I may have been unclear in my previous messages, as I was providing examples rather than describing my actual case. Here’s my real scenario:

I have three models in my pipeline:

  1. Classifier – Determines whether arcing is present in an image. Since arcing is always caused by a single source, there can never be multiple arcings classified within a single frame. If detected, there will be exactly one arcing per image.

  2. Segmentation Model 1 – Detects a single horizontal line in the image. There is always one such specific horizontal line in the frame.

  3. Segmentation Model 2 – Detects a single vertical line in the image. Similar to the horizontal line, this vertical line is also specific and always appears as a single instance in the frame.

In my pipeline, I use nvstreammux after the two segmentation models. This is necessary because I need to combine their results and perform post-processing to determine where the two lines intersect.

I do not have a PGIE detector that provides objects for these segmentation models. The models process the entire image, which is only preprocessed by nvvideoconvert for cropping and nvinfer for resizing. There is no PGIE performing object detection beforehand.

The only relationship between these models is that their results are merged using nvstreammux. In the probe function of nvstreammux, I determine which mask corresponds to which segmentation model based on source_id:

source_id == 0 → Horizontal Line Segmentation

source_id == 1 → Vertical Line Segmentation

Here’s the relevant code snippet that i have as a probe function connected to nvstreammux element

pyds.nvds_acquire_meta_lock(batch_meta)

    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        # Process all user metadata to collect masks
        l_user = frame_meta.frame_user_meta_list
        while l_user is not None:
            try:
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
            except StopIteration:
                break

            if (
                user_meta
                and user_meta.base_meta.meta_type == pyds.NVDSINFER_SEGMENTATION_META
            ):
                try:
                    segmeta = pyds.NvDsInferSegmentationMeta.cast(
                        user_meta.user_meta_data
                    )
                except StopIteration:
                    break

                # Retrieve mask data in the numpy format from segmeta
                mask = np.array(
                    pyds.get_segmentation_masks(segmeta),
                    copy=True,
                    order="C",
                    dtype=np.float32,
                )

                # Determine which branch this mask came from using unique component ID
                source_id = frame_meta.source_id
                if source_id == 0:  # EDGE
                    mask_edge = np.squeeze(mask)
                    logger.debug(
                        f"Got EDGE mask for frame {frame_meta.frame_num} "
                        f"with shape: {mask_edge.shape}"
                    )
                elif source_id == 1:  # LINE
                    mask_line = np.squeeze(mask)
                    logger.debug(
                        f"Got LINE mask for frame {frame_meta.frame_num} "
                        f"with shape: {mask_line.shape}"
                    )
                else:
                    logger.warning(f"Unknown source_id: {source_id}")

            try:
                l_user = l_user.next
            except StopIteration:
                break
        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    pyds.nvds_release_meta_lock(batch_meta)

Since I can easily determine which mask belongs to which model using source_id, I don’t see a need to restructure the pipeline.

If I adjust my pipeline as you suggest—placing one segmentation model after the other—this would mean:

• The first segmentation model should be a PGIE (since it does not depend on any previous model).

• The second segmentation model should also be a PGIE, because its inference does not depend on the first segmentation model. Am I right? Would this be the correct approach?

My classifier determines whether arcing is present in the image. Since arcing is always caused by a single source, there cannot be multiple instances per frame (e.g., no cases where five different arcings are detected from separate sources like lamps or cars).

That was just an example—not my actual case! My real setup is now described above.

So in my case if i have 3 models, Which of the following two approaches is better for my case?

Option 1: Single Pipeline Branch (Sequential Processing)

nvstreammux → PGIE0 (Segmentation 1) → PGIE1 (Segmentation 2) → PGIE2 (Classifier) → fakesink

Option 2: Parallel Processing with tee (Multiple Branches)

tee → nvstreammux0 → PGIE0 (Segmentation 1) →  
  ├── nvstreammux1 → PGIE1 (Segmentation 2) → funnel → common_fakesink  
  ├── nvstreammux2 → PGIE2 (Classifier)     →  

Given that my models all perform inference in under 5ms, and my framerate is 60 FPS (meaning a new frame is generated every 16ms), I assume parallel execution should not be an issue.

If you want to do this, the two segmentations should share the same nvstreammux.

nvstreammux only combine streams into batches. If you use two nvstreammux, there will be duplicated metadata which may not be aligned.

No. Multiple PGIEs also work in one pipeline. We never say PGIE should work with SGIEs. PGIEs are independent.

Yes. The benefit is that the segmentation result will be put in the same metadata. You don’t need to align the frames.

I’ve explained how the frames are handled in the pipeline with multiple PGIEs in previous post.

Depends on the relationship between the models you use.

As for your case, the nvstreammux → PGIE0 → PGIE1 → PGIE2 pipeline is enough.

If you insist to use parallel pipeline, you may need the pipeline like deepstream_reference_apps/deepstream_parallel_inference_app at master · NVIDIA-AI-IOT/deepstream_reference_apps. The image in this repo has some problem, please refer to the following pipeline image:

@Fiona.Chen thank you very much for your insights! Your explanations have clarified several aspects I hadn’t considered before.

Returning to my original question—if I adjust my pipeline as follows:

nvstreammux → PGIE0 (Segmentation 1) → PGIE1 (Segmentation 2) → PGIE2 (Classifier) → fakesink

I want to dynamically alternate between the models based on the time of day:

During the day: PGIE0 (Segmentation 1) and PGIE1 (Segmentation 2) should perform inference, while PGIE2 (Classifier) should be inactive.

At night: PGIE2 (Classifier) should perform inference, while PGIE0 and PGIE1 should be inactive.

To achieve this, I can manipulate the interval property:

• Setting interval = 0 enables inference.

• Setting interval = 2147483647 effectively disables inference (since it skips nearly all frames).

By dynamically updating the interval property at runtime, I can ensure that only the selected models perform inference at any given time, while the others remain inactive without modifying the pipeline structure.

Would this approach be efficient, or is there a better way to handle model switching in DeepStream?

Yes. I think it is the most efficient and reliable way currently. The GStreamer tee and valve plugins should be used very carefully because valve not only drop the GstBuffer but also drop events, tee only clones GstBuffers but not copies GstBuffers to branches which means the GstBuffers in different branches share the same memory, this may cause some problems because most DeepStream plugins work in sharing buffer mode.

@Fiona.Chen Thank you for all explanations. I select your answer as a solution and close the topic.

Thank you! Wish it helps you!

1 Like

@Fiona.Chen
I have a question about extracting frames from the correct PGIE when both models perform segmentation.

Given the following pipeline:

nvstreammux → PGIE0 (Segmentation 1) → PGIE1 (Segmentation 2) → fakesink

With a probe function attached to fakesink, how can I distinguish whether an incoming frame originates from PGIE0 or PGIE1?

When debugging, I get the following output:

DEBUG - Num frames in batch: 1
DEBUG - Got source_id: 0 mask for frame 188 with shape: (224, 224)
DEBUG - Got source_id: 0 mask for frame 188 with shape: (224, 224)
DEBUG - Num frames in batch: 1
DEBUG - Got source_id: 0 mask for frame 188 with shape: (224, 224)
DEBUG - Got source_id: 0 mask for frame 188 with shape: (224, 224)

I indeed get one frame after the other, however first frame has data from first segmentation and second frame has data from second segmentation, is there a way i get data for both segmentations in single NvDsUserMeta?

It appears that source_id is not a reliable property for differentiating between the two segmentation models. How can I determine whether a frame comes from PGIE0 or PGIE1 during inference? Is there any example Python script similar to my case?

Below is the relevant snippet from my probe function:

    pyds.nvds_acquire_meta_lock(batch_meta)

    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        # Process all user metadata to collect masks
        l_user = frame_meta.frame_user_meta_list
        while l_user is not None:
            try:
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
            except StopIteration:
                break

            if (
                user_meta
                and user_meta.base_meta.meta_type == pyds.NVDSINFER_SEGMENTATION_META
            ):
                try:
                    segmeta = pyds.NvDsInferSegmentationMeta.cast(
                        user_meta.user_meta_data
                    )
                except StopIteration:
                    break

                # Retrieve mask data in the numpy format from segmeta
                mask = np.array(
                    pyds.get_segmentation_masks(segmeta),
                    copy=True,
                    order="C",
                    dtype=np.float32,
                )

                # Determine which branch this mask came from using unique component ID
                source_id = frame_meta.source_id
                if source_id == 0:  # EDGE
                    mask_edge = np.squeeze(mask)
                    logger.debug(
                        f"Got EDGE mask for frame {frame_meta.frame_num} "
                        f"with shape: {mask_edge.shape}"
                    )
                elif source_id == 1:  # LINE
                    mask_line = np.squeeze(mask)
                    logger.debug(
                        f"Got LINE mask for frame {frame_meta.frame_num} "
                        f"with shape: {mask_line.shape}"
                    )
                else:
                    logger.warning(f"Unknown source_id: {source_id}")

            try:
                l_user = l_user.next
            except StopIteration:
                break
        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    pyds.nvds_release_meta_lock(batch_meta)

When you set multiple PGIEs with gst-nvinfer configuration files, please set different “gie-unique-id” values to different PGIEs. Gst-nvinfer — DeepStream documentation

The inferencing result is stored in NvDsBatchMeta. I saw you got the NvDsInferSegmentationMeta from the tensor output of the NvDsFrameMeta. Please check the " unique_id" value in NvDsInferSegmentationMeta, it contains the information for which PGIE generates this NvDsInferSegmentationMeta.

Please raise new topic since this is new question.

1 Like

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.