I have written a code that enables streaming from camera connected to the Orin. The camera is set to 20fps. The stream starts, achieving 20fps on the sender’s side, but on the receiver’s side, it appears to be 1fps with a delay of approximately 25/35 seconds.
Command used on the receiver’s side:
gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink
I also tried this command and it doesn’t work:
gst-launch-1.0 -v udpsrc port=3445 ! application/x-rtp ! rtph264depay ! h264parse ! nvv4l2decoder ! nv3dsink -e
it gives such an error:
NVMEDIA: NvMMLiteNVMEDIAProcessVES: 1824: NvMediaParserParse Unsupported Codec
Sender side (but it seems to work as expected):
pipeline created inside:
appsrc → queue → videoconvert → x264enc → rtph264pay → udpsink
Code:
GstreamerPipeline.hpp
#pragma once
#include "data_source/stream/gstreamer/Gstreamer.hpp"
#include "data_source/stream/gstreamer/GstreamerPipelineElement.hpp"
#include <gstreamer-1.0/gst/gst.h>
#include <ss-utils/Logger.hpp>
class GstreamerPipeline
{
public:
explicit GstreamerPipeline(const StreamGstreamerConfig& new_stream_config);
void run(Gstreamer* callback_object);
virtual ~GstreamerPipeline();
private:
virtual auto prepareElements(Gstreamer* callback_object)
-> std::vector<GstreamerPipelineElement>;
virtual void handleMessages();
virtual void beginPlaying();
auto prepareAppSrcPart(Gstreamer* _appsrc) -> std::vector<GstreamerPipelineElement>;
auto prepareStreamingToUdp() -> std::vector<GstreamerPipelineElement>;
void createPipeline(std::vector<GstreamerPipelineElement> elements);
void linkInOrder(std::vector<GstreamerPipelineElement> elements);
void throwingAssertion(bool condition, const std::string& message);
static void errorCallback(GstBus* bus, GstMessage* msg, GMainLoop* loop);
std::shared_ptr<spdlog::logger> log = ss::log::instance();
StreamGstreamerConfig stream_config;
GstElement* pipeline;
GstBus* bus;
GMainLoop* loop;
};
}
GstreamerPipeline.cpp
#include "data_source/stream/gstreamer/GstreamerPipeline.hpp"
#include "speed_camera/utils/Sliding2.hpp"
GstreamerPipeline::GstreamerPipeline(const StreamGstreamerConfig& new_stream_config) :
stream_config(new_stream_config)
{
}
void GstreamerPipeline::run(Gstreamer* callback_object)
{
gst_init(nullptr, nullptr);
createPipeline(prepareElements(callback_object));
beginPlaying();
handleMessages();
loop = g_main_loop_new(nullptr, FALSE);
g_main_loop_run(loop);
}
GstreamerPipeline::~GstreamerPipeline()
{
gst_object_unref(bus);
gst_element_set_state(pipeline, GST_STATE_NULL);
gst_object_unref(pipeline);
g_main_loop_quit(loop);
}
auto GstreamerPipeline::prepareElements(Gstreamer* callback_object)
-> std::vector<GstreamerPipelineElement>
{
// Pipeline: appsrc -> queue -> videoconvert -> x264enc -> rtph264pay -> udpsink
std::vector<GstreamerPipelineElement> pipeline_elements;
std::ranges::copy(prepareAppSrcPart(callback_object), std::back_inserter(pipeline_elements));
std::ranges::copy(prepareStreamingToUdp(), std::back_inserter(pipeline_elements));
return pipeline_elements;
}
auto GstreamerPipeline::prepareAppSrcPart(Gstreamer* callback_object)
-> std::vector<GstreamerPipelineElement>
{
auto appsrc = GstreamerPipelineElement("appsrc");
// clang-format off
g_object_set(appsrc.ptr,
"caps",gst_caps_new_simple("video/x-raw",
"format", G_TYPE_STRING,"GRAY8",
"width",G_TYPE_INT,stream_config.width,
"height",G_TYPE_INT,stream_config.height,
"framerate",GST_TYPE_FRACTION,stream_config.frame_rate,1,
nullptr),
nullptr);
// clang-format on
int raw_binary_data = 0;
static auto dispatcher =
[](GstElement* _appsrc, [[maybe_unused]] guint size, gpointer _callback_object)
{
static_cast<Gstreamer*>(_callback_object)->gstreamerCallback(_appsrc);
};
g_object_set(appsrc.ptr, "stream-type", raw_binary_data, "format", GST_FORMAT_TIME, nullptr);
g_signal_connect(appsrc.ptr,
"need-data",
// such casting is typical, gstreamer tutorial do the same.
reinterpret_cast<GCallback>(+dispatcher),
callback_object);
return std::vector{appsrc};
}
auto GstreamerPipeline::prepareStreamingToUdp() -> std::vector<GstreamerPipelineElement>
{
auto queue_to_udp = GstreamerPipelineElement("queue", "queue_to_udp");
const auto drop_oldest_when_full = 2;
// clang-format off
g_object_set(queue_to_udp.ptr,
"leaky", drop_oldest_when_full,
nullptr);
// clang-format on
auto videoconvert = GstreamerPipelineElement("videoconvert", "videoconvert1");
auto x264enc = GstreamerPipelineElement("x264enc");
auto rtph264pay = GstreamerPipelineElement("rtph264pay");
auto udpsink = GstreamerPipelineElement("udpsink");
auto rtp_payload_type = 96;
g_object_set(rtph264pay.ptr, "pt", rtp_payload_type, nullptr);
g_object_set(
udpsink.ptr, "host", stream_config.address.c_str(), "port", stream_config.port, nullptr);
log->info("INFO: Command to receive udp stream:\n `gst-launch-1.0 udpsrc port={} "
"! application/x-rtp ! rtph264depay ! avdec_h264 ! videoconvert ! autovideosink`\n",
stream_config.port);
log->info("INFO: Command to receive udp stream and save to a file:\n "
"gst-launch-1.0 udpsrc port={} ! application/x-rtp ! rtph264depay ! avdec_h264 ! "
"videoconvert ! tee name=show_and_save show_and_save. ! queue ! autovideosink "
"show_and_save. ! queue ! matroskamux ! filesink location=received_video.mkv -e\n",
stream_config.port);
return std::vector{queue_to_udp, videoconvert, x264enc, rtph264pay, udpsink};
}
void GstreamerPipeline::createPipeline(std::vector<GstreamerPipelineElement> elements)
{
pipeline = gst_pipeline_new("stream_pipeline");
throwingAssertion(pipeline, "pipeline couldn't be created");
for (auto element : elements)
{
throwingAssertion(element.ptr, "element couldn't be created");
gst_bin_add(GST_BIN(pipeline), element.ptr);
}
linkInOrder(std::move(elements));
}
void GstreamerPipeline::throwingAssertion(bool condition, const std::string& message)
{
if (!condition)
{
throw std::runtime_error(message);
}
}
void GstreamerPipeline::linkInOrder(std::vector<GstreamerPipelineElement> elements)
{
for (const auto& [first, second] : sliding2(elements))
{
bool ok = gst_element_link(first.ptr, second.ptr);
throwingAssertion(ok, fmt::format("can't link {} and {}\n", first.name, second.name));
}
}
void GstreamerPipeline::beginPlaying()
{
GstStateChangeReturn ret = gst_element_set_state(pipeline, GST_STATE_PLAYING);
throwingAssertion(ret != GST_STATE_CHANGE_FAILURE,
"Unable to set the pipeline to the playing state.\n");
}
void GstreamerPipeline::handleMessages()
{
gst_bus_add_signal_watch(bus);
g_signal_connect(G_OBJECT(bus),
"message::error",
// such casting is typical, gstreamer tutorial do the same.
reinterpret_cast<GCallback>(GstreamerPipeline::errorCallback),
loop);
gst_object_unref(bus);
}
void GstreamerPipeline::errorCallback([[maybe_unused]] GstBus* bus,
GstMessage* msg,
GMainLoop* loop)
{
GError* err;
gchar* debug_info;
gst_message_parse_error(msg, &err, &debug_info);
g_printerr("Error received from element %s: %s\n", GST_OBJECT_NAME(msg->src), err->message);
g_printerr("Debugging information: %s\n", debug_info ? debug_info : "none");
g_clear_error(&err);
g_free(debug_info);
g_main_loop_quit(loop);
}
GstreamerPipelineElement.hpp
#pragma once
#include <string>
#include <gstreamer-1.0/gst/gst.h>
{
class GstreamerPipelineElement
{
public:
explicit GstreamerPipelineElement(const std::string& factory_name);
explicit GstreamerPipelineElement(const std::string& factory_name,
const std::string& element_name);
GstElement* ptr;
std::string name;
};
}
GstreamerPipelineElements.cpp
#include "data_source/stream/gstreamer/GstreamerPipelineElement.hpp"
GstreamerPipelineElement::GstreamerPipelineElement(const std::string& factory_name) :
GstreamerPipelineElement{factory_name, factory_name}
{
}
GstreamerPipelineElement::GstreamerPipelineElement(const std::string& factory_name,
const std::string& element_name) :
ptr{gst_element_factory_make(factory_name.c_str(), element_name.c_str())},
name{std::move(element_name)}
{
}
Gstreamer.hpp
#pragma once
#include "data_source/camera/FrameMerger.hpp"
#include "data_source/config/StreamConfig.hpp"
#include "data_source/storage/OutputManager.hpp"
#include "data_source/stream/Streamer.hpp"
#include "data_source/utils/MultiResourceManager.hpp"
#include <condition_variable>
#include <mutex>
#include <gstreamer-1.0/gst/gst.h>
#include <opencv2/core/mat.hpp>
#include <ss-utils/Logger.hpp>
{
class Gstreamer : public Streamer
{
public:
Gstreamer(StreamGstreamerConfig new_stream_config,
std::function<std::unique_ptr<FrameMerger>()> merger_factory);
void gstreamerCallback(GstElement* appsrc);
virtual void stream(const TimestampedFrames& frames) override;
~Gstreamer() override = default;
private:
auto pushFrameToPipeline(GstElement* appsrc, const cv::Mat& frame) -> GstFlowReturn;
auto allocateBuffer(const cv::Mat& frame) -> GstBuffer*;
StreamGstreamerConfig stream_config;
MultiResourceManager<FrameMerger> merger_manager;
std::shared_ptr<spdlog::logger> log = ss::log::instance();
std::mutex frame_mutex;
std::condition_variable frame_condition;
bool new_frame_available = false;
cv::Mat frame_for_gstreamer;
};
} // namespace speed_camera
Gstreamer.cpp
#include "data_source/stream/gstreamer/Gstreamer.hpp"
#include "data_source/camera/BaslerCameraLayout.hpp"
#include "data_source/camera/frames/TimestampedFrames.hpp"
#include "data_source/stream/gstreamer/GstreamerPipeline.hpp"
#include "data_source/utils/MultiResourceManager.hpp"
#include <iostream>
#include <opencv2/imgproc.hpp>
#include <thread>
Gstreamer::Gstreamer(
[[maybe_unused]] StreamGstreamerConfig new_stream_config,
[[maybe_unused]] std::function<std::unique_ptr<FrameMerger>()> merger_factory) :
stream_config{new_stream_config},
merger_manager{merger_factory, std::thread::hardware_concurrency()}
{
std::thread gstreamer(
[new_stream_config, this]()
{
g_setenv("GST_DEBUG", "*:3", TRUE);
auto pipeline = std::make_unique<GstreamerPipeline>(new_stream_config);
pipeline->run(this);
});
gstreamer.detach();
}
void Gstreamer::gstreamerCallback(GstElement* appsrc)
{
log->debug("need data");
std::unique_lock<std::mutex> lock(frame_mutex);
using namespace std::chrono_literals;
if (!frame_condition.wait_for(lock, 10s, [this] { return this->new_frame_available; }))
{
handleDeadlock(__PRETTY_FUNCTION__);
}
pushFrameToPipeline(appsrc, frame_for_gstreamer);
new_frame_available = false;
}
void Gstreamer::stream(const TimestampedFrames& frames)
{
auto merger = merger_manager.getResource();
Frame merged;
merger->use([&frames, &merged](FrameMerger& m) { merged = m.mergeFrames(frames.frames); });
{
std::lock_guard<std::mutex> lock(frame_mutex);
resize(merged.cvMatView(),
frame_for_gstreamer,
cv::Size2i{stream_config.width, stream_config.height},
cv::INTER_NEAREST);
new_frame_available = true;
}
log->debug("push data");
frame_condition.notify_one();
}
auto Gstreamer::pushFrameToPipeline(GstElement* appsrc, const cv::Mat& frame) -> GstFlowReturn
{
GstBuffer* buffer = allocateBuffer(frame);
GstFlowReturn ret;
g_signal_emit_by_name(appsrc, "push-buffer", buffer, &ret);
gst_buffer_unref(buffer);
if (ret != GST_FLOW_OK)
{
std::cerr << "Image pushing ends with problems" << std::endl;
}
return ret;
}
auto Gstreamer::allocateBuffer(const cv::Mat& frame) -> GstBuffer*
{
auto size = static_cast<unsigned int>(frame.total() * frame.elemSize());
GstBuffer* buffer = gst_buffer_new_allocate(nullptr, size, nullptr);
GstMapInfo map;
gst_buffer_map(buffer, &map, GST_MAP_WRITE);
memcpy(map.data, frame.data, gst_buffer_get_size(buffer));
static GstClockTime timestamp = 0;
GST_BUFFER_PTS(buffer) = timestamp;
GST_BUFFER_DURATION(buffer) = gst_util_uint64_scale_int(1, GST_SECOND, 3);
timestamp += GST_BUFFER_DURATION(buffer);
gst_buffer_unmap(buffer, &map);
return buffer;
}