Unable to implement inferencing in my deepstream python application

Board:
* Name: NVIDIA Jetson NANO/TX1
* Type: NANO/TX1
* Jetpack: 4.3 [L4T 32.3.1]
* GPU-Arch: 5.3

  • Libraries:
    • CUDA: 10.0.326
    • cuDNN: 7.6.3.28-1+cuda10.0
    • TensorRT: 6.0.1.10-1+cuda10.0
    • VisionWorks: 1.6.0.500n
    • OpenCV: 4.1.1 compiled CUDA: YES

Python 2.7.17
Python 3.6.9

Hello!

I built a GStreamer application that uses the nvinfer plugin to perform inferencing in one of it’s branches. However, I get various errors when trying to run it.

Initially I got a segfault error by the nvarguscamerasrc element, but I fixed it like mentioned here: https://devtalk.nvidia.com/default/topic/1070455/bug-nvarguscamerasrc-segmentation-fault/?offset=10.

But the fix caused following error: https://devtalk.nvidia.com/default/topic/1064294/jetson-nano/started-getting-an-quot-argus-quot-error-when-running-camera-script-/post/5389296/

The nvinfer element requries the use of the nvstreammux element. I use nvstreammux in my architecture but it causes my pipeline to crash with this error: “ERROR: from element /GstPipeline:pipeline0/GstNvStreamMux:m: Input buffer number of surfaces (0) must be equal to mux->num_surfaces_per_frame (1) \ Set nvstreammux property num-surfaces-per-frame appropriately”.

To fix this, I set “bufapi-version”=1 of nvarguscamerasrc and tried to remove nvvidconv because it should not be used in DS-4.0 as mentioned here: https://devtalk.nvidia.com/default/topic/1069620/jetson-nano/using-nvvidconv-in-two-different-pipelines-getting-quot-nvbuf_utils-nvbuffer-payload-type-not-supported-quot-/

It fixes the streammux error but causes this error: https://devtalk.nvidia.com/default/topic/1064294/jetson-nano/started-getting-an-quot-argus-quot-error-when-running-camera-script-/post/5389296/

If I run this test pipeline, no errors are thrown. I just cannot integrate it into my existing architecture and I am really all out of ideas. Even the debug output of deepstream doesn’t mention anything useful.

gst-launch-1.0 nvarguscamerasrc ! m.sink_0 nvstreammux name=m width=1920 height=1080 batch_size=1 ! nvinfer config-file-path="./config_infer_primary_yoloV3_tiny.txt" ! fakesink

Here is my stripped-down code.

import gi
gi.require_version("Gst", "1.0")
from gi.repository import GLib, GObject, Gst
import threading
import csv
from PIL import Image
from io import BytesIO
from picar import car
# Lanedetection
from picar.modules.inferencing.src.start import steering
# AI driving
from picar.modules.inferencing.src.ai_drive.implement_model import steer_without_yolo
import numpy as np
import os
import datetime

class GStreamer(threading.Thread):
    """
    Read images from the camera and process them.
    It uses GStreamer to accomplish various manipulations of the images.
    First it gets images from the camera, then it encodes those with h265.
    After that, the images are split to three branches.
    1. The "training-data" branch, which used to gather training-data for the ML and DL algorithms
    2. The "streaming" branch, which is used to stream the images to a rtsp server.
    3. The "inferencing" branch, which is used to supply the ML and DL algorithms with images, so they can perform inferencing on them.

    Nr 1 is not always on and is dynamically added when needed.

    """
    def __init__(self):
        threading.Thread.__init__(self)
        GObject.threads_init()
        Gst.init(None)

        # Counter for gathering testdata
        self.counter = 0
        
        self.loop = GObject.MainLoop()
        self.pipeline = Gst.Pipeline()
        self._modes = []

        if not self.pipeline:
            raise ValueError("Gstreamer pipeline could not be created")

        print("Creating elements")

        # Camera capture elements
        source = Gst.ElementFactory.make("nvarguscamerasrc", "nvarguscamerasrc")
        capsfilter = Gst.ElementFactory.make("capsfilter", "capsfilter")
        nvvidconv = Gst.ElementFactory.make("nvvidconv", "nvvidconv")
        tee = Gst.ElementFactory.make("tee", "tee")

        elements = [source, capsfilter, nvvidconv, tee]
        for element in elements:
            if not element:
                raise ValueError(element.name, "could not be created")

        # Bin for gather-testdata module
        self.testdata = self._create_testdata_branch()
        # Bin for streaming module
        self.streaming = self._create_streaming_branch()
        # Bin for inferencing module
        self.inferencing = self._create_inferencing_branch()    

        # Configure elements
        source.set_property("maxperf", True)
        #source.set_property("bufapi-version", 1)
        caps = Gst.caps_from_string("video/x-raw(memory:NVMM), width=(int)1920, height=(int)1080, framerate=(fraction)30/1, format=(string)NV12")
        capsfilter.set_property("caps", caps)
        nvvidconv.set_property("flip-method", 2)
        print("Adding elements to pipeline")

        self.pipeline.add(source)
        self.pipeline.add(capsfilter)
        self.pipeline.add(nvvidconv)
        self.pipeline.add(tee)
        self.pipeline.add(self.streaming)
        self.pipeline.add(self.inferencing)

        # Link all elements that can be automatically linked
        source.link(capsfilter)
        capsfilter.link(nvvidconv)
        nvvidconv.link(tee)
        print("Linking tee elements")

        # Manually link the tee_pads because they are "request"-pads
        tee_streaming_pad = tee.get_request_pad("src_%u")
        streaming_pad = self.streaming.get_static_pad("sink")
        tee_streaming_pad.link(streaming_pad)

bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.on_message)

    def _create_streaming_branch(self):
        streaming_bin = Gst.Bin.new("streaming")

        # Create elements
        queue1 = Gst.ElementFactory.make("queue", "queue1")
        nvstreammux = Gst.ElementFactory.make("nvstreammux", "nvstreammux")
        nvinfer = Gst.ElementFactory.make("nvinfer", "infer")
        convert1 = Gst.ElementFactory.make("nvvideoconvert", "convert1")
        nvdsosd = Gst.ElementFactory.make("nvdsosd", "nvdsosd")
        convert2 = Gst.ElementFactory.make("nvvideoconvert", "convert2")
        encoder_capsfilter = Gst.ElementFactory.make("capsfilter", "encoder_capsfilter")
        encoder = Gst.ElementFactory.make("nvv4l2h265enc", "nvv4l2h265enc")
        udpsink = Gst.ElementFactory.make("udpsink", "udpsink")

        elements = [queue1, nvstreammux, nvinfer, convert1, nvdsosd, convert2, encoder_capsfilter, encoder, udpsink]
        for element in elements:
            if not element:
                raise ValueError(element.name, "could not be created")

        # Configure elements
        queue1.set_property("flush-on-eos", True)
        nvinfer.set_property("config-file-path", car.config.TRT_CONFIG)
        caps = Gst.caps_from_string("video/x-raw(memory:NVMM), format=(string)NV12, width=(int)1920, height=(int)1080")
        encoder_capsfilter.set_property("caps", caps)
        encoder.set_property("maxperf-enable", True)
        encoder.set_property("bitrate", 1000000)
        encoder.set_property("insert-sps-pps", True)
        encoder.set_property("preset-level", 1)
        nvstreammux.set_property("width", 1920)
        nvstreammux.set_property("height", 1080)
        nvstreammux.set_property("batch-size", 1)

        # Add elements to bin
        streaming_bin.add(queue1)
        streaming_bin.add(nvstreammux)
        streaming_bin.add(nvinfer)
        streaming_bin.add(convert1)
        streaming_bin.add(nvdsosd)
        streaming_bin.add(convert2)
        streaming_bin.add(encoder_capsfilter)
        streaming_bin.add(encoder)
        streaming_bin.add(udpsink)
        pad = queue1.get_static_pad("sink")
        ghostpad = Gst.GhostPad.new("sink", pad)
        streaming_bin.add_pad(ghostpad)

        # Link elements; NVStreammux has request sinks
        mux_pad = nvstreammux.get_request_pad("sink_0")
        queue_pad = queue1.get_static_pad("src")
        # The other elements have always pads
        queue_pad.link(mux_pad)
        nvstreammux.link(nvinfer)
        nvinfer.link(convert1)
        convert1.link(nvdsosd)
        nvdsosd.link(convert2)
        convert2.link(encoder_capsfilter)
        encoder_capsfilter.link(encoder)
        encoder.link(udpsink)

        return streaming_bin

    def on_message(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            self.pipeline.set_state(Gst.State.NULL)
            print("End of Stream")
        elif t == Gst.MessageType.ERROR:
            self.pipeline.set_state(Gst.State.NULL)
            err, debug = message.parse_error()
            print("Error: %s" % err, debug)
        elif t == Gst.MessageType.WARNING:
            warn, debug = message.parse_warning()
            print("Warning: ", warn, " Debug is", debug)
        elif t == Gst.MessageType.INFO:
            info, debug = message.parse_info()
            print("Info: ", info, "Debug is:", debug)
        elif t == Gst.MessageType.ELEMENT:
            if message.has_name("GstBinForwarded"):
                struct = message.get_structure()
                forwarded_message = struct.get_value("message")
                if forwarded_message.type == Gst.MessageType.EOS:
                    print("EOS from testdata-bin")
                    # Remove the testdata-bin from the branch because it reached EOS
                    self.testdata.set_state(Gst.State.NULL)
                    self.pipeline.remove(self.testdata)
                    # Rename the testdata directory folder and create a new one
                    postfix = datetime.datetime.now().strftime('%d-%m-%Y_%X').replace(':', '-')
                    folder = os.path.basename(os.path.normpath(car.config.TESTDATA_DIRECTORY_LOCATION))
                    old_dir = car.config.TESTDATA_DIRECTORY_LOCATION
                    path_to_dir = os.path.dirname(car.config.TESTDATA_DIRECTORY_LOCATION)
                    new_dir = path_to_dir[:path_to_dir.rfind('/')] + '/' + folder + postfix
                    os.rename(old_dir, new_dir)
                    os.mkdir(old_dir)

    def run(self): 
        try:
            print("GStreaming!")
            self.pipeline.set_state(Gst.State.PLAYING)
            self.loop.run()
        except KeyboardInterrupt:
            print("GStreaming interrupted")
        finally:
            self.pipeline.set_state(Gst.State.NULL)
            self.loop.quit()
            print("GStreaming ended")

Any help is greatly appreciated!

Hi,
For gaining optimal performance on Jetson Nano, suggest you use DeepStream SDK.
https://devtalk.nvidia.com/default/topic/1068639/deepstream-sdk/announcing-deepstream-sdk-4-0-2/
Please install the package through SDKManager and give it a try.

Hello,

I already installed DeepStream through the use of the SDKManager. Otherwise I wouldn’t be able to use the nvinfer or nvvideoconvert elements, for example.

I also followed your documentation and I tried your sample apps. I don’t really understand why the above code is crashing. The error logs aren’t really helpful either and I tried fixing it by myself by searching through other posts, which I linked in my original post.

Hi,
We have deepstream python app in
https://github.com/NVIDIA-AI-IOT/deepstream_python_apps

There are several samples. Please take a look and see if you can choose one sample similar to your usecase and apply customization.