Running image fusion on Jetson Orin Nano with deepstream python apps

Please provide complete information as applicable to your setup.

• Hardware Platform: Jetson Orin Nano 8GB
• DeepStream Version: 6.3.0
• JetPack Version: 5.1.2
• TensorRT Version:8.5
• NVIDIA GPU Driver Version: CUDA:11.4, CUDNN:8.6

I am currently utilizing DeepStream Python applications to explore and adapt pipelines for my specific use case. My objective involves implementing an image fusion model that combines thermal and optical images of identical resolution. The model processes an input image where both modalities are concatenated channel-wise after conversion from RGB to grayscale, resulting in each modality having one channel. Consequently, the model’s input format is (1, 2, H, W), where H=W=640 in my setup. The model generates a fused image of the same resolution but with a single channel output format of (1, 1, H, W).

To facilitate this, I have converted a PyTorch-trained model to ONNX and subsequently built a TensorRT engine with fp16 precision, following NVIDIA’s documentation. Despite being new to DeepStream, I have encountered an issue.

Specifically, I am attempting to customize the deepstream-test3 pipeline without success. My requirement involves reading two streams: one from a thermal camera channel and another from an optical camera channel. These streams have differing resolutions, necessitating their alignment to a common resolution. Subsequently, I aim to perform image registration using a homography matrix. Each frame will then be converted to grayscale and concatenated along the channel dimension before inference.

Additionally, I seek guidance on replacing a Caffe model with the engine file I have created. What configuration parameters should I utilize for this transition? Furthermore, could you advise on the appropriate pipeline configuration to execute the image fusion process in this scenario? Any relevant example or documentation suggestion would be helpful.

Thanks in advance!

Could you draw a simple pipeline for your feature?
Also what’s the specific format of your two sources(png, jpeg, video)?

Since you are using onnx model, we have many examples of config file for the onnx model configs/nvinfer.

@yuweiw Thank you so much for the assistance.

The sources are videos, i.e. URI(.mp4, .h264 etc) or RTSP. The first video is from RGB camera and the second one is from thermal camera. I think I have problem in creating the correct pipeline since I am the beginner in deepstream. However, the pipeline I created based on my understand is attached in pdf file here, i have no idea if it is correct because it is not working. I would appreciate any corrections you suggest or relevant examples for that.

And I have built the engine file by following documentation here to convert pytorch trained model to onnx then engine file.
I want to use that engine file for inference. Could I use it? What will be correct configuration for it?
thanks in advance!
pipeline.pdf (20.9 KB)

Yes. You can use that. I have attached many config files before, you can refer to that. You can also configure your onnx model directly in the file and let deepstream generate an engine file for you.
But I sugguet you learn some basic infomation about nvinfer to configure the file according your needs.

About the pipeline you attached, you can try the pipeline below.

source1->nvvideoconvert->

                          nvcompositor->nvstreammux->nvinfer->tiler->osd->sink

source2->nvvideoconvert->

@yuweiw thank you so much.
I have followed your pipeline, I am attaching python file that I modified from deepstream-test3. I think I need to add some more features in the pipeline.
I need to convert each source frames into grayscale (to get one channel), and then I have to concatenated two sources (in grayscale) to form an input with 2 channels (1 form source and other from source2), this is the expected input to the model. However, I am getting error: vinfer gstnvinfer.cpp:676:gst_nvinfer_logger: NvDsInferContext[UID 1]: Error in NvDsInferContextImpl::preparePreprocess() <nvdsinfer_context_impl.cpp:974> [UID = 1]: RGB/BGR input format specified but network input channels is not 3. The network expect 2 channel input with (B, C, H, W) format.

below is the complete error log:

0:00:00.466760630  3996     0x2b19b610 WARN                 nvinfer gstnvinfer.cpp:887:gst_nvinfer_start:<primary-inference> warning: NvInfer output-tensor-meta is enabled but init_params auto increase memory (auto-inc-mem) is disabled. The bufferpool will not be automatically resized.
0:00:00.466873276  3996     0x2b19b610 WARN                 nvinfer gstnvinfer.cpp:679:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::initialize() <nvdsinfer_context_impl.cpp:1174> [UID = 1]: Warning, OpenCV has been deprecated. Using NMS for clustering instead of cv::groupRectangles with topK = 20 and NMS Threshold = 0.5
WARNING: [TRT]: Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.
0:00:03.644780928  3996     0x2b19b610 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1988> [UID = 1]: deserialized trt engine from :/home/jetson/Faizan/fusion/Saferail-AI/tensorrt_files/tardal-dt.trt
WARNING: [TRT]: The getMaxBatchSize() function should not be used with an engine built from a network created with NetworkDefinitionCreationFlag::kEXPLICIT_BATCH flag. This function will always return 1.
INFO: [Implicit Engine Info]: layers num: 2
0   INPUT  kHALF  image           2x640x640       
1   OUTPUT kHALF  fused           1x640x640       

0:00:03.845326426  3996     0x2b19b610 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2091> [UID = 1]: Use deserialized engine model: /home/jetson/Faizan/fusion/Saferail-AI/tensorrt_files/tardal-dt.trt
0:00:03.845405022  3996     0x2b19b610 ERROR                nvinfer gstnvinfer.cpp:676:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Error in NvDsInferContextImpl::preparePreprocess() <nvdsinfer_context_impl.cpp:974> [UID = 1]: RGB/BGR input format specified but network input channels is not 3
ERROR: Infer Context prepare preprocessing resource failed., nvinfer error:NVDSINFER_CONFIG_FAILED
0:00:03.853297880  3996     0x2b19b610 WARN                 nvinfer gstnvinfer.cpp:898:gst_nvinfer_start:<primary-inference> error: Failed to create NvDsInferContext instance
0:00:03.853347994  3996     0x2b19b610 WARN                 nvinfer gstnvinfer.cpp:898:gst_nvinfer_start:<primary-inference> error: Config file path: image_fusion_config.txt, NvDsInfer Error: NVDSINFER_CONFIG_FAILED
Warning: gst-library-error-quark: NvInfer output-tensor-meta is enabled but init_params auto increase memory (auto-inc-mem) is disabled. The bufferpool will not be automatically resized. (5): /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvinfer/gstnvinfer.cpp(887): gst_nvinfer_start (): /GstPipeline:pipeline0/GstNvInfer:primary-inference
Error: gst-resource-error-quark: Failed to create NvDsInferContext instance (1): /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvinfer/gstnvinfer.cpp(898): gst_nvinfer_start (): /GstPipeline:pipeline0/GstNvInfer:primary-inference:
Config file path: image_fusion_config.txt, NvDsInfer Error: NVDSINFER_CONFIG_FAILED
Exiting app

image_fusion_config.txt (2.7 KB)

the code i am using

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
from pathlib import Path
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.FPS import PERF_DATA

import pyds

no_display = False
silent = False
file_loop = False
perf_data = None

MAX_DISPLAY_LEN=64
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1


# pgie_src_pad_buffer_probe  will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def pgie_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    # num_rects=0
    # got_fps = False
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        
        if not silent:
            print("Frame Number=", frame_number)
            # print("Frame Number=", frame_number, "Number of Objects=",num_rects,"Vehicle_count=",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person_count=",obj_counter[PGIE_CLASS_ID_PERSON])

        # Update frame rate through this probe
        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)

        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)



def create_source_bin(index, uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    if file_loop:
        # use nvurisrcbin to enable file-loop
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin

def main(args, requested_pgie=None, config=None, disable_probe=False):
    global perf_data
    perf_data = PERF_DATA(len(args))

    number_sources=len(args)

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    
    # creating videoconverter elements
    print("Creating nvvconverter elements \n")
    converter1 = Gst.ElementFactory.make("nvvideoconvert", "converter1")
    converter2 = Gst.ElementFactory.make("nvvideoconvert", "converter2")
    if not converter1 and not converter2:
        sys.stderr.write(" Unable to create video converters \n")
    pipeline.add(converter1)
    pipeline.add(converter2)
    vid_converters = [converter1, converter2]
    
    # connecting sources to nvvideoconvert
    print("Creating source elements and connecting with nvvconverter element in the piplline \n")
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        padname="sink"
        sinkpad= vid_converters[i].get_static_pad(padname) 
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
        
    # create vidoecompositor element
    print("Creating nvvcompositor \n ")
    compositor = Gst.ElementFactory.make("nvcompositor", "compositor")
    if not compositor:
        sys.stderr.write(" Unable to create nvcompositor \n")
        
    # Create nvstreammux instance to form batches from one or more sources.
    print("Creating streamux \n ")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")
   

    print("Creating Pgie \n ")
    if requested_pgie != None and (requested_pgie == 'nvinferserver' or requested_pgie == 'nvinferserver-grpc') :
        pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    elif requested_pgie != None and requested_pgie == 'nvinfer':
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    else:
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

    if not pgie:
        sys.stderr.write(" Unable to create pgie :  %s\n" % requested_pgie)

    if disable_probe:
        # Use nvdslogger for perf measurement instead of probe function
        print ("Creating nvdslogger \n")
        nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
        
   
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    nvosd.set_property('display-text',OSD_DISPLAY_TEXT)

    if file_loop:
        if is_aarch64():
            # Set nvbuf-memory-type=4 for aarch64 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 4)
        else:
            # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 2)

    if no_display:
        print("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)
    else:
        if is_aarch64():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                sys.stderr.write(" Unable to create nv3dsink \n")
        else:
            print("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")
    if not sink:
        sys.stderr.write(" Unable to create sink element \n")

    if is_live:
        print("At least one of the sources is live")
        streammux.set_property('live-source', 1)
    
    print("setting streamux properties \n")
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)
    
    if requested_pgie == "nvinferserver" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinferserver-grpc" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinfer" and config != None:
        pgie.set_property('config-file-path', config)
    else:
        pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
   
   # tiler properties
    print("Setting tiler properties \n")
    tiler_rows=int(math.sqrt(number_sources))
    tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos",0)

    print("Adding elements to Pipeline \n")
    # adding elements in the pipeline that have not been added above
    pipeline.add(compositor)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(tiler)
    pipeline.add(nvosd)
    pipeline.add(sink)
    

    print("Linking elements in the Pipeline \n")
    converter1.link(compositor)
    converter2.link(compositor)
    compositor.link(streammux)
    streammux.link(pgie)
    pgie.link(tiler)
    tiler.link(nvosd)
    nvosd.link(sink)
    
    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    pgie_src_pad=pgie.get_static_pad("src")
    if not pgie_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        if not disable_probe:
            pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
            # perf callback function to print fps every 5 sec
            GLib.timeout_add(5000, perf_data.perf_print_callback)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.State.NULL)

def parse_args():

    parser = argparse.ArgumentParser(prog="deepstream_test_3",
                    description="deepstream-test3 multi stream, multi model inference reference app")
    parser.add_argument(
        "-i",
        "--input",
        help="Path to input streams",
        nargs="+",
        metavar="URIs",
        default=["a"],
        required=True,
    )
    parser.add_argument(
        "-c",
        "--configfile",
        metavar="config_location.txt",
        default=None,
        help="Choose the config-file to be used with specified pgie",
    )
    parser.add_argument(
        "-g",
        "--pgie",
        default=None,
        help="Choose Primary GPU Inference Engine",
        choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
    )
    parser.add_argument(
        "--no-display",
        action="store_true",
        default=False,
        dest='no_display',
        help="Disable display of video output",
    )
    parser.add_argument(
        "--file-loop",
        action="store_true",
        default=False,
        dest='file_loop',
        help="Loop the input file sources after EOS",
    )
    parser.add_argument(
        "--disable-probe",
        action="store_true",
        default=False,
        dest='disable_probe',
        help="Disable the probe function and use nvdslogger for FPS",
    )
    parser.add_argument(
        "-s",
        "--silent",
        action="store_true",
        default=False,
        dest='silent',
        help="Disable verbose output",
    )
    # Check input arguments
    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()

    stream_paths = args.input
    pgie = args.pgie
    config = args.configfile
    disable_probe = args.disable_probe
    global no_display
    global silent
    global file_loop
    no_display = args.no_display
    silent = args.silent
    file_loop = args.file_loop

    if config and not pgie or pgie and not config:
        sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
        parser.print_help()
        sys.exit(1)
    if config:
        config_path = Path(config)
        if not config_path.is_file():
            sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
            sys.exit(1)

    print(vars(args))
    return stream_paths, pgie, config, disable_probe

if __name__ == '__main__':
    stream_paths, pgie, config, disable_probe = parse_args()
    sys.exit(main(stream_paths, pgie, config, disable_probe))

You need to add capsfilter plugin after the nvvideoconvert plugin to change the image to GRAY. You can refer to caps to learn how to create a caps.

@yuweiw I did that, but still I got the following error.

0:00:00.209130188 24549     0x26489b90 WARN                 nvinfer gstnvinfer.cpp:887:gst_nvinfer_start:<primary-inference> warning: NvInfer output-tensor-meta is enabled but init_params auto increase memory (auto-inc-mem) is disabled. The bufferpool will not be automatically resized.
0:00:00.209245458 24549     0x26489b90 WARN                 nvinfer gstnvinfer.cpp:679:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::initialize() <nvdsinfer_context_impl.cpp:1174> [UID = 1]: Warning, OpenCV has been deprecated. Using NMS for clustering instead of cv::groupRectangles with topK = 20 and NMS Threshold = 0.5
WARNING: [TRT]: Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.
0:00:03.469997272 24549     0x26489b90 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1988> [UID = 1]: deserialized trt engine from :/home/jetson/Faizan/fusion/Saferail-AI/tensorrt_files/tardal-dt.trt
WARNING: [TRT]: The getMaxBatchSize() function should not be used with an engine built from a network created with NetworkDefinitionCreationFlag::kEXPLICIT_BATCH flag. This function will always return 1.
INFO: [Implicit Engine Info]: layers num: 2
0   INPUT  kHALF  image           2x640x640       
1   OUTPUT kHALF  fused           1x640x640       

0:00:03.674593702 24549     0x26489b90 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2091> [UID = 1]: Use deserialized engine model: /home/jetson/Faizan/fusion/Saferail-AI/tensorrt_files/tardal-dt.trt
0:00:03.674661642 24549     0x26489b90 ERROR                nvinfer gstnvinfer.cpp:676:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Error in NvDsInferContextImpl::preparePreprocess() <nvdsinfer_context_impl.cpp:983> [UID = 1]: GRAY input format specified but network input channels is not 1.
ERROR: Infer Context prepare preprocessing resource failed., nvinfer error:NVDSINFER_CONFIG_FAILED
0:00:03.682417822 24549     0x26489b90 WARN                 nvinfer gstnvinfer.cpp:898:gst_nvinfer_start:<primary-inference> error: Failed to create NvDsInferContext instance
0:00:03.682475521 24549     0x26489b90 WARN                 nvinfer gstnvinfer.cpp:898:gst_nvinfer_start:<primary-inference> error: Config file path: image_fusion_config.txt, NvDsInfer Error: NVDSINFER_CONFIG_FAILED
Warning: gst-library-error-quark: NvInfer output-tensor-meta is enabled but init_params auto increase memory (auto-inc-mem) is disabled. The bufferpool will not be automatically resized. (5): /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvinfer/gstnvinfer.cpp(887): gst_nvinfer_start (): /GstPipeline:pipeline0/GstNvInfer:primary-inference
Error: gst-resource-error-quark: Failed to create NvDsInferContext instance (1): /dvs/git/dirty/git-master_linux/deepstream/sdk/src/gst-plugins/gst-nvinfer/gstnvinfer.cpp(898): gst_nvinfer_start (): /GstPipeline:pipeline0/GstNvInfer:primary-inference:
Config file path: image_fusion_config.txt, NvDsInfer Error: NVDSINFER_CONFIG_FAILED
Exiting app

how can I debug the pipeline and check the frame shape and format at each stage. Also my model takes 2 channel input which is not either grayscale , RGB, or BGR. How would I handle that? I think the model by default takes RGB/BGR input.

Did you set the “model-color-format” to 2?

Our nvinfer plugin is open source sources\gst-plugins\gst-nvinfer. You can refer to the Gst-Nvinfer source code diagram to learn that if you want to get the input of each stage.

@yuweiw yes, but I have also set the model color format to 1 (RGB), but it was not working as well since the model takes 2 channel input rather than 3 or 1 (gray or RGB/BGR). Should I build the engine file again that takes 3 channel input like I can use first two channels in the model forward method and discard the last channel. Atleast it will work. Or would you suggest writing some custom function?

So your requirement is not to merge two images into one and use it as input to the model. Do you want to put the gray image 1 to channel 1 and gray image 2 to channel 2?
We cannot support that scenario currently. We’ll discuss this requirement. Can you describe in detail what this model is used for?

@yuweiw Basically I am using TarDAL (infrared and optical fusion algorithm), the algorithm takes two frames/images as input. These images are RGB, but the model requirement is converting each image to grayscale and concatenating them along channel dimension. As mentioned here. It loads RGB data and convert it to grayscale, and passes the grayscale image to model for inference to fuse the informatoin in both modalities. The fused result again is grayscale, it then convert it to RGB using YCBR color conversoin scheme.

Therefore, I want to put grayscale infrared image at channel 1 and grayscale optical image at channel 2, and I can put some random data at channel 3. That can be discarded in the forward method and first two channel can be used as required by the model.

import torch
import torch.nn as nn
from torch import Tensor


class Generator(nn.Module):
    r"""
    Use to generate fused images.
    ir + vi -> fus
    """

    def __init__(self, dim: int = 32, depth: int = 3):
        super(Generator, self).__init__()
        self.depth = depth

        self.encoder = nn.Sequential(
            nn.Conv2d(2, dim, (3, 3), (1, 1), 1),
            nn.BatchNorm2d(dim),
            nn.ReLU()
        )

        self.dense = nn.ModuleList([
            nn.Sequential(
                nn.Conv2d(dim * (i + 1), dim, (3, 3), (1, 1), 1),
                nn.BatchNorm2d(dim),
                nn.ReLU()
            ) for i in range(depth)
        ])

        self.fuse = nn.Sequential(
            nn.Sequential(
                nn.Conv2d(dim * (depth + 1), dim * 4, (3, 3), (1, 1), 1),
                nn.ReLU()
            ),
            nn.Sequential(
                nn.Conv2d(dim * 4, dim * 2, (3, 3), (1, 1), 1),
                nn.BatchNorm2d(dim * 2),
                nn.ReLU()
            ),
            nn.Sequential(
                nn.Conv2d(dim * 2, dim, (3, 3), (1, 1), 1),
                nn.BatchNorm2d(dim),
                nn.ReLU()
            ),
            nn.Sequential(
                nn.Conv2d(dim, 1, (3, 3), (1, 1), 1),
                nn.Tanh()
            ),
        )

    def forward(self, images) -> Tensor:
        # images are 3 channel input but we need only the first two
        image = images[:, :2, :, :]
        x = self.encoder(image)
        for i in range(self.depth):
            t = self.dense[i](x)
            x = torch.cat([x, t], dim=1)
        fus = self.fuse(x)
        return fus

this way i can utilize deepstream RGB model because it has 3 channel input? otherwise passing 2 channel input will not work. What do you think? I will build the onnx file again.

Yes. You can try that. Also you can try to use nvdspreprocess to customize your own tensor data without changing your model.
You can refer to our demo deepstream-pose-classification to learn how to customize your own tensor data with nvdspreprocess.

1 Like

@yuweiw thank you. The pipeline runs with 3 Channel input as I suggested. However, the problem is I can’t see anything at the output. Here is output logs:

Now playing...
0 :  file:////home/jetson/Faizan/fusion/Saferail-AI/videos/2_optical.mp4
1 :  file:////home/jetson/Faizan/fusion/Saferail-AI/videos/2_thermal.mp4
Starting pipeline 

0:00:00.478708656  4193     0x1938d0f0 WARN                 nvinfer gstnvinfer.cpp:679:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::initialize() <nvdsinfer_context_impl.cpp:1174> [UID = 1]: Warning, OpenCV has been deprecated. Using NMS for clustering instead of cv::groupRectangles with topK = 20 and NMS Threshold = 0.5
WARNING: [TRT]: Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.
0:00:04.422968692  4193     0x1938d0f0 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1988> [UID = 1]: deserialized trt engine from :/home/jetson/Faizan/fusion/Saferail-AI/tensorrt_files/tardal-dt-3c.trt
WARNING: [TRT]: The getMaxBatchSize() function should not be used with an engine built from a network created with NetworkDefinitionCreationFlag::kEXPLICIT_BATCH flag. This function will always return 1.
INFO: [Implicit Engine Info]: layers num: 2
0   INPUT  kHALF  image           3x640x640       
1   OUTPUT kHALF  fused           1x640x640       

0:00:04.644763547  4193     0x1938d0f0 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2091> [UID = 1]: Use deserialized engine model: /home/jetson/Faizan/fusion/Saferail-AI/tensorrt_files/tardal-dt-3c.trt
0:00:04.678277655  4193     0x1938d0f0 INFO                 nvinfer gstnvinfer_impl.cpp:328:notifyLoadModelStatus:<primary-inference> [UID 1]: Load new model:image_fusion_config.txt sucessfully
Decodebin child added: source 

Decodebin child added: decodebin0 

Decodebin child added: source 

Decodebin child added: decodebin1 

Decodebin child added: mpegpsdemux1 

Decodebin child added: mpegpsdemux0 

Decodebin child added: multiqueue0 

Decodebin child added: h265parse0 

Decodebin child added: capsfilter0 

Decodebin child added: multiqueue1 

Decodebin child added: h265parse1 

Decodebin child added: capsfilter1 

Decodebin child added: nvv4l2decoder1 
Decodebin child added: nvv4l2decoder0 


Opening in BLOCKING MODE 
Opening in BLOCKING MODE 
NvMMLiteOpen : Block : BlockType = 279 
NvMMLiteOpen : Block : BlockType = 279 
NvMMLiteBlockCreate : Block : BlockType = 279 
NvMMLiteBlockCreate : Block : BlockType = 279 
In cb_newpad

In cb_newpad

gstname= video/x-raw
gstname= video/x-raw
features= <Gst.CapsFeatures object at 0xffff8e646460 (GstCapsFeatures at 0xfffef805dce0)>
features= <Gst.CapsFeatures object at 0xffff8e646e80 (GstCapsFeatures at 0xffff04028ca0)>

**PERF:  {'stream0': 0.0, 'stream1': 0.0} 


**PERF:  {'stream0': 0.0, 'stream1': 0.0} 


The complete code fie and config file attached as well.
image_fusion_config.txt (2.7 KB)

the code:

#!/usr/bin/env python3

################################################################################
# SPDX-FileCopyrightText: Copyright (c) 2019-2022 NVIDIA CORPORATION & AFFILIATES. All rights reserved.
# SPDX-License-Identifier: Apache-2.0
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
################################################################################

import sys
sys.path.append('../')
from pathlib import Path
import gi
import configparser
import argparse
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from ctypes import *
import time
import sys
import math
import platform
from common.is_aarch_64 import is_aarch64
from common.bus_call import bus_call
from common.FPS import PERF_DATA

import pyds

no_display = False
silent = False
file_loop = False
perf_data = None

MAX_DISPLAY_LEN=64
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE= 0
OSD_DISPLAY_TEXT= 1


# pgie_src_pad_buffer_probe  will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def pgie_src_pad_buffer_probe(pad,info,u_data):
    frame_number=0
    # num_rects=0
    # got_fps = False
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return
    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting is done by pyds.NvDsFrameMeta.cast()
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number=frame_meta.frame_num
        
        if not silent:
            print("Frame Number=", frame_number)
            # print("Frame Number=", frame_number, "Number of Objects=",num_rects,"Vehicle_count=",obj_counter[PGIE_CLASS_ID_VEHICLE],"Person_count=",obj_counter[PGIE_CLASS_ID_PERSON])

        # Update frame rate through this probe
        stream_index = "stream{0}".format(frame_meta.pad_index)
        global perf_data
        perf_data.update_fps(stream_index)

        try:
            l_frame=l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK



def cb_newpad(decodebin, decoder_src_pad,data):
    print("In cb_newpad\n")
    caps=decoder_src_pad.get_current_caps()
    if not caps:
        caps = decoder_src_pad.query_caps()
    gststruct=caps.get_structure(0)
    gstname=gststruct.get_name()
    source_bin=data
    features=caps.get_features(0)

    # Need to check if the pad created by the decodebin is for video and not
    # audio.
    print("gstname=",gstname)
    if(gstname.find("video")!=-1):
        # Link the decodebin pad only if decodebin has picked nvidia
        # decoder plugin nvdec_*. We do this by checking if the pad caps contain
        # NVMM memory features.
        print("features=",features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad=source_bin.get_static_pad("src")
            if not bin_ghost_pad.set_target(decoder_src_pad):
                sys.stderr.write("Failed to link decoder src pad to source bin ghost pad\n")
        else:
            sys.stderr.write(" Error: Decodebin did not pick nvidia decoder plugin.\n")

def decodebin_child_added(child_proxy,Object,name,user_data):
    print("Decodebin child added:", name, "\n")
    if(name.find("decodebin") != -1):
        Object.connect("child-added",decodebin_child_added,user_data)

    if "source" in name:
        source_element = child_proxy.get_by_name("source")
        if source_element.find_property('drop-on-latency') != None:
            Object.set_property("drop-on-latency", True)



def create_source_bin(index, uri):
    print("Creating source bin")

    # Create a source GstBin to abstract this bin's content from the rest of the
    # pipeline
    bin_name="source-bin-%02d" %index
    print(bin_name)
    nbin=Gst.Bin.new(bin_name)
    if not nbin:
        sys.stderr.write(" Unable to create source bin \n")

    # Source element for reading from the uri.
    # We will use decodebin and let it figure out the container format of the
    # stream and the codec and plug the appropriate demux and decode plugins.
    if file_loop:
        # use nvurisrcbin to enable file-loop
        uri_decode_bin=Gst.ElementFactory.make("nvurisrcbin", "uri-decode-bin")
        uri_decode_bin.set_property("file-loop", 1)
        uri_decode_bin.set_property("cudadec-memtype", 0)
    else:
        uri_decode_bin=Gst.ElementFactory.make("uridecodebin", "uri-decode-bin")
    if not uri_decode_bin:
        sys.stderr.write(" Unable to create uri decode bin \n")
    # We set the input uri to the source element
    uri_decode_bin.set_property("uri",uri)
    # Connect to the "pad-added" signal of the decodebin which generates a
    # callback once a new pad for raw data has beed created by the decodebin
    uri_decode_bin.connect("pad-added",cb_newpad,nbin)
    uri_decode_bin.connect("child-added",decodebin_child_added,nbin)

    # We need to create a ghost pad for the source bin which will act as a proxy
    # for the video decoder src pad. The ghost pad will not have a target right
    # now. Once the decode bin creates the video decoder and generates the
    # cb_newpad callback, we will set the ghost pad target to the video decoder
    # src pad.
    Gst.Bin.add(nbin,uri_decode_bin)
    bin_pad=nbin.add_pad(Gst.GhostPad.new_no_target("src",Gst.PadDirection.SRC))
    if not bin_pad:
        sys.stderr.write(" Failed to add ghost pad in source bin \n")
        return None
    return nbin



def main(args, requested_pgie=None, config=None, disable_probe=False):
    global perf_data
    perf_data = PERF_DATA(len(args))

    number_sources=len(args)
    enc_type = 0 # hardware encoder for software encoder enc_type=1

    # Standard GStreamer initialization
    Gst.init(None)

    # Create gstreamer elements */
    # Create Pipeline element that will form a connection of other elements
    print("Creating Pipeline \n ")
    pipeline = Gst.Pipeline()
    is_live = False

    if not pipeline:
        sys.stderr.write(" Unable to create Pipeline \n")
    
    # creating videoconverter elements
    print("Creating nvvconverter elements \n")
    converter1 = Gst.ElementFactory.make("nvvideoconvert", "converter1")
    converter2 = Gst.ElementFactory.make("nvvideoconvert", "converter2")
    if not converter1 and not converter2:
        sys.stderr.write(" Unable to create video converters \n")
    pipeline.add(converter1)
    pipeline.add(converter2)
    vid_converters = [converter1, converter2]
    
    # connecting sources to nvvideoconvert
    print("Creating source elements and connecting with nvvconverter element in the piplline \n")
    for i in range(number_sources):
        print("Creating source_bin ",i," \n ")
        uri_name=args[i]
        if uri_name.find("rtsp://") == 0 :
            is_live = True
        source_bin=create_source_bin(i, uri_name)
        if not source_bin:
            sys.stderr.write("Unable to create source bin \n")
        pipeline.add(source_bin)
        sinkpad= vid_converters[i].get_static_pad("sink") 
        if not sinkpad:
            sys.stderr.write("Unable to create sink pad bin \n")
        srcpad=source_bin.get_static_pad("src")
        if not srcpad:
            sys.stderr.write("Unable to create src pad bin \n")
        srcpad.link(sinkpad)
        
    # creating caps to convert the video to GRAY8 from an RGB format
    caps1 = Gst.ElementFactory.make("capsfilter", "filter1")
    caps2 = Gst.ElementFactory.make("capsfilter", "filter2")
    caps3 = Gst.ElementFactory.make("capsfilter", "filter3")
    if enc_type == 0:
        caps1.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=GRAY8"))
        caps2.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=GRAY8"))
        caps3.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=GRAY8"))
    else:
        caps1.set_property("caps", Gst.Caps.from_string("video/x-raw, format=GRAY8"))
        caps2.set_property("caps", Gst.Caps.from_string("video/x-raw, format=GRAY8"))
        caps3.set_property("caps", Gst.Caps.from_string("video/x-raw, format=GRAY8"))
        
             
    # create vidoecompositor element
    print("Creating nvvcompositor \n ")
    compositor = Gst.ElementFactory.make("nvcompositor", "compositor")
    if not compositor:
        sys.stderr.write(" Unable to create nvcompositor \n")
    
        
    # Create nvstreammux instance to form batches from one or more sources.
    print("Creating streamux \n ")
    streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
    if not streammux:
        sys.stderr.write(" Unable to create NvStreamMux \n")
   

    print("Creating Pgie \n ")
    if requested_pgie != None and (requested_pgie == 'nvinferserver' or requested_pgie == 'nvinferserver-grpc') :
        pgie = Gst.ElementFactory.make("nvinferserver", "primary-inference")
    elif requested_pgie != None and requested_pgie == 'nvinfer':
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
    else:
        pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")

    if not pgie:
        sys.stderr.write(" Unable to create pgie :  %s\n" % requested_pgie)

    if disable_probe:
        # Use nvdslogger for perf measurement instead of probe function
        print ("Creating nvdslogger \n")
        nvdslogger = Gst.ElementFactory.make("nvdslogger", "nvdslogger")

    print("Creating tiler \n ")
    tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
    if not tiler:
        sys.stderr.write(" Unable to create tiler \n")
        
   
    print("Creating nvosd \n ")
    nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
    if not nvosd:
        sys.stderr.write(" Unable to create nvosd \n")
    nvosd.set_property('process-mode',OSD_PROCESS_MODE)
    nvosd.set_property('display-text',OSD_DISPLAY_TEXT)

    if file_loop:
        if is_aarch64():
            # Set nvbuf-memory-type=4 for aarch64 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 4)
        else:
            # Set nvbuf-memory-type=2 for x86 for file-loop (nvurisrcbin case)
            streammux.set_property('nvbuf-memory-type', 2)

    if no_display:
        print("Creating Fakesink \n")
        sink = Gst.ElementFactory.make("fakesink", "fakesink")
        sink.set_property('enable-last-sample', 0)
        sink.set_property('sync', 0)
    else:
        if is_aarch64():
            print("Creating nv3dsink \n")
            sink = Gst.ElementFactory.make("nv3dsink", "nv3d-sink")
            if not sink:
                sys.stderr.write(" Unable to create nv3dsink \n")
        else:
            print("Creating EGLSink \n")
            sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
            if not sink:
                sys.stderr.write(" Unable to create egl sink \n")
    if not sink:
        sys.stderr.write(" Unable to create sink element \n")

    if is_live:
        print("At least one of the sources is live")
        streammux.set_property('live-source', 1)
    
    print("setting streamux properties \n")
    streammux.set_property('width', 1920)
    streammux.set_property('height', 1080)
    streammux.set_property('batch-size', 1)
    streammux.set_property('batched-push-timeout', 4000000)
    
    if requested_pgie == "nvinferserver" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinferserver-grpc" and config != None:
        pgie.set_property('config-file-path', config)
    elif requested_pgie == "nvinfer" and config != None:
        pgie.set_property('config-file-path', config)
    else:
        pgie.set_property('config-file-path', "dstest3_pgie_config.txt")
   
   # tiler properties
    print("Setting tiler properties \n")
    tiler_rows=int(math.sqrt(number_sources))
    tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
    tiler.set_property("rows",tiler_rows)
    tiler.set_property("columns",tiler_columns)
    tiler.set_property("width", TILED_OUTPUT_WIDTH)
    tiler.set_property("height", TILED_OUTPUT_HEIGHT)
    sink.set_property("qos",0)

    print("Adding elements to Pipeline \n")
    # adding elements in the pipeline that have not been added above
    pipeline.add(caps1)
    pipeline.add(caps2)
    pipeline.add(caps3)
    pipeline.add(compositor)
    pipeline.add(streammux)
    pipeline.add(pgie)
    pipeline.add(tiler)
    pipeline.add(nvosd)
    pipeline.add(sink)
    

    print("Linking elements in the Pipeline \n")
    converter1.link(caps1)
    converter2.link(caps2)
    converter1.link(caps3)
    caps1.link(compositor)
    caps2.link(compositor)
    caps3.link(compositor)
    compositor.link(streammux)
    streammux.link(pgie)
    pgie.link(tiler)
    tiler.link(nvosd)
    nvosd.link(sink)
    
    # create an event loop and feed gstreamer bus mesages to it
    loop = GLib.MainLoop()
    bus = pipeline.get_bus()
    bus.add_signal_watch()
    bus.connect ("message", bus_call, loop)
    # pgie_src_pad=pgie.get_static_pad("src")
    streamux_src_pad = streammux.get_static_pad("src")
    if not streamux_src_pad:
        sys.stderr.write(" Unable to get src pad \n")
    else:
        if not disable_probe:
            streamux_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
            # perf callback function to print fps every 5 sec
            GLib.timeout_add(5000, perf_data.perf_print_callback)

    # List the sources
    print("Now playing...")
    for i, source in enumerate(args):
        print(i, ": ", source)

    print("Starting pipeline \n")
    # start play back and listed to events		
    pipeline.set_state(Gst.State.PLAYING)
    try:
        loop.run()
    except:
        pass
    
    # cleanup
    print("Exiting app\n")
    pipeline.set_state(Gst.Stlinate.NULL)

def parse_args():

    parser = argparse.ArgumentParser(prog="deepstream_test_3",
                    description="deepstream-test3 multi stream, multi model inference reference app")
    parser.add_argument(
        "-i",
        "--input",
        help="Path to input streams",
        nargs="+",
        metavar="URIs",
        default=["a"],
        required=True,
    )
    parser.add_argument(
        "-c",
        "--configfile",
        metavar="config_location.txt",
        default=None,
        help="Choose the config-file to be used with specified pgie",
    )
    parser.add_argument(
        "-g",
        "--pgie",
        default=None,
        help="Choose Primary GPU Inference Engine",
        choices=["nvinfer", "nvinferserver", "nvinferserver-grpc"],
    )
    parser.add_argument(
        "--no-display",
        action="store_true",
        default=False,
        dest='no_display',
        help="Disable display of video output",
    )
    parser.add_argument(
        "--file-loop",
        action="store_true",
        default=False,
        dest='file_loop',
        help="Loop the input file sources after EOS",
    )
    parser.add_argument(
        "--disable-probe",
        action="store_true",
        default=False,
        dest='disable_probe',
        help="Disable the probe function and use nvdslogger for FPS",
    )
    parser.add_argument(
        "-s",
        "--silent",
        action="store_true",
        default=False,
        dest='silent',
        help="Disable verbose output",
    )
    # Check input arguments
    if len(sys.argv) == 1:
        parser.print_help(sys.stderr)
        sys.exit(1)
    args = parser.parse_args()

    stream_paths = args.input
    pgie = args.pgie
    config = args.configfile
    disable_probe = args.disable_probe
    global no_display
    global silent
    global file_loop
    no_display = args.no_display
    silent = args.silent
    file_loop = args.file_loop

    if config and not pgie or pgie and not config:
        sys.stderr.write ("\nEither pgie or configfile is missing. Please specify both! Exiting...\n\n\n\n")
        parser.print_help()
        sys.exit(1)
    if config:
        config_path = Path(config)
        if not config_path.is_file():
            sys.stderr.write ("Specified config-file: %s doesn't exist. Exiting...\n\n" % config)
            sys.exit(1)

    print(vars(args))
    return stream_paths, pgie, config, disable_probe

if __name__ == '__main__':
    stream_paths, pgie, config, disable_probe = parse_args()
    sys.exit(main(stream_paths, pgie, config, disable_probe))

The compositor plugin cannot meet your needs at present. It will composite the 2 gray images into 1, it cannot be split later.

It is unlikely to achieve your needs by simply modifying the python demo code. You need customize lots of our plugin code yourself, like nvdspreprocess, to implement your needs.

For example, you can use the pipeline below.
Make sure to feed two images in sequence and process them yourself at nvdsprepreocss plugin. Like adding a buffer of size 2 in the nvdspreprocess and when there two images in the buffer, you need to build your own tensor data.
I have attached how to modify nvdspreprocess before. You can refer to that.
source->nvvideoconvert->nvstreammux->nvpreprocess->nvinfer->....