Multithreaded gstreamer pipeline crashing for onboard cameras TX1

I have 6 raspberry pi camera modules interfaced via mipi. I am able to capture all the cameras simultaneously by running gstreamer pipelines like the below

gst-launch-1.0 nvcamerasrc sensor-id=4 fpsRange=“30 30” ! “video/x-raw(memory:NVMM), width=(int)820, height=(int)616,format=(string)I420, framerate=(fraction)30/1” ! nvvidconv ! nveglglessink -v

However when i run the same pipeline as a code it throws the following error

Error received from element video-source: GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure.
Debugging information: gstbasesrc.c(3354): gst_base_src_start (): /GstPipeline:pipeline/GstNvCameraSrc:video-source:
Failed to start

NvCameraSrc: Trying To Set Default Camera Resolution. Selected 820x616 FrameRate = 30.000000 …

Socket read error. Camera Daemon stopped functioning…
gst_nvcamera_open() failed ret=0
Error received from element video-source: GStreamer error: state change failed and some element failed to post a proper error message with the reason for the failure.
Debugging information: gstbasesrc.c(3354): gst_base_src_start (): /GstPipeline:pipeline/GstNvCameraSrc:video-source:
Failed to start


for some cameras randomly . Below is my complete code

#include <gst/gst.h>
#include <gst/app/gstappsink.h>
#include <bits/stdc++.h>
#include <unistd.h>
#include <sys/time.h>
#include <opencv2/opencv.hpp>
#include <pthread.h>
using namespace cv;
using namespace std;

typedef struct CameraPipeline
{

	GstElement *pipeline;
	GstElement *camera_source;
	GstElement *input_caps_filter;
	GstElement *output_caps_filter;
	GstElement *conversion_element;
	GstElement *sink;
	
	GstCaps *input_caps;
	GstCaps *output_caps;
	
	GstBus *bus;
	GstMessage *msg;
	
	int camera_index;
	int width;
	int height;
	int fps;
	
	
	unsigned char *frame;

}CameraPipeline;

static void new_sample(GstElement *appsink, void *data)
{
CameraPipeline *ptr = (CameraPipeline *)data;

GstBuffer *buffer;
GstMapInfo info;
GstSample *sample;

/* Retrieve the buffer */
g_signal_emit_by_name (appsink, “pull-sample”, &sample);

if (sample)
{
/* The only thing we do in this example is print a * to indicate a received buffer */

	g_print("*");

	buffer = gst_sample_get_buffer(sample);

	gboolean success = gst_buffer_map(buffer, &info, (GstMapFlags)GST_MAP_READ);
    
    if (!success)
    {
        printf("GStreamer: unable to map buffer");
        return ;
    }

	void * decodedPtr = info.data;

	
	ptr->frame = (unsigned char *)decodedPtr;


	gst_buffer_unmap(buffer, &info);


	gst_buffer_unref (buffer);
	gst_sample_unref (sample);

}

}

void CameraPipelineInit(CameraPipeline *p, int cameraIndex, int dstWidth, int dstHeight, int dstFPS)
{
p->camera_index = cameraIndex;
p->width = dstWidth;
p->height = dstHeight;
p->fps = dstFPS;
p->frame = (unsigned char *)malloc(dstWidth * dstHeight * 1.5);
}
int cnt = 0;
void * CameraPipelineRun(void *p)
{

gst_init(NULL, NULL);

CameraPipeline *ptr = (CameraPipeline *)p;
ptr->pipeline = gst_pipeline_new("pipeline");
ptr->camera_source = gst_element_factory_make("nvcamerasrc", "video-source");
ptr->conversion_element = gst_element_factory_make("nvvidconv", "video-convert");
ptr->input_caps_filter = gst_element_factory_make("capsfilter", "input-caps-filter");
ptr->output_caps_filter = gst_element_factory_make("capsfilter", "output-caps-filter");
ptr->sink = gst_element_factory_make("nveglglessink", NULL);
	

ptr->input_caps =  gst_caps_from_string("video/x-raw(memory:NVMM), width=820, height=616, format=I420, framerate=(fraction)30/1"); 
						
							 
						
ptr->output_caps = gst_caps_new_simple("video/x-raw", 
							 "width", G_TYPE_INT, ptr->width,
							 "height", G_TYPE_INT, ptr->height,
							 "framerate", GST_TYPE_FRACTION, ptr->fps, 1,
							 "format", G_TYPE_STRING, "I420",
							 NULL);

g_object_set(G_OBJECT(ptr->camera_source), "sensor-id", ptr->camera_index, NULL);
g_object_set(G_OBJECT(ptr->input_caps_filter), "caps", ptr->input_caps, NULL); 
g_object_set(G_OBJECT(ptr->output_caps_filter), "caps", ptr->output_caps, NULL); 



gst_bin_add_many(GST_BIN(ptr->pipeline), ptr->camera_source, ptr->input_caps_filter, ptr->conversion_element, ptr->sink, NULL);
gst_element_link_many(ptr->camera_source, ptr->input_caps_filter, ptr->conversion_element,  ptr->sink, NULL);


//gst_app_sink_set_max_buffers (GST_APP_SINK(ptr->sink), 1);
//gst_app_sink_set_drop (GST_APP_SINK(ptr->sink), true);


//g_object_set(G_OBJECT(ptr->sink),"emit-signals",TRUE, NULL);
//g_object_set(G_OBJECT(ptr->sink),"enable-last-sample", FALSE, NULL);

//g_signal_connect (ptr->sink, "new-sample", G_CALLBACK (new_sample), (void *)ptr);

// gst_pipeline_set_clock((GstPipeline *)ptr->pipeline, NULL);

gst_element_set_state(ptr->pipeline, GST_STATE_PLAYING);


    ptr->bus = gst_element_get_bus (ptr->pipeline);
 ptr->msg = gst_bus_timed_pop_filtered (ptr->bus, GST_CLOCK_TIME_NONE, GST_MESSAGE_ERROR);

/* Parse message */
if (ptr->msg != NULL) {
GError *err;
gchar *debug_info;

switch (GST_MESSAGE_TYPE (ptr->msg)) {
  case GST_MESSAGE_ERROR:
    gst_message_parse_error (ptr->msg, &err, &debug_info);
    g_printerr ("Error received from element %s: %s\n", GST_OBJECT_NAME (ptr->msg->src), err->message);
    g_printerr ("Debugging information: %s\n", debug_info ? debug_info : "none");
    g_clear_error (&err);
    g_free (debug_info);
    break;
  case GST_MESSAGE_EOS:
    g_print ("End-Of-Stream reached.\n");
    break;
  default:
    /* We should not reach here because we only asked for ERRORs and EOS */
    g_printerr ("Unexpected message received.\n");
    break;
}
gst_message_unref (ptr->msg);

}

gst_object_unref (ptr->bus);

gst_element_set_state (ptr->pipeline, GST_STATE_NULL);
gst_object_unref (ptr->pipeline);
}

int main()
{

int num_cameras = 6;
int width = 1920;
int height = 1080;
int fps = 30;



unsigned char **img;

img = (unsigned char **)malloc(num_cameras * sizeof(unsigned char *));


CameraPipeline **cameras;
pthread_t camera_pipeline_thread[num_cameras];

cameras = (CameraPipeline **)malloc(num_cameras * sizeof(CameraPipeline *));

for(int i = 0 ; i < num_cameras ; i++)
{
	cameras[i] = (CameraPipeline *)malloc(sizeof(CameraPipeline));
	CameraPipelineInit(cameras[i], i, width, height, fps);

	img[i] = (unsigned char *)malloc(width * height * 1.5);
}

pthread_attr_t attr_;
cpu_set_t cpus_;
pthread_attr_init(&attr_);




CPU_ZERO(&cpus_);
CPU_SET(1, &cpus_);
pthread_attr_setaffinity_np(&attr_, sizeof(cpu_set_t), &cpus_);

pthread_create(&camera_pipeline_thread[0], NULL, CameraPipelineRun, (void *)cameras[0]);

CPU_ZERO(&cpus_);
CPU_SET(2, &cpus_);
pthread_attr_setaffinity_np(&attr_, sizeof(cpu_set_t), &cpus_);

pthread_create(&camera_pipeline_thread[1], NULL, CameraPipelineRun, (void *)cameras[1]);

CPU_ZERO(&cpus_);
CPU_SET(3, &cpus_);
pthread_attr_setaffinity_np(&attr_, sizeof(cpu_set_t), &cpus_);

pthread_create(&camera_pipeline_thread[2], NULL, CameraPipelineRun, (void *)cameras[2]);

CPU_ZERO(&cpus_);
CPU_SET(1, &cpus_);
pthread_attr_setaffinity_np(&attr_, sizeof(cpu_set_t), &cpus_);

pthread_create(&camera_pipeline_thread[3], NULL, CameraPipelineRun, (void *)cameras[3]);

CPU_ZERO(&cpus_);
CPU_SET(1, &cpus_);
pthread_attr_setaffinity_np(&attr_, sizeof(cpu_set_t), &cpus_);

pthread_create(&camera_pipeline_thread[4], NULL, CameraPipelineRun, (void *)cameras[4]);

    CPU_ZERO(&cpus_);
CPU_SET(1, &cpus_);
pthread_attr_setaffinity_np(&attr_, sizeof(cpu_set_t), &cpus_);

pthread_create(&camera_pipeline_thread[5], NULL, CameraPipelineRun, (void *)cameras[5]);


    


FILE *f;
int cnt = 0 ;
while(1)
{
	
	continue;
	/*cnt++;
	for(int i = 0 ; i < num_cameras ; i++)
	{
			ostringstream str;
			str << i + 1;
			img[i] = cameras[i]->frame;
			
		
			Mat image(height * 1.5, width, CV_8UC1, img[i]);
			Mat imagedst(height, width, CV_8UC3);
			cvtColor(image, imagedst, CV_YUV2BGR_I420);
			imshow(str.str().c_str(), imagedst);
			waitKey(1);
			
	}*/
	
}
return 0;

}

Any help would be appreciated.

Hi,
Can you share the command to build the source?
Some OpenCV APIs are called in your code. Have you tried the case without OpenCV?

Hi , in the above code opencv part is commented out.

As i said gstreamer launched in 6 terminals separately works fine. However this code sometimes crashes with Socket read error. Camera Daemon stopped functioning… but works sometimes when i add a sleep function before and after every thread launch.

Thanks

Hi,
gst_init() should be called only once but in your code it seems to be called six times.

And please also try to launch 6 pipelines in one thread. In your implementation there are 6 threads. It may have race condition.

Hi, Yes regarding gst_init() i have written a new code with it pulled out of the thread handler. still it crashes sometimes.

Yeah i will try launching 6 pipelines in a thread.

Also i would like to know if there are any samples available that i can use to capture six cameras simultaneously.

Please refer to attachment.

$ g++ -Wall -std=c++11  multi.cpp -o multi $(pkg-config --cflags --libs gstreamer-1.0)

And also replace nvcamera-daemon of r28.1
https://devtalk.nvidia.com/default/topic/1023757/jetson-tx2/piecewiselinear-level-3-translation-fault-with-nvcamera-socket-read-error-and-daemon-stopped-functio-/post/5208774/#5208774
multi.cpp (1.81 KB)

Hi , thanks a lot for the help.
I have switched to argus api for multicamera capture which is eventually consumed by cuda buffers for processing.
Currently i have modified the denoise application which works fine but shows a very high increase in latency(observed visually) (of about 0.5 s ) when i increase the number of capture sessions. I would like to know if the current implementation as below is optimal.
Thank You. Below is the code.

/*
 * Copyright (c) 2016, NVIDIA CORPORATION. All rights reserved.
 *
 * Redistribution and use in source and binary forms, with or without
 * modification, are permitted provided that the following conditions
 * are met:
 *  * Redistributions of source code must retain the above copyright
 *    notice, this list of conditions and the following disclaimer.
 *  * Redistributions in binary form must reproduce the above copyright
 *    notice, this list of conditions and the following disclaimer in the
 *    documentation and/or other materials provided with the distribution.
 *  * Neither the name of NVIDIA CORPORATION nor the names of its
 *    contributors may be used to endorse or promote products derived
 *    from this software without specific prior written permission.
 *
 * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS ``AS IS'' AND ANY
 * EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT LIMITED TO, THE
 * IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR A PARTICULAR
 * PURPOSE ARE DISCLAIMED.  IN NO EVENT SHALL THE COPYRIGHT OWNER OR
 * CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL, SPECIAL,
 * EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT LIMITED TO,
 * PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE, DATA, OR
 * PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY THEORY
 * OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
 * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
 * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
 */

#include "Error.h"
#include "EGLGlobal.h"
#include "GLContext.h"
#include "Window.h"
#include "Thread.h"
#include "PreviewConsumer.h"

#include <Argus/Argus.h>
#include <EGLStream/EGLStream.h>

#include <unistd.h>
#include <stdlib.h>

using namespace Argus;

/*
 * This sample outputs capture requests to two streams, one of which has denoise algorithms
 * enabled while the other does not, and then renders them to a split-screen window.
 */

namespace ArgusSamples
{
	

#define NUM_SESSIONS 6

// Constants.
static const uint32_t       CAPTURE_TIME    = 10; // In seconds.
static const Size           STREAM_SIZE      (1920, 1080);
static const NormalizedRect SOURCE_CLIP_RECT (0.4f, 0.4f, 0.6f, 0.6f);

// Globals.
UniqueObj<CameraProvider> g_cameraProvider;
EGLDisplayHolder g_display;

// Debug print macros.
#define PRODUCER_PRINT(...) printf("PRODUCER: " __VA_ARGS__)

static bool execute()
{
    // Initialize the window and EGL display.
    Window &window = Window::getInstance();
    window.setWindowRect(0, 0, STREAM_SIZE.width, STREAM_SIZE.height);
    PROPAGATE_ERROR(g_display.initialize(window.getEGLNativeDisplay()));

    // Initialize the Argus camera provider.
    UniqueObj<CameraProvider> cameraProvider(CameraProvider::create());
    ICameraProvider *iCameraProvider = interface_cast<ICameraProvider>(cameraProvider);
    if (!iCameraProvider)
        ORIGINATE_ERROR("Failed to get ICameraProvider interface");

    // Create a capture session using the first available device.
    std::vector<CameraDevice*> cameraDevices;
    if (iCameraProvider->getCameraDevices(&cameraDevices) != STATUS_OK)
        ORIGINATE_ERROR("Failed to get CameraDevices");
    if (cameraDevices.size() < NUM_SESSIONS)
        ORIGINATE_ERROR("Not enough CameraDevices available");

	
	
	UniqueObj<CaptureSession> captureSession[NUM_SESSIONS];
	ICaptureSession *iCaptureSession[NUM_SESSIONS];
	
    
    for(int i = 0 ; i < NUM_SESSIONS; i++)
    {
		captureSession[i] = UniqueObj<CaptureSession>(iCameraProvider->createCaptureSession(cameraDevices[i]));
		iCaptureSession[i] = interface_cast<ICaptureSession>(captureSession[i]);
		if (!iCaptureSession[i])
			ORIGINATE_ERROR("Failed to create CaptureSession ");
	}
   
    

   



    // Create six output streams for 4 previews .
    PRODUCER_PRINT("Creating 4 output streams\n");
    
    UniqueObj<OutputStreamSettings> streamSettings[NUM_SESSIONS];
    IOutputStreamSettings *iStreamSettings[NUM_SESSIONS];
    
    for(int i = 0 ; i < NUM_SESSIONS ; i++)
    {
		streamSettings[i] = UniqueObj<OutputStreamSettings>(iCaptureSession[i]->createOutputStreamSettings());
        iStreamSettings[i] = interface_cast<IOutputStreamSettings>(streamSettings[i]);
		if (!iStreamSettings[i])
			ORIGINATE_ERROR("Failed to create OutputStreamSettings");
   
		iStreamSettings[i]->setPixelFormat(PIXEL_FMT_YCbCr_420_888);
		iStreamSettings[i]->setResolution(STREAM_SIZE);
		iStreamSettings[i]->setEGLDisplay(g_display.get());
	}
      
 
    
  
    
    UniqueObj<OutputStream> previewStream[NUM_SESSIONS];
    IStream *iPreviewStream[NUM_SESSIONS];
    for(int i = 0 ; i < NUM_SESSIONS ; i++)
    {
		
		previewStream[i] = UniqueObj<OutputStream>(iCaptureSession[i]->createOutputStream(streamSettings[i].get()));
        iPreviewStream[i] = interface_cast<IStream>(previewStream[i]);
		if (!iPreviewStream[i])
			ORIGINATE_ERROR("Failed to create preview stream");

}
		





    // Connect a PreviewConsumer to the streams to render a split-screen, side-by-side rendering.
    
    PRODUCER_PRINT("Launching consumer thread\n");
    std::vector<EGLStreamKHR> eglStreams;
    
    for(int i = 0 ; i < NUM_SESSIONS ; i++)
    {
		eglStreams.push_back(iPreviewStream[i]->getEGLStream());
	}
 

    
    PreviewConsumerThread consumerThread(g_display.get(), eglStreams,
                                         PreviewConsumerThread::LAYOUT_SPLIT_VERTICAL,
                                         false /* Sync stream frames */);
                                         


    PROPAGATE_ERROR(consumerThread.initialize());

    //consumerThread.setLineWidth(1);
    //consumerThread.setLineColor(1.0f, 0.0f, 0.0f);

    // Wait until the consumer is connected to the streams.
    PROPAGATE_ERROR(consumerThread.waitRunning());
 
	UniqueObj<Request> request[NUM_SESSIONS];
	
	IRequest *iRequest[NUM_SESSIONS];
    // Create capture request and enable output streams.
    
    for(int  i = 0 ; i < NUM_SESSIONS ; i++)
    {
		request[i] = UniqueObj<Request>(iCaptureSession[i]->createRequest());
		iRequest[i] = interface_cast<IRequest>(request[i]);
		if (!iRequest[i])
			ORIGINATE_ERROR("Failed to create Request");
			
		iRequest[i]->enableOutputStream(previewStream[i].get());
			
	}
    


  






    


   

    // Submit capture requests.
    PRODUCER_PRINT("Starting repeat capture requests.\n");
    
    for(int i = 0; i < NUM_SESSIONS ; i++)
    {
    if (iCaptureSession[i]->repeat(request[i].get()) != STATUS_OK)
        ORIGINATE_ERROR("Failed to start repeat capture request");
	}
        
        


    // Wait for CAPTURE_TIME seconds.
    PROPAGATE_ERROR(window.pollingSleep(CAPTURE_TIME));

    // Stop the repeating request and wait for idle.
    for(int i = 0 ; i < NUM_SESSIONS ; i++)
    {
		iCaptureSession[i]->stopRepeat();
		iCaptureSession[i]->waitForIdle();
  

    // Destroy the output streams and wait for the consumer thread to complete.
		previewStream[i].reset();
		
		
	}

    
 
    PROPAGATE_ERROR(consumerThread.shutdown());

    // Shut down Argus.
    g_cameraProvider.reset();

    // Shut down the window (destroys window's EGLSurface).
    window.shutdown();

    // Cleanup the EGL display
    PROPAGATE_ERROR(g_display.cleanup());

    PRODUCER_PRINT("Done -- exiting.\n");
    return true;

}

}; // namespace ArgusSamples

int main(int argc, const char *argv[])
{
    if (!ArgusSamples::execute())
        return EXIT_FAILURE;

    return EXIT_SUCCESS;
}

Argus has shorter latency than gstreamer as we have test results at
[url]https://devtalk.nvidia.com/default/topic/1026587/jetson-tx2/csi-latency-is-over-80-milliseconds-/post/5222727/#5222727[/url]

For running multiple cameras, you have to configure ‘sudo ~/jetson_clocks.sh’ to get max performance.