1). Regarding Batch Order in Triton Inference Server with Python Backend:
We’re developing a high-performance video analytics system using DeepStream with Triton Inference Server and a Python backend. Our requirement is to maintain a fixed order of channels within a batch. For example, camera 1 should correspond to batch[0], camera 2 to batch[1], and so on. However, we’re encountering issues where the batch length varies randomly and the order changes. What solutions or configurations can ensure a consistent batch order?
2). Mitigating Video Stuttering with Long Inference Times:
In our system, the execute method in the model.py takes up to 100ms to process a single frame batch. This leads to stuttering in the output video. Are there mechanisms, such as asynchronous inference or asynchronous overlay drawing using a buffer, that can help process all frames while avoiding stuttering?
Please find the model.py used in deepstream application.
# Copyright 2021-2023, NVIDIA CORPORATION & AFFILIATES. All rights reserved.
#
# Redistribution and use in source and binary forms, with or without
# modification, are permitted provided that the following conditions
# are met:
# * Redistributions of source code must retain the above copyright
# notice, this list of conditions and the following disclaimer.
# * Redistributions in binary form must reproduce the above copyright
# notice, this list of conditions and the following disclaimer in the
# documentation and/or other materials provided with the distribution.
# * Neither the name of NVIDIA CORPORATION nor the names of its
# contributors may be used to endorse or promote products derived
# from this software without specific prior written permission.
#
# THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
# EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
# IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
# PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT OWNER OR
# CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
# EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
# PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
# PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
# OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
# (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
# OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
import numpy as np
import cv2
import cupy as cp
import time
# triton_python_backend_utils is available in every Triton Python model. You
# need to use this module to create inference requests and responses. It also
# contains some utility functions for extracting information from model_config
# and converting Triton input/output types to numpy types.
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
"""Your Python model must use the same class name. Every Python model
that is created must have "TritonPythonModel" as the class name.
"""
# def initialize(self, args):
# """`initialize` is called only once when the model is being loaded.
# Implementing `initialize` function is optional. This function allows
# the model to initialize any state associated with this model.
# Parameters
# ----------
# args : dict
# Both keys and values are strings. The dictionary keys and values are:
# * model_config: A JSON string containing the model configuration
# * model_instance_kind: A string containing model instance kind
# * model_instance_device_id: A string containing model instance device ID
# * model_repository: Model repository path
# * model_version: Model version
# * model_name: Model name
# """
# # You must parse model_config. JSON string is not parsed here
# self.model_config = model_config = json.loads(args["model_config"])
# # Get OUTPUT0 configuration
# output0_config = pb_utils.get_output_config_by_name(model_config, "OUTPUT_0")
# # Convert Triton types to numpy types
# self.output0_dtype = pb_utils.triton_string_to_numpy(
# output0_config["data_type"]
# )
def initialize(self, args):
pass
def execute(self, requests):
"""`execute` MUST be implemented in every Python model. `execute`
function receives a list of pb_utils.InferenceRequest as the only
argument. This function is called when an inference request is made
for this model. Depending on the batching configuration (e.g. Dynamic
Batching) used, `requests` may contain multiple requests. Every
Python model, must create one pb_utils.InferenceResponse for every
pb_utils.InferenceRequest in `requests`. If there is an error, you can
set the error argument when creating a pb_utils.InferenceResponse
Parameters
----------
requests : list
A list of pb_utils.InferenceRequest
Returns
-------
list
A list of pb_utils.InferenceResponse. The length of this list must
be the same as `requests`
"""
responses = []
logger = pb_utils.Logger
# logger.log_error(f"Info Msg!:::::::::{test_module.value}")
# logger.log_warn("Warning Msg!")
# logger.log_error("Error Msg!")
# logger.log_verbose("Verbose Msg!")
for request in requests:
input_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT0")
frame_cp = cp.fromDlpack(input_tensor.to_dlpack())
logger.log_warn(f"Warning Msg!{frame_cp.device}")
# frame = input_tensor.as_numpy()
print(f"frame size is {frame_cp.shape}")
frame = cp.asnumpy(frame_cp)
batch_size = frame.shape[0]
out_tensor = pb_utils.Tensor.from_dlpack(
"OUTPUT0", input_tensor.to_dlpack()
)
try:
frame_0 = frame[0].astype(np.uint8)
cv2.imshow('Video_0', frame_0)
if batch_size>1:
frame_1 = frame[1].astype(np.uint8)
cv2.imshow('Video_1', frame_1)
cv2.waitKey(1)
except Exception as e:
logger.log_warn(f"exception:{e}")
stats = np.array([
[360, 780, 360, 360, -1] # Middle rectangle
])
#replicated_array=np.array([stats1,stats2])
replicated_array = np.tile(stats, (batch_size, 1, 1))
logger.log_warn(f"Warning Msg:{replicated_array.shape}")
stats = replicated_array.astype(np.float32)
out_tensor_1 = pb_utils.Tensor(
"OUTPUT1", stats
)
# self.MEM1.Set(frame[0,:,:200,:200])
responses.append(pb_utils.InferenceResponse([out_tensor,out_tensor_1]))
return responses
def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is OPTIONAL. This function allows
the model to perform any necessary clean ups before exit.
"""
print("Cleaning up...")
I will attach the minimum reproducible code via github url, as well,
This is the reference application: deepstream-rtsp-in-rtsp-out
Please checkout to ensemble branch.
github link