Hi @fanzh
I happened to know that NvDsBatchMeta have everything that what we are looking for to fix the order within triton inference pb backend, Please let me know if I can access that with in python backend. For eg:- read and write it for a stream from the execute method as following
import triton_python_backend_utils as pb_utils
class TritonPythonModel:
"""Your Python model must use the same class name. Every Python model
that is created must have "TritonPythonModel" as the class name.
"""
# def initialize(self, args):
# """`initialize` is called only once when the model is being loaded.
# Implementing `initialize` function is optional. This function allows
# the model to initialize any state associated with this model.
# Parameters
# ----------
# args : dict
# Both keys and values are strings. The dictionary keys and values are:
# * model_config: A JSON string containing the model configuration
# * model_instance_kind: A string containing model instance kind
# * model_instance_device_id: A string containing model instance device ID
# * model_repository: Model repository path
# * model_version: Model version
# * model_name: Model name
# """
# # You must parse model_config. JSON string is not parsed here
# self.model_config = model_config = json.loads(args["model_config"])
# # Get OUTPUT0 configuration
# output0_config = pb_utils.get_output_config_by_name(model_config, "OUTPUT_0")
# # Convert Triton types to numpy types
# self.output0_dtype = pb_utils.triton_string_to_numpy(
# output0_config["data_type"]
# )
def initialize(self, args):
pass
def execute(self, requests):
"""`execute` MUST be implemented in every Python model. `execute`
function receives a list of pb_utils.InferenceRequest as the only
argument. This function is called when an inference request is made
for this model. Depending on the batching configuration (e.g. Dynamic
Batching) used, `requests` may contain multiple requests. Every
Python model, must create one pb_utils.InferenceResponse for every
pb_utils.InferenceRequest in `requests`. If there is an error, you can
set the error argument when creating a pb_utils.InferenceResponse
Parameters
----------
requests : list
A list of pb_utils.InferenceRequest
Returns
-------
list
A list of pb_utils.InferenceResponse. The length of this list must
be the same as `requests`
"""
responses = []
logger = pb_utils.Logger
# logger.log_error(f"Info Msg!:::::::::{test_module.value}")
# logger.log_warn("Warning Msg!")
# logger.log_error("Error Msg!")
# logger.log_verbose("Verbose Msg!")
for request in requests:
input_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT0")
frame_cp = cp.fromDlpack(input_tensor.to_dlpack())
logger.log_warn(f"Warning Msg!{frame_cp.device}")
# NvDsBatchMeta read meta here to read the source_id of the frames in the batch
# frame = input_tensor.as_numpy()
print(f"frame size is {frame_cp.shape}")
frame = cp.asnumpy(frame_cp)
batch_size = frame.shape[0]
out_tensor = pb_utils.Tensor.from_dlpack(
"OUTPUT0", input_tensor.to_dlpack()
)
try:
frame_0 = frame[0].astype(np.uint8)
cv2.imshow('Video_0', frame_0)
if batch_size>1:
frame_1 = frame[1].astype(np.uint8)
cv2.imshow('Video_1', frame_1)
cv2.waitKey(1)
except Exception as e:
logger.log_warn(f"exception:{e}")
stats = np.array([
[360, 780, 360, 360, -1] # Middle rectangle
])
#replicated_array=np.array([stats1,stats2])
replicated_array = np.tile(stats, (batch_size, 1, 1))
logger.log_warn(f"Warning Msg:{replicated_array.shape}")
stats = replicated_array.astype(np.float32)
out_tensor_1 = pb_utils.Tensor(
"OUTPUT1", stats
)
# self.MEM1.Set(frame[0,:,:200,:200])
responses.append(pb_utils.InferenceResponse([out_tensor,out_tensor_1]))
return responses
def finalize(self):
"""`finalize` is called only once when the model is being unloaded.
Implementing `finalize` function is OPTIONAL. This function allows
the model to perform any necessary clean ups before exit.
"""
print("Cleaning up...")