VPI in a GStreamer pipeline

Hello,
I am trying to perform multiple transformations on multiple camera streams using VPI in a GStreamer pipeline.

I implemented a custom nvivafilter starting from the example nvsample_cudaprocess_src.

Here is my gpu_process function:

/* VPI Resources are global and initialized in the `init` function. */

VPIImage vpi_image;
VPIImage inter_vpi_image;
VPIStream vpi_stream;
VPIWarpMap map;
VPIPayload warp;

void gpu_process (EGLImageKHR egl_image, void ** usrptr)
{
  if (!vpi_image){
    vpiImageCreateEGLImageWrapper (egl_image, NULL, VPI_BACKEND_CUDA, &vpi_image);
  } else {
    vpiImageSetWrappedEGLImage (vpi_image, egl_image)
  }

  CHECK_VPI_STATUS(vpiSubmitRemap(vpi_stream, 0, warp, vpi_image, inter_vpi_image, VPI_INTERP_LINEAR, VPI_BORDER_ZERO, 0));
  CHECK_VPI_STATUS(vpiSubmitConvertImageFormat(vpi_stream, VPI_BACKEND_CUDA, inter_vpi_image, vpi_image, NULL));
  CHECK_VPI_STATUS(vpiStreamSync(vpi_stream));
}

It works well with a single stream. However, when I start two pipelines concurrently, the frames are randomly swapped between the two streams.
How can I handle multiple nvfilters running in parallel? Is it possible to pass data through the void ** usrptr parameter such that processes don’t need to use the same global resources?

I also implemented a GStreamer plugin starting from gst-dsexample. It works well with multiple streams, but I realized it only works with the new buffer api from deepstream.
How can one adapt the gst-dsexample plugin to work with non-deepstream plugins?

Hi,

1)
May I know more about your use case?
Do you launch the pipeline in threads or in the different processes?

More, could you share a simple reproducible source with us?
We want to reproduce and check it internally.

2)
dsexample can be launched with gst-launch-1.0.
For example:

$ gst-launch-1.0 filesrc location = ./streams/sample_1080p_h264.mp4 ! qtdemux ! h264parse ! nvv4l2decoder \
! m.sink_0 nvstreammux name=m width=1280 height=720 batch-size=1  ! nvinfer config-file-path= ./configs/deepstream-app/config_infer_primary.txt \
! dsexample full-frame=1 ! nvvideoconvert ! nvdsosd ! nvegltransform !  nveglglessink sync=0

So you can add the GStreamer component based on your use case.

Thanks.

Hi,

I am working on an existing gstreamer application. For each camera, a pipeline is launched in a new thread. It captures, encodes and sends the frames achieving low latency.
I need to apply multiple transformations (rotation + fisheye to equirectangular + custom warping) on the frames before encoding.
I combined the transformations in a single xy mapping that I want to apply using VPI.

1)
I attached a simplified example of the issue. Here is how I run it:

$ cd gst-nvivafilter && make
$ cd ../gst-app && make

# works perfectly
$ ./single_pipeline

# flashes between streams and maps since both pipeline shares the same objects
$ ./dual_pipeline

concurrent-filters.zip (5.6 KB)
This example captures the frame (nvarguscamerasrc), apply the custom filter (nvivafilter) and display the result (nv3dsink).
The custom filter uses VPI’s remap function to apply a dummy rotation.

2)
I customized dsexample in an attempt to workaround the issue faced in 1).
It works well in a pipeline with deepstreams plugins (new bufapi), but doesn’t work with the elements used in my existing pipeline.

Thank you for your help.

Hello,
Any update on this?
What would be the easiest way to apply a VPI map on multiple streams launched in parallel threads? (My existing pipeline is not a DeepStream pipeline)

You may have to register clients and manage ressources for each.
So far I assume the same thread is running calls to gpu_process for each running instance, so checking there can help.
I’ve quickly tried, the following is far from clean code but it seems working as a starting point just running 2 pipelines. Not tried what if dynamically creating/destroying pipelines, nor evaluated the overhead and limitations. You may improve and share.

rotation_filter.cu
/*
 * Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *  * Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *  * Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *  * Neither the name of NVIDIA CORPORATION nor the names of its
 *    contributors may be used to endorse or promote products derived
 *    from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */
#include <sys/types.h>
#include <unistd.h>
#include <sys/syscall.h>
#include <map>

#include <stdio.h>
#include <stdlib.h>
#include <math.h>

#include <vpi/Context.h>
#include <vpi/Image.h>
#include <vpi/Status.h>
#include <vpi/Stream.h>
#include <vpi/algo/ConvertImageFormat.h>
#include <vpi/WarpMap.h>
#include <vpi/algo/Remap.h>
#include <vpi/EGLInterop.h>

#include "cudaEGL.h"

#define CHECK_VPI_STATUS(STMT) do {                        \
  VPIStatus status = (STMT);                               \
  if (status != VPI_SUCCESS)                               \
  {                                                        \
    char buffer[VPI_MAX_STATUS_MESSAGE_LENGTH];            \
    vpiGetLastStatusMessage(buffer, sizeof(buffer));       \
    printf ("Error: %s in %s at line %d (%s)\n",           \
    vpiStatusGetName(status), __FILE__, __LINE__, buffer); \
  }                                                        \
} while (0);

extern "C" void Handle_EGLImage (EGLImageKHR image);
extern "C" {

  typedef enum {
    COLOR_FORMAT_Y8 = 0,
    COLOR_FORMAT_U8_V8,
    COLOR_FORMAT_RGBA,
    COLOR_FORMAT_NONE
  } ColorFormat;

  typedef struct {
    /**
    * cuda-process API
    *
    * @param image   : EGL Image to process
    * @param userPtr : point to user alloc data, should be free by user
    */
    void (*fGPUProcess) (EGLImageKHR image, void ** userPtr);

    void (*fPreProcess)(void **sBaseAddr, unsigned int *smemsize, unsigned int *swidth, unsigned int *sheight, unsigned int *spitch, ColorFormat *sformat, unsigned int nsurfcount, void ** userPtr);

    void (*fPostProcess)(void **sBaseAddr, unsigned int *smemsize, unsigned int *swidth, unsigned int *sheight, unsigned int *spitch, ColorFormat *sformat, unsigned int nsurfcount, void ** userPtr);
  } CustomerFunction;

  void init (CustomerFunction * pFuncs);
  void deinit ();
}



typedef struct  {
  VPIImage   vpi_image;
  VPIImage   inter_img;
  VPIStream  vpi_stream;
  VPIWarpMap map;
  VPIPayload warp;
} clientData;

static std::map<pid_t,clientData*> clientsMap;

clientData *
init_resources() {
  int width = 1280;
  int height = 720;

  clientData *c = new clientData();

  /* Create an identity map with a single dense region of size 1280x720. */
  memset(&c->map, 0, sizeof(c->map));
  c->map.grid.numHorizRegions  = 1;
  c->map.grid.numVertRegions   = 1;
  c->map.grid.regionWidth[0]   = width;
  c->map.grid.regionHeight[0]  = height;
  c->map.grid.horizInterval[0] = 1;
  c->map.grid.vertInterval[0]  = 1;
  CHECK_VPI_STATUS(vpiWarpMapAllocData(&c->map));
  CHECK_VPI_STATUS(vpiWarpMapGenerateIdentity(&c->map));

  for (int i = 0; i < c->map.numVertPoints; ++i)
  {
      VPIKeypoint *row = (VPIKeypoint *)((uint8_t *)c->map.keypoints + c->map.pitchBytes * i);
      for (int j = 0; j < c->map.numHorizPoints; ++j)
      {
          const float x = j - width / 2.0f;
          const float y = i - height / 2.0f;
          const float angle = M_PI / 3.0f;
          
          row[j].x = x * cos(angle) - y * sin(angle) + width / 2.0f;
          row[j].y = x * sin(angle) + y * cos(angle) + height / 2.0f;
      }
  }

  CHECK_VPI_STATUS(vpiStreamCreate(VPI_BACKEND_CUDA, &c->vpi_stream));
  CHECK_VPI_STATUS(vpiCreateRemap(VPI_BACKEND_CUDA, &c->map, &c->warp));
  CHECK_VPI_STATUS(vpiImageCreate(width, height, VPI_IMAGE_FORMAT_NV12, 0, &c->inter_img));

  printf("#### VPIImageImpl inter_img address: %p ####\n", c->inter_img);

  return c;
}


/**
  * Performs CUDA Operations on egl image.
  *
  * @param image : EGL image
  */
void
gpu_process (EGLImageKHR image, void ** usrptr)
{
  pid_t tid = syscall(__NR_gettid);
  clientData *c = clientsMap[tid];
  if (!c) {
  	c = init_resources();
	clientsMap[tid] = c;
  	//printf("Created ressources at %p for client %d\n", clientsMap[tid], tid);
  }
  //printf("Got client data at %p for thread %d\n", c, tid);
  
  if (!c->vpi_image) {
    //printf("Creating EGL Image wrapper\n");fflush(stdout);
    vpiImageCreateEGLImageWrapper (image, NULL, VPI_BACKEND_CUDA, &c->vpi_image);
  }
  else { 
    //printf("Wrapping EGL Image\n");fflush(stdout);
    vpiImageSetWrappedEGLImage (c->vpi_image, image);
  }

  CHECK_VPI_STATUS(vpiSubmitRemap(c->vpi_stream, 0, c->warp, c->vpi_image, c->inter_img, VPI_INTERP_LINEAR, VPI_BORDER_ZERO, 0));
  CHECK_VPI_STATUS(vpiSubmitConvertImageFormat(c->vpi_stream, VPI_BACKEND_CUDA, c->inter_img, c->vpi_image, NULL));
  CHECK_VPI_STATUS(vpiStreamSync(c->vpi_stream));
}


extern "C" void
init (CustomerFunction * pFuncs)
{
  pFuncs->fGPUProcess = gpu_process;

}

extern "C" void
deinit ()
{
  std::map<pid_t,clientData*>::const_iterator it = clientsMap.begin();
  while (it != clientsMap.end()) {
	// pid_t tid = it->first;
	clientData *c = it->second;
	if (c->vpi_stream) {
	    vpiStreamDestroy (c->vpi_stream);
	    c->vpi_stream = NULL;
	}
	if (c->warp) {
	    vpiPayloadDestroy (c->warp);
	    c->warp = NULL;
	}
	if (c->inter_img) {
	    vpiImageDestroy (c->inter_img);
	    c->inter_img = NULL;
	}
	vpiWarpMapFreeData (&c->map);

	delete c;
	++it;
  }
  clientsMap.clear();
}

Otherwise, you may get the public source code of nvvidconv plugin that has similar implementation as nvivafilter, so you would be able to further customize the plugin for your processing.

Hi,

Sorry for the late update.

We are still checking this source you shared on 16 Jul.
Will get back to you later.

Thanks.

Hi,

We can get the correct dual-pipeline result with the source shared by Honey_Patouceul.
Please also give it a try.

Thanks.

Hello,
Thank you @Honey_Patouceul, using the threads’ id looks like a viable solution. I was affraid GStreamer might regularly creates new threads but it doesn’t seem to.
Thank you @AastaLLL for the update.

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.