Order within triton inference server python backend

Hi @fanzh

I happened to know that NvDsBatchMeta have everything that what we are looking for to fix the order within triton inference pb backend, Please let me know if I can access that with in python backend. For eg:- read and write it for a stream from the execute method as following

import triton_python_backend_utils as pb_utils

class TritonPythonModel:
    """Your Python model must use the same class name. Every Python model
    that is created must have "TritonPythonModel" as the class name.
    """

    # def initialize(self, args):
    #     """`initialize` is called only once when the model is being loaded.
    #     Implementing `initialize` function is optional. This function allows
    #     the model to initialize any state associated with this model.

    #     Parameters
    #     ----------
    #     args : dict
    #       Both keys and values are strings. The dictionary keys and values are:
    #       * model_config: A JSON string containing the model configuration
    #       * model_instance_kind: A string containing model instance kind
    #       * model_instance_device_id: A string containing model instance device ID
    #       * model_repository: Model repository path
    #       * model_version: Model version
    #       * model_name: Model name
    #     """

    #     # You must parse model_config. JSON string is not parsed here
    #     self.model_config = model_config = json.loads(args["model_config"])

    #     # Get OUTPUT0 configuration
    #     output0_config = pb_utils.get_output_config_by_name(model_config, "OUTPUT_0")

    #     # Convert Triton types to numpy types
    #     self.output0_dtype = pb_utils.triton_string_to_numpy(
    #         output0_config["data_type"]
    #     )
 
    def initialize(self, args):
      pass
    def execute(self, requests):
        """`execute` MUST be implemented in every Python model. `execute`
        function receives a list of pb_utils.InferenceRequest as the only
        argument. This function is called when an inference request is made
        for this model. Depending on the batching configuration (e.g. Dynamic
        Batching) used, `requests` may contain multiple requests. Every
        Python model, must create one pb_utils.InferenceResponse for every
        pb_utils.InferenceRequest in `requests`. If there is an error, you can
        set the error argument when creating a pb_utils.InferenceResponse

        Parameters
        ----------
        requests : list
          A list of pb_utils.InferenceRequest

        Returns
        -------
        list
          A list of pb_utils.InferenceResponse. The length of this list must
          be the same as `requests`
        """
        responses = []

        logger = pb_utils.Logger
        # logger.log_error(f"Info Msg!:::::::::{test_module.value}")
        # logger.log_warn("Warning Msg!")
        # logger.log_error("Error Msg!")
        # logger.log_verbose("Verbose Msg!")
        for request in requests:
            
            input_tensor = pb_utils.get_input_tensor_by_name(request, "INPUT0")
            frame_cp = cp.fromDlpack(input_tensor.to_dlpack())
            logger.log_warn(f"Warning Msg!{frame_cp.device}")
            # NvDsBatchMeta read meta here to read the source_id of the frames in the batch



            # frame = input_tensor.as_numpy()
            print(f"frame size is {frame_cp.shape}")
            frame = cp.asnumpy(frame_cp)
            batch_size = frame.shape[0]
            out_tensor = pb_utils.Tensor.from_dlpack(
                "OUTPUT0", input_tensor.to_dlpack()
            )

            try:
                frame_0 = frame[0].astype(np.uint8)
                cv2.imshow('Video_0', frame_0)
                
                if batch_size>1:
                    frame_1 = frame[1].astype(np.uint8)
                    cv2.imshow('Video_1', frame_1)
                cv2.waitKey(1)
            except Exception as e:
                logger.log_warn(f"exception:{e}")

            stats = np.array([
              [360, 780, 360, 360, -1]           # Middle rectangle
          ])
            

            #replicated_array=np.array([stats1,stats2])
            replicated_array = np.tile(stats, (batch_size, 1, 1))

            logger.log_warn(f"Warning Msg:{replicated_array.shape}")
            stats = replicated_array.astype(np.float32)

            out_tensor_1 = pb_utils.Tensor(
                "OUTPUT1", stats
            )
            # self.MEM1.Set(frame[0,:,:200,:200])

            responses.append(pb_utils.InferenceResponse([out_tensor,out_tensor_1]))
        return responses

    def finalize(self):
        """`finalize` is called only once when the model is being unloaded.
        Implementing `finalize` function is OPTIONAL. This function allows
        the model to perform any necessary clean ups before exit.
        """
        print("Cleaning up...")

DeepStream does not pass this information to triton. you can check trtion 's interface.

Hi @fanzh

I am able to read the source_id from a pgie sink pad probe.
is there any recomended way to transfer this data to triton server with python backend, it should be synchronous, and if it can be pass as a extra tensor, that can be specified with Gst-nvinferserver Configuration File Specifications, it will be great.


def pgie_sink_pad_buffer_probe(pad, info, u_data):
    Counter=Counter+1
    source_id_List=[]
    print("\n\n")
    gst_buffer = info.get_buffer()
    if not gst_buffer:
        print("Unable to get GstBuffer ")
        return

    # Retrieve batch metadata from the gst_buffer
    # Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
    # C address of gst_buffer as input, which is obtained with hash(gst_buffer)
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
    l_frame = batch_meta.frame_meta_list

    detection_params = DetectionParam(CLASS_NB, ACCURACY_ALL_CLASS)
    box_size_param = BoxSizeParam(IMAGE_HEIGHT, IMAGE_WIDTH,
                                  MIN_BOX_WIDTH, MIN_BOX_HEIGHT)
    nms_param = NmsParam(TOP_K, IOU_THRESHOLD)

    label_names = get_label_names_from_file("labels.txt")

    while l_frame is not None:
        try:
            # Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
            # The casting also keeps ownership of the underlying memory
            # in the C code, so the Python garbage collector will leave
            # it alone.
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
            source_id = frame_meta.source_id
            source_id_List.append(source_id)
            print(f"source is is {source_id}")
        except StopIteration:
            break

        l_user = frame_meta.frame_user_meta_list
        while l_user is not None:
            try:
                # Note that l_user.data needs a cast to pyds.NvDsUserMeta
                # The casting also keeps ownership of the underlying memory
                # in the C code, so the Python garbage collector will leave
                # it alone.
                user_meta = pyds.NvDsUserMeta.cast(l_user.data)
            except StopIteration:
                break

            if (
                    user_meta.base_meta.meta_type
                    != pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META
            ):
                continue

            tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)

            # Boxes in the tensor meta should be in network resolution which is
            # found in tensor_meta.network_info. Use this info to scale boxes to
            # the input frame resolution.
            layers_info = []

            for i in range(tensor_meta.num_output_layers):
                layer = pyds.get_nvds_LayerInfo(tensor_meta, i)
                layers_info.append(layer)

            frame_object_list = nvds_infer_parse_custom_tf_ssd(
                layers_info, detection_params, box_size_param, nms_param
            )
            try:
                l_user = l_user.next
            except StopIteration:
                break

            for frame_object in frame_object_list:
                add_obj_meta_to_frame(frame_object, batch_meta, frame_meta, label_names)

        try:
            # indicate inference is performed on the frame
            frame_meta.bInferDone = True
            l_frame = l_frame.next
        except StopIteration:
            break
    send_to_Triron(source_id_List) # TRYING TO SEND TO TRITON PB
    return Gst.PadProbeReturn.OK

it depends on triton 's interface. passing tensor will use TRITONSERVER_InferenceRequestAddInput and TRITONSERVER_InferenceRequestAppendInputData function. they all has no parameter to pass source_id. as I wrote on Apr 3, you can reorder the data by source_id in TrtServerRequest::setInputs.

Please send me the source of the TrtServerRequest::setInputs, I assume it is cpp, but let me estimate What can I do on it.

nvinferserver low-level lib is openosurce. TrtServerRequest::setInputs is in /opt/nvidia/deepstream/deepstream/sources/libs/nvdsinferserver/infer_trtis_server.cpp.

ok Thanks.

I see Gst-nvstreammux New — DeepStream documentation 6.4 documentation

priority ::

The priority of this stream. Deprecated (shall remove support from next release). Please use algorithm-type instead; uint

Default: 0 (highest priority) A higher value is a lower priority.

How can I play with this property to control the order of frame with in batch?

or will be able to use any of the new nvstreammux features to sort the issue out.

new streammux is opensource. I tested deepstream-test3 with only adding some logs. the batch0 always correspond to source0. please refer to the log. log.txt (91.8 KB)

Thanks let me take a look.

Hi @fanzh

This actually fixed the issue, however, I am getting a hanged pipeline after getting any EOS from any of the stream with the following error.

[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]
[ERROR push 353] push failed [-5]

Anyway, I will create a new topic for this issue. Thank you very much.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.