• Hardware Platform (Jetson / GPU)
sudo dmesg | grep "Machine model"
[ 0.000000] Machine model: NVIDIA Jetson Orin NX Engineering Reference Developer Kit
cat /etc/nv_tegra_release
# R36 (release), REVISION: 3.0, GCID: 36191598, BOARD: generic, EABI: aarch64, DATE: Mon May 6 17:34:21 UTC 2024
# KERNEL_VARIANT: oot
TARGET_USERSPACE_LIB_DIR=nvidia
TARGET_USERSPACE_LIB_DIR_PATH=usr/lib/aarch64-linux-gnu/nvidia
• DeepStream Version
DeepStream 7.0, docker container with guide from: GitHub - NVIDIA-AI-IOT/deepstream_dockers: A project demonstrating how to make DeepStream docker images.
• Issue Type( questions, new requirements, bugs)
Question:
I’m trying to run deepstream-ipc-test with csi nvarguscamera feed. For default version of this sample app with rtsp input everything is fine, but for csi input there is NVMAP_IOC_READ
issue with client side and video output is mozaic. The modification I made to the sample app is only related to function in server side create_source_bin
which I changed to create_source_bin_csi
. For the client side everything is the same:
#include <gst/gst.h>
#include <glib.h>
#include <stdio.h>
#include <math.h>
#include <string.h>
#include <sys/time.h>
#include <cuda_runtime_api.h>
#include "gstnvdsmeta.h"
#include "nvds_yml_parser.h"
#include "gst-nvmessage.h"
#include "deepstream_common.h"
#include <errno.h>
#include <sys/socket.h>
#include <sys/types.h>
#include <sys/un.h>
#include <unistd.h>
#include <gst/app/gstappsink.h>
#include <gst/app/gstappsrc.h>
#define CONFIG_CSI_SOURCE_WIDTH 1920
#define CONFIG_CSI_SOURCE_HEIGHT 1080
#define CONFIG_CSI_SOURCE_FPS_N 30
#define CONFIG_CSI_SOURCE_FPS_D 1
#undef MAX_SOURCE_BINS
#define MAX_SOURCE_BINS 1
#define PGIE_CLASS_ID_VEHICLE 0
#define PGIE_CLASS_ID_PERSON 2
/* By default, OSD process-mode is set to GPU_MODE. To change mode, set as:
* 0: CPU mode
* 1: GPU mode
*/
#define OSD_PROCESS_MODE 1
/* By default, OSD will not display text. To display text, change this to 1 */
#define OSD_DISPLAY_TEXT 0
/* The muxer output resolution must be set if the input streams will be of
* different resolution. The muxer will scale all the input frames to this
* resolution. */
#define MUXER_OUTPUT_WIDTH 1920
#define MUXER_OUTPUT_HEIGHT 1080
/* Muxer batch formation timeout, for e.g. 40 millisec. Should ideally be set
* based on the fastest source's framerate. */
#define MUXER_BATCH_TIMEOUT_USEC 40000
#define TILED_OUTPUT_WIDTH 1280
#define TILED_OUTPUT_HEIGHT 720
/* NVIDIA Decoder source pad memory feature. This feature signifies that source
* pads having this capability will push GstBuffers containing cuda buffers. */
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"
gchar pgie_classes_str[4][32] = { "Vehicle", "TwoWheeler", "Person",
"RoadSign"
};
typedef struct
{
int fd;
guint bus_id;
GstElement *pipeline;
GstElement *appsrc;
GMainLoop *loop;
gboolean quit;
pthread_t thread_id;
void *ipcserver;
} NvIpcSinkPipeline;
typedef struct
{
int fd;
gchar *uri;
gchar *socket_path;
gchar *caps_string;
GMutex caps_mutex;
GCond caps_cond;
gboolean is_caps;
pthread_t thread_id;
guint bus_id;
GstElement *pipeline;
GQueue *ipc_sink_queue;
GMutex ipc_sink_queue_lock;
void *appCtx;
} NvIpcServerPipeline;
typedef struct
{
int fd[MAX_SOURCE_BINS];
gchar *uri[MAX_SOURCE_BINS];
gchar *socket_path[MAX_SOURCE_BINS];
guint bus_id;
GstElement *pipeline;
void *appCtx;
} NvIpcClientPipeline;
typedef struct
{
GMainLoop *loop;
NvIpcServerPipeline ipcserver[MAX_SOURCE_BINS];
NvIpcClientPipeline ipcclient;
gboolean quit;
} AppCtx;
/* tiler_sink_pad_buffer_probe will extract metadata received on OSD sink pad
* and update params for drawing rectangle, object information etc. */
static GstPadProbeReturn
tiler_src_pad_buffer_probe (GstPad * pad, GstPadProbeInfo * info,
gpointer u_data)
{
GstBuffer *buf = (GstBuffer *) info->data;
guint num_rects = 0;
NvDsObjectMeta *obj_meta = NULL;
guint vehicle_count = 0;
guint person_count = 0;
NvDsMetaList * l_frame = NULL;
NvDsMetaList * l_obj = NULL;
//NvDsDisplayMeta *display_meta = NULL;
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta (buf);
for (l_frame = batch_meta->frame_meta_list; l_frame != NULL;
l_frame = l_frame->next) {
NvDsFrameMeta *frame_meta = (NvDsFrameMeta *) (l_frame->data);
//int offset = 0;
for (l_obj = frame_meta->obj_meta_list; l_obj != NULL;
l_obj = l_obj->next) {
obj_meta = (NvDsObjectMeta *) (l_obj->data);
if (obj_meta->class_id == PGIE_CLASS_ID_VEHICLE) {
vehicle_count++;
num_rects++;
}
if (obj_meta->class_id == PGIE_CLASS_ID_PERSON) {
person_count++;
num_rects++;
}
}
g_print ("Frame Number = %d Number of objects = %d "
"Vehicle Count = %d Person Count = %d\n",
frame_meta->frame_num, num_rects, vehicle_count, person_count);
}
return GST_PAD_PROBE_OK;
}
static gboolean
bus_call (GstBus * bus, GstMessage * msg, gpointer data)
{
GMainLoop *loop = (GMainLoop *) data;
switch (GST_MESSAGE_TYPE (msg)) {
case GST_MESSAGE_EOS:
g_print ("End of stream\n");
g_main_loop_quit (loop);
break;
case GST_MESSAGE_WARNING:
{
gchar *debug;
GError *error;
gst_message_parse_warning (msg, &error, &debug);
g_printerr ("WARNING from element %s: %s\n",
GST_OBJECT_NAME (msg->src), error->message);
g_free (debug);
g_printerr ("Warning: %s\n", error->message);
g_error_free (error);
break;
}
case GST_MESSAGE_ERROR:
{
gchar *debug;
GError *error;
gst_message_parse_error (msg, &error, &debug);
g_printerr ("ERROR from element %s: %s\n",
GST_OBJECT_NAME (msg->src), error->message);
if (debug)
g_printerr ("Error details: %s\n", debug);
g_free (debug);
g_error_free (error);
g_main_loop_quit (loop);
break;
}
case GST_MESSAGE_ELEMENT:
{
if (gst_nvmessage_is_stream_eos (msg)) {
guint stream_id;
if (gst_nvmessage_parse_stream_eos (msg, &stream_id)) {
g_print ("Got EOS from stream %d\n", stream_id);
}
}
break;
}
case GST_MESSAGE_STATE_CHANGED:
{
GstState oldstate, newstate;
gst_message_parse_state_changed (msg, &oldstate, &newstate, NULL);
switch (newstate) {
case GST_STATE_PLAYING:
g_print ("Pipeline running\n");
break;
case GST_STATE_PAUSED:
if (oldstate == GST_STATE_PLAYING) {
g_print ("Pipeline paused\n");
}
break;
case GST_STATE_READY:
if (oldstate == GST_STATE_NULL) {
g_print ("Pipeline ready\n");
} else {
g_print ("Pipeline stopped\n");
}
break;
case GST_STATE_NULL:
g_print ("Pipeline Null\n");
g_main_loop_quit (loop);
return FALSE;
break;
default:
break;
}
break;
}
default:
break;
}
return TRUE;
}
static void
cb_newpad (GstElement * decodebin, GstPad * decoder_src_pad, gpointer data)
{
GstCaps *caps = gst_pad_get_current_caps (decoder_src_pad);
if (!caps) {
caps = gst_pad_query_caps (decoder_src_pad, NULL);
}
const GstStructure *str = gst_caps_get_structure (caps, 0);
const gchar *name = gst_structure_get_name (str);
GstElement *source_bin = (GstElement *) data;
GstCapsFeatures *features = gst_caps_get_features (caps, 0);
/* Need to check if the pad created by the decodebin is for video and not
* audio. */
if (!strncmp (name, "video", 5)) {
/* Link the decodebin pad only if decodebin has picked nvidia
* decoder plugin nvdec_*. We do this by checking if the pad caps contain
* NVMM memory features. */
if (gst_caps_features_contains (features, GST_CAPS_FEATURES_NVMM)) {
/* Get the source bin ghost pad */
GstPad *bin_ghost_pad = gst_element_get_static_pad (source_bin, "src");
if (!gst_ghost_pad_set_target (GST_GHOST_PAD (bin_ghost_pad),
decoder_src_pad)) {
g_printerr ("Failed to link decoder src pad to source bin ghost pad\n");
}
gst_object_unref (bin_ghost_pad);
} else {
g_printerr ("Error: Decodebin did not pick nvidia decoder plugin.\n");
}
}
}
static void
decodebin_child_added (GstChildProxy * child_proxy, GObject * object,
gchar * name, gpointer user_data)
{
g_print ("Decodebin child added: %s\n", name);
if (g_strrstr (name, "decodebin") == name) {
g_signal_connect (G_OBJECT (object), "child-added",
G_CALLBACK (decodebin_child_added), user_data);
}
if (g_strrstr (name, "source") == name) {
g_object_set(G_OBJECT(object),"drop-on-latency",true,NULL);
}
}
static GstElement *
create_source_bin (guint index, gchar * uri, int fd)
{
GstElement *bin = NULL, *uri_decode_bin = NULL;
gchar bin_name[16] = { };
g_snprintf (bin_name, 15, "source-bin-%02d", index);
/* Create a source GstBin to abstract this bin's content from the rest of the
* pipeline */
bin = gst_bin_new (bin_name);
/* Source element for reading from the uri.
* We will use decodebin and let it figure out the container format of the
* stream and the codec and plug the appropriate demux and decode plugins. */
uri_decode_bin = gst_element_factory_make ("nvurisrcbin", NULL);
g_object_set (G_OBJECT (uri_decode_bin), "file-loop", TRUE, NULL);
g_object_set (G_OBJECT (uri_decode_bin), "cudadec-memtype", 1, NULL);
if (fd != -1) {
g_object_set (G_OBJECT (uri_decode_bin), "sock-fd", fd, NULL);
}
if (!bin || !uri_decode_bin) {
g_printerr ("One element in source bin could not be created.\n");
return NULL;
}
/* We set the input uri to the source element */
g_object_set (G_OBJECT (uri_decode_bin), "uri", uri, NULL);
/* Connect to the "pad-added" signal of the decodebin which generates a
* callback once a new pad for raw data has beed created by the decodebin */
g_signal_connect (G_OBJECT (uri_decode_bin), "pad-added",
G_CALLBACK (cb_newpad), bin);
g_signal_connect (G_OBJECT (uri_decode_bin), "child-added",
G_CALLBACK (decodebin_child_added), bin);
gst_bin_add (GST_BIN (bin), uri_decode_bin);
/* We need to create a ghost pad for the source bin which will act as a proxy
* for the video decoder src pad. The ghost pad will not have a target right
* now. Once the decode bin creates the video decoder and generates the
* cb_newpad callback, we will set the ghost pad target to the video decoder
* src pad. */
if (!gst_element_add_pad (bin, gst_ghost_pad_new_no_target ("src",
GST_PAD_SRC))) {
g_printerr ("Failed to add ghost pad in source bin\n");
return NULL;
}
return bin;
}
static GstElement
*create_source_bin_csi(guint index, gchar *cfg_file_path) {
GstElement *bin = NULL, *src_elem = NULL, *caps_filter = NULL;
GstCaps *caps = NULL;
gchar bin_name[16] = {};
g_snprintf(bin_name, 15, "source-bin-%02d", index);
/* Create a source GstBin to abstract this bin's content from the rest of the
* pipeline */
bin = gst_bin_new(bin_name);
src_elem = gst_element_factory_make ("nvarguscamerasrc", "csi_src_elem");
if (!bin || !src_elem) {
g_printerr("One element in source bin could not be created.\n");
return NULL;
}
caps_filter = gst_element_factory_make("capsfilter", "src_cap_filter");
if (!caps_filter) {
g_printerr("Could not create 'src_cap_filter'");
}
caps = gst_caps_new_simple(
"video/x-raw", "format", G_TYPE_STRING, "NV12", "width", G_TYPE_INT,
CONFIG_CSI_SOURCE_WIDTH, "height", G_TYPE_INT, CONFIG_CSI_SOURCE_HEIGHT,
"framerate", GST_TYPE_FRACTION, CONFIG_CSI_SOURCE_FPS_N,
CONFIG_CSI_SOURCE_FPS_D, NULL);
GstCapsFeatures *feature = NULL;
feature = gst_caps_features_new ("memory:NVMM", NULL);
gst_caps_set_features (caps, 0, feature);
g_object_set (G_OBJECT (caps_filter), "caps", caps, NULL);
gst_bin_add_many (GST_BIN (bin), src_elem, caps_filter, NULL);
NVGSTDS_LINK_ELEMENT (src_elem, caps_filter);
NVGSTDS_BIN_ADD_GHOST_PAD (bin, caps_filter, "src");
done:
if (caps)
gst_caps_unref (caps);
return bin;
}
static int
UnixSocketConnect(const char *socket_name) {
int sock_fd = -1;
struct sockaddr_un sock_addr;
int wait_loop = 0;
sock_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (sock_fd < 0) {
g_printerr("%s: socket create failed.\n", __func__);
return -1;
}
memset(&sock_addr, 0, sizeof(struct sockaddr_un));
sock_addr.sun_family = AF_UNIX;
strncpy(sock_addr.sun_path, socket_name, sizeof(sock_addr.sun_path) - 1);
while (connect(sock_fd, (const struct sockaddr *)&sock_addr,
sizeof(struct sockaddr_un))) {
/* wait loop runs 60 times after sleep of 100ms each */
if (wait_loop < 60) {
g_print("Waiting for producer count %d\n", wait_loop);
usleep(100000);
wait_loop++;
} else {
g_printerr("\n%s: Waiting timed out\n", __func__);
close (sock_fd);
sock_fd = -1;
return -1;
}
}
return sock_fd;
}
static int
create_client_pipeline (int argc, char *argv[])
{
AppCtx appCtx;
GMainLoop *loop = NULL;
GstElement *pipeline = NULL, *streammux = NULL, *sink = NULL, *pgie = NULL,
*queue1, *queue2, *queue3, *queue4, *queue5, *nvvidconv = NULL,
*nvosd = NULL, *tiler = NULL, *nvdslogger = NULL;
GstBus *bus = NULL;
guint bus_watch_id;
GstPad *tiler_src_pad = NULL;
guint i =0, num_sources = 0;
guint tiler_rows, tiler_columns;
guint pgie_batch_size;
NvDsGieType pgie_type = NVDS_GIE_PLUGIN_INFER;
gboolean PERF_MODE = g_getenv("NV_IPC_TEST_PERF_MODE") &&
!g_strcmp0(g_getenv("NV_IPC_TEST_PERF_MODE"), "1");
memset(&appCtx, 0, sizeof(AppCtx));
appCtx.loop = loop = g_main_loop_new (NULL, FALSE);
/* Create gstreamer elements */
/* Create Pipeline element that will form a connection of other elements */
pipeline = gst_pipeline_new ("ipc-client-pipeline");
/* Create nvstreammux instance to form batches from one or more sources. */
streammux = gst_element_factory_make ("nvstreammux", "stream-muxer");
if (!pipeline || !streammux) {
g_printerr ("One element could not be created. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), streammux);
num_sources = argc - 2;
for (i = 0; i < num_sources; i++) {
GstPad *sinkpad, *srcpad;
gchar pad_name[16] = { };
const char *prefix = "ipc://";
const char *socket_path = argv[i + 2] + strlen(prefix);
int sock_fd = UnixSocketConnect(socket_path);
if (sock_fd == -1) {
g_printerr ("UnixSocketConnect failed\n");
return -1;
}
appCtx.ipcclient.fd[i] = sock_fd;
appCtx.ipcclient.uri[i] = strdup(argv[i + 2]);
appCtx.ipcclient.socket_path[i] = strdup(socket_path);
g_print("client is connected uri: %s path: %s fd: %d\n",
appCtx.ipcclient.uri[i], appCtx.ipcclient.socket_path[i],
appCtx.ipcclient.fd[i]);
GstElement *source_bin= NULL;
source_bin = create_source_bin (i, argv[i + 2], sock_fd);
if (!source_bin) {
g_printerr ("Failed to create source bin. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
g_snprintf (pad_name, 15, "sink_%u", i);
sinkpad = gst_element_request_pad_simple (streammux, pad_name);
if (!sinkpad) {
g_printerr ("Streammux request sink pad failed. Exiting.\n");
return -1;
}
srcpad = gst_element_get_static_pad (source_bin, "src");
if (!srcpad) {
g_printerr ("Failed to get src pad of source bin. Exiting.\n");
return -1;
}
if (gst_pad_link (srcpad, sinkpad) != GST_PAD_LINK_OK) {
g_printerr ("Failed to link source bin to stream muxer. Exiting.\n");
return -1;
}
gst_object_unref (srcpad);
gst_object_unref (sinkpad);
}
/* Use nvinfer or nvinferserver to infer on batched frame. */
if (pgie_type == NVDS_GIE_PLUGIN_INFER_SERVER) {
pgie = gst_element_factory_make ("nvinferserver", "primary-nvinference-engine");
} else {
pgie = gst_element_factory_make ("nvinfer", "primary-nvinference-engine");
}
/* Add queue elements between every two elements */
queue1 = gst_element_factory_make ("queue", "queue1");
queue2 = gst_element_factory_make ("queue", "queue2");
queue3 = gst_element_factory_make ("queue", "queue3");
queue4 = gst_element_factory_make ("queue", "queue4");
queue5 = gst_element_factory_make ("queue", "queue5");
/* Use nvdslogger for perf measurement. */
nvdslogger = gst_element_factory_make ("nvdslogger", "nvdslogger");
/* Use nvtiler to composite the batched frames into a 2D tiled array based
* on the source of the frames. */
tiler = gst_element_factory_make ("nvmultistreamtiler", "nvtiler");
/* Use convertor to convert from NV12 to RGBA as required by nvosd */
nvvidconv = gst_element_factory_make ("nvvideoconvert", "nvvideo-converter");
/* Create OSD to draw on the converted RGBA buffer */
nvosd = gst_element_factory_make ("nvdsosd", "nv-onscreendisplay");
if (PERF_MODE) {
sink = gst_element_factory_make ("fakesink", "nvvideo-renderer");
g_object_set (G_OBJECT (sink), "sync", FALSE, NULL);
} else {
sink = gst_element_factory_make ("nv3dsink", "nv3d-sink");
g_object_set (G_OBJECT (sink), "sync", FALSE, NULL);
}
if (!pgie || !nvdslogger || !tiler || !nvvidconv || !nvosd || !sink) {
g_printerr ("One element could not be created. Exiting.\n");
return -1;
}
g_object_set (G_OBJECT (streammux), "batch-size", num_sources, NULL);
g_object_set (G_OBJECT (streammux), "width", MUXER_OUTPUT_WIDTH, "height",
MUXER_OUTPUT_HEIGHT,
"batched-push-timeout", MUXER_BATCH_TIMEOUT_USEC, NULL);
/* Configure the nvinfer element using the nvinfer config file. */
g_object_set (G_OBJECT (pgie),
"config-file-path", "dsipctest_pgie_config.txt", NULL);
/* Override the batch-size set in the config file with the number of sources. */
g_object_get (G_OBJECT (pgie), "batch-size", &pgie_batch_size, NULL);
if (pgie_batch_size != num_sources) {
g_printerr
("WARNING: Overriding infer-config batch-size (%d) with number of sources (%d)\n",
pgie_batch_size, num_sources);
g_object_set (G_OBJECT (pgie), "batch-size", num_sources, NULL);
}
tiler_rows = (guint) sqrt (num_sources);
tiler_columns = (guint) ceil (1.0 * num_sources / tiler_rows);
/* we set the tiler properties here */
g_object_set (G_OBJECT (tiler), "rows", tiler_rows, "columns", tiler_columns,
"width", TILED_OUTPUT_WIDTH, "height", TILED_OUTPUT_HEIGHT, NULL);
g_object_set (G_OBJECT (nvosd), "process-mode", OSD_PROCESS_MODE,
"display-text", OSD_DISPLAY_TEXT, NULL);
g_object_set (G_OBJECT (sink), "qos", 0, NULL);
g_object_set (G_OBJECT (streammux), "nvbuf-memory-type", 4, NULL);
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
gst_object_unref (bus);
/* Set up the pipeline */
/* we add all elements into the pipeline */
gst_bin_add_many (GST_BIN (pipeline), queue1, pgie, queue2, nvdslogger, tiler,
queue3, nvvidconv, queue4, nvosd, queue5, sink, NULL);
/* we link the elements together
* nvstreammux -> nvinfer -> nvdslogger -> nvtiler -> nvvidconv -> nvosd
* -> video-renderer */
if (!gst_element_link_many (streammux, queue1, pgie, queue2, nvdslogger, tiler,
queue3, nvvidconv, queue4, nvosd, queue5, sink, NULL)) {
g_printerr ("Elements could not be linked. Exiting.\n");
return -1;
}
/* Lets add probe to get informed of the meta data generated, we add probe to
* the sink pad of the osd element, since by that time, the buffer would have
* had got all the metadata. */
tiler_src_pad = gst_element_get_static_pad (pgie, "src");
if (!tiler_src_pad)
g_print ("Unable to get src pad\n");
else
gst_pad_add_probe (tiler_src_pad, GST_PAD_PROBE_TYPE_BUFFER,
tiler_src_pad_buffer_probe, NULL, NULL);
gst_object_unref (tiler_src_pad);
/* Set the pipeline to "playing" state */
g_print ("Now playing:");
for (i = 0; i < num_sources; i++) {
g_print (" %s,", argv[i + 2]);
}
g_print ("\n");
gst_element_set_state (pipeline, GST_STATE_PLAYING);
/* Wait till pipeline encounters an error or EOS */
g_print ("Running...\n");
g_main_loop_run (loop);
appCtx.quit = TRUE;
for (i = 0; i < num_sources; i++) {
g_print("client is disconnected uri: %s path: %s fd: %d\n",
appCtx.ipcclient.uri[i], appCtx.ipcclient.socket_path[i],
appCtx.ipcclient.fd[i]);
close(appCtx.ipcclient.fd[i]);
g_free(appCtx.ipcclient.uri[i]);
g_free(appCtx.ipcclient.socket_path[i]);
}
/* Out of the main loop, clean up nicely */
g_print ("Returned, stopping playback\n");
gst_element_set_state (pipeline, GST_STATE_NULL);
g_print ("Deleting pipeline\n");
gst_object_unref (GST_OBJECT (pipeline));
g_source_remove (bus_watch_id);
g_main_loop_unref (loop);
return 0;
}
static int
UnixSocketCreate(const char *socket_name) {
int listen_fd;
struct sockaddr_un sock_addr;
listen_fd = socket(PF_UNIX, SOCK_STREAM, 0);
if (listen_fd < 0) {
g_printerr("%s: socket create failed", __func__);
return -1;
}
unlink(socket_name);
memset(&sock_addr, 0, sizeof(struct sockaddr_un));
sock_addr.sun_family = AF_UNIX;
strncpy(sock_addr.sun_path, socket_name, sizeof(sock_addr.sun_path) - 1);
sock_addr.sun_path[sizeof(sock_addr.sun_path) - 1] = '\0';
if (bind(listen_fd, (const struct sockaddr *)&sock_addr,
sizeof(struct sockaddr_un))) {
g_printerr("%s: bind error", __func__);
close(listen_fd);
return -1;
}
if (listen(listen_fd, 16)) {
g_printerr("%s: listen error", __func__);
close(listen_fd);
return -1;
}
return listen_fd;
}
static int
UnixSocketAccept(int listen_fd) {
/* connect_fd -2 represents timeout */
int connect_fd = -2;
struct sockaddr_un connect_addr;
socklen_t connect_addr_len = 0;
struct timeval timeout;
fd_set read_fds;
FD_ZERO(&read_fds);
FD_SET(listen_fd, &read_fds);
timeout.tv_sec = 1;
timeout.tv_usec = 0;
int activity = select(listen_fd + 1, &read_fds, NULL, NULL, &timeout);
if (activity > 0) {
connect_fd =
accept(listen_fd, (struct sockaddr *)&connect_addr, &connect_addr_len);
if (connect_fd < 0) {
g_printerr("%s: accept failed\n", __func__);
return -1;
}
}
return connect_fd;
}
static void*
client_handler(void *user_data)
{
NvIpcSinkPipeline *ipcsink = (NvIpcSinkPipeline *)user_data;
NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)ipcsink->ipcserver;
g_main_loop_run(ipcsink->loop);
ipcsink->quit = TRUE;
gst_element_set_state (ipcsink->pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (ipcsink->pipeline));
g_source_remove (ipcsink->bus_id);
close(ipcsink->fd);
g_main_loop_unref (ipcsink->loop);
g_print("client is disconnected uri: %s path: %s fd: %d\n",
ipcserver->uri, ipcserver->socket_path, ipcsink->fd);
return NULL;
}
static int
create_ipc_sink_pipeline (NvIpcSinkPipeline *ipcsink)
{
GstElement *pipeline = NULL;
GstBus *bus = NULL;
guint bus_watch_id;
GstElement *appsrc=NULL, *caps_filter=NULL, *ipcpipelinesink= NULL;
GstCaps *caps=NULL;
GMainLoop *loop = NULL;
NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)ipcsink->ipcserver;
ipcsink->loop = loop = g_main_loop_new (NULL, FALSE);
/* Create gstreamer elements */
ipcsink->pipeline = pipeline = gst_pipeline_new ("ipc-sink-pipeline");
if (!pipeline) {
g_printerr ("Failed to create pipeline. Exiting.\n");
return -1;
}
ipcsink->appsrc = appsrc = gst_element_factory_make ("appsrc", NULL);
if (!appsrc) {
g_printerr ("Failed to create appsrc. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), appsrc);
caps_filter = gst_element_factory_make ("capsfilter", NULL);
if (!caps_filter) {
g_printerr ("Failed to create caps_filter. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), caps_filter);
ipcpipelinesink = gst_element_factory_make ("nvipcpipelinesink", NULL);
if (!ipcpipelinesink) {
g_printerr ("Failed to create ipcpipelinesink. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), ipcpipelinesink);
if (ipcserver->caps_string) {
caps = gst_caps_from_string (ipcserver->caps_string);
} else {
caps = gst_caps_from_string ("video/x-raw(memory:NVMM),format=NV12,width=1920,height=1080");
}
g_object_set (G_OBJECT(caps_filter), "caps", caps, NULL);
gst_caps_unref (caps);
g_object_set (G_OBJECT(appsrc), "format", 3, NULL);
g_object_set (G_OBJECT(appsrc), "is-live", true, NULL);
g_object_set (G_OBJECT(ipcpipelinesink), "ack-time", 10000000, NULL);
g_object_set (G_OBJECT(ipcpipelinesink), "fdin", ipcsink->fd, "fdout", ipcsink->fd, NULL);
/* link the elements together */
if (!gst_element_link_many (appsrc, caps_filter, ipcpipelinesink, NULL)) {
g_printerr ("Elements could not be linked. Exiting.\n");
return -1;
}
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
ipcsink->bus_id = bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
gst_object_unref (bus);
gst_element_set_state (ipcsink->pipeline, GST_STATE_PLAYING);
pthread_create(&ipcsink->thread_id, NULL,
client_handler, (void*)ipcsink);
return 0;
}
static void*
connection_handler(void *user_data)
{
NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)user_data;
AppCtx *appCtx = (AppCtx *)ipcserver->appCtx;
int listen_fd = ipcserver->fd;
int connect_fd;
int ret;
while (!appCtx->quit) {
connect_fd = UnixSocketAccept(listen_fd);
if (connect_fd == -1) {
g_printerr("UnixSocketAccept failed\n");
break;
} else if (connect_fd == -2) {
/*Accept timeout to check if loop quits due to any error */
continue;
}
NvIpcSinkPipeline *ipcsink = g_new0 (NvIpcSinkPipeline, 1);
ipcsink->fd = connect_fd;
ipcsink->ipcserver = (void *)ipcserver;
ret = create_ipc_sink_pipeline(ipcsink);
if (ret == -1) {
g_printerr("create_ipc_sink_pipeline failed\n");
close(connect_fd);
g_free(ipcsink);
break;
}
g_print("client is connected uri: %s path: %s fd: %d\n",
ipcserver->uri, ipcserver->socket_path, connect_fd);
g_mutex_lock (&ipcserver->ipc_sink_queue_lock);
g_queue_push_tail (ipcserver->ipc_sink_queue, ipcsink);
g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);
}
if (!appCtx->quit) {
g_main_loop_quit (appCtx->loop);
}
g_mutex_lock (&ipcserver->ipc_sink_queue_lock);
while (!g_queue_is_empty(ipcserver->ipc_sink_queue))
{
NvIpcSinkPipeline *ipcsink =
(NvIpcSinkPipeline *)g_queue_pop_head (ipcserver->ipc_sink_queue);
if (!ipcsink->quit) {
g_main_loop_quit (ipcsink->loop);
}
pthread_join(ipcsink->thread_id, NULL);
g_free(ipcsink);
}
g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);
return NULL;
}
static void
gst_mem_free_cb(GstSample *sample, GstMiniObject *obj)
{
if (sample) {
gst_sample_unref (sample);
}
}
static GstFlowReturn
on_new_sample (GstElement *appsink, gpointer user_data)
{
NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)user_data;
GstSample *sample = NULL;
GstBuffer *srcbuffer = NULL;
GstMapInfo srcmap;
if (gst_app_sink_is_eos((GstAppSink *)appsink)) {
g_printerr("EOS Received on app sink element\n");
return GST_FLOW_EOS;
}
/* Get the sample from appsink */
sample = gst_app_sink_pull_sample (GST_APP_SINK (appsink));
if (sample == NULL) {
g_printerr("Sample NULL received on app sink element\n");
return GST_FLOW_OK;
}
/* Get the buffer from sample */
srcbuffer = gst_sample_get_buffer (sample);
if (srcbuffer == NULL) {
g_printerr("No more buffers available from app sink element\n");
gst_sample_unref (sample);
return GST_FLOW_ERROR;
}
/* Map the gst buffer */
if (gst_buffer_map (srcbuffer, &srcmap, GST_MAP_READ) == false) {
g_printerr("Map the gst buffer Failed\n");
gst_sample_unref (sample);
return GST_FLOW_ERROR;
}
g_mutex_lock (&ipcserver->ipc_sink_queue_lock);
if (g_queue_is_empty(ipcserver->ipc_sink_queue)) {
g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);
gst_buffer_unmap (srcbuffer, &srcmap);
gst_sample_unref (sample);
return GST_FLOW_OK;
}
GList *current = ipcserver->ipc_sink_queue->head;
while (current != NULL) {
NvIpcSinkPipeline *ipcsink = (NvIpcSinkPipeline *)(current->data);
if (!ipcsink->quit) {
/* Allocate a new Gst Buffer */
GstBuffer *dstbuffer = gst_buffer_new_allocate (NULL, srcmap.size, NULL);
GstMapInfo dstmap;
/* Map the Gst Buffer to write the data */
gst_buffer_map (dstbuffer, &dstmap, GST_MAP_WRITE);
int copy_flags = (GST_BUFFER_COPY_META | GST_BUFFER_COPY_FLAGS | GST_BUFFER_COPY_TIMESTAMPS);
gst_buffer_copy_into (dstbuffer, srcbuffer, (GstBufferCopyFlags)copy_flags, 0, -1);
memcpy (dstmap.data, srcmap.data, srcmap.size);
dstmap.size = srcmap.size;
/* Unmap the Gst Buffer */
gst_buffer_unmap (dstbuffer, &dstmap);
GstMemory *mem = NULL;
mem = gst_buffer_peek_memory (dstbuffer, 0);
gst_mini_object_weak_ref(GST_MINI_OBJECT(mem), (GstMiniObjectNotify)gst_mem_free_cb, sample);
gst_sample_ref(sample);
gst_app_src_push_buffer((GstAppSrc*)ipcsink->appsrc, dstbuffer);
}
current = current->next;
}
g_mutex_unlock (&ipcserver->ipc_sink_queue_lock);
gst_buffer_unmap (srcbuffer, &srcmap);
/* Unref the sample */
gst_sample_unref (sample);
return GST_FLOW_OK;
}
static GstPadProbeReturn
event_probe(GstPad *pad, GstPadProbeInfo *info, gpointer user_data)
{
NvIpcServerPipeline *ipcserver = (NvIpcServerPipeline *)user_data;
GstEvent *event = NULL;
event = GST_PAD_PROBE_INFO_EVENT(info);
if (event)
{
if (GST_EVENT_CAPS == GST_EVENT_TYPE(event))
{
GstCaps * caps;
gst_event_parse_caps(event, &caps);
if (caps) {
ipcserver->caps_string = gst_caps_to_string (caps);
g_print("event_probe uri: %s caps: %s\n", ipcserver->uri, ipcserver->caps_string);
g_mutex_lock (&ipcserver->caps_mutex);
ipcserver->is_caps = TRUE;
g_cond_signal (&ipcserver->caps_cond);
g_mutex_unlock (&ipcserver->caps_mutex);
} else {
ipcserver->caps_string = NULL;
}
}
}
return GST_PAD_PROBE_OK;
}
static int
create_server_pipeline (int argc, char *argv[])
{
AppCtx appCtx;
guint i =0, num_sources = 0;
num_sources = (argc - 2)/2;
GMainLoop *loop = NULL;
memset(&appCtx, 0, sizeof(AppCtx));
appCtx.loop = loop = g_main_loop_new (NULL, FALSE);
for (i = 0; i < num_sources; i++) {
GstElement *pipeline = NULL;
GstBus *bus = NULL;
guint bus_watch_id;
GstElement *source_bin=NULL, *caps_filter=NULL, *queue, *appsink= NULL;
GstCaps *caps=NULL;
/* Create gstreamer elements */
/* Create Pipeline element that will form a connection of other elements */
appCtx.ipcserver[i].pipeline = pipeline = gst_pipeline_new ("ipc-server-pipeline");
if (!pipeline) {
g_printerr ("Failed to create pipeline. Exiting.\n");
return -1;
}
source_bin = create_source_bin_csi (i, argv[(i*2) + 2]);
if (!source_bin) {
g_printerr ("Failed to create source bin. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), source_bin);
appCtx.ipcserver[i].uri = strdup(argv[(i*2) + 2]);
GstPad* srcpad = gst_element_get_static_pad (GST_ELEMENT(G_OBJECT(source_bin)), "src");
gst_pad_add_probe(srcpad, GST_PAD_PROBE_TYPE_EVENT_BOTH, event_probe, &appCtx.ipcserver[i], NULL);
gst_object_unref(srcpad);
caps_filter = gst_element_factory_make ("capsfilter", NULL);
if (!caps_filter) {
g_printerr ("Failed to create caps_filter. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), caps_filter);
queue = gst_element_factory_make ("queue", NULL);
if (!queue) {
g_printerr ("Failed to create queue. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), queue);
appsink = gst_element_factory_make ("appsink", NULL);
if (!appsink) {
g_printerr ("Failed to create appsink. Exiting.\n");
return -1;
}
gst_bin_add (GST_BIN (pipeline), appsink);
caps = gst_caps_from_string ("video/x-raw(memory:NVMM),format=NV12");
g_object_set (G_OBJECT(caps_filter), "caps", caps, NULL);
gst_caps_unref (caps);
g_object_set (G_OBJECT(appsink), "emit-signals", TRUE, "sync", FALSE, NULL);
g_signal_connect (G_OBJECT(appsink), "new-sample", G_CALLBACK (on_new_sample), &appCtx.ipcserver[i]);
/* link the elements together */
if (!gst_element_link_many (source_bin, caps_filter, queue, appsink, NULL)) {
g_printerr ("Elements could not be linked. Exiting.\n");
return -1;
}
/* we add a message handler */
bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
appCtx.ipcserver[i].bus_id = bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
gst_object_unref (bus);
appCtx.ipcserver[i].appCtx = &appCtx;
}
/* Set the pipeline to "playing" state */
g_print ("Now playing:");
for (i = 0; i < num_sources; i++) {
gint64 end_time;
g_mutex_init (&appCtx.ipcserver[i].caps_mutex);
g_cond_init (&appCtx.ipcserver[i].caps_cond);
appCtx.ipcserver[i].ipc_sink_queue = g_queue_new ();
g_queue_init (appCtx.ipcserver[i].ipc_sink_queue);
g_mutex_init (&appCtx.ipcserver[i].ipc_sink_queue_lock);
gst_element_set_state (appCtx.ipcserver[i].pipeline, GST_STATE_PLAYING);
g_mutex_lock (&appCtx.ipcserver[i].caps_mutex);
end_time = g_get_monotonic_time () + 1 * G_TIME_SPAN_SECOND;
while (!appCtx.ipcserver[i].is_caps) {
g_cond_wait_until (&appCtx.ipcserver[i].caps_cond, &appCtx.ipcserver[i].caps_mutex, end_time);
}
g_mutex_unlock (&appCtx.ipcserver[i].caps_mutex);
}
for (i = 0; i < num_sources; i++) {
appCtx.ipcserver[i].fd = UnixSocketCreate(argv[(i*2) + 3]);
if (appCtx.ipcserver[i].fd == -1) {
g_printerr("UnixSocketCreate failed\n");
return -1;
}
appCtx.ipcserver[i].socket_path = strdup(argv[(i*2) + 3]);
g_print("server is started uri: %s path: %s fd: %d\n",
appCtx.ipcserver[i].uri, appCtx.ipcserver[i].socket_path, appCtx.ipcserver[i].fd);
pthread_create(&appCtx.ipcserver[i].thread_id, NULL,
connection_handler, (void*)&appCtx.ipcserver[i]);
GST_DEBUG_BIN_TO_DOT_FILE_WITH_TS (GST_BIN (appCtx.ipcserver[i].pipeline),
GST_DEBUG_GRAPH_SHOW_ALL,"ipc-server-pipeline-added");
}
/* Wait till pipeline encounters an error or EOS */
g_print ("Running...\n");
g_main_loop_run (loop);
appCtx.quit = TRUE;
/* Out of the main loop, clean up nicely */
g_print ("Returned, stopping playback\n");
g_print ("Deleting pipeline\n");
for (i = 0; i < num_sources; i++) {
pthread_join(appCtx.ipcserver[i].thread_id, NULL);
gst_element_set_state (appCtx.ipcserver[i].pipeline, GST_STATE_NULL);
gst_object_unref (GST_OBJECT (appCtx.ipcserver[i].pipeline));
g_source_remove (appCtx.ipcserver[i].bus_id);
g_print("server is closed uri: %s path: %s fd: %d\n",
appCtx.ipcserver[i].uri, appCtx.ipcserver[i].socket_path, appCtx.ipcserver[i].fd);
close(appCtx.ipcserver[i].fd);
g_free(appCtx.ipcserver[i].uri);
g_free(appCtx.ipcserver[i].socket_path);
g_free(appCtx.ipcserver[i].caps_string);
g_mutex_clear (&appCtx.ipcserver[i].caps_mutex);
g_cond_clear (&appCtx.ipcserver[i].caps_cond);
g_mutex_clear (&appCtx.ipcserver[i].ipc_sink_queue_lock);
g_queue_clear (appCtx.ipcserver[i].ipc_sink_queue);
g_queue_free(appCtx.ipcserver[i].ipc_sink_queue);
}
g_main_loop_unref (loop);
return 0;
}
int
main (int argc, char *argv[])
{
int ret = 0;
/* Check input arguments */
if (argc < 3) {
g_printerr ("Usage: %s <server> <rtsp_url> <socket_path>\n", argv[0]);
g_printerr ("OR: %s <client> <ipc_url>\n", argv[0]);
return -1;
}
/* Standard GStreamer initialization */
gst_init (&argc, &argv);
if (strcmp(argv[1], "client") == 0) {
signal(SIGPIPE, SIG_IGN);
ret = create_client_pipeline(argc, argv);
} else {
signal(SIGPIPE, SIG_IGN);
ret = create_server_pipeline(argc, argv);
}
return ret;
}
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3416064 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3424256 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3432448 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3440640 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3448832 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3457024 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3465216 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3473408 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3481600 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3489792 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3497984 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3506176 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3514368 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3522560 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
NVMAP_IOC_READ:1077431815 failed [22]
[1077431815]: Offset 3530752 SrcStride 8192 pDst 0xffff1fffc448 DstStride 8192 Count 1
Frame Number = 744 Number of objects = 20 Vehicle Count = 20 Person Count = 0
NVMAP_IOC_READ:1077431815 failed [22]