device :JETSON ORIN NX 16G
jetpack : 5.1.1 && 5.1.2
hello everyone
I used nvdec for decoding on the Jetson platform and found that there is a very small probability of blocking. Is the decoder thread safe? Support multiple threads to simultaneously create and delete decoders
When waiting for dec_capture-loop_fcn to close in the decode_comc thread, this issue occurs with join dec_tid , If while (dec->capture_plane.getNumQueuedBuffers() > 0)
is changed to while(1)
, the probability of reproduction is higher.
The most important thing is, for capture_plane, calling dqbuffer
without buffer will block, is it as expected?
I already know that jp5.1.1 will have a memory leak issue with the nvjpeg encoder, so I would like to know if calling capture ->plan.dqbuffer
without buffer causes a deadlock, and if this is the expected behavior
My work scenario involves 10-12 threads simultaneously creating or releasing decoders
/* Dequeue a valid capture_plane buffer that contains YUV BL data */
if (dec->capture_plane.dqBuffer(v4l2_buf, &dec_buffer, NULL, 0))
|
| // jump into NvV4l2ElementPlane::dqBuffer
|-> ret = v4l2_ioctl(fd, VIDIOC_DQBUF, &v4l2_buf); // will deadlock
decode_proc
void *JetsonDec::decode_proc(void *arg)
{
JetsonDec *self = (JetsonDec *)arg;
context_t &ctx = self->ctx;
int ret = 0;
int error = 0;
// bool eos = false;
uint32_t i;
// char *nalu_parse_buffer = NULL;
/* Set default values for decoder context members */
set_defaults(&ctx);
#if 0
ctx.disable_rendering=true;
#endif
ctx.decoder_pixfmt = self->dec_pixfmt;
ctx.out_pixfmt = 1; // NV12
ctx.input_nalu = true;
#if 0
if (ctx.enable_osd || ctx.enable_osd_text)
ctx.nvosd_context = nvosd_create_context();
if (ctx.enable_osd) {
cout << "ctx.osd_file_path:" << ctx.osd_file_path << endl;
ctx.osd_file = new ifstream(ctx.osd_file_path);
TEST_ERROR(!ctx.osd_file->is_open(), "Error opening osd file", cleanup);
}
#endif
/* Create and initialize video decoder
more about decoder, refer to 00_video_decode sample */
ctx.dec = NvVideoDecoder::createVideoDecoder("dec0");
TEST_ERROR_SPDLOG(!ctx.dec, "ctx error, Could not create decoder", cleanup);
/* Subscribe to Resolution change event */
ret = ctx.dec->subscribeEvent(V4L2_EVENT_RESOLUTION_CHANGE, 0, 0);
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Could not subscribe to V4L2_EVENT_RESOLUTION_CHANGE",
cleanup);
ret = ctx.dec->subscribeEvent(V4L2_EVENT_EOS, 0, 0);
TEST_ERROR(ret < 0, "Could not subscribe to V4L2_EVENT_EOS", cleanup);
/* Set the max size of the outputPlane buffers, here is
CHUNK_SIZE, which contains the encoded data in bytes */
ret = ctx.dec->setOutputPlaneFormat(ctx.decoder_pixfmt, CHUNK_SIZE);
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Could not set output plane format", cleanup);
// nalu_parse_buffer = new char[CHUNK_SIZE];
ret = ctx.dec->setFrameInputMode(0);
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Error in decoder setFrameInputMode", cleanup);
ret = ctx.dec->disableDPB();
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Error in decoder disableDPB", cleanup);
ret = ctx.dec->setMaxPerfMode(1);
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Error while setting decoder to max perf", cleanup);
/* Request MMAP buffers for writing encoded video data */
ret = ctx.dec->output_plane.setupPlane(V4L2_MEMORY_MMAP, 10, true, false);
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Error while setting up output plane", cleanup);
/* Start streaming on decoder output_plane */
ret = ctx.dec->output_plane.setStreamStatus(true);
TEST_ERROR_SPDLOG(ret < 0, "ctx error, Error in output plane stream on", cleanup);
pthread_create(&self->dec_tid, NULL, self->dec_capture_loop_fcn,
self);
pthread_setname_np(self->dec_tid, "CapturePlane");
// pthread_setname_np(ctx.dec_capture_loop, "CapturePlane");
// self->proc_ready = true;
/* Read encoded data and enqueue all the output plane buffers.
Exit loop in case end of file */
i = 0;
while (!self->m_abort && !ctx.dec->isInError() &&
i < ctx.dec->output_plane.getNumBuffers())
{
struct v4l2_buffer v4l2_buf;
struct v4l2_plane planes[MAX_PLANES];
NvBuffer *buffer;
memset(&v4l2_buf, 0, sizeof(v4l2_buf));
memset(planes, 0, sizeof(planes));
buffer = ctx.dec->output_plane.getNthBuffer(i);
struct timeval time_now;
self->read_decoder_input_nalu(buffer, NULL, CHUNK_SIZE, time_now);
v4l2_buf.index = i;
v4l2_buf.m.planes = planes;
v4l2_buf.m.planes[0].bytesused = buffer->planes[0].bytesused;
/* It is necessary to queue an empty buffer to signal EOS to sthe decoder
i.e. set v4l2_buf.m.planes[0].bytesused = 0 and queue the buffer */
v4l2_buf.timestamp = time_now;
ret = ctx.dec->output_plane.qBuffer(v4l2_buf, NULL);
if (ret < 0)
{
self->LOGGER_ERROR("ctx error , Error Qing buffer at output plane");
// abort(&ctx);
break;
}
if (v4l2_buf.m.planes[0].bytesused == 0)
{
// eos = true;
self->m_abort_cap = true;
self->LOGGER_INFO("Input file read complete");
break;
}
i++;
}
/* Since all the output plane buffers have been queued in above loop,
in this loop, firstly dequeue a empty buffer, then read encoded data
into this buffer, enqueue it back for decoding at last */
while (!self->m_abort_cap && !self->m_abort && !ctx.dec->isInError())
{
struct v4l2_buffer v4l2_buf;
struct v4l2_plane planes[MAX_PLANES];
NvBuffer *buffer;
memset(&v4l2_buf, 0, sizeof(v4l2_buf));
memset(planes, 0, sizeof(planes));
v4l2_buf.m.planes = planes;
ret = ctx.dec->output_plane.dqBuffer(v4l2_buf, &buffer, NULL, -1);
if (ret < 0)
{
cerr << "Error DQing buffer at output plane" << endl;
self->LOGGER_ERROR("ctx error, Error DQing buffer at output plane");
// abort(&ctx);
break;
}
struct timeval time_now;
self->read_decoder_input_nalu(buffer, NULL, CHUNK_SIZE, time_now);
v4l2_buf.m.planes[0].bytesused = buffer->planes[0].bytesused;
v4l2_buf.timestamp = time_now;
ret = ctx.dec->output_plane.qBuffer(v4l2_buf, NULL);
if (ret < 0)
{
// cerr << "Error Qing buffer at output plane" << endl;
self->LOGGER_ERROR("ctx error, Error Qing buffer at output plane");
// abort(&ctx);
break;
}
if (v4l2_buf.m.planes[0].bytesused == 0)
{
// eos = true;
self->m_abort_cap = true;
// cout << "Input file read complete" << endl;
self->LOGGER_INFO("Input file read complete");
break;
}
}
/* As EOS, dequeue all the output planes */
while (ctx.dec->output_plane.getNumQueuedBuffers() > 0 &&
!ctx.dec->isInError())
{
struct v4l2_buffer v4l2_buf;
struct v4l2_plane planes[MAX_PLANES];
memset(&v4l2_buf, 0, sizeof(v4l2_buf));
memset(planes, 0, sizeof(planes));
v4l2_buf.m.planes = planes;
ret = ctx.dec->output_plane.dqBuffer(v4l2_buf, NULL, NULL, -1);
if (ret < 0)
{
// cerr << "Error DQing buffer at output plane" << endl;
// abort(&ctx);
self->LOGGER_ERROR("ctx error, Error DQing buffer at output plane");
break;
}
}
cleanup:
if (self->dec_tid)
{
self->LOGGER_INFO("wait dec_capture_loop");
pthread_join(self->dec_tid, NULL);
self->LOGGER_INFO("dec_capture_loop end");
}
/* The decoder destructor does all the cleanup i.e set streamoff on output
and capture planes, unmap buffers, tell decoder to deallocate buffer
(reqbufs ioctl with counnt = 0), and finally call v4l2_close on the fd */
delete ctx.dec;
if (ctx.dst_dma_fd != -1)
{
ret = NvBufSurf::NvDestroy(ctx.dst_dma_fd);
ctx.dst_dma_fd = -1;
if (ret < 0)
{
self->LOGGER_ERROR("Error in BufferDestroy");
error = 1;
}
}
// delete[] nalu_parse_buffer;
self->LOGGER_INFO("Exiting decode_proc thread");
return NULL;
}
dec_capture_loop_fcn
void *JetsonDec::dec_capture_loop_fcn(void *arg)
{
JetsonDec *self = (JetsonDec *)arg;
while (!self->m_abort_cap && !self->proc_ready)
{
usleep(1000);
continue;
}
context_t *ctx = &self->ctx;
NvVideoDecoder *dec = ctx->dec;
struct v4l2_event ev;
int ret;
cout << "Starting decoder capture loop thread" << endl;
/* Wait for the first Resolution change event as decoder needs
to know the stream resolution for allocating appropriate
buffers when calling REQBUFS */
do
{
/* VIDIOC_DQEVENT, max_wait_ms = 1000ms */
if (dec == nullptr)
{
printf("error dec is null\n");
return NULL;
}
ret = dec->dqEvent(ev, 1000);
if (ret < 0)
{
if (errno == EAGAIN)
{
cerr << "Timed out waiting for first V4L2_EVENT_RESOLUTION_CHANGE"
<< endl;
continue;
}
self->LOGGER_ERROR("Error in dequeueing decoder event {0}", errno);
abort(ctx);
break;
}
} while (!self->m_abort_cap && ev.type != V4L2_EVENT_RESOLUTION_CHANGE);
/* Received the resolution change event, now can do query_and_set_capture */
if (!self->m_abort_cap)
{
if (!query_and_set_capture(ctx, self))
{
self->LOGGER_ERROR("query_and_set_capture failed,retry again 3 s later");
std::this_thread::sleep_for(std::chrono::seconds(3));
if (!query_and_set_capture(ctx, self))
{
self->LOGGER_ERROR("query_and_set_capture failed,abort");
abort(ctx);
}
}
}
/* Exit on error or EOS which is signalled in main() */
while (!(dec->isInError() || self->m_abort_cap))
{
NvBuffer *dec_buffer;
/* Check for resolution change again */
ret = dec->dqEvent(ev, false);
if (ret == 0)
{
switch (ev.type)
{
case V4L2_EVENT_RESOLUTION_CHANGE:
self->LOGGER_INFO("Got V4L2_EVENT_RESOLUTION_CHANGE\n");
if (!query_and_set_capture(ctx, self))
{
self->LOGGER_ERROR("query_and_set_capture failed,retry again 3 s later");
std::this_thread::sleep_for(std::chrono::seconds(3));
if (!query_and_set_capture(ctx, self))
{
self->LOGGER_ERROR("query_and_set_capture failed,abort");
abort(ctx);
}
}
continue;
case V4L2_EVENT_EOS:
self->LOGGER_INFO("Got V4L2_EVENT_EOS\n");
self->m_abort_cap = true;
break;
}
}
/* Decoder capture loop */
while (dec->capture_plane.getNumQueuedBuffers() > 0)
{
struct v4l2_buffer v4l2_buf;
struct v4l2_plane planes[MAX_PLANES];
memset(&v4l2_buf, 0, sizeof(v4l2_buf));
memset(planes, 0, sizeof(planes));
v4l2_buf.m.planes = planes;
/* Dequeue a valid capture_plane buffer that contains YUV BL data */
if (dec->capture_plane.dqBuffer(v4l2_buf, &dec_buffer, NULL, 0))
{
if (errno == EAGAIN)
{
printf("EAGAIN\n");
usleep(1000); // 1ms
}
else
{
self->LOGGER_ERROR("Error while calling dequeue at capture plane {0}", errno);
abort(ctx);
}
break;
}
/* Clip & Stitch can be done by adjusting rectangle. */
NvBufSurf::NvCommonTransformParams transform_params;
transform_params.src_top = 0;
transform_params.src_left = 0;
transform_params.src_width = ctx->dec_width;
transform_params.src_height = ctx->dec_height;
transform_params.dst_top = 0;
transform_params.dst_left = 0;
transform_params.dst_width = ctx->dec_width;
transform_params.dst_height = ctx->dec_height;
transform_params.flag = NVBUFSURF_TRANSFORM_FILTER;
transform_params.flip = NvBufSurfTransform_None;
transform_params.filter = NvBufSurfTransformInter_Nearest;
/* Perform Blocklinear to PitchLinear conversion. */
// nv12 to rgba
ret = NvBufSurf::NvTransform(&transform_params, dec_buffer->planes[0].fd,
ctx->dst_dma_fd);
if (ret == -1)
{
self->LOGGER_ERROR("Transform failed");
break;
}
/*code */
return_buffer:
if (dec->capture_plane.qBuffer(v4l2_buf, NULL) < 0)
{
self->LOGGER_ERROR("Error while queueing buffer at decoder capture plane");
break;
}
}
if (dec->capture_plane.getNumQueuedBuffers() == 0)
{
// usleep(1000);
printf("no buffer\n");
}
}
cout << "Exiting decoder capture loop thread" << endl;
self->LOGGER_ERROR("Exiting decoder capture loop thread");
return NULL;
}