Deepstream-ipc-test with nvarguscamera feed getting mozaic output

• Hardware Platform (Jetson / GPU)

sudo dmesg | grep "Machine model"
[    0.000000] Machine model: NVIDIA Jetson Orin NX Engineering Reference Developer Kit
cat /etc/nv_tegra_release
# R36 (release), REVISION: 3.0, GCID: 36191598, BOARD: generic, EABI: aarch64, DATE: Mon May  6 17:34:21 UTC 2024
# KERNEL_VARIANT: oot
TARGET_USERSPACE_LIB_DIR=nvidia
TARGET_USERSPACE_LIB_DIR_PATH=usr/lib/aarch64-linux-gnu/nvidia

• DeepStream Version
DeepStream 7.0, docker container with guide from: GitHub - NVIDIA-AI-IOT/deepstream_dockers: A project demonstrating how to make DeepStream docker images.
• Issue Type( questions, new requirements, bugs)
Question:

I’m trying to run deepstream-ipc-test with csi nvarguscamera feed. For default version of this sample app with rtsp input everything is fine, but for csi input there is NVMAP_IOC_READ issue with client side and video output is mozaic. The modification I made to the sample app is only related to function in server side create_source_bin which I changed to create_source_bin_csi. For the client side everything is the same:

#include <gst/gst.h>
#include <glib.h>
#include <stdio.h>
#include <math.h>
#include <string.h>
#include <sys/time.h>
#include <cuda_runtime_api.h>

#include "gstnvdsmeta.h"
#include "nvds_yml_parser.h"
#include "gst-nvmessage.h"
#include "deepstream_common.h"

#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <gst/app/gstappsink.h>
#include <gst/app/gstappsrc.h>

#define CONFIG_CSI_SOURCE_WIDTH 1920
#define CONFIG_CSI_SOURCE_HEIGHT 1080
#define CONFIG_CSI_SOURCE_FPS_N 30
#define CONFIG_CSI_SOURCE_FPS_D 1

#undef MAX_SOURCE_BINS
#define MAX_SOURCE_BINS 1

#define PGIE_CLASS_ID_VEHICLE 0
#define PGIE_CLASS_ID_PERSON 2

/* By default, OSD process-mode is set to GPU_MODE. To change mode, set as:
 * 0: CPU mode
 * 1: GPU mode
 */
#define OSD_PROCESS_MODE 1

/* By default, OSD will not display text. To display text, change this to 1 */
#define OSD_DISPLAY_TEXT 0

/* The muxer output resolution must be set if the input streams will be of
 * different resolution. The muxer will scale all the input frames to this
 * resolution. */
#define MUXER_OUTPUT_WIDTH 1920
#define MUXER_OUTPUT_HEIGHT 1080

/* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set
 * based on the fastest source's framerate. */
#define MUXER_BATCH_TIMEOUT_USEC 40000

#define TILED_OUTPUT_WIDTH 1280
#define TILED_OUTPUT_HEIGHT 720

/* NVIDIA Decoder source pad memory feature. This feature signifies that source
 * pads having this capability will push GstBuffers containing cuda buffers. */
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"

gchar pgie_classes_str[4][32] = { "Vehicle", "TwoWheeler", "Person",
  "RoadSign"
};

typedef struct
{
  int fd;
  guint bus_id;
  GstElement *pipeline;
  GstElement *appsrc;
  GMainLoop *loop;
  gboolean quit;
  pthread_t thread_id;
  void *ipcserver;
} NvIpcSinkPipeline;

typedef struct
{
  int fd;
  gchar *uri;
  gchar *socket_path;
  gchar *caps_string;
  GMutex caps_mutex;
  GCond caps_cond;
  gboolean is_caps;
  pthread_t thread_id;
  guint bus_id;
  GstElement *pipeline;
  GQueue *ipc_sink_queue;
  GMutex ipc_sink_queue_lock;
  void *appCtx;
} NvIpcServerPipeline;

typedef struct
{
  int fd[MAX_SOURCE_BINS];
  gchar *uri[MAX_SOURCE_BINS];
  gchar *socket_path[MAX_SOURCE_BINS];
  guint bus_id;
  GstElement *pipeline;
  void *appCtx;
} NvIpcClientPipeline;

typedef struct
{
  GMainLoop *loop;
  NvIpcServerPipeline ipcserver[MAX_SOURCE_BINS];
  NvIpcClientPipeline ipcclient;
  gboolean quit;
} AppCtx;

/* tiler_sink_pad_buffer_probe  will extract metadata received on OSD sink pad
 * and update params for drawing rectangle, object information etc. */

static GstPadProbeReturn
tiler_src_pad_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
    gpointer u_data)
{
    GstBuffer *buf = (GstBuffer *) info->data;
    guint num_rects = 0;
    NvDsObjectMeta *obj_meta = NULL;
    guint vehicle_count = 0;
    guint person_count = 0;
    NvDsMetaList * l_frame = NULL;
    NvDsMetaList * l_obj = NULL;
    //NvDsDisplayMeta *display_meta = NULL;

    NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta (buf);

    for (l_frame = batch_meta->frame_meta_list; l_frame != NULL;
      l_frame = l_frame->next) {
        NvDsFrameMeta *frame_meta = (NvDsFrameMeta *) (l_frame->data);
        //int offset = 0;
        for (l_obj = frame_meta->obj_meta_list; l_obj != NULL;
                l_obj = l_obj->next) {
            obj_meta = (NvDsObjectMeta *) (l_obj->data);
            if (obj_meta->class_id == PGIE_CLASS_ID_VEHICLE) {
                vehicle_count++;
                num_rects++;
            }
            if (obj_meta->class_id == PGIE_CLASS_ID_PERSON) {
                person_count++;
                num_rects++;
            }
        }
          g_print ("Frame Number = %d Number of objects = %d "
            "Vehicle Count = %d Person Count = %d\n",
            frame_meta->frame_num, num_rects, vehicle_count, person_count);
    }
    return GST_PAD_PROBE_OK;
}

static gboolean
bus_call (GstBus * bus, GstMessage * msg, gpointer data)
{
  GMainLoop *loop = (GMainLoop *) data;
  switch (GST_MESSAGE_TYPE (msg)) {
    case GST_MESSAGE_EOS:
      g_print ("End of stream\n");
      g_main_loop_quit (loop);
      break;
    case GST_MESSAGE_WARNING:
    {
      gchar *debug;
      GError *error;
      gst_message_parse_warning (msg, &error, &debug);
      g_printerr ("WARNING from element %s: %s\n",
          GST_OBJECT_NAME (msg->src), error->message);
      g_free (debug);
      g_printerr ("Warning: %s\n", error->message);
      g_error_free (error);
      break;
    }
    case GST_MESSAGE_ERROR:
    {
      gchar *debug;
      GError *error;
      gst_message_parse_error (msg, &error, &debug);
      g_printerr ("ERROR from element %s: %s\n",
          GST_OBJECT_NAME (msg->src), error->message);
      if (debug)
        g_printerr ("Error details: %s\n", debug);
      g_free (debug);
      g_error_free (error);
      g_main_loop_quit (loop);
      break;
    }
    case GST_MESSAGE_ELEMENT:
    {
      if (gst_nvmessage_is_stream_eos (msg)) {
        guint stream_id;
        if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) {
          g_print ("Got EOS from stream %d\n", stream_id);
        }
      }
      break;
    }
    case GST_MESSAGE_STATE_CHANGED:
    {
      GstState oldstate, newstate;
      gst_message_parse_state_changed (msg, &oldstate, &newstate, NULL);
        switch (newstate) {
          case GST_STATE_PLAYING:
            g_print ("Pipeline running\n");
            break;
          case GST_STATE_PAUSED:
            if (oldstate == GST_STATE_PLAYING) {
              g_print ("Pipeline paused\n");
            }
            break;
          case GST_STATE_READY:
            if (oldstate == GST_STATE_NULL) {
              g_print ("Pipeline ready\n");
            } else {
              g_print ("Pipeline stopped\n");
            }
            break;
          case GST_STATE_NULL:
            g_print ("Pipeline Null\n");
            g_main_loop_quit (loop);
            return FALSE;
            break;
          default:
            break;
        }
      break;
    }
    default:
      break;
  }
  return TRUE;
}

static void
cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
{
  GstCaps *caps = gst_pad_get_current_caps (decoder_src_pad);
  if (!caps) {
    caps = gst_pad_query_caps (decoder_src_pad, NULL);
  }

  const GstStructure *str = gst_caps_get_structure (caps, 0);
  const gchar *name = gst_structure_get_name (str);
  GstElement *source_bin = (GstElement *) data;
  GstCapsFeatures *features = gst_caps_get_features (caps, 0);

  /* Need to check if the pad created by the decodebin is for video and not
   * audio. */
  if (!strncmp (name, "video", 5)) {
    /* Link the decodebin pad only if decodebin has picked nvidia
     * decoder plugin nvdec_*. We do this by checking if the pad caps contain
     * NVMM memory features. */
    if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
      /* Get the source bin ghost pad */
      GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, "src");
      if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
              decoder_src_pad)) {
        g_printerr ("Failed to link decoder src pad to source bin ghost pad\n");
      }
      gst_object_unref (bin_ghost_pad);
    } else {
      g_printerr ("Error: Decodebin did not pick nvidia decoder plugin.\n");
    }
  }
}

static void
decodebin_child_added (GstChildProxy * child_proxy, GObject * object,
    gchar * name, gpointer user_data)
{
  g_print ("Decodebin child added: %s\n", name);
  if (g_strrstr (name, "decodebin") == name) {
    g_signal_connect (G_OBJECT (object), "child-added",
        G_CALLBACK (decodebin_child_added), user_data);
  }
  if (g_strrstr (name, "source") == name) {
    g_object_set(G_OBJECT(object),"drop-on-latency",true,NULL);
  }
}

static GstElement *
create_source_bin (guint index, gchar * uri, int fd)
{
  GstElement *bin = NULL, *uri_decode_bin = NULL;
  gchar bin_name[16] = { };

  g_snprintf (bin_name, 15, "source-bin-%02d", index);
  /* Create a source GstBin to abstract this bin's content from the rest of the
   * pipeline */
  bin = gst_bin_new (bin_name);

  /* Source element for reading from the uri.
   * We will use decodebin and let it figure out the container format of the
   * stream and the codec and plug the appropriate demux and decode plugins. */
  uri_decode_bin = gst_element_factory_make ("nvurisrcbin", NULL);
  g_object_set (G_OBJECT (uri_decode_bin), "file-loop", TRUE, NULL);
  g_object_set (G_OBJECT (uri_decode_bin), "cudadec-memtype", 1, NULL);
  if (fd != -1) {
    g_object_set (G_OBJECT (uri_decode_bin), "sock-fd", fd, NULL);
  }

  if (!bin || !uri_decode_bin) {
    g_printerr ("One element in source bin could not be created.\n");
    return NULL;
  }

  /* We set the input uri to the source element */
  g_object_set (G_OBJECT (uri_decode_bin), "uri", uri, NULL);

  /* Connect to the "pad-added" signal of the decodebin which generates a
   * callback once a new pad for raw data has beed created by the decodebin */
  g_signal_connect (G_OBJECT (uri_decode_bin), "pad-added",
      G_CALLBACK (cb_newpad), bin);
  g_signal_connect (G_OBJECT (uri_decode_bin), "child-added",
      G_CALLBACK (decodebin_child_added), bin);

  gst_bin_add (GST_BIN (bin), uri_decode_bin);

  /* We need to create a ghost pad for the source bin which will act as a proxy
   * for the video decoder src pad. The ghost pad will not have a target right
   * now. Once the decode bin creates the video decoder and generates the
   * cb_newpad callback, we will set the ghost pad target to the video decoder
   * src pad. */
  if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target ("src",
              GST_PAD_SRC))) {
    g_printerr ("Failed to add ghost pad in source bin\n");
    return NULL;
  }

  return bin;
}

static GstElement
*create_source_bin_csi(guint index, gchar *cfg_file_path) {
  GstElement *bin = NULL, *src_elem = NULL, *caps_filter = NULL;
  GstCaps *caps = NULL;
  gchar bin_name[16] = {};
  
  g_snprintf(bin_name, 15, "source-bin-%02d", index);
  /* Create a source GstBin to abstract this bin's content from the rest of the
   * pipeline */
  bin = gst_bin_new(bin_name);

  src_elem = gst_element_factory_make ("nvarguscamerasrc", "csi_src_elem");

  if (!bin || !src_elem) {
    g_printerr("One element in source bin could not be created.\n");
    return NULL;
  }

  caps_filter = gst_element_factory_make("capsfilter", "src_cap_filter");
  if (!caps_filter) {
    g_printerr("Could not create 'src_cap_filter'");
  }
  caps = gst_caps_new_simple(
                             "video/x-raw", "format", G_TYPE_STRING, "NV12", "width", G_TYPE_INT,
                             CONFIG_CSI_SOURCE_WIDTH, "height", G_TYPE_INT, CONFIG_CSI_SOURCE_HEIGHT,
                             "framerate", GST_TYPE_FRACTION, CONFIG_CSI_SOURCE_FPS_N,
                             CONFIG_CSI_SOURCE_FPS_D, NULL);
    
  GstCapsFeatures *feature = NULL;
  feature = gst_caps_features_new ("memory:NVMM", NULL);
  gst_caps_set_features (caps, 0, feature);
    
  g_object_set (G_OBJECT (caps_filter), "caps", caps, NULL);

  gst_bin_add_many (GST_BIN (bin), src_elem, caps_filter, NULL);

  NVGSTDS_LINK_ELEMENT (src_elem, caps_filter);
  NVGSTDS_BIN_ADD_GHOST_PAD (bin, caps_filter, "src");

done:
  if (caps)
    gst_caps_unref (caps);
  
  return bin;
}

static int
UnixSocketConnect(const char *socket_name) {
  int sock_fd = -1;
  struct sockaddr_un sock_addr;
  int wait_loop = 0;

  sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
  if (sock_fd < 0) {
    g_printerr("%s: socket create failed.\n", __func__);
    return -1;
  }

  memset(&sock_addr, 0, sizeof(struct sockaddr_un));
  sock_addr.sun_family = AF_UNIX;
  strncpy(sock_addr.sun_path, socket_name, sizeof(sock_addr.sun_path) - 1);

  while (connect(sock_fd, (const struct sockaddr *)&sock_addr,
                 sizeof(struct sockaddr_un))) {
    /* wait loop runs 60 times after sleep of 100ms each */
    if (wait_loop < 60) {
      g_print("Waiting for producer count %d\n", wait_loop);
      usleep(100000);
      wait_loop++;
    } else {
      g_printerr("\n%s: Waiting timed out\n", __func__);
      close (sock_fd);
      sock_fd = -1;
      return -1;
    }
  }

  return sock_fd;
}

static int
create_client_pipeline (int argc, char *argv[])
{
  AppCtx appCtx;
  GMainLoop *loop = NULL;
  GstElement *pipeline = NULL, *streammux = NULL, *sink = NULL, *pgie = NULL,
      *queue1, *queue2, *queue3, *queue4, *queue5, *nvvidconv = NULL,
      *nvosd = NULL, *tiler = NULL, *nvdslogger = NULL;
  GstBus *bus = NULL;
  guint bus_watch_id;
  GstPad *tiler_src_pad = NULL;
  guint i =0, num_sources = 0;
  guint tiler_rows, tiler_columns;
  guint pgie_batch_size;
  NvDsGieType pgie_type = NVDS_GIE_PLUGIN_INFER;
  gboolean PERF_MODE = g_getenv("NV_IPC_TEST_PERF_MODE") &&
      !g_strcmp0(g_getenv("NV_IPC_TEST_PERF_MODE"), "1");

  memset(&appCtx, 0, sizeof(AppCtx));
  appCtx.loop = loop = g_main_loop_new (NULL, FALSE);

  /* Create gstreamer elements */
  /* Create Pipeline element that will form a connection of other elements */
  pipeline = gst_pipeline_new ("ipc-client-pipeline");

  /* Create nvstreammux instance to form batches from one or more sources. */
  streammux = gst_element_factory_make ("nvstreammux", "stream-muxer");

  if (!pipeline || !streammux) {
    g_printerr ("One element could not be created. Exiting.\n");
    return -1;
  }
  gst_bin_add (GST_BIN (pipeline), streammux);

  num_sources = argc - 2;

  for (i = 0; i < num_sources; i++) {
    GstPad *sinkpad, *srcpad;
    gchar pad_name[16] = { };

    const char *prefix = "ipc://";
    const char *socket_path = argv[i + 2] + strlen(prefix);
    int sock_fd = UnixSocketConnect(socket_path);
    if (sock_fd == -1) {
      g_printerr ("UnixSocketConnect failed\n");
      return -1;
    }
    appCtx.ipcclient.fd[i] = sock_fd;
    appCtx.ipcclient.uri[i] = strdup(argv[i + 2]);
    appCtx.ipcclient.socket_path[i] = strdup(socket_path);
    g_print("client is connected uri: %s path: %s fd: %d\n",
    appCtx.ipcclient.uri[i], appCtx.ipcclient.socket_path[i],
    appCtx.ipcclient.fd[i]);

    GstElement *source_bin= NULL;
    source_bin = create_source_bin (i, argv[i + 2], sock_fd);
    if (!source_bin) {
      g_printerr ("Failed to create source bin. Exiting.\n");
      return -1;
    }

    gst_bin_add (GST_BIN (pipeline), source_bin);

    g_snprintf (pad_name, 15, "sink_%u", i);
    sinkpad = gst_element_request_pad_simple (streammux, pad_name);
    if (!sinkpad) {
      g_printerr ("Streammux request sink pad failed. Exiting.\n");
      return -1;
    }

    srcpad = gst_element_get_static_pad (source_bin, "src");
    if (!srcpad) {
      g_printerr ("Failed to get src pad of source bin. Exiting.\n");
      return -1;
    }

    if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
      g_printerr ("Failed to link source bin to stream muxer. Exiting.\n");
      return -1;
    }

    gst_object_unref (srcpad);
    gst_object_unref (sinkpad);

  }

  /* Use nvinfer or nvinferserver to infer on batched frame. */
  if (pgie_type == NVDS_GIE_PLUGIN_INFER_SERVER) {
    pgie = gst_element_factory_make ("nvinferserver", "primary-nvinference-engine");
  } else {
    pgie = gst_element_factory_make ("nvinfer", "primary-nvinference-engine");
  }

  /* Add queue elements between every two elements */
  queue1 = gst_element_factory_make ("queue", "queue1");
  queue2 = gst_element_factory_make ("queue", "queue2");
  queue3 = gst_element_factory_make ("queue", "queue3");
  queue4 = gst_element_factory_make ("queue", "queue4");
  queue5 = gst_element_factory_make ("queue", "queue5");

  /* Use nvdslogger for perf measurement. */
  nvdslogger = gst_element_factory_make ("nvdslogger", "nvdslogger");

  /* Use nvtiler to composite the batched frames into a 2D tiled array based
   * on the source of the frames. */
  tiler = gst_element_factory_make ("nvmultistreamtiler", "nvtiler");

  /* Use convertor to convert from NV12 to RGBA as required by nvosd */
  nvvidconv = gst_element_factory_make ("nvvideoconvert", "nvvideo-converter");

  /* Create OSD to draw on the converted RGBA buffer */
  nvosd = gst_element_factory_make ("nvdsosd", "nv-onscreendisplay");

  if (PERF_MODE) {
    sink = gst_element_factory_make ("fakesink", "nvvideo-renderer");
    g_object_set (G_OBJECT (sink), "sync", FALSE, NULL);
  } else {
    sink = gst_element_factory_make ("nv3dsink", "nv3d-sink");
    g_object_set (G_OBJECT (sink), "sync", FALSE, NULL);
  }

  if (!pgie || !nvdslogger || !tiler || !nvvidconv || !nvosd || !sink) {
    g_printerr ("One element could not be created. Exiting.\n");
    return -1;
  }

  g_object_set (G_OBJECT (streammux), "batch-size", num_sources, NULL);

  g_object_set (G_OBJECT (streammux), "width", MUXER_OUTPUT_WIDTH, "height",
      MUXER_OUTPUT_HEIGHT,
      "batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, NULL);

  /* Configure the nvinfer element using the nvinfer config file. */
  g_object_set (G_OBJECT (pgie),
      "config-file-path", "dsipctest_pgie_config.txt", NULL);

  /* Override the batch-size set in the config file with the number of sources. */
  g_object_get (G_OBJECT (pgie), "batch-size", &pgie_batch_size, NULL);
  if (pgie_batch_size != num_sources) {
    g_printerr
        ("WARNING: Overriding infer-config batch-size (%d) with number of sources (%d)\n",
        pgie_batch_size, num_sources);
    g_object_set (G_OBJECT (pgie), "batch-size", num_sources, NULL);
  }

  tiler_rows = (guint) sqrt (num_sources);
  tiler_columns = (guint) ceil (1.0 * num_sources / tiler_rows);
  /* we set the tiler properties here */
  g_object_set (G_OBJECT (tiler), "rows", tiler_rows, "columns", tiler_columns,
      "width", TILED_OUTPUT_WIDTH, "height", TILED_OUTPUT_HEIGHT, NULL);

  g_object_set (G_OBJECT (nvosd), "process-mode", OSD_PROCESS_MODE,
      "display-text", OSD_DISPLAY_TEXT, NULL);

  g_object_set (G_OBJECT (sink), "qos", 0, NULL);
  g_object_set (G_OBJECT (streammux), "nvbuf-memory-type", 4, NULL);

  /* we add a message handler */
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
  gst_object_unref (bus);

  /* Set up the pipeline */
  /* we add all elements into the pipeline */
  gst_bin_add_many (GST_BIN (pipeline), queue1, pgie, queue2, nvdslogger, tiler,
      queue3, nvvidconv, queue4, nvosd, queue5, sink, NULL);
  /* we link the elements together
  * nvstreammux -> nvinfer -> nvdslogger -> nvtiler -> nvvidconv -> nvosd
  * -> video-renderer */
  if (!gst_element_link_many (streammux, queue1, pgie, queue2, nvdslogger, tiler,
        queue3, nvvidconv, queue4, nvosd, queue5, sink, NULL)) {
    g_printerr ("Elements could not be linked. Exiting.\n");
    return -1;
  }

  /* Lets add probe to get informed of the meta data generated, we add probe to
   * the sink pad of the osd element, since by that time, the buffer would have
   * had got all the metadata. */
  tiler_src_pad = gst_element_get_static_pad (pgie, "src");
  if (!tiler_src_pad)
    g_print ("Unable to get src pad\n");
  else
    gst_pad_add_probe (tiler_src_pad, GST_PAD_PROBE_TYPE_BUFFER,
        tiler_src_pad_buffer_probe, NULL, NULL);
  gst_object_unref (tiler_src_pad);

  /* Set the pipeline to "playing" state */
  g_print ("Now playing:");
  for (i = 0; i < num_sources; i++) {
    g_print (" %s,", argv[i + 2]);
  }
  g_print ("\n");
  gst_element_set_state (pipeline, GST_STATE_PLAYING);

  /* Wait till pipeline encounters an error or EOS */
  g_print ("Running...\n");
  g_main_loop_run (loop);
  appCtx.quit = TRUE;

  for (i = 0; i < num_sources; i++) {
    g_print("client is disconnected uri: %s path: %s fd: %d\n",
      appCtx.ipcclient.uri[i], appCtx.ipcclient.socket_path[i],
      appCtx.ipcclient.fd[i]);
    close(appCtx.ipcclient.fd[i]);
    g_free(appCtx.ipcclient.uri[i]);
    g_free(appCtx.ipcclient.socket_path[i]);
  }

  /* Out of the main loop, clean up nicely */
  g_print ("Returned, stopping playback\n");
  gst_element_set_state (pipeline, GST_STATE_NULL);
  g_print ("Deleting pipeline\n");
  gst_object_unref (GST_OBJECT (pipeline));
  g_source_remove (bus_watch_id);
  g_main_loop_unref (loop);

  return 0;
}

static int
UnixSocketCreate(const char *socket_name) {
  int listen_fd;
  struct sockaddr_un sock_addr;

  listen_fd = socket(PF_UNIX, SOCK_STREAM, 0);
  if (listen_fd < 0) {
    g_printerr("%s: socket create failed", __func__);
    return -1;
  }

  unlink(socket_name);

  memset(&sock_addr, 0, sizeof(struct sockaddr_un));
  sock_addr.sun_family = AF_UNIX;
  strncpy(sock_addr.sun_path, socket_name, sizeof(sock_addr.sun_path) - 1);
  sock_addr.sun_path[sizeof(sock_addr.sun_path) - 1] = '\0';

  if (bind(listen_fd, (const struct sockaddr *)&sock_addr,
           sizeof(struct sockaddr_un))) {
    g_printerr("%s: bind error", __func__);
    close(listen_fd);
    return -1;
  }

  if (listen(listen_fd, 16)) {
    g_printerr("%s: listen error", __func__);
    close(listen_fd);
    return -1;
  }

  return listen_fd;
}

static int
UnixSocketAccept(int listen_fd) {
  /* connect_fd -2 represents timeout */
  int connect_fd = -2;
  struct sockaddr_un connect_addr;
  socklen_t connect_addr_len = 0;
  struct timeval timeout;
  fd_set read_fds;

  FD_ZERO(&read_fds);
  FD_SET(listen_fd, &read_fds);
  timeout.tv_sec = 1;
  timeout.tv_usec = 0;

  int activity = select(listen_fd + 1, &read_fds, NULL, NULL, &timeout);

  if (activity > 0) {
    connect_fd =
        accept(listen_fd, (struct sockaddr *)&connect_addr, &connect_addr_len);

    if (connect_fd < 0) {
      g_printerr("%s: accept failed\n", __func__);
      return -1;
    }
  }

  return connect_fd;
}

static void*
client_handler(void *user_data)
{
  NvIpcSinkPipeline *ipcsink = (NvIpcSinkPipeline *)user_data;
  NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)ipcsink->ipcserver;

  g_main_loop_run(ipcsink->loop);
  ipcsink->quit = TRUE;

  gst_element_set_state (ipcsink->pipeline, GST_STATE_NULL);
  gst_object_unref (GST_OBJECT (ipcsink->pipeline));
  g_source_remove (ipcsink->bus_id);
  close(ipcsink->fd);
  g_main_loop_unref (ipcsink->loop);

  g_print("client is disconnected uri: %s path: %s fd: %d\n",
  ipcserver->uri, ipcserver->socket_path, ipcsink->fd);

  return NULL;
}

static int
create_ipc_sink_pipeline (NvIpcSinkPipeline *ipcsink)
{
  GstElement *pipeline = NULL;
  GstBus *bus = NULL;
  guint bus_watch_id;
  GstElement *appsrc=NULL, *caps_filter=NULL, *ipcpipelinesink= NULL;
  GstCaps *caps=NULL;
  GMainLoop *loop = NULL;
  NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)ipcsink->ipcserver;

  ipcsink->loop = loop = g_main_loop_new (NULL, FALSE);

  /* Create gstreamer elements */
  ipcsink->pipeline = pipeline = gst_pipeline_new ("ipc-sink-pipeline");
  if (!pipeline) {
    g_printerr ("Failed to create pipeline. Exiting.\n");
    return -1;
  }

  ipcsink->appsrc = appsrc = gst_element_factory_make ("appsrc", NULL);
  if (!appsrc) {
    g_printerr ("Failed to create appsrc. Exiting.\n");
    return -1;
  }
  gst_bin_add (GST_BIN (pipeline), appsrc);

  caps_filter = gst_element_factory_make ("capsfilter", NULL);
  if (!caps_filter) {
    g_printerr ("Failed to create caps_filter. Exiting.\n");
    return -1;
  }
  gst_bin_add (GST_BIN (pipeline), caps_filter);

  ipcpipelinesink = gst_element_factory_make ("nvipcpipelinesink", NULL);
  if (!ipcpipelinesink) {
    g_printerr ("Failed to create ipcpipelinesink. Exiting.\n");
    return -1;
  }
  gst_bin_add (GST_BIN (pipeline), ipcpipelinesink);

  if (ipcserver->caps_string) {
    caps = gst_caps_from_string (ipcserver->caps_string);
  } else {
    caps = gst_caps_from_string ("video/x-raw(memory:NVMM),format=NV12,width=1920,height=1080");
  }
  g_object_set (G_OBJECT(caps_filter), "caps", caps, NULL);
  gst_caps_unref (caps);

  g_object_set (G_OBJECT(appsrc), "format", 3, NULL);
  g_object_set (G_OBJECT(appsrc), "is-live", true, NULL);
  g_object_set (G_OBJECT(ipcpipelinesink), "ack-time", 10000000, NULL);
  g_object_set (G_OBJECT(ipcpipelinesink), "fdin", ipcsink->fd, "fdout", ipcsink->fd, NULL);

  /* link the elements together */
  if (!gst_element_link_many (appsrc, caps_filter, ipcpipelinesink, NULL)) {
    g_printerr ("Elements could not be linked. Exiting.\n");
    return -1;
  }

  /* we add a message handler */
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  ipcsink->bus_id = bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
  gst_object_unref (bus);

  gst_element_set_state (ipcsink->pipeline, GST_STATE_PLAYING);

  pthread_create(&ipcsink->thread_id, NULL,
                  client_handler, (void*)ipcsink);
  return 0;
}

static void*
connection_handler(void *user_data)
{
  NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)user_data;
  AppCtx *appCtx = (AppCtx *)ipcserver->appCtx;
  int listen_fd = ipcserver->fd;
  int connect_fd;
  int ret;

  while (!appCtx->quit) {
    connect_fd = UnixSocketAccept(listen_fd);
    if (connect_fd == -1) {
      g_printerr("UnixSocketAccept failed\n");
      break;
    } else if (connect_fd == -2) {
      /*Accept timeout to check if loop quits due to any error */
      continue;
    }
    NvIpcSinkPipeline *ipcsink = g_new0 (NvIpcSinkPipeline, 1);
    ipcsink->fd = connect_fd;
    ipcsink->ipcserver = (void *)ipcserver;

    ret = create_ipc_sink_pipeline(ipcsink);
    if (ret == -1) {
      g_printerr("create_ipc_sink_pipeline failed\n");
      close(connect_fd);
      g_free(ipcsink);
      break;
    }
    g_print("client is connected uri: %s path: %s fd: %d\n",
    ipcserver->uri, ipcserver->socket_path, connect_fd);
    g_mutex_lock (&ipcserver->ipc_sink_queue_lock);
    g_queue_push_tail (ipcserver->ipc_sink_queue, ipcsink);
    g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);
  }

  if (!appCtx->quit) {
    g_main_loop_quit (appCtx->loop);
  }

  g_mutex_lock (&ipcserver->ipc_sink_queue_lock);
  while (!g_queue_is_empty(ipcserver->ipc_sink_queue))
  {
    NvIpcSinkPipeline *ipcsink =
      (NvIpcSinkPipeline *)g_queue_pop_head (ipcserver->ipc_sink_queue);
    if (!ipcsink->quit) {
      g_main_loop_quit (ipcsink->loop);
    }
    pthread_join(ipcsink->thread_id, NULL);
    g_free(ipcsink);
  }
  g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);

  return NULL;
}

static void
gst_mem_free_cb(GstSample *sample, GstMiniObject *obj)
{
  if (sample) {
    gst_sample_unref (sample);
  }
}

static GstFlowReturn
on_new_sample (GstElement *appsink, gpointer user_data)
{
  NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)user_data;
  GstSample *sample = NULL;
  GstBuffer *srcbuffer = NULL;
  GstMapInfo srcmap;

  if (gst_app_sink_is_eos((GstAppSink *)appsink)) {
    g_printerr("EOS Received on app sink element\n");
    return GST_FLOW_EOS;
  }

  /* Get the sample from appsink */
  sample = gst_app_sink_pull_sample (GST_APP_SINK (appsink));
  if (sample == NULL) {
    g_printerr("Sample NULL received on app sink element\n");
    return GST_FLOW_OK;
  }

  /* Get the buffer from sample */
  srcbuffer = gst_sample_get_buffer (sample);
  if (srcbuffer == NULL) {
    g_printerr("No more buffers available from app sink element\n");
    gst_sample_unref (sample);
    return GST_FLOW_ERROR;
  }

  /* Map the gst buffer */
  if (gst_buffer_map (srcbuffer, &srcmap, GST_MAP_READ) == false) {
    g_printerr("Map the gst buffer Failed\n");
    gst_sample_unref (sample);
    return GST_FLOW_ERROR;
  }

  g_mutex_lock (&ipcserver->ipc_sink_queue_lock);
  if (g_queue_is_empty(ipcserver->ipc_sink_queue)) {
    g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);
    gst_buffer_unmap (srcbuffer, &srcmap);
    gst_sample_unref (sample);
    return GST_FLOW_OK;
  }
  GList *current = ipcserver->ipc_sink_queue->head;
  while (current != NULL) {
    NvIpcSinkPipeline *ipcsink = (NvIpcSinkPipeline *)(current->data);
    if (!ipcsink->quit) {
      /* Allocate a new Gst Buffer */
      GstBuffer *dstbuffer = gst_buffer_new_allocate (NULL, srcmap.size, NULL);
      GstMapInfo dstmap;

      /* Map the Gst Buffer to write the data */
      gst_buffer_map (dstbuffer, &dstmap, GST_MAP_WRITE);

      int copy_flags = (GST_BUFFER_COPY_META | GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS);
      gst_buffer_copy_into (dstbuffer, srcbuffer, (GstBufferCopyFlags)copy_flags, 0, -1);

      memcpy (dstmap.data, srcmap.data, srcmap.size);
      dstmap.size = srcmap.size;

      /* Unmap the Gst Buffer */
      gst_buffer_unmap (dstbuffer, &dstmap);

      GstMemory *mem = NULL;
      mem = gst_buffer_peek_memory (dstbuffer, 0);
      gst_mini_object_weak_ref(GST_MINI_OBJECT(mem), (GstMiniObjectNotify)gst_mem_free_cb, sample);
      gst_sample_ref(sample);
      gst_app_src_push_buffer((GstAppSrc*)ipcsink->appsrc, dstbuffer);
    }
    current = current->next;
  }
  g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);

  gst_buffer_unmap (srcbuffer, &srcmap);

  /* Unref the sample */
  gst_sample_unref (sample);

  return GST_FLOW_OK;
}

static GstPadProbeReturn
event_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
  NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)user_data;
  GstEvent *event = NULL;
  event = GST_PAD_PROBE_INFO_EVENT(info);
  if (event)
  {
    if (GST_EVENT_CAPS == GST_EVENT_TYPE(event))
    {
      GstCaps * caps;
      gst_event_parse_caps(event, &caps);
      if (caps) {
        ipcserver->caps_string = gst_caps_to_string (caps);
        g_print("event_probe uri: %s caps: %s\n", ipcserver->uri, ipcserver->caps_string);
        g_mutex_lock (&ipcserver->caps_mutex);
        ipcserver->is_caps = TRUE;
        g_cond_signal (&ipcserver->caps_cond);
        g_mutex_unlock (&ipcserver->caps_mutex);
      } else {
        ipcserver->caps_string = NULL;
      }
    }
  }
  return GST_PAD_PROBE_OK;
}

static int
create_server_pipeline (int argc, char *argv[])
{
  AppCtx appCtx;
  guint i =0, num_sources = 0;
  num_sources = (argc - 2)/2;
  GMainLoop *loop = NULL;

  memset(&appCtx, 0, sizeof(AppCtx));
  appCtx.loop = loop = g_main_loop_new (NULL, FALSE);

  for (i = 0; i < num_sources; i++) {
    GstElement *pipeline = NULL;
    GstBus *bus = NULL;
    guint bus_watch_id;
    GstElement *source_bin=NULL, *caps_filter=NULL, *queue, *appsink= NULL;
    GstCaps *caps=NULL;

    /* Create gstreamer elements */
    /* Create Pipeline element that will form a connection of other elements */
    appCtx.ipcserver[i].pipeline = pipeline = gst_pipeline_new ("ipc-server-pipeline");
    if (!pipeline) {
      g_printerr ("Failed to create pipeline. Exiting.\n");
      return -1;
    }

    source_bin = create_source_bin_csi (i, argv[(i*2) + 2]);
    if (!source_bin) {
      g_printerr ("Failed to create source bin. Exiting.\n");
      return -1;
    }
    gst_bin_add (GST_BIN (pipeline), source_bin);
    appCtx.ipcserver[i].uri = strdup(argv[(i*2) + 2]);
    GstPad* srcpad = gst_element_get_static_pad (GST_ELEMENT(G_OBJECT(source_bin)), "src");
    gst_pad_add_probe(srcpad, GST_PAD_PROBE_TYPE_EVENT_BOTH, event_probe, &appCtx.ipcserver[i], NULL);
    gst_object_unref(srcpad);

    caps_filter = gst_element_factory_make ("capsfilter", NULL);
    if (!caps_filter) {
      g_printerr ("Failed to create caps_filter. Exiting.\n");
      return -1;
    }
    gst_bin_add (GST_BIN (pipeline), caps_filter);

    queue = gst_element_factory_make ("queue", NULL);
    if (!queue) {
      g_printerr ("Failed to create queue. Exiting.\n");
      return -1;
    }
    gst_bin_add (GST_BIN (pipeline), queue);

    appsink = gst_element_factory_make ("appsink", NULL);
    if (!appsink) {
      g_printerr ("Failed to create appsink. Exiting.\n");
      return -1;
    }
    gst_bin_add (GST_BIN (pipeline), appsink);

    caps = gst_caps_from_string ("video/x-raw(memory:NVMM),format=NV12");
    g_object_set (G_OBJECT(caps_filter), "caps", caps, NULL);
    gst_caps_unref (caps);

    g_object_set (G_OBJECT(appsink), "emit-signals", TRUE, "sync", FALSE, NULL);
    g_signal_connect (G_OBJECT(appsink), "new-sample", G_CALLBACK (on_new_sample), &appCtx.ipcserver[i]);

    /* link the elements together */
    if (!gst_element_link_many (source_bin, caps_filter, queue, appsink, NULL)) {
      g_printerr ("Elements could not be linked. Exiting.\n");
      return -1;
    }

    /* we add a message handler */
    bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
    appCtx.ipcserver[i].bus_id = bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
    gst_object_unref (bus);

    appCtx.ipcserver[i].appCtx = &appCtx;
  }

  /* Set the pipeline to "playing" state */
  g_print ("Now playing:");
  for (i = 0; i < num_sources; i++) {
    gint64 end_time;
    g_mutex_init (&appCtx.ipcserver[i].caps_mutex);
    g_cond_init (&appCtx.ipcserver[i].caps_cond);
    appCtx.ipcserver[i].ipc_sink_queue = g_queue_new ();
    g_queue_init (appCtx.ipcserver[i].ipc_sink_queue);
    g_mutex_init (&appCtx.ipcserver[i].ipc_sink_queue_lock);
    gst_element_set_state (appCtx.ipcserver[i].pipeline, GST_STATE_PLAYING);
    g_mutex_lock (&appCtx.ipcserver[i].caps_mutex);
    end_time = g_get_monotonic_time () + 1 * G_TIME_SPAN_SECOND;
    while (!appCtx.ipcserver[i].is_caps) {
      g_cond_wait_until (&appCtx.ipcserver[i].caps_cond, &appCtx.ipcserver[i].caps_mutex, end_time);
    }
    g_mutex_unlock (&appCtx.ipcserver[i].caps_mutex);
  }

  for (i = 0; i < num_sources; i++) {
    appCtx.ipcserver[i].fd = UnixSocketCreate(argv[(i*2) + 3]);
    if (appCtx.ipcserver[i].fd == -1) {
      g_printerr("UnixSocketCreate failed\n");
      return -1;
    }
    appCtx.ipcserver[i].socket_path = strdup(argv[(i*2) + 3]);
    g_print("server is started uri: %s path: %s fd: %d\n",
    appCtx.ipcserver[i].uri, appCtx.ipcserver[i].socket_path, appCtx.ipcserver[i].fd);
    pthread_create(&appCtx.ipcserver[i].thread_id, NULL,
                    connection_handler, (void*)&appCtx.ipcserver[i]);
    GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (appCtx.ipcserver[i].pipeline),
      GST_DEBUG_GRAPH_SHOW_ALL,"ipc-server-pipeline-added");
  }

  /* Wait till pipeline encounters an error or EOS */
  g_print ("Running...\n");
  g_main_loop_run (loop);
  appCtx.quit = TRUE;

  /* Out of the main loop, clean up nicely */
  g_print ("Returned, stopping playback\n");
  g_print ("Deleting pipeline\n");
  for (i = 0; i < num_sources; i++) {
    pthread_join(appCtx.ipcserver[i].thread_id, NULL);
    gst_element_set_state (appCtx.ipcserver[i].pipeline, GST_STATE_NULL);
    gst_object_unref (GST_OBJECT (appCtx.ipcserver[i].pipeline));
    g_source_remove (appCtx.ipcserver[i].bus_id);
    g_print("server is closed uri: %s path: %s fd: %d\n",
    appCtx.ipcserver[i].uri, appCtx.ipcserver[i].socket_path, appCtx.ipcserver[i].fd);
    close(appCtx.ipcserver[i].fd);
    g_free(appCtx.ipcserver[i].uri);
    g_free(appCtx.ipcserver[i].socket_path);
    g_free(appCtx.ipcserver[i].caps_string);
    g_mutex_clear (&appCtx.ipcserver[i].caps_mutex);
    g_cond_clear (&appCtx.ipcserver[i].caps_cond);
    g_mutex_clear (&appCtx.ipcserver[i].ipc_sink_queue_lock);
    g_queue_clear (appCtx.ipcserver[i].ipc_sink_queue);
    g_queue_free(appCtx.ipcserver[i].ipc_sink_queue);
  }

  g_main_loop_unref (loop);
  return 0;
}

int
main (int argc, char *argv[])
{
  int ret = 0;

  /* Check input arguments */
  if (argc < 3) {
    g_printerr ("Usage: %s <server> <rtsp_url> <socket_path>\n", argv[0]);
    g_printerr ("OR: %s <client> <ipc_url>\n", argv[0]);
    return -1;
  }

  /* Standard GStreamer initialization */
  gst_init (&argc, &argv);

  if (strcmp(argv[1], "client") == 0) {
    signal(SIGPIPE, SIG_IGN);
    ret = create_client_pipeline(argc, argv);
  } else {
    signal(SIGPIPE, SIG_IGN);
    ret = create_server_pipeline(argc, argv);
  }

  return ret;
}

NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3416064 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3424256 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3432448 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3440640 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3448832 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3457024 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3465216 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3473408 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3481600 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3489792 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3497984 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3506176 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3514368 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3522560 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3530752 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
Frame Number = 744 Number of objects = 20 Vehicle Count = 20 Person Count = 0
NVMAP_IOC_READ:1077431815 failed [22]

We will check if the argus camera source is supported in the ipc scenario currently.

Hi @yuweiw, kindly asking if there are any updates about this issue. I have checked nvipcpipelinesink with file source, it’s also working as expected.

The demo does not currently support argus camera source. It will be added to our roadmap. Thanks