hi,junshengy:
Thank you for your previous guidance. I have made significant changes to my implementation based on your suggestions and my own testing, and I would like to share the updates and the remaining issues I am facing.
I made some modifications:
- I switched the inference plugin from NVNFER to NVTracker and used a low-level tracker library with its own configuration file.
- Modify the SEI injection strategy. SEI insertion no longer waits for inference results, switch to asynchronous insertion.
However I have observed two critical issues through detailed logging that I need help understanding.
- Both the forwarding branch and the inference branch process frames in the form of two pulse trains, rather than a stable 33 millisecond rhythm. I have added monotonic timestamp logs in both sei_inject_combe and tracker_strc_combe. The log always shows that two frames are processed within a few milliseconds, followed by an interval of approximately 66 milliseconds before the next burst. For example, sei_inject_debe may process one frame and then process the next frame three milliseconds later, which is far from the expected 33 millisecond interval. I want to understand why GStreamer processes frames in this burst mode instead of smoothing them one after another.
- The second and more critical issue is that the same PTS frame arrives at sei_inject_probe approximately fifty to seventy milliseconds earlier than it arrives at tracker_src_probe.
[sei_inject_probe] pts:1689339161 mono=93430589850
[sei_inject_probe] pts:1719795555 mono=93430586603 ← only 3.3ms later!
[tracker_src_probe] pts:1689339161 mono=93430641369 ← 51.5ms after sei
[tracker_src_probe] pts:1719795555 mono=93430657543 ← 70.9ms after sei
- Can I directly delete udpsink and use RTSP server in the forwarding link?Just like:
queue (delay) -> [SEI prepend probe] -> h264parse -> rtph264pay -> rtsp server
To diagnose this I added timing probes at the nvstreammux sink pad input and at the qa2 sink pad which is the output of nvstreamdemux. The inference pipeline itself from mux through tracker to demux takes only about nineteen milliseconds. Since the total observed delay between sei and tracker for the same PTS is fifty to seventy milliseconds, the additional thirty to fifty milliseconds must come from somewhere before the inference pipeline starts.
My ultimate goal is to achieve inference lagging behind forwarding by at most one frame, so that MetaCache can perform accurate frame-level SEI injection. Currently the two to three frame gap makes this impossible. I would like recommendations on how to configure nvstreammux, nvv4l2decoder, and the queue elements to minimize the inference branch latency and eliminate the burst pattern.
Here is my code:
// SPDX-License-Identifier: MIT
//
// sei_inject_demo
//
// Run primary inference on Jetson with DeepStream and embed each frame's
// detection results as an H.264 user_data_unregistered SEI NAL into the
// **original** elementary stream, then publish RTP/H264 through udpsink. In RTSP
// mode the RTSP server reads that local RTP stream back with udpsrc. The original
// H.264 is never decoded for forwarding and never re-encoded.
//
// Pipeline:
//
// rtspsrc -> rtph264depay -> h264parse -> capsfilter -> tee
// |-> queue (leaky) -> nvv4l2decoder -> nvstreammux -> nvinfer -> nvstreamdemux -> queue -> [meta-extract probe] -> fakesink
// |-> queue (delay) -> [SEI prepend probe] -> h264parse -> rtph264pay -> udpsink
// |-> receiver UDP (-o udp)
// \-> 127.0.0.1:<port> -> RTSP server udpsrc (-o rtsp)
//
// The inference branch publishes detections (keyed by frame PTS) into a
// thread-safe cache. The forwarding branch waits up to N ms for the matching
// PTS, builds an Annex-B SEI NAL, and prepends it to the original H.264
// access unit before RTP packetization.
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include <nvdsmeta.h>
#include <gstnvdsmeta.h>
#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <mutex>
#include <optional>
#include <string>
#include <map>
#include <vector>
#include <sys/ioctl.h>
//#define USE_NVINFER 1
// ---------------------------------------------------------------------------
// Detection cache: PTS -> detections, thread-safe with timed wait
// ---------------------------------------------------------------------------
struct Detection {
uint16_t class_id;
float confidence;
uint16_t x, y, w, h;
uint64_t tracking_id; // MOT/SOT object_id from nvtracker
};
using DetectionMap = std::map<uint64_t, std::vector<Detection>>;
using DetectionNode = DetectionMap::node_type;
uint64_t get_monotonic_time_us() {
struct timespec mono_ts;
if (clock_gettime(CLOCK_MONOTONIC, &mono_ts) < 0) {
return 0;
}
return mono_ts.tv_sec * 1000000 + mono_ts.tv_nsec / 1000;
}
// nvtracker low-level path: confidence may be -0.1; real score in tracker_confidence.
static float object_confidence(const NvDsObjectMeta* om) {
if (om->confidence >= 0.f && om->confidence <= 1.f)
return om->confidence;
if (om->tracker_confidence >= 0.f && om->tracker_confidence <= 1.f)
return om->tracker_confidence;
if (om->tracker_confidence > 1e-6f)
return om->tracker_confidence;
return om->confidence;
}
// Hash table — keys are PTS, no ordering needed. put() takes the detection
// vector by rvalue ref so a caller's std::move(dets) binds directly without
// going through a by-value parameter copy. insert_or_assign performs a
// single move into the node. wait_pop uses extract() to splice the node
// out without a second find + copy.
class MetaCache {
public:
void put(uint64_t pts, std::vector<Detection> d) {
std::lock_guard<std::mutex> lk(m_mtx);
//如果保存的结果超过15帧,删除pts最小的结果
while (m_cache.size() >= m_max_queue) {
g_print("[MetaCache] queue full (%zu/15), drop oldest pts=%lu\n",
m_cache.size(), m_cache.begin()->first);
m_cache.erase(m_cache.begin());
}
//g_print("[%s:%d]nvtracker pts:%lu, det:%d, cache size:%d\n", __FUNCTION__, __LINE__, pts, d.size(), m_cache.size());
m_cache[pts] = std::move(d);
while (m_cache.size() > m_max_obj) m_cache.erase(m_cache.begin()); // bound memory
}
std::optional<DetectionNode> pop(uint64_t pts) {
std::unique_lock<std::mutex> lk(m_mtx);
if (m_cache.empty()) {
return std::nullopt;
}
//
auto it = m_cache.begin();
while(it != m_cache.end()) {
uint64_t cached_pts = it->first;
int64_t diff = static_cast<int64_t>(pts) - static_cast<int64_t>(cached_pts);
g_print("[MetaCache] stream_pts=%lu, cached_pts=%lu, diff=%ld us\n",
pts, cached_pts, diff / 1000);
if (std::abs(diff) > m_timeout) {
// 超时,丢弃这帧老数据
g_print("[MetaCache] timeout drop: pts=%lu diff=%lu ms > 500ms\n",
cached_pts, std::abs(diff / 1000000));
it = m_cache.erase(it); // erase 返回下一个迭代器
continue;
}
// 匹配成功,取出并删除
//auto result = std::move(it->second);
auto result = m_cache.extract(it);
/*g_print("[MetaCache] matched: pts=%lu obj=%zu remaining=%zu\n",
result.key(), result.mapped().size(), m_cache.size() - 1);*/
return std::optional<DetectionNode>(std::move(result));
}
g_print("[MetaCache] no match within 500ms for pts=%lu\n", pts);
return std::nullopt;
}
bool empty() {
std::unique_lock<std::mutex> lk(m_mtx);
return m_cache.empty();
}
private:
static constexpr size_t m_max_obj = 128;
static constexpr int64_t m_max_queue = 15;
static constexpr int64_t m_timeout = 500 * 1000000LL;
std::mutex m_mtx;
std::map<uint64_t, std::vector<Detection>> m_cache;
};
static MetaCache g_cache;
static std::atomic<uint64_t> g_n_inserted{0};
static std::atomic<uint64_t> g_n_passthrough{0};
static std::atomic<uint64_t> g_n_inference{0};
static std::atomic<uint64_t> g_n_udp_packets{0};
static bool is_rtsp_output(const std::string& mode) { return mode == "rtsp"; }
static bool is_udp_output(const std::string& mode) { return mode == "udp"; }
// ---------------------------------------------------------------------------
// SEI builder (Annex-B, user_data_unregistered, payload_type = 5)
// ---------------------------------------------------------------------------
static const uint8_t SEI_UUID[16] = {
0xde, 0xad, 0xbe, 0xef, 0x12, 0x34, 0x56, 0x78,
0x9a, 0xbc, 0xde, 0xf0, 0x11, 0x22, 0x33, 0x44};
static void put_be16(std::vector<uint8_t>& v, uint16_t x) {
v.push_back(uint8_t(x >> 8)); v.push_back(uint8_t(x));
}
static void put_be32(std::vector<uint8_t>& v, uint32_t x) {
v.push_back(uint8_t(x >> 24)); v.push_back(uint8_t(x >> 16));
v.push_back(uint8_t(x >> 8)); v.push_back(uint8_t(x));
}
static void put_f32(std::vector<uint8_t>& v, float f) {
uint32_t u; std::memcpy(&u, &f, 4); put_be32(v, u);
}
// Insert 0x03 after any "00 00 [<=03]" pair so the SEI body cannot fake a
// start code inside the bitstream.
static std::vector<uint8_t> emul_prev(const std::vector<uint8_t>& in) {
std::vector<uint8_t> out;
out.reserve(in.size() + in.size() / 8);
int z = 0;
for (uint8_t b : in) {
if (z >= 2 && b <= 0x03) { out.push_back(0x03); z = 0; }
out.push_back(b);
z = (b == 0) ? z + 1 : 0;
}
return out;
}
// Annex-B SEI NAL = [00 00 00 01][06][05][size_VLI][UUID + payload (with EPB)][80]
static std::vector<uint8_t>
build_sei_annexb(const std::vector<Detection>& dets, uint64_t pts_ns) {
std::vector<uint8_t> rbsp;
rbsp.insert(rbsp.end(), SEI_UUID, SEI_UUID + 16);
put_be32(rbsp, uint32_t(pts_ns >> 32));
put_be32(rbsp, uint32_t(pts_ns));
put_be32(rbsp, uint32_t(dets.size()));
for (const auto& d : dets) {
put_be16(rbsp, d.class_id);
put_f32 (rbsp, d.confidence);
put_be16(rbsp, d.x); put_be16(rbsp, d.y);
put_be16(rbsp, d.w); put_be16(rbsp, d.h);
}
auto epb = emul_prev(rbsp);
std::vector<uint8_t> nal;
nal.insert(nal.end(), {0x00, 0x00, 0x00, 0x01}); // start code
nal.push_back(0x06); // NAL header: SEI
nal.push_back(0x05); // payload_type = 5
size_t sz = rbsp.size(); // payload_size is RBSP bytes, excluding EPB bytes
while (sz >= 255) { nal.push_back(0xFF); sz -= 255; }
nal.push_back(uint8_t(sz)); // payload_size
nal.insert(nal.end(), epb.begin(), epb.end()); // payload
nal.push_back(0x80); // RBSP trailing bits
return nal;
}
static GstPadProbeReturn
mux_sink_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
auto* buf = static_cast<GstBuffer*>(info->data);
uint64_t pts = uint64_t(GST_BUFFER_PTS(buf));
//g_print("[%s:%d:%lu]mux input pts:%lu\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), pts);
/*std::lock_guard<std::mutex> lk(g_mux_time_mtx);
g_mux_in_times[pts] = now;
// 容量保护,防止内存无限增长(异常时 pts 匹配不上)
while (g_mux_in_times.size() > 64) {
g_mux_in_times.erase(g_mux_in_times.begin());
}*/
return GST_PAD_PROBE_OK;
}
// ---------------------------------------------------------------------------
// Probes
// ---------------------------------------------------------------------------
// On the demuxed src pad (after nvstreamdemux). GST_BUFFER_PTS here is the
// original input PTS, which matches branch B's H.264 buffers exactly.
// Important: we *cannot* key the cache with frame_meta->buf_pts when nvstreammux
// runs with live-source=1, because the muxer rewrites the outgoing batched
// PTS from its own clock. nvstreamdemux restores the per-source PTS.
#if USE_NVINFER
static GstPadProbeReturn
nvinfer_src_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
auto* buf = static_cast<GstBuffer*>(info->data);
NvDsBatchMeta* batch = gst_buffer_get_nvds_batch_meta(buf);
if (!batch) return GST_PAD_PROBE_OK;
uint64_t key_pts = uint64_t(GST_BUFFER_PTS(buf));
for (NvDsMetaList* lf = batch->frame_meta_list; lf; lf = lf->next) {
auto* fm = static_cast<NvDsFrameMeta*>(lf->data);
std::vector<Detection> dets;
for (NvDsMetaList* lo = fm->obj_meta_list; lo; lo = lo->next) {
auto* om = static_cast<NvDsObjectMeta*>(lo->data);
Detection d;
d.class_id = uint16_t(om->class_id);
d.confidence = om->confidence;
d.x = uint16_t(om->rect_params.left);
d.y = uint16_t(om->rect_params.top);
d.w = uint16_t(om->rect_params.width);
d.h = uint16_t(om->rect_params.height);
dets.push_back(d);
}
if (dets.size() > 0) {
g_cache.put(key_pts, std::move(dets));
g_n_inference.fetch_add(1, std::memory_order_relaxed);
}
}
return GST_PAD_PROBE_OK;
}
#else
static GstPadProbeReturn
tracker_src_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
auto* buf = static_cast<GstBuffer*>(info->data);
NvDsBatchMeta* batch = gst_buffer_get_nvds_batch_meta(buf);
if (!batch) return GST_PAD_PROBE_OK;
uint64_t key_pts = uint64_t(GST_BUFFER_PTS(buf)); //unit:ns
g_print("[%s:%d:%lu]tracker pts:%lu\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), key_pts);
for (NvDsMetaList* lf = batch->frame_meta_list; lf; lf = lf->next) {
auto* fm = static_cast<NvDsFrameMeta*>(lf->data);
std::vector<Detection> dets;
for (NvDsMetaList* lo = fm->obj_meta_list; lo; lo = lo->next) {
auto* om = static_cast<NvDsObjectMeta*>(lo->data);
Detection d;
d.class_id = uint16_t(om->class_id);
d.confidence = object_confidence(om);
d.x = uint16_t(om->rect_params.left);
d.y = uint16_t(om->rect_params.top);
d.w = uint16_t(om->rect_params.width);
d.h = uint16_t(om->rect_params.height);
d.tracking_id = om->object_id;
dets.push_back(d);
}
if (dets.size() > 0) {
g_cache.put(key_pts, std::move(dets));
g_n_inference.fetch_add(1, std::memory_order_relaxed);
}
}
return GST_PAD_PROBE_OK;
}
#endif
static std::vector<uint8_t> build_sei_annexb_hello() {
const std::string hello = "Hello World";
const uint8_t test_uuid[16] = {
0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11,
0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99
};
std::vector<uint8_t> rbsp;
rbsp.insert(rbsp.end(), test_uuid, test_uuid + 16);
rbsp.insert(rbsp.end(), hello.begin(), hello.end());
uint8_t *pbuf = rbsp.data();
/*g_print("send data(%zu):", rbsp.size());
for (int i = 0; i < rbsp.size(); i++) {
g_print("%02X ", pbuf[i]);
}
g_print("\n\n");*/
auto epb = emul_prev(rbsp);
std::vector<uint8_t> nal;
nal.insert(nal.end(), {0x00, 0x00, 0x00, 0x01}); // start code
nal.push_back(0x06); // NAL type: SEI
nal.push_back(0x05); // payload_type = 5 (user_data_unregistered)
size_t sz = epb.size();
while (sz >= 255) { nal.push_back(0xFF); sz -= 255; }
nal.push_back(uint8_t(sz));
nal.insert(nal.end(), epb.begin(), epb.end());
nal.push_back(0x80); // RBSP trailing bits
return nal;
}
// On forwarding branch sink pad: prepend the SEI NAL to the original H.264 buffer.
// The original buffer is shared with branch A via tee (refcounted), so we must
// build a *new* GstBuffer rather than mutating in place.
#if 1
static GstPadProbeReturn
sei_inject_probe(GstPad*, GstPadProbeInfo* info, gpointer user_data) {
GstBuffer* orig = GST_PAD_PROBE_INFO_BUFFER(info);
if (!orig) return GST_PAD_PROBE_OK;
uint64_t pts = uint64_t(GST_BUFFER_PTS(orig));
g_print("[%s:%d:%lu]sei pts:%lu\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), pts);
if (g_cache.empty()) {
g_n_passthrough.fetch_add(1, std::memory_order_relaxed);
return GST_PAD_PROBE_OK; // forward unchanged
}
auto track_result = g_cache.pop(pts);
if (!track_result) {
return GST_PAD_PROBE_OK;
}
uint64_t result_pts = track_result->key();
std::vector<Detection> meta = std::move(track_result->mapped());
//g_print("[%s:%d:%lu]sei pts:%lu(%lu)\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), pts, result_pts);
/*g_print("meta count:%d\n", meta->size());
for (auto &dd : *meta) {
g_print("class_id:%u,conf:%f, rect:[%u %u %u %u], track:%lu\n",
dd.class_id, dd.confidence, dd.x, dd.y, dd.w, dd.h, dd.tracking_id);
}*/
auto sei = build_sei_annexb(meta, pts);
gsize osz = gst_buffer_get_size(orig);
GstBuffer* nb = gst_buffer_new_allocate(nullptr, sei.size() + osz, nullptr);
GstMapInfo mi;
gst_buffer_map(nb, &mi, GST_MAP_WRITE);
std::memcpy(mi.data, sei.data(), sei.size());
gst_buffer_extract(orig, 0, mi.data + sei.size(), osz);
gst_buffer_unmap(nb, &mi);
GST_BUFFER_PTS(nb) = GST_BUFFER_PTS(orig);
GST_BUFFER_DTS(nb) = GST_BUFFER_DTS(orig);
GST_BUFFER_DURATION(nb) = GST_BUFFER_DURATION(orig);
GST_BUFFER_FLAGS(nb) = GST_BUFFER_FLAGS(orig);
info->data = nb;
gst_buffer_unref(orig);
g_n_inserted.fetch_add(1, std::memory_order_relaxed);
return GST_PAD_PROBE_OK;
}
#else
static GstPadProbeReturn
sei_inject_probe(GstPad*, GstPadProbeInfo* info, gpointer user_data) {
GstBuffer* orig = GST_PAD_PROBE_INFO_BUFFER(info);
if (!orig) return GST_PAD_PROBE_OK;
// 构建一个包含 "Hello World" 字符串的 SEI NAL(user_data_unregistered)
std::vector<uint8_t> sei = build_sei_annexb_hello();
if (sei.empty()) {
return GST_PAD_PROBE_OK;
}
gsize osz = gst_buffer_get_size(orig);
GstBuffer* nb = gst_buffer_new_allocate(nullptr, sei.size() + osz, nullptr);
GstMapInfo mi;
gst_buffer_map(nb, &mi, GST_MAP_WRITE);
std::memcpy(mi.data, sei.data(), sei.size());
gst_buffer_extract(orig, 0, mi.data + sei.size(), osz);
gst_buffer_unmap(nb, &mi);
GST_BUFFER_PTS(nb) = GST_BUFFER_PTS(orig);
GST_BUFFER_DTS(nb) = GST_BUFFER_DTS(orig);
GST_BUFFER_DURATION(nb) = GST_BUFFER_DURATION(orig);
GST_BUFFER_FLAGS(nb) = GST_BUFFER_FLAGS(orig);
info->data = nb;
gst_buffer_unref(orig);
return GST_PAD_PROBE_OK;
}
#endif
// ---------------------------------------------------------------------------
// rtspsrc dynamic pad -> rtph264depay
// ---------------------------------------------------------------------------
static void on_rtsp_pad_added(GstElement*, GstPad* pad, gpointer user) {
auto* depay = static_cast<GstElement*>(user);
GstPad* sink = gst_element_get_static_pad(depay, "sink");
if (!gst_pad_is_linked(sink)) {
GstCaps* caps = gst_pad_get_current_caps(pad);
if (!caps) caps = gst_pad_query_caps(pad, nullptr);
if (caps) {
const gchar* enc = gst_structure_get_string(
gst_caps_get_structure(caps, 0), "encoding-name");
g_print("[rtspsrc] new pad encoding-name=%s\n", enc ? enc : "(null)");
if (enc && g_str_equal(enc, "H264")) gst_pad_link(pad, sink);
gst_caps_unref(caps);
}
}
gst_object_unref(sink);
}
static GstPadProbeReturn udp_packet_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
if (GST_PAD_PROBE_INFO_BUFFER(info)) {
g_n_udp_packets.fetch_add(1, std::memory_order_relaxed);
}
return GST_PAD_PROBE_OK;
}
// ---------------------------------------------------------------------------
// Periodic stats / bus
// ---------------------------------------------------------------------------
static gboolean stats_tick(gpointer) {
g_print("[stats] inference=%lu sei_inserted=%lu passthrough=%lu udp_packets=%lu\n",
(unsigned long)g_n_inference.load(),
(unsigned long)g_n_inserted.load(),
(unsigned long)g_n_passthrough.load(),
(unsigned long)g_n_udp_packets.load());
return TRUE;
}
static gboolean bus_cb(GstBus*, GstMessage* msg, gpointer loop) {
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_EOS:
g_print("[bus] EOS\n");
g_main_loop_quit(static_cast<GMainLoop*>(loop));
break;
case GST_MESSAGE_ERROR: {
GError* e = nullptr; gchar* d = nullptr;
gst_message_parse_error(msg, &e, &d);
g_printerr("[bus] ERROR: %s | %s\n", e->message, d ? d : "");
g_error_free(e); g_free(d);
g_main_loop_quit(static_cast<GMainLoop*>(loop));
break;
}
case GST_MESSAGE_WARNING: {
GError* e = nullptr; gchar* d = nullptr;
gst_message_parse_warning(msg, &e, &d);
g_printerr("[bus] WARN: %s | %s\n", e->message, d ? d : "");
g_error_free(e); g_free(d);
break;
}
default: break;
}
return TRUE;
}
// ---------------------------------------------------------------------------
// CLI
// ---------------------------------------------------------------------------
struct Args {
const char* rtsp_url = "rtsp://192.168.0.13:554/aibox_transfer_fpv";
const char* pgie_cfg = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/model/det/0/config_infer_primary_yolo11.txt";
const char* ll_lib_file = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/libs/libByteTracker.so";
const char* ll_config_file = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/config/config_sot.yml";
const char* output_mode = "rtsp";
const char* out_host = "192.168.11.1";
int out_port = 8554;
const char* out_mount = "/aibox";
const char* udp_host = "127.0.0.1";
int udp_port = 5400;
int width = 1280;
int height = 720;
int tracker_width = 960;
int tracker_height = 544;
int infer_interval = 0; // nvinfer interval; a large value skips most inference batches.
};
static void usage(const char* a0) {
g_printerr(
"usage: %s [-r input_rtsp_url] [-c pgie_cfg] [-o udp|rtsp]\n"
" [-u udp_host] [-P udp_port]\n"
" [-h rtsp_bind_host] [-p rtsp_port] [-m rtsp_mount]\n"
" [-W width] [-H height] [-I nvinfer_interval]\n", a0);
}
int main(int argc, char** argv) {
std::setvbuf(stdout, nullptr, _IOLBF, 0);
Args A;
for (int i = 1; i < argc; ++i) {
auto next = [&](const char* def)->const char*{ return (i + 1 < argc) ? argv[++i] : def; };
std::string s = argv[i];
if (s == "-r") A.rtsp_url = next(A.rtsp_url);
//else if (s == "-c") A.pgie_cfg = next(A.pgie_cfg);
else if (s == "-o") A.output_mode = next(A.output_mode);
else if (s == "-u") A.udp_host = next(A.udp_host);
else if (s == "-P") A.udp_port = std::atoi(next("5400"));
else if (s == "-h") A.out_host = next(A.out_host);
else if (s == "-p") A.out_port = std::atoi(next("8554"));
else if (s == "-m") A.out_mount = next(A.out_mount);
else if (s == "-W") A.width = std::atoi(next("1280"));
else if (s == "-H") A.height = std::atoi(next("720"));
else if (s == "-I") A.infer_interval = std::atoi(next("0"));
else if (s == "--help") { usage(argv[0]); return 0; }
else { usage(argv[0]); return 1; }
}
std::string output_mode = A.output_mode;
if (!is_udp_output(output_mode) && !is_rtsp_output(output_mode)) {
g_printerr("invalid output mode: %s\n", A.output_mode);
usage(argv[0]);
return 1;
}
if (A.infer_interval < 0) A.infer_interval = 0;
g_print("[cfg] input=%s\n ll_config_file=%s\n mode=%s wh=%dx%d nvinfer_interval=%d\n",
A.rtsp_url, A.ll_config_file, A.output_mode, A.width, A.height,
A.infer_interval);
if (is_udp_output(output_mode)) {
g_print(" output=rtp://%s:%d payload=H264 pt=96\n", A.udp_host, A.udp_port);
} else {
g_print(" output=rtsp://%s:%d%s\n", A.out_host, A.out_port, A.out_mount);
g_print(" bridge=rtp://127.0.0.1:%d payload=H264 pt=96\n", A.udp_port);
}
gst_init(&argc, &argv);
GstRTSPServer* rtsp_server = nullptr;
guint rtsp_source_id = 0;
rtsp_server = gst_rtsp_server_new();
std::string out_service = std::to_string(A.out_port);
g_object_set(rtsp_server, "address", A.out_host,
"service", out_service.c_str(),
nullptr);
GstRTSPMountPoints* mounts = gst_rtsp_server_get_mount_points(rtsp_server);
GstRTSPMediaFactory* factory = gst_rtsp_media_factory_new();
std::string rtsp_launch =
"( udpsrc port=" + std::to_string(A.udp_port) +
" caps=\"application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H264,payload=(int)96\" "
"! queue leaky=downstream max-size-buffers=120 max-size-bytes=0 max-size-time=0 "
"! rtph264depay "
"! h264parse config-interval=-1 "
"! rtph264pay name=pay0 pt=96 config-interval=-1 mtu=1400 )";
gst_rtsp_media_factory_set_launch(factory, rtsp_launch.c_str());
gst_rtsp_media_factory_set_shared(factory, TRUE);
gst_rtsp_mount_points_add_factory(mounts, A.out_mount, factory);
g_object_unref(mounts);
rtsp_source_id = gst_rtsp_server_attach(rtsp_server, nullptr);
if (!rtsp_source_id) {
g_printerr("RTSP server attach failed for rtsp://%s:%d%s\n", A.out_host, A.out_port, A.out_mount);
g_object_unref(rtsp_server);
return 1;
}
g_print("[rtsp] serving rtsp://%s:%d%s from local UDP port %d\n",
A.out_host, A.out_port, A.out_mount, A.udp_port);
GstElement* pipe = gst_pipeline_new("sei-inject");
GstElement* src = gst_element_factory_make("rtspsrc", "src");
GstElement* depay = gst_element_factory_make("rtph264depay", "depay");
GstElement* parse = gst_element_factory_make("h264parse", "parse");
GstElement* capsf = gst_element_factory_make("capsfilter", "bsf");
GstElement* tee = gst_element_factory_make("tee", "t");
// Branch A = inference: decode + nvinfer + meta extract
GstElement* qa = gst_element_factory_make("queue", "qa");
GstElement* dec = gst_element_factory_make("nvv4l2decoder", "dec");
GstElement* mux = gst_element_factory_make("nvstreammux", "mux");
#if USE_NVINFER
GstElement* infer = gst_element_factory_make("nvinfer", "pgie");
#else
GstElement* tracker = gst_element_factory_make("nvtracker", "tracker");
#endif
GstElement* dmx = gst_element_factory_make("nvstreamdemux", "dmx");
GstElement* qa2 = gst_element_factory_make("queue", "qa2");
GstElement* fa = gst_element_factory_make("fakesink", "fa");
// Branch B = forwarding: queue with delay -> SEI prepend probe -> parse -> pay -> UDP/RTSP sink
GstElement* qb = gst_element_factory_make("queue", "qb");
GstElement* out_parse = gst_element_factory_make("h264parse", "udp_parse");
GstElement* out_pay = gst_element_factory_make("rtph264pay", "udp_pay");
GstElement* sink = gst_element_factory_make("udpsink", "udp_sink");
#if USE_NVINFER
if (!pipe||!src||!depay||!parse||!capsf||!tee||!qa||!dec||!mux
||!infer||!dmx||!qa2||!fa||!qb||!out_parse||!out_pay||!sink) {
g_printerr("element creation failed\n"); return 1;
}
#else
if (!pipe||!src||!depay||!parse||!capsf||!tee||!qa||!dec||!mux
||!tracker||!dmx||!qa2||!fa||!qb||!out_parse||!out_pay||!sink) {
g_printerr("element creation failed\n"); return 1;
}
#endif
g_object_set(src, "location", A.rtsp_url,
"latency", 80,
"drop-on-latency", TRUE,
"protocols", 4,
nullptr);
g_object_set(parse, "config-interval", -1, nullptr);
// Force Annex-B byte-stream into both branches so the SEI prepend (which uses
// 00 00 00 01 start codes) and nvv4l2decoder both consume the same format.
GstCaps* bs = gst_caps_from_string(
"video/x-h264,stream-format=byte-stream,alignment=au");
g_object_set(capsf, "caps", bs, nullptr);
gst_caps_unref(bs);
// Keep the tee alive even if branch B's downstream tears down.
g_object_set(tee, "allow-not-linked", TRUE, nullptr);
// Branch A: drop oldest decoder input if GPU briefly stalls. This keeps
// backpressure off branch B (forwarding) at the cost of inferring fewer frames.
g_object_set(qa, "leaky", 2,
"max-size-buffers", 4,
"max-size-bytes", 0,
"max-size-time", (guint64)0,
nullptr);
g_object_set(mux, "batch-size", 1,
"width", A.width, "height", A.height,
"live-source", 1,
"batched-push-timeout", 40000, nullptr);
// nvinfer interval is the number of consecutive batches skipped between
// inference runs. Use -I 999999 to keep the element in the pipeline while
// effectively disabling per-frame inference after the initial batch.
#if USE_NVINFER
g_object_set(infer,
"config-file-path", A.pgie_cfg,
"batch-size", 1,
"interval", (guint)A.infer_interval,
nullptr);
#else
g_object_set(tracker,
"ll-lib-file", A.ll_lib_file,
"ll-config-file", A.ll_config_file,
"tracker-width", A.tracker_width,
"tracker-height", A.tracker_height,
"gpu-id", 0,
"compute-hw", 1,
nullptr);
#endif
g_object_set(fa, "sync", FALSE, "async", FALSE, nullptr);
// Branch B: introduce a fixed delay so nvinfer has time to publish meta for
// the matching PTS before this buffer reaches the SEI inject probe.
g_object_set(qb, "max-size-buffers", 60,
"max-size-bytes", 0,
"max-size-time", (guint64)0,
nullptr);
g_object_set(out_parse, "config-interval", -1, nullptr);
g_object_set(out_pay, "pt", 96,
"config-interval", 1,
"mtu", 1400,
nullptr);
g_object_set(sink, "host", A.udp_host,
"port", A.udp_port,
"sync", FALSE,
"async", FALSE,
"buffer-size", 1048576,
nullptr);
#if USE_NVINFER
gst_bin_add_many(GST_BIN(pipe), src, depay, parse, capsf, tee,
qa, dec, mux, infer, dmx, qa2, fa,
qb, nullptr);
#else
gst_bin_add_many(GST_BIN(pipe), src, depay, parse, capsf, tee,
qa, dec, mux, tracker, dmx, qa2, fa,
qb, nullptr);
#endif
gst_bin_add_many(GST_BIN(pipe), out_parse, out_pay, sink, nullptr);
if (!gst_element_link_many(depay, parse, capsf, tee, nullptr)) {
g_printerr("link depay->parse->caps->tee failed\n"); return 1;
}
if (!gst_element_link(tee, qa) || !gst_element_link(qa, dec)) {
g_printerr("link branch-A pre-mux failed\n"); return 1;
}
GstPad* dec_src = gst_element_get_static_pad(dec, "src");
//GstPad* mux_sink = gst_element_request_pad_simple(mux, "sink_0");
GstPad* mux_sink = gst_element_get_request_pad(mux, "sink_0");
if (!dec_src || !mux_sink ||
gst_pad_link(dec_src, mux_sink) != GST_PAD_LINK_OK) {
g_printerr("dec.src -> mux.sink_0 failed\n"); return 1;
}
gst_pad_add_probe(mux_sink, GST_PAD_PROBE_TYPE_BUFFER, mux_sink_probe, nullptr, nullptr);
gst_object_unref(dec_src); gst_object_unref(mux_sink);
#if USE_NVINFER
if (!gst_element_link_many(mux, infer, dmx, nullptr)) {
g_printerr("link mux->infer->dmx failed\n"); return 1;
}
#else
if (!gst_element_link_many(mux, tracker, dmx, nullptr)) {
g_printerr("link mux->tracker->dmx failed\n"); return 1;
}
#endif
//GstPad* dmx_src = gst_element_request_pad_simple(dmx, "src_0");
GstPad* dmx_src = gst_element_get_request_pad(dmx, "src_0");
GstPad* qa2_sink = gst_element_get_static_pad(qa2, "sink");
if (!dmx_src || !qa2_sink ||
gst_pad_link(dmx_src, qa2_sink) != GST_PAD_LINK_OK) {
g_printerr("dmx.src_0 -> qa2 failed\n"); return 1;
}
gst_object_unref(dmx_src); gst_object_unref(qa2_sink);
if (!gst_element_link(qa2, fa)) {
g_printerr("link qa2 -> fa failed\n"); return 1;
}
if (!gst_element_link_many(tee, qb, out_parse, out_pay, sink, nullptr)) {
g_printerr("link branch-B RTP/UDP failed\n"); return 1;
}
g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added), depay);
GstPad* p = gst_element_get_static_pad(qa2, "sink");
#if USE_NVINFER
gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, nvinfer_src_probe, nullptr, nullptr);
#else
gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, tracker_src_probe, nullptr, nullptr);
#endif
gst_object_unref(p);
p = gst_element_get_static_pad(out_parse, "sink");
gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER,
sei_inject_probe, nullptr, nullptr);
gst_object_unref(p);
p = gst_element_get_static_pad(sink, "sink");
gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, udp_packet_probe, nullptr, nullptr);
gst_object_unref(p);
if (gst_element_set_state(pipe, GST_STATE_PLAYING)
== GST_STATE_CHANGE_FAILURE) {
g_printerr("pipeline start failed\n"); return 1;
}
GMainLoop* loop = g_main_loop_new(nullptr, FALSE);
GstBus* bus = gst_element_get_bus(pipe);
gst_bus_add_watch(bus, bus_cb, loop);
g_object_unref(bus);
g_timeout_add_seconds(2, stats_tick, nullptr);
g_main_loop_run(loop);
gst_element_set_state(pipe, GST_STATE_NULL);
g_object_unref(pipe);
if (rtsp_source_id) g_source_remove(rtsp_source_id);
if (rtsp_server) g_object_unref(rtsp_server);
g_main_loop_unref(loop);
return 0;
}
Thank you for your continued support. I look forward to your insights on the burst pattern and the sources of latency in the inference branch.