I want to pull images from an RTSP address, process them, and then upload the udpsink plugin through the appsrc plugin. However, the latency 17 second

I want to pull a video stream from an RTSP address, then use the appsink plugin to obtain a buffer and convert it into a cv::Mat. I save the cv::Mat in a thread-safe queue A, where the size of queue A is always less than or equal to 2. This ensures that each time I retrieve an image from this queue, I get the latest frame. I also have another thread constantly pulling cv::Mat from this queue A, processing it with a frame processing time of approximately 50 ms, and placing the processed frame into another queue B. Then, I convert the images in queue B into buffers and push them to the appsrc plugin. Finally, I transmit the encoded images to udpsink. The latency of my video stream pulling is around 480 ms, and the image processing program takes about 50 ms. The latency of the streaming has not been tested. However, when I pull the UDP stream I’m pushing from the udpsrc plugin, I observe a delay of about 17 seconds and a low frame rate.
This is my main.cpp.

#include <iostream>
#include "pullStream.h"
#include "cuda_help.h"
#include "gpu_defog_darkchannel.h"
#include "opencv2/opencv.hpp"
#include "loadConfig.h"
#include <stdio.h>
#include <pthread.h>
#include <mutex>


using std::cout;
using std::endl;
using std::cerr;

#define VIDEO_WIDTH   1920
#define VIDEO_HEIGHT  1080
#define VIDEO_FORMAT  "BGR"
#define PIXEL_SIZE    3
static GstClockTime timestamp = 0;

void *ThreadFunc(void *arg);
struct task_data
{
    CThreadSafeQueue<cv::Mat>* frame_queue;
    pullStream* pull_pipeline;
    GpuDarkChannelDefog* defog_coder;
    channel_para* ch_para;
};
bool getMImage(cv::Mat &_frame, CThreadSafeQueue<cv::Mat>* frame_queue){
    bool flag = false;
    if(frame_queue->size()>0){

        _frame=frame_queue->safe_pop();
        flag=true;
    }
    return flag;
}
static std::mutex mux;
static void cb_need_data(GstElement *appsrc, guint unused_size, gpointer user_data) {
    pullStream * pullpipeline = (pullStream *)user_data;
    timestamp = g_get_monotonic_time() * 1000;

    static gboolean white = FALSE;
    static GstClockTime timestamp = 0;
    GstBuffer *buffer;
    GstFlowReturn ret;

    // create OpenCV Mat
    cv::Mat frame(VIDEO_WIDTH, VIDEO_HEIGHT, CV_8UC3);
    bool ans = pullpipeline->getMat(frame);

    // create GstBuffer and copy OpenCV Mat to GstBuffer
    buffer = gst_buffer_new_allocate(NULL, frame.total() * frame.elemSize(), NULL);
    gst_buffer_fill(buffer, 0, frame.data, frame.total() * frame.elemSize());
    buffer = gst_buffer_new_wrapped_full(GST_MEMORY_FLAG_NO_SHARE, frame.data, frame.total() * frame.elemSize(), 0, frame.total() * frame.elemSize(), NULL, NULL);

//    GST_BUFFER_PTS(buffer) = timestamp;
//    GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, 5);

    timestamp += GST_BUFFER_DURATION(buffer);

    g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
    gst_buffer_unref(buffer);



    if (ret != GST_FLOW_OK) {
        //handle the error
    }
}

void * ThreadFunc(void *arg)
{
    task_data* task = (task_data*)arg;
    while(true)
    {
        cv::Mat imgD;
        bool flag = task->pull_pipeline->getMat(imgD);
        if(!flag){
            task->pull_pipeline->countNUM();
            usleep(25*1000);
            continue;
        }
        else
        {
            if (!imgD.empty()) {
//                cv::resize(imgD,imgD,cv::Size(1920,1080));
                cv::Mat res = task->defog_coder->defog(imgD, task->ch_para->p1, task->ch_para->p2);
                while(task->frame_queue->size()>=2){
                    task->frame_queue->safe_pop();
                }
                task->frame_queue->safe_push(res);

            }
        }
    }
    return nullptr;
}
int main()
{
    channel_para *ch_para = new channel_para;
    loadFile ld("config.json");
    ld.get_ch_para(ch_para);
    gst_init(NULL,NULL);
    bool ans = hasCudaDevice();
    std::cout << ans << std::endl;

//    GpuDarkChannelDefog*  defog_coder = new GpuDarkChannelDefog(ch_para->image_height,ch_para->image_width);
    GpuDarkChannelDefog*  defog_coder = new GpuDarkChannelDefog(VIDEO_HEIGHT,VIDEO_WIDTH);
   // std::string video_path = "file://" + ch_para->channel_path;
    std::string stream_path = ch_para->channel_path;
    pullStream *pipilineTest = new pullStream(stream_path.c_str());
//    pipilineTest->defog_coder = defog_coder;
    pipilineTest->start();
    CThreadSafeQueue<cv::Mat>* frame_queue= new CThreadSafeQueue<cv::Mat>;
    task_data *t= new task_data;
    t->frame_queue = frame_queue;
    t->pull_pipeline = pipilineTest;
    t->defog_coder = defog_coder;
    t->ch_para = ch_para;
    pthread_t tid;
    pthread_create(&tid,NULL,ThreadFunc,(void *)t);
    pthread_detach(tid);
    usleep(400 * 1000);
    GstElement *pipeline, *appsrc, *conv, *videosink;
    GMainLoop *loop;

    loop = g_main_loop_new(NULL, FALSE);

    pipeline = gst_pipeline_new("pipeline");
    appsrc = gst_element_factory_make("appsrc", "source");
    conv = gst_element_factory_make("videoconvert", "conv");
    GstElement *capsfilter = gst_element_factory_make("capsfilter", "caps-filter");
    GstElement *queue = gst_element_factory_make("queue", "queue");
    GstElement *x264enc = gst_element_factory_make("x264enc", "x264-encoder");
//    GstElement *x264enc = gst_element_factory_make("omxh264enc", "x264-encoder");
    GstElement * h264parse = gst_element_factory_make("h264parse", "h264-parser");
    GstElement *rtph264pay = gst_element_factory_make("rtph264pay", "rtp-h264-pay");
    GstElement *udpsink = gst_element_factory_make("udpsink", "udp-sink");
//    videosink = gst_element_factory_make("xvimagesink", "videosink");

    g_object_set(G_OBJECT(appsrc), "caps",
        gst_caps_new_simple("video/x-raw",
            "format", G_TYPE_STRING, VIDEO_FORMAT,
            "width", G_TYPE_INT, VIDEO_WIDTH,
            "height", G_TYPE_INT, VIDEO_HEIGHT,
            "framerate", GST_TYPE_FRACTION, 0, 1,
            NULL), NULL);
    g_object_set(G_OBJECT(udpsink), "host", "127.0.0.1", "port", 5000, "sync", false, NULL);
    GstCaps *caps;
    int framerate_numerator = 0;
        int framerate_denominator = 1;
        char caps_string[256];
        snprintf(caps_string, sizeof(caps_string), "video/x-raw,format=(string)NV12,width=%d,height=%d,framerate=(fraction)%d/%d", VIDEO_WIDTH, VIDEO_HEIGHT, framerate_numerator, framerate_denominator);
        caps = gst_caps_from_string(caps_string);
        g_object_set(G_OBJECT(capsfilter), "caps", caps, NULL);
        gst_caps_unref(caps);
    gst_bin_add_many(GST_BIN(pipeline), appsrc, conv, capsfilter, queue,x264enc,h264parse,rtph264pay,udpsink,NULL);

    gst_element_link_many(appsrc, conv, capsfilter, queue, x264enc,h264parse,rtph264pay,udpsink, NULL);
    g_object_set(G_OBJECT(appsrc),
        "stream-type", 0,
        "format", GST_FORMAT_TIME, NULL);
    g_signal_connect(appsrc, "need-data", G_CALLBACK(cb_need_data), t->pull_pipeline);

    gst_element_set_state(pipeline, GST_STATE_PLAYING);
    g_main_loop_run(loop);

    gst_element_set_state(pipeline, GST_STATE_NULL);
    gst_object_unref(GST_OBJECT(pipeline));
    g_main_loop_unref(loop);

    return 0;
}

I’d like to inquire whether there is an issue with the data format filling of my appsrc or if there are problems with the settings of some intermediate plugins. My streaming pipeline consists of uridecodebin ! queue ! videoconvert, where uridecodebin is configured with NVMM flags, utilizing hardware decoding.

Hi,
OpenCV uses significant CPU resources. Please execute sudo nvpmodel -m 2 and sudo jetson_clocks. The two commands enable CPU cores running at maximum frequency.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.