Conflict when encoding full frame from pipeline to jpeg and send to Kafka server

I am developing an application using the following setup.

• Hardware Platform (Jetson / GPU): Quadro RTX 4000
• DeepStream Version: 6.0.1
• TensorRT Version: 8.4.1.5
• NVIDIA GPU Driver Version (valid for GPU only): 510.47.03

My application requires reading data from a rtsp source and sending full images to Kafka Server. Initially, I used the following pipeline: rtspsrc location=<src> ! rtph265depay ! h265parse ! nvv4l2decoder ! nvstreammux ! nvinfer ! nvmsgconv ! nvmsgbroker

I attached a probe function to pgie src in which, OpenCV function cv::imencode() was used to encode raw frame data to jpeg format before packing it into a message to send to Kafka Server via an NVDS_EVENT_MSG_META event. My probe function is shown below

static GstPadProbeReturn pgie_src_pad_buffer_probe(GstPad *pad, GstPadProbeInfo *info, gpointer _udata)
{
GstBuffer *buf = reinterpret_cast<GstBuffer *>(info->data);
GST_ASSERT(buf);
if (!buf)
{
return GST_PAD_PROBE_OK;
}
GstMapInfo in_map_info;
NvDsBatchMeta *batch_meta = gst_buffer_get_nvds_batch_meta(buf);
user_callback_data *callback_data = reinterpret_cast<user_callback_data *>(_udata);

for (NvDsMetaList *l_frame = batch_meta->frame_meta_list; l_frame != NULL; l_frame = l_frame->next)
{
    NvDsFrameMeta *frame_meta = reinterpret_cast<NvDsFrameMeta *>(l_frame->data);
    const auto p1 = std::chrono::system_clock::now();
    cv::Mat frame = cv::Mat(frame_meta->source_frame_height, frame_meta->source_frame_width, CV_8UC3, in_map_info.data);
    cv::resize(frame, frame, cv::Size(1280, 720));
    std::vector<int> encode_param;
    std::vector<uchar> encoded_buf;
    encode_param.push_back(cv::IMWRITE_JPEG_QUALITY);
    encode_param.push_back(80);
    cv::imencode(".jpg", frame, encoded_buf, encode_param); // This line causes problem
    // Problem remains even when commenting the following section meaning no NVDS_EVENT_MSG_META is created
    FaceVisualMsg *msg_meta_content = (FaceVisualMsg *)g_malloc0(sizeof(FaceVisualMsg));
    msg_meta_content->timestamp = std::chrono::duration_cast<std::chrono::milliseconds>(p1.time_since_epoch()).count();
    msg_meta_content->full_img = g_strdup(b64encode((uint8_t *)in_map_info.data, in_map_info.size));
    NvDsEventMsgMeta *visual_event_msg = (NvDsEventMsgMeta *)g_malloc0(sizeof(NvDsEventMsgMeta));
    visual_event_msg->extMsg = (void *)msg_meta_content;
    visual_event_msg->extMsgSize = sizeof(FaceVisualMsg);
    visual_event_msg->componentId = 2;
    // Pack EventMsgMeta into UserMeta
    NvDsUserMeta *user_event_visual = nvds_acquire_user_meta_from_pool(batch_meta);
    if (user_event_visual)
    {
        user_event_visual->user_meta_data = (void *)visual_event_msg;
        user_event_visual->base_meta.meta_type = NVDS_EVENT_MSG_META;
        user_event_visual->base_meta.copy_func = (NvDsMetaCopyFunc)face_msg_visual_copy_func;
        user_event_visual->base_meta.release_func = (NvDsMetaReleaseFunc)face_msg_visual_release_func;

        nvds_add_user_meta_to_frame(frame_meta, user_event_visual);
    }
}

return GST_PAD_PROBE_OK;

}

When executing the pipeline I got the following error

#0 0x00007ffff1c49e87 in __GI_raise (sig=sig@entry=6) at …/sysdeps/unix/sysv/linux/raise.c:51
#1 0x00007ffff1c4b7f1 in __GI_abort () at abort.c:79
#2 0x00007ffff1c94837 in __libc_message (action=action@entry=do_abort, fmt=fmt@entry=0x7ffff1dc1a7b “%s\n”) at …/sysdeps/posix/libc_fatal.c:181
#3 0x00007ffff1c9b8ba in malloc_printerr (str=str@entry=0x7ffff1dbfc76 “free(): invalid pointer”) at malloc.c:5342
#4 0x00007ffff1ca2dec in _int_free (have_lock=0, p=0x7ffed0008412, av=0x7ffff1ff6c40 <main_arena>) at malloc.c:4167
#5 0x00007ffff1ca2dec in __GI___libc_free (mem=0x7ffed0008422) at malloc.c:3134
#6 0x00007fff4c296f91 in json_delete () at /usr/lib/x86_64-linux-gnu/libjansson.so.4
#7 0x00007fff4c76d848 in json_get_key_value(char const*, int, char const*, char*, int) () at src/kafka_protocol_adaptor/libnvds_kafka_proto.so
#8 0x00007fff4c76e915 in nvds_msgapi_send_async () at src/kafka_protocol_adaptor/libnvds_kafka_proto.so
#9 0x00007fffc58b450f in legacy_gst_nvmsgbroker_render(GstBaseSink*, GstBuffer*) (sink=0x55555683fc60, buf=0x7ffec0244480) at gstnvmsgbroker.cpp:516
#10 0x00007fffc58b427d in gst_nvmsgbroker_render(GstBaseSink*, GstBuffer*) (sink=0x55555683fc60, buf=0x7ffec0244480) at gstnvmsgbroker.cpp:463
#11 0x00007fffc7dbd1ce in gst_base_sink_chain_unlocked (basesink=basesink@entry=0x55555683fc60, obj=obj@entry=0x7ffec0244480, is_list=is_list@entry=0, pad=)
at …/libs/gst/base/gstbasesink.c:3952
#12 0x00007fffc7dbe520 in gst_base_sink_chain_main (basesink=0x55555683fc60, pad=, obj=0x7ffec0244480, is_list=0) at …/libs/gst/base/gstbasesink.c:4078
#13 0x00007ffff62d009b in gst_pad_chain_data_unchecked (data=0x7ffec0244480, type=4112, pad=0x55555682ef30) at …/gst/gstpad.c:4447
#14 0x00007ffff62d009b in gst_pad_push_data (pad=pad@entry=0x55555682ece0, type=type@entry=4112, data=data@entry=0x7ffec0244480) at …/gst/gstpad.c:4711
#15 0x00007ffff62d8b03 in gst_pad_push (pad=0x55555682ece0, buffer=0x7ffec0244480) at …/gst/gstpad.c:4830
#16 0x00007fffc7dc91af in gst_base_transform_chain (pad=, parent=0x555555d2ecb0, buffer=) at …/libs/gst/base/gstbasetransform.c:2377
#17 0x00007ffff62d009b in gst_pad_chain_data_unchecked (data=0x7ffec0244480, type=4112, pad=0x55555682ea90) at …/gst/gstpad.c:4447
#18 0x00007ffff62d009b in gst_pad_push_data (pad=pad@entry=0x55555682f3d0, type=type@entry=4112, data=data@entry=0x7ffec0244480) at …/gst/gstpad.c:4711
#19 0x00007ffff62d8b03 in gst_pad_push (pad=0x55555682f3d0, buffer=buffer@entry=0x7ffec0244480) at …/gst/gstpad.c:4830
#20 0x00007fffcc14f509 in gst_queue_push_one (queue=0x555555d23550) at …/subprojects/gstreamer/plugins/elements/gstqueue.c:1388
#21 0x00007fffcc14f509 in gst_queue_loop (pad=) at …/subprojects/gstreamer/plugins/elements/gstqueue.c:1541
#22 0x00007ffff63076b9 in gst_task_func (task=0x555555d5f4d0) at …/gst/gsttask.c:384
#23 0x00007ffff6606c70 in () at /usr/lib/x86_64-linux-gnu/libglib-2.0.so.0
#24 0x00007ffff66062a5 in () at /usr/lib/x86_64-linux-gnu/libglib-2.0.so.0
#25 0x00007ffff20036db in start_thread (arg=0x7ffef1ffb700) at pthread_create.c:463
#26 0x00007ffff1d2c61f in clone () at …/sysdeps/unix/sysv/linux/x86_64/clone.S:95

It is an free(): invalid pointer error found in libnvds_kafka_proto.so. If I comment out the line performing cv::imencode() and pack dummy message, it works fine. Besides, If I remove the nvmsgconv and nvmsgbroker elements and do cv::imwrite() in the probe function to save images to disk, it still works. Basically, cv::imwrite() consisting of 2 phases which are image encoding and file writing. Even when I comment out the section that creates the NVDS_EVENT_MSG_META event while keeping the encoding function, the prolem still holds. Therefore, it seems like there is an underlying conflict between libjpeg used by cv::imencode() and the nvmsgbroker module of deepstream. I have also used other jpeg encode library like nvJPEG but still not working. I tried a workaround by using jpegenc element of gstreamer and place it before the nvmsgconv element but those 2 elements are not able to link properly. Is there anyone familiar with these issues can help me ?

Do you still have this issue if writing to other format (such as .png)?

I tried to encode to .png but the problem stills arise. Besides, I want to encode to jpeg format to minimize bandwidth sent to Kafka server

can you test deepstream native sample deepstream-test5 by adding cv::imencode() in probe function?

I found a workaround to this problem by removing nvmsgconv and nvmsgbroker elements from deepstream pipeline. Instead, I generate json message and use librdkafka to send message to Kafka Server. Basically, I rewrite nvmsgconv and nvmsgbroker but instead of encapsulating them into gstreamer elements, I do it all in the pgie probe function

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.