gStreamer - high delay, low fps

I have written a code that enables streaming from camera connected to the Orin. The camera is set to 20fps. The stream starts, achieving 20fps on the sender’s side, but on the receiver’s side, it appears to be 1fps with a delay of approximately 25/35 seconds.

Command used on the receiver’s side:

gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink

I also tried this command and it doesn’t work:

gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! nvv4l2decoder ! nv3dsink -e

it gives such an error:

NVMEDIA: NvMMLiteNVMEDIAProcessVES: 1824: NvMediaParserParse Unsupported Codec

Sender side (but it seems to work as expected):
pipeline created inside:
appsrc → queue → videoconvert → x264enc → rtph264pay → udpsink

Code:
GstreamerPipeline.hpp

#pragma once
#include "data_source/stream/gstreamer/Gstreamer.hpp"
#include "data_source/stream/gstreamer/GstreamerPipelineElement.hpp"

#include <gstreamer-1.0/gst/gst.h>
#include <ss-utils/Logger.hpp>

    class GstreamerPipeline
    {
    public:
        explicit GstreamerPipeline(const StreamGstreamerConfig& new_stream_config);
        void run(Gstreamer* callback_object);
        virtual ~GstreamerPipeline();

    private:
        virtual auto prepareElements(Gstreamer* callback_object)
            -> std::vector<GstreamerPipelineElement>;
        virtual void handleMessages();
        virtual void beginPlaying();

        auto prepareAppSrcPart(Gstreamer* _appsrc) -> std::vector<GstreamerPipelineElement>;
        auto prepareStreamingToUdp() -> std::vector<GstreamerPipelineElement>;
        void createPipeline(std::vector<GstreamerPipelineElement> elements);
        void linkInOrder(std::vector<GstreamerPipelineElement> elements);
        void throwingAssertion(bool condition, const std::string& message);
        static void errorCallback(GstBus* bus, GstMessage* msg, GMainLoop* loop);

        std::shared_ptr<spdlog::logger> log = ss::log::instance();
        StreamGstreamerConfig stream_config;
        GstElement* pipeline;
        GstBus* bus;
        GMainLoop* loop;
    };
} 

GstreamerPipeline.cpp

#include "data_source/stream/gstreamer/GstreamerPipeline.hpp"
#include "speed_camera/utils/Sliding2.hpp"


GstreamerPipeline::GstreamerPipeline(const StreamGstreamerConfig& new_stream_config) :
    stream_config(new_stream_config)
{
}

void GstreamerPipeline::run(Gstreamer* callback_object)
{
    gst_init(nullptr, nullptr);
    createPipeline(prepareElements(callback_object));
    beginPlaying();
    handleMessages();
    loop = g_main_loop_new(nullptr, FALSE);
    g_main_loop_run(loop);
}

GstreamerPipeline::~GstreamerPipeline()
{
    gst_object_unref(bus);
    gst_element_set_state(pipeline, GST_STATE_NULL);
    gst_object_unref(pipeline);
    g_main_loop_quit(loop);
}

auto GstreamerPipeline::prepareElements(Gstreamer* callback_object)
    -> std::vector<GstreamerPipelineElement>
{
    //  Pipeline: appsrc -> queue -> videoconvert -> x264enc -> rtph264pay -> udpsink
    std::vector<GstreamerPipelineElement> pipeline_elements;
    std::ranges::copy(prepareAppSrcPart(callback_object), std::back_inserter(pipeline_elements));
    std::ranges::copy(prepareStreamingToUdp(), std::back_inserter(pipeline_elements));

    return pipeline_elements;
}

auto GstreamerPipeline::prepareAppSrcPart(Gstreamer* callback_object)
    -> std::vector<GstreamerPipelineElement>
{
    auto appsrc = GstreamerPipelineElement("appsrc");
    // clang-format off
    g_object_set(appsrc.ptr,
                 "caps",gst_caps_new_simple("video/x-raw",
                                            "format", G_TYPE_STRING,"GRAY8",
                                            "width",G_TYPE_INT,stream_config.width,
                                            "height",G_TYPE_INT,stream_config.height,
                                            "framerate",GST_TYPE_FRACTION,stream_config.frame_rate,1,
                                            nullptr),
                 nullptr);
    // clang-format on
    int raw_binary_data = 0;
    static auto dispatcher =
        [](GstElement* _appsrc, [[maybe_unused]] guint size, gpointer _callback_object)
    {
        static_cast<Gstreamer*>(_callback_object)->gstreamerCallback(_appsrc);
    };
    g_object_set(appsrc.ptr, "stream-type", raw_binary_data, "format", GST_FORMAT_TIME, nullptr);
    g_signal_connect(appsrc.ptr,
                     "need-data",
                     // such casting is typical, gstreamer tutorial do the same.
                     reinterpret_cast<GCallback>(+dispatcher),
                     callback_object);

    return std::vector{appsrc};
}

auto GstreamerPipeline::prepareStreamingToUdp() -> std::vector<GstreamerPipelineElement>
{
    auto queue_to_udp = GstreamerPipelineElement("queue", "queue_to_udp");
    const auto drop_oldest_when_full = 2;
    // clang-format off
    g_object_set(queue_to_udp.ptr,
                 "leaky", drop_oldest_when_full,
                 nullptr);
    // clang-format on
    auto videoconvert = GstreamerPipelineElement("videoconvert", "videoconvert1");
    auto x264enc = GstreamerPipelineElement("x264enc");
    auto rtph264pay = GstreamerPipelineElement("rtph264pay");
    auto udpsink = GstreamerPipelineElement("udpsink");
    auto rtp_payload_type = 96;
    g_object_set(rtph264pay.ptr, "pt", rtp_payload_type, nullptr);
    g_object_set(
        udpsink.ptr, "host", stream_config.address.c_str(), "port", stream_config.port, nullptr);
    log->info("INFO: Command to receive udp stream:\n `gst-launch-1.0 udpsrc port={} "
              "! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink`\n",
              stream_config.port);
    log->info("INFO: Command to receive udp stream and save to a file:\n "
              "gst-launch-1.0 udpsrc port={} ! application/x-rtp ! rtph264depay ! avdec_h264 ! "
              "videoconvert ! tee  name=show_and_save show_and_save. ! queue ! autovideosink "
              "show_and_save. ! queue ! matroskamux ! filesink location=received_video.mkv -e\n",
              stream_config.port);
    return std::vector{queue_to_udp, videoconvert, x264enc, rtph264pay, udpsink};
}

void GstreamerPipeline::createPipeline(std::vector<GstreamerPipelineElement> elements)
{
    pipeline = gst_pipeline_new("stream_pipeline");
    throwingAssertion(pipeline, "pipeline couldn't be created");
    for (auto element : elements)
    {
        throwingAssertion(element.ptr, "element couldn't be created");
        gst_bin_add(GST_BIN(pipeline), element.ptr);
    }
    linkInOrder(std::move(elements));
}

void GstreamerPipeline::throwingAssertion(bool condition, const std::string& message)
{
    if (!condition)
    {
        throw std::runtime_error(message);
    }
}

void GstreamerPipeline::linkInOrder(std::vector<GstreamerPipelineElement> elements)
{
    for (const auto& [first, second] : sliding2(elements))
    {
        bool ok = gst_element_link(first.ptr, second.ptr);
        throwingAssertion(ok, fmt::format("can't link {} and {}\n", first.name, second.name));
    }
}

void GstreamerPipeline::beginPlaying()
{
    GstStateChangeReturn ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
    throwingAssertion(ret != GST_STATE_CHANGE_FAILURE,
                      "Unable to set the pipeline to the playing state.\n");
}

void GstreamerPipeline::handleMessages()
{
    gst_bus_add_signal_watch(bus);
    g_signal_connect(G_OBJECT(bus),
                     "message::error",
                     // such casting is typical, gstreamer tutorial do the same.
                     reinterpret_cast<GCallback>(GstreamerPipeline::errorCallback),
                     loop);
    gst_object_unref(bus);
}

void GstreamerPipeline::errorCallback([[maybe_unused]] GstBus* bus,
                                      GstMessage* msg,
                                      GMainLoop* loop)
{
    GError* err;
    gchar* debug_info;

    gst_message_parse_error(msg, &err, &debug_info);
    g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
    g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
    g_clear_error(&err);
    g_free(debug_info);

    g_main_loop_quit(loop);
}

GstreamerPipelineElement.hpp

#pragma once

#include <string>

#include <gstreamer-1.0/gst/gst.h>

{
    class GstreamerPipelineElement
    {
    public:
        explicit GstreamerPipelineElement(const std::string& factory_name);
        explicit GstreamerPipelineElement(const std::string& factory_name,
                                          const std::string& element_name);
        GstElement* ptr;
        std::string name;
    };
}

GstreamerPipelineElements.cpp

#include "data_source/stream/gstreamer/GstreamerPipelineElement.hpp"


GstreamerPipelineElement::GstreamerPipelineElement(const std::string& factory_name) :
    GstreamerPipelineElement{factory_name, factory_name}
{
}

GstreamerPipelineElement::GstreamerPipelineElement(const std::string& factory_name,
                                                   const std::string& element_name) :
    ptr{gst_element_factory_make(factory_name.c_str(), element_name.c_str())},
    name{std::move(element_name)}
{
}

Gstreamer.hpp

#pragma once
#include "data_source/camera/FrameMerger.hpp"
#include "data_source/config/StreamConfig.hpp"
#include "data_source/storage/OutputManager.hpp"
#include "data_source/stream/Streamer.hpp"
#include "data_source/utils/MultiResourceManager.hpp"

#include <condition_variable>
#include <mutex>

#include <gstreamer-1.0/gst/gst.h>
#include <opencv2/core/mat.hpp>
#include <ss-utils/Logger.hpp>

{
    class Gstreamer : public Streamer
    {
    public:
        Gstreamer(StreamGstreamerConfig new_stream_config,
                  std::function<std::unique_ptr<FrameMerger>()> merger_factory);
        void gstreamerCallback(GstElement* appsrc);
        virtual void stream(const TimestampedFrames& frames) override;
        ~Gstreamer() override = default;

    private:
        auto pushFrameToPipeline(GstElement* appsrc, const cv::Mat& frame) -> GstFlowReturn;
        auto allocateBuffer(const cv::Mat& frame) -> GstBuffer*;

        StreamGstreamerConfig stream_config;
        MultiResourceManager<FrameMerger> merger_manager;
        std::shared_ptr<spdlog::logger> log = ss::log::instance();
        std::mutex frame_mutex;
        std::condition_variable frame_condition;
        bool new_frame_available = false;
        cv::Mat frame_for_gstreamer;
    };

} // namespace speed_camera

Gstreamer.cpp

#include "data_source/stream/gstreamer/Gstreamer.hpp"

#include "data_source/camera/BaslerCameraLayout.hpp"
#include "data_source/camera/frames/TimestampedFrames.hpp"
#include "data_source/stream/gstreamer/GstreamerPipeline.hpp"
#include "data_source/utils/MultiResourceManager.hpp"

#include <iostream>
#include <opencv2/imgproc.hpp>
#include <thread>


Gstreamer::Gstreamer(
    [[maybe_unused]] StreamGstreamerConfig new_stream_config,
    [[maybe_unused]] std::function<std::unique_ptr<FrameMerger>()> merger_factory) :
    stream_config{new_stream_config},
    merger_manager{merger_factory, std::thread::hardware_concurrency()}
{
    std::thread gstreamer(
        [new_stream_config, this]()
        {
            g_setenv("GST_DEBUG", "*:3", TRUE);
            auto pipeline = std::make_unique<GstreamerPipeline>(new_stream_config);
            pipeline->run(this);
        });
    gstreamer.detach();
}

void Gstreamer::gstreamerCallback(GstElement* appsrc)
{
    log->debug("need data");
    std::unique_lock<std::mutex> lock(frame_mutex);
    using namespace std::chrono_literals;
    if (!frame_condition.wait_for(lock, 10s, [this] { return this->new_frame_available; }))
    {
        handleDeadlock(__PRETTY_FUNCTION__);
    }
    pushFrameToPipeline(appsrc, frame_for_gstreamer);
    new_frame_available = false;
}

void Gstreamer::stream(const TimestampedFrames& frames)
{
    auto merger = merger_manager.getResource();
    Frame merged;
    merger->use([&frames, &merged](FrameMerger& m) { merged = m.mergeFrames(frames.frames); });
    {
        std::lock_guard<std::mutex> lock(frame_mutex);
        resize(merged.cvMatView(),
               frame_for_gstreamer,
               cv::Size2i{stream_config.width, stream_config.height},
               cv::INTER_NEAREST);
        new_frame_available = true;
    }
    log->debug("push data");
    frame_condition.notify_one();
}

auto Gstreamer::pushFrameToPipeline(GstElement* appsrc, const cv::Mat& frame) -> GstFlowReturn
{
    GstBuffer* buffer = allocateBuffer(frame);
    GstFlowReturn ret;
    g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
    gst_buffer_unref(buffer);
    if (ret != GST_FLOW_OK)
    {
        std::cerr << "Image pushing ends with problems" << std::endl;
    }
    return ret;
}

auto Gstreamer::allocateBuffer(const cv::Mat& frame) -> GstBuffer*
{
    auto size = static_cast<unsigned int>(frame.total() * frame.elemSize());
    GstBuffer* buffer = gst_buffer_new_allocate(nullptr, size, nullptr);
    GstMapInfo map;
    gst_buffer_map(buffer, &map, GST_MAP_WRITE);
    memcpy(map.data, frame.data, gst_buffer_get_size(buffer));

    static GstClockTime timestamp = 0;
    GST_BUFFER_PTS(buffer) = timestamp;
    GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, 3);
    timestamp += GST_BUFFER_DURATION(buffer);
    gst_buffer_unmap(buffer, &map);

    return buffer;
}

Hi,
We have the UDP setup in Jetson AGX Orin FAQ

Please take a look and try. Would suggest try gst-launch-1.0 commands as reference setup. It’s supposed to achieve the same after implementing to C code.

Could you please review the post once more? I’ve made some edits, and I believe the most important information is now at the top.

Hi,
Please dump the h264 stream and attach it. So that we can investigate why hardware decoder cannot decode it:

$ gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! video/x-h264,stream-format=byte-stream ! filesink location=dump.h264

I got this file: dump.h264 - Google Drive
Sth is in this file, but it dosn’t look well.
Metadata said that video has 2 frames, size is equal to 5,4MB
dump.zip (1.1 KB)

Hi,
The profile of the h264 stream is

Format profile : High 4:4:4 Predictive@L2.1

It is not supported. Please check if the source can generate H264 stream in high 4:4:4 or 4:2:0.

obraz

I just test 4:4:4 it works, but nothing have changed. I have 2 frames in saved file. Meta data are the same
File: dump_4_4_4.h264 - Google Drive

(the same as 4:4:4) 4:2:2

4.2:0 not exists in x264enc src capabilities

Hi,
For encoding YUV420, it will be either profile of the three:

(string)high, (string)main, (string)baseline