gStreamer - high delay, low fps

I have written a code that enables streaming from camera connected to the Orin. The camera is set to 20fps. The stream starts, achieving 20fps on the sender’s side, but on the receiver’s side, it appears to be 1fps with a delay of approximately 25/35 seconds.

Command used on the receiver’s side:

gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink

I also tried this command and it doesn’t work:

gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! nvv4l2decoder ! nv3dsink -e

it gives such an error:

NVMEDIA: NvMMLiteNVMEDIAProcessVES: 1824: NvMediaParserParse Unsupported Codec

Sender side (but it seems to work as expected):
pipeline created inside:
appsrc → queue → videoconvert → x264enc → rtph264pay → udpsink

Code:
GstreamerPipeline.hpp

#pragma once
#include "data_source/stream/gstreamer/Gstreamer.hpp"
#include "data_source/stream/gstreamer/GstreamerPipelineElement.hpp"

#include <gstreamer-1.0/gst/gst.h>
#include <ss-utils/Logger.hpp>

    class GstreamerPipeline
    {
    public:
        explicit GstreamerPipeline(const StreamGstreamerConfig& new_stream_config);
        void run(Gstreamer* callback_object);
        virtual ~GstreamerPipeline();

    private:
        virtual auto prepareElements(Gstreamer* callback_object)
            -> std::vector<GstreamerPipelineElement>;
        virtual void handleMessages();
        virtual void beginPlaying();

        auto prepareAppSrcPart(Gstreamer* _appsrc) -> std::vector<GstreamerPipelineElement>;
        auto prepareStreamingToUdp() -> std::vector<GstreamerPipelineElement>;
        void createPipeline(std::vector<GstreamerPipelineElement> elements);
        void linkInOrder(std::vector<GstreamerPipelineElement> elements);
        void throwingAssertion(bool condition, const std::string& message);
        static void errorCallback(GstBus* bus, GstMessage* msg, GMainLoop* loop);

        std::shared_ptr<spdlog::logger> log = ss::log::instance();
        StreamGstreamerConfig stream_config;
        GstElement* pipeline;
        GstBus* bus;
        GMainLoop* loop;
    };
} 

GstreamerPipeline.cpp

#include "data_source/stream/gstreamer/GstreamerPipeline.hpp"
#include "speed_camera/utils/Sliding2.hpp"


GstreamerPipeline::GstreamerPipeline(const StreamGstreamerConfig& new_stream_config) :
    stream_config(new_stream_config)
{
}

void GstreamerPipeline::run(Gstreamer* callback_object)
{
    gst_init(nullptr, nullptr);
    createPipeline(prepareElements(callback_object));
    beginPlaying();
    handleMessages();
    loop = g_main_loop_new(nullptr, FALSE);
    g_main_loop_run(loop);
}

GstreamerPipeline::~GstreamerPipeline()
{
    gst_object_unref(bus);
    gst_element_set_state(pipeline, GST_STATE_NULL);
    gst_object_unref(pipeline);
    g_main_loop_quit(loop);
}

auto GstreamerPipeline::prepareElements(Gstreamer* callback_object)
    -> std::vector<GstreamerPipelineElement>
{
    //  Pipeline: appsrc -> queue -> videoconvert -> x264enc -> rtph264pay -> udpsink
    std::vector<GstreamerPipelineElement> pipeline_elements;
    std::ranges::copy(prepareAppSrcPart(callback_object), std::back_inserter(pipeline_elements));
    std::ranges::copy(prepareStreamingToUdp(), std::back_inserter(pipeline_elements));

    return pipeline_elements;
}

auto GstreamerPipeline::prepareAppSrcPart(Gstreamer* callback_object)
    -> std::vector<GstreamerPipelineElement>
{
    auto appsrc = GstreamerPipelineElement("appsrc");
    // clang-format off
    g_object_set(appsrc.ptr,
                 "caps",gst_caps_new_simple("video/x-raw",
                                            "format", G_TYPE_STRING,"GRAY8",
                                            "width",G_TYPE_INT,stream_config.width,
                                            "height",G_TYPE_INT,stream_config.height,
                                            "framerate",GST_TYPE_FRACTION,stream_config.frame_rate,1,
                                            nullptr),
                 nullptr);
    // clang-format on
    int raw_binary_data = 0;
    static auto dispatcher =
        [](GstElement* _appsrc, [[maybe_unused]] guint size, gpointer _callback_object)
    {
        static_cast<Gstreamer*>(_callback_object)->gstreamerCallback(_appsrc);
    };
    g_object_set(appsrc.ptr, "stream-type", raw_binary_data, "format", GST_FORMAT_TIME, nullptr);
    g_signal_connect(appsrc.ptr,
                     "need-data",
                     // such casting is typical, gstreamer tutorial do the same.
                     reinterpret_cast<GCallback>(+dispatcher),
                     callback_object);

    return std::vector{appsrc};
}

auto GstreamerPipeline::prepareStreamingToUdp() -> std::vector<GstreamerPipelineElement>
{
    auto queue_to_udp = GstreamerPipelineElement("queue", "queue_to_udp");
    const auto drop_oldest_when_full = 2;
    // clang-format off
    g_object_set(queue_to_udp.ptr,
                 "leaky", drop_oldest_when_full,
                 nullptr);
    // clang-format on
    auto videoconvert = GstreamerPipelineElement("videoconvert", "videoconvert1");
    auto x264enc = GstreamerPipelineElement("x264enc");
    auto rtph264pay = GstreamerPipelineElement("rtph264pay");
    auto udpsink = GstreamerPipelineElement("udpsink");
    auto rtp_payload_type = 96;
    g_object_set(rtph264pay.ptr, "pt", rtp_payload_type, nullptr);
    g_object_set(
        udpsink.ptr, "host", stream_config.address.c_str(), "port", stream_config.port, nullptr);
    log->info("INFO: Command to receive udp stream:\n `gst-launch-1.0 udpsrc port={} "
              "! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink`\n",
              stream_config.port);
    log->info("INFO: Command to receive udp stream and save to a file:\n "
              "gst-launch-1.0 udpsrc port={} ! application/x-rtp ! rtph264depay ! avdec_h264 ! "
              "videoconvert ! tee  name=show_and_save show_and_save. ! queue ! autovideosink "
              "show_and_save. ! queue ! matroskamux ! filesink location=received_video.mkv -e\n",
              stream_config.port);
    return std::vector{queue_to_udp, videoconvert, x264enc, rtph264pay, udpsink};
}

void GstreamerPipeline::createPipeline(std::vector<GstreamerPipelineElement> elements)
{
    pipeline = gst_pipeline_new("stream_pipeline");
    throwingAssertion(pipeline, "pipeline couldn't be created");
    for (auto element : elements)
    {
        throwingAssertion(element.ptr, "element couldn't be created");
        gst_bin_add(GST_BIN(pipeline), element.ptr);
    }
    linkInOrder(std::move(elements));
}

void GstreamerPipeline::throwingAssertion(bool condition, const std::string& message)
{
    if (!condition)
    {
        throw std::runtime_error(message);
    }
}

void GstreamerPipeline::linkInOrder(std::vector<GstreamerPipelineElement> elements)
{
    for (const auto& [first, second] : sliding2(elements))
    {
        bool ok = gst_element_link(first.ptr, second.ptr);
        throwingAssertion(ok, fmt::format("can't link {} and {}\n", first.name, second.name));
    }
}

void GstreamerPipeline::beginPlaying()
{
    GstStateChangeReturn ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
    throwingAssertion(ret != GST_STATE_CHANGE_FAILURE,
                      "Unable to set the pipeline to the playing state.\n");
}

void GstreamerPipeline::handleMessages()
{
    gst_bus_add_signal_watch(bus);
    g_signal_connect(G_OBJECT(bus),
                     "message::error",
                     // such casting is typical, gstreamer tutorial do the same.
                     reinterpret_cast<GCallback>(GstreamerPipeline::errorCallback),
                     loop);
    gst_object_unref(bus);
}

void GstreamerPipeline::errorCallback([[maybe_unused]] GstBus* bus,
                                      GstMessage* msg,
                                      GMainLoop* loop)
{
    GError* err;
    gchar* debug_info;

    gst_message_parse_error(msg, &err, &debug_info);
    g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
    g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
    g_clear_error(&err);
    g_free(debug_info);

    g_main_loop_quit(loop);
}

GstreamerPipelineElement.hpp

#pragma once

#include <string>

#include <gstreamer-1.0/gst/gst.h>

{
    class GstreamerPipelineElement
    {
    public:
        explicit GstreamerPipelineElement(const std::string& factory_name);
        explicit GstreamerPipelineElement(const std::string& factory_name,
                                          const std::string& element_name);
        GstElement* ptr;
        std::string name;
    };
}

GstreamerPipelineElements.cpp

#include "data_source/stream/gstreamer/GstreamerPipelineElement.hpp"


GstreamerPipelineElement::GstreamerPipelineElement(const std::string& factory_name) :
    GstreamerPipelineElement{factory_name, factory_name}
{
}

GstreamerPipelineElement::GstreamerPipelineElement(const std::string& factory_name,
                                                   const std::string& element_name) :
    ptr{gst_element_factory_make(factory_name.c_str(), element_name.c_str())},
    name{std::move(element_name)}
{
}

Gstreamer.hpp

#pragma once
#include "data_source/camera/FrameMerger.hpp"
#include "data_source/config/StreamConfig.hpp"
#include "data_source/storage/OutputManager.hpp"
#include "data_source/stream/Streamer.hpp"
#include "data_source/utils/MultiResourceManager.hpp"

#include <condition_variable>
#include <mutex>

#include <gstreamer-1.0/gst/gst.h>
#include <opencv2/core/mat.hpp>
#include <ss-utils/Logger.hpp>

{
    class Gstreamer : public Streamer
    {
    public:
        Gstreamer(StreamGstreamerConfig new_stream_config,
                  std::function<std::unique_ptr<FrameMerger>()> merger_factory);
        void gstreamerCallback(GstElement* appsrc);
        virtual void stream(const TimestampedFrames& frames) override;
        ~Gstreamer() override = default;

    private:
        auto pushFrameToPipeline(GstElement* appsrc, const cv::Mat& frame) -> GstFlowReturn;
        auto allocateBuffer(const cv::Mat& frame) -> GstBuffer*;

        StreamGstreamerConfig stream_config;
        MultiResourceManager<FrameMerger> merger_manager;
        std::shared_ptr<spdlog::logger> log = ss::log::instance();
        std::mutex frame_mutex;
        std::condition_variable frame_condition;
        bool new_frame_available = false;
        cv::Mat frame_for_gstreamer;
    };

} // namespace speed_camera

Gstreamer.cpp

#include "data_source/stream/gstreamer/Gstreamer.hpp"

#include "data_source/camera/BaslerCameraLayout.hpp"
#include "data_source/camera/frames/TimestampedFrames.hpp"
#include "data_source/stream/gstreamer/GstreamerPipeline.hpp"
#include "data_source/utils/MultiResourceManager.hpp"

#include <iostream>
#include <opencv2/imgproc.hpp>
#include <thread>


Gstreamer::Gstreamer(
    [[maybe_unused]] StreamGstreamerConfig new_stream_config,
    [[maybe_unused]] std::function<std::unique_ptr<FrameMerger>()> merger_factory) :
    stream_config{new_stream_config},
    merger_manager{merger_factory, std::thread::hardware_concurrency()}
{
    std::thread gstreamer(
        [new_stream_config, this]()
        {
            g_setenv("GST_DEBUG", "*:3", TRUE);
            auto pipeline = std::make_unique<GstreamerPipeline>(new_stream_config);
            pipeline->run(this);
        });
    gstreamer.detach();
}

void Gstreamer::gstreamerCallback(GstElement* appsrc)
{
    log->debug("need data");
    std::unique_lock<std::mutex> lock(frame_mutex);
    using namespace std::chrono_literals;
    if (!frame_condition.wait_for(lock, 10s, [this] { return this->new_frame_available; }))
    {
        handleDeadlock(__PRETTY_FUNCTION__);
    }
    pushFrameToPipeline(appsrc, frame_for_gstreamer);
    new_frame_available = false;
}

void Gstreamer::stream(const TimestampedFrames& frames)
{
    auto merger = merger_manager.getResource();
    Frame merged;
    merger->use([&frames, &merged](FrameMerger& m) { merged = m.mergeFrames(frames.frames); });
    {
        std::lock_guard<std::mutex> lock(frame_mutex);
        resize(merged.cvMatView(),
               frame_for_gstreamer,
               cv::Size2i{stream_config.width, stream_config.height},
               cv::INTER_NEAREST);
        new_frame_available = true;
    }
    log->debug("push data");
    frame_condition.notify_one();
}

auto Gstreamer::pushFrameToPipeline(GstElement* appsrc, const cv::Mat& frame) -> GstFlowReturn
{
    GstBuffer* buffer = allocateBuffer(frame);
    GstFlowReturn ret;
    g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
    gst_buffer_unref(buffer);
    if (ret != GST_FLOW_OK)
    {
        std::cerr << "Image pushing ends with problems" << std::endl;
    }
    return ret;
}

auto Gstreamer::allocateBuffer(const cv::Mat& frame) -> GstBuffer*
{
    auto size = static_cast<unsigned int>(frame.total() * frame.elemSize());
    GstBuffer* buffer = gst_buffer_new_allocate(nullptr, size, nullptr);
    GstMapInfo map;
    gst_buffer_map(buffer, &map, GST_MAP_WRITE);
    memcpy(map.data, frame.data, gst_buffer_get_size(buffer));

    static GstClockTime timestamp = 0;
    GST_BUFFER_PTS(buffer) = timestamp;
    GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, 3);
    timestamp += GST_BUFFER_DURATION(buffer);
    gst_buffer_unmap(buffer, &map);

    return buffer;
}

Hi,
We have the UDP setup in Jetson AGX Orin FAQ

Please take a look and try. Would suggest try gst-launch-1.0 commands as reference setup. It’s supposed to achieve the same after implementing to C code.

Could you please review the post once more? I’ve made some edits, and I believe the most important information is now at the top.

Hi,
Please dump the h264 stream and attach it. So that we can investigate why hardware decoder cannot decode it:

$ gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! video/x-h264,stream-format=byte-stream ! filesink location=dump.h264

I got this file: dump.h264 - Google Drive
Sth is in this file, but it dosn’t look well.
Metadata said that video has 2 frames, size is equal to 5,4MB
dump.zip (1.1 KB)

Hi,
The profile of the h264 stream is

Format profile : High 4:4:4 Predictive@L2.1

It is not supported. Please check if the source can generate H264 stream in high 4:4:4 or 4:2:0.

obraz

I just test 4:4:4 it works, but nothing have changed. I have 2 frames in saved file. Meta data are the same
File: dump_4_4_4.h264 - Google Drive

(the same as 4:4:4) 4:2:2

4.2:0 not exists in x264enc src capabilities

Hi,
For encoding YUV420, it will be either profile of the three:

(string)high, (string)main, (string)baseline

I managed to fix a couple of issues: streaming to a file and UDP streaming. However, I still have a delay of approximately 3 seconds when streaming at 20 frames per second (fps). It seems like buffering might be the culprit here, as the delay increases proportionally with the frame rate (10fps = 6 seconds, 5fps = 12 seconds). Can anyone help me locate where this buffering for 60 frames is occurring?

Hi,
Please set the property to nvv4l2decoder for a try:

  disable-dpb         : Set to disable DPB buffer for low latency
                        flags: readable, writable
                        Boolean. Default: false

For running in the mode, the source h264 stream is better to be in IDR P P … structure and reference frame=1. Or the display order after decoding may be wrong. Please make sure the source is in the structure and enable the mode for a try.

I can’t run this command (with nvv4l2decoder)

gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! nvv4l2decoder disable-dpb=true ! nv3dsink -e

My stream work only with avdec_h264. Can you help me how to fix: 'Video Dec Unsupported Streamor how to turn off buffering inavdec_h264`?

Hello,
I’ve achieved a small latency and the desired frames per second. However, I’m still encountering the issue described above (Unsupported Stream). Since I don’t currently require better performance, the thread can be closed.
DaneLLL - Thank you so much for your efforts to assist me.

To summarize:
On the sender side, the simplified view (c++ code): appsrc → queue → videoconvert → x264enc → rtph264pay → udpsink
On the receiver side, the command is: gst-launch-1.0 udpsrc port=3445 ! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink

Fixes:

  • camera FPS other than specified in gstreamer
  • Bug in GST_TYPE_FRACTION - unintentionally passing some doubles as arguments, resulting in treating raw double data as integers. While functional, it omitted many frames.
  • queue element can complicate things when the real frame rate is not the same as specified in Gstreamer. It adds its maximum latency or can cause frame skipping (at least it does when leaky=downstream).
  • A few seconds delay (~60 frames) can be improved by the use of tune = zerolatency
    auto zerolatency = 0x00000004;
    g_object_set(x264enc.ptr, "tune", zerolatency, nullptr);
  • Additionally, the speed-preset can be used for further improvement, but measured in milliseconds

The command you proposed now creates a proper file :)
gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! video/x-h264,stream-format=byte-stream ! filesink location=dump.h264
Thus, there might not be any problem with the profile in such a setup.

Finally the delay is 20-50ms
image

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.