Why does running deepstream on Orin Nano result in CPU usage exceeding 500%?

Hi, junshengy:

Thank you very much for the detailed example and the zip file.

I have analyzed the code and confirmed that it avoids re-encoding entirely – the forwarding branch simply prepends SEI NALs to the original H.264 stream.

I have two follow‑up questions regarding integration into DeepStream:

  1. DeepStream integration

Can I implement this directly inside the create_rtsp_src_bin function (or create_pipeline) without using two separate sink bins?

I plan to add a tee after rtph264depay, keep the original decoding/inference branch, and create a new branch for SEI injection + rtph264pay + udpsink. Is this the intended way?

  1. SEI verification

What is the simplest method to verify that the SEI messages are correctly injected on the receiving side?

I see that h264parse can print debug logs, but is there a more direct way (e.g., a probe to dump SEI payloads)?

Any further advice would be greatly appreciated.

If you only need one pgie, modifying the sample code I provided above should be a simpler solution. The deepstream-app code is quite long and complex to modify. If you require the ability to reconnect or dynamically add sources, consider replacing the source element in the sample code with nvurisrcbin or nvmultiurisrcbin.

The sample code I provided includes complete code for SEI injection and verification.
sei_decode_example.cpp is used to check whether SEI has been injected.

Hi,junshengy:

Thank you very much for your reply.

After optimization on Orin Nano, the test program(Refer to the implementation of sei_inject_demo.cpp) is now configured to forward the original H.264 stream without any SEI injection and without waiting for inference results (pure forwarding test).However, the end‑to‑end latency is still around 800 ms.

On Orin NX, DeepStream runs with hardware encoding (nvv4l2h264enc) and latency is around 300 ms.

Is there any further optimization to reduce the latency on Orin Nano to below 400 ms?

Here is my code:

#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include <nvdsmeta.h>
#include <gstnvdsmeta.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <map>
#include <mutex>
#include <optional>
#include <string>
#include <vector>
#include <time.h>
#include <sys/ioctl.h>

// ---------------------------------------------------------------------------
// Detection cache: PTS -> detections, thread-safe with timed wait
// ---------------------------------------------------------------------------
struct Detection {
  uint16_t class_id;
  float    confidence;
  uint16_t x, y, w, h;
};

class MetaCache {
 public:
  void put(uint64_t pts, std::vector<Detection> d) {
    std::lock_guard<std::mutex> lk(m_);
    cache_[pts] = std::move(d);
    while (cache_.size() > kCap) cache_.erase(cache_.begin()); // bound memory
    cv_.notify_all();
  }
  std::optional<std::vector<Detection>>
  wait_pop(uint64_t pts, std::chrono::milliseconds to) {
    std::unique_lock<std::mutex> lk(m_);
    cv_.wait_for(lk, to, [&] { return cache_.count(pts); });
    auto it = cache_.find(pts);
    if (it == cache_.end()) return std::nullopt;
    auto v = std::move(it->second);
    cache_.erase(it);
    return v;
  }

 private:
  static constexpr size_t kCap = 256;
  std::mutex m_;
  std::condition_variable cv_;
  std::map<uint64_t, std::vector<Detection>> cache_;
};

static MetaCache g_cache;
static std::atomic<uint64_t> g_n_inserted{0};
static std::atomic<uint64_t> g_n_passthrough{0};
static std::atomic<uint64_t> g_n_inference{0};

uint64_t get_monotonic_time_us() {
    struct timespec mono_ts;

    if (clock_gettime(CLOCK_MONOTONIC, &mono_ts) < 0) {
        return 0;
    }

    return mono_ts.tv_sec * 1000000 + mono_ts.tv_nsec / 1000;
}

// ---------------------------------------------------------------------------
// SEI builder (Annex-B, user_data_unregistered, payload_type = 5)
// ---------------------------------------------------------------------------
static const uint8_t SEI_UUID[16] = {
    0xde, 0xad, 0xbe, 0xef, 0x12, 0x34, 0x56, 0x78,
    0x9a, 0xbc, 0xde, 0xf0, 0x11, 0x22, 0x33, 0x44};

static void put_be16(std::vector<uint8_t>& v, uint16_t x) {
  v.push_back(uint8_t(x >> 8)); v.push_back(uint8_t(x));
}
static void put_be32(std::vector<uint8_t>& v, uint32_t x) {
  v.push_back(uint8_t(x >> 24)); v.push_back(uint8_t(x >> 16));
  v.push_back(uint8_t(x >>  8)); v.push_back(uint8_t(x));
}
static void put_f32(std::vector<uint8_t>& v, float f) {
  uint32_t u; std::memcpy(&u, &f, 4); put_be32(v, u);
}

// Insert 0x03 after any "00 00 [<=03]" pair so the SEI body cannot fake a
// start code inside the bitstream.
static std::vector<uint8_t> emul_prev(const std::vector<uint8_t>& in) {
  std::vector<uint8_t> out;
  out.reserve(in.size() + in.size() / 8);
  int z = 0;
  for (uint8_t b : in) {
    if (z >= 2 && b <= 0x03) { out.push_back(0x03); z = 0; }
    out.push_back(b);
    z = (b == 0) ? z + 1 : 0;
  }
  return out;
}

// Annex-B SEI NAL = [00 00 00 01][06][05][size_VLI][UUID + payload (with EPB)][80]
static std::vector<uint8_t>
build_sei_annexb(const std::vector<Detection>& dets, uint64_t pts_ns) {
  std::vector<uint8_t> rbsp;
  rbsp.insert(rbsp.end(), SEI_UUID, SEI_UUID + 16);
  put_be32(rbsp, uint32_t(pts_ns >> 32));
  put_be32(rbsp, uint32_t(pts_ns));
  put_be32(rbsp, uint32_t(dets.size()));
  for (const auto& d : dets) {
    put_be16(rbsp, d.class_id);
    put_f32 (rbsp, d.confidence);
    put_be16(rbsp, d.x); put_be16(rbsp, d.y);
    put_be16(rbsp, d.w); put_be16(rbsp, d.h);
  }
  auto epb = emul_prev(rbsp);

  std::vector<uint8_t> nal;
  nal.insert(nal.end(), {0x00, 0x00, 0x00, 0x01}); // start code
  nal.push_back(0x06);                             // NAL header: SEI
  nal.push_back(0x05);                             // payload_type = 5
  size_t sz = epb.size();
  while (sz >= 255) { nal.push_back(0xFF); sz -= 255; }
  nal.push_back(uint8_t(sz));                      // payload_size
  nal.insert(nal.end(), epb.begin(), epb.end());   // payload
  nal.push_back(0x80);                             // RBSP trailing bits
  return nal;
}


static GstPadProbeReturn
nvinfer_src_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
  auto* buf = static_cast<GstBuffer*>(info->data);
  NvDsBatchMeta* batch = gst_buffer_get_nvds_batch_meta(buf);
  if (!batch) return GST_PAD_PROBE_OK;

  uint64_t key_pts = uint64_t(GST_BUFFER_PTS(buf));

  for (NvDsMetaList* lf = batch->frame_meta_list; lf; lf = lf->next) {
    auto* fm = static_cast<NvDsFrameMeta*>(lf->data);
    std::vector<Detection> dets;
    for (NvDsMetaList* lo = fm->obj_meta_list; lo; lo = lo->next) {
      auto* om = static_cast<NvDsObjectMeta*>(lo->data);
      Detection d;
      d.class_id   = uint16_t(om->class_id);
      d.confidence = om->confidence;
      d.x = uint16_t(om->rect_params.left);
      d.y = uint16_t(om->rect_params.top);
      d.w = uint16_t(om->rect_params.width);
      d.h = uint16_t(om->rect_params.height);
      dets.push_back(d);
    }
    g_cache.put(key_pts, std::move(dets));
    g_n_inference.fetch_add(1, std::memory_order_relaxed);
  }
  static auto last_time = std::chrono::steady_clock::now();
  auto now = std::chrono::steady_clock::now();
  auto delta_us = std::chrono::duration_cast<std::chrono::microseconds>(now - last_time).count();
  last_time = now;
  g_print("[infer interval:%lu] %lld us\n",  get_monotonic_time_us(), (long long)delta_us);
  return GST_PAD_PROBE_OK;
}

static GstPadProbeReturn
sei_inject_probe(GstPad*, GstPadProbeInfo* info, gpointer user_data) {
#if 0
  GstBuffer* orig = GST_PAD_PROBE_INFO_BUFFER(info);
  if (!orig) return GST_PAD_PROBE_OK;

  uint64_t pts = uint64_t(GST_BUFFER_PTS(orig));
  int wait_ms  = GPOINTER_TO_INT(user_data);
  auto meta = g_cache.wait_pop(pts, std::chrono::milliseconds(wait_ms));
  if (!meta) {
    g_n_passthrough.fetch_add(1, std::memory_order_relaxed);
    return GST_PAD_PROBE_OK; // forward unchanged
  }

  auto sei = build_sei_annexb(*meta, pts);
  gsize osz = gst_buffer_get_size(orig);

  GstBuffer* nb = gst_buffer_new_allocate(nullptr, sei.size() + osz, nullptr);
  GstMapInfo mi;
  gst_buffer_map(nb, &mi, GST_MAP_WRITE);
  std::memcpy(mi.data, sei.data(), sei.size());
  gst_buffer_extract(orig, 0, mi.data + sei.size(), osz);
  gst_buffer_unmap(nb, &mi);

  GST_BUFFER_PTS(nb)      = GST_BUFFER_PTS(orig);
  GST_BUFFER_DTS(nb)      = GST_BUFFER_DTS(orig);
  GST_BUFFER_DURATION(nb) = GST_BUFFER_DURATION(orig);
  GST_BUFFER_FLAGS(nb)    = GST_BUFFER_FLAGS(orig);

  info->data = nb;
  gst_buffer_unref(orig);
  g_n_inserted.fetch_add(1, std::memory_order_relaxed);
#endif
  return GST_PAD_PROBE_OK;
}

static void on_rtsp_pad_added(GstElement*, GstPad* pad, gpointer user) {
  auto* depay = static_cast<GstElement*>(user);
  GstPad* sink = gst_element_get_static_pad(depay, "sink");
  if (!gst_pad_is_linked(sink)) {
    GstCaps* caps = gst_pad_get_current_caps(pad);
    if (!caps) caps = gst_pad_query_caps(pad, nullptr);
    if (caps) {
      const gchar* enc = gst_structure_get_string(
          gst_caps_get_structure(caps, 0), "encoding-name");
      g_print("[rtspsrc] new pad encoding-name=%s\n", enc ? enc : "(null)");
      if (enc && g_str_equal(enc, "H264")) gst_pad_link(pad, sink);
      gst_caps_unref(caps);
    }
  }
  gst_object_unref(sink);
}

// ---------------------------------------------------------------------------
// Periodic stats / bus
// ---------------------------------------------------------------------------
static gboolean stats_tick(gpointer) {
  g_print("[stats] inference=%lu sei_inserted=%lu passthrough=%lu\n",
          (unsigned long)g_n_inference.load(),
          (unsigned long)g_n_inserted.load(),
          (unsigned long)g_n_passthrough.load());
  return TRUE;
}

static gboolean bus_cb(GstBus*, GstMessage* msg, gpointer loop) {
  switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_EOS:
      g_print("[bus] EOS\n");
      g_main_loop_quit(static_cast<GMainLoop*>(loop));
      break;
    case GST_MESSAGE_ERROR: {
      GError* e = nullptr; gchar* d = nullptr;
      gst_message_parse_error(msg, &e, &d);
      g_printerr("[bus] ERROR: %s | %s\n", e->message, d ? d : "");
      g_error_free(e); g_free(d);
      g_main_loop_quit(static_cast<GMainLoop*>(loop));
      break;
    }
    case GST_MESSAGE_WARNING: {
      GError* e = nullptr; gchar* d = nullptr;
      gst_message_parse_warning(msg, &e, &d);
      g_printerr("[bus] WARN: %s | %s\n", e->message, d ? d : "");
      g_error_free(e); g_free(d);
      break;
    }
    default: break;
  }
  return TRUE;
}


struct Args {
  const char* rtsp_url    = "rtsp://192.168.0.119:554/live/stream";
  const char* pgie_cfg    = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/model/det/0/config_infer_primary_yolo11.txt";
  const char* rtsp_listen    = "192.168.11.1";
  int         rtsp_port    = 8554;
  int         width       = 1280;
  int         height      = 720;
  int         delay_ms    = 0;   // forwarding queue min-threshold-time
  int         wait_ms     = 100;   // SEI probe wait_pop timeout
};

static void usage(const char* a0) {
  g_printerr(
    "usage: %s [-r rtsp_url] [-c pgie_cfg] [-h rtsp_listen] [-p rtsp_port]\n"
    "          [-W width] [-H height] [-d delay_ms] [-w wait_ms]\n", a0);
}

int main(int argc, char** argv) {
  std::setvbuf(stdout, nullptr, _IOLBF, 0);
  Args A;
  for (int i = 1; i < argc; ++i) {
    auto next = [&](const char* def)->const char*{ return (i + 1 < argc) ? argv[++i] : def; };
    std::string s = argv[i];
    if      (s == "-r") A.rtsp_url = next(A.rtsp_url);
    else if (s == "-c") A.pgie_cfg = next(A.pgie_cfg);
    else if (s == "-h") A.rtsp_listen = next(A.rtsp_listen);
    else if (s == "-p") A.rtsp_port = std::atoi(next("8554"));
    else if (s == "-W") A.width    = std::atoi(next("1280"));
    else if (s == "-H") A.height   = std::atoi(next("720"));
    else if (s == "-d") A.delay_ms = std::atoi(next("0"));
    else if (s == "-w") A.wait_ms  = std::atoi(next("100"));
    else { usage(argv[0]); return 1; }
  }

  g_print("[cfg] rtsp=%s\n      pgie=%s\n      rtsp server=%s:%d  wh=%dx%d delay=%dms wait=%dms\n",
          A.rtsp_url, A.pgie_cfg, A.rtsp_listen, A.rtsp_port,
          A.width, A.height, A.delay_ms, A.wait_ms);

  gst_init(&argc, &argv);

  GstElement* pipe  = gst_pipeline_new("sei-inject");
  GstElement* src   = gst_element_factory_make("rtspsrc",        "src");
  GstElement* depay = gst_element_factory_make("rtph264depay",   "depay");
  GstElement* parse = gst_element_factory_make("h264parse",      "parse");
  GstElement* capsf = gst_element_factory_make("capsfilter",     "bsf");
  GstElement* tee   = gst_element_factory_make("tee",            "t");

  GstElement* qa    = gst_element_factory_make("queue",          "qa");
  GstElement* dec   = gst_element_factory_make("nvv4l2decoder",  "dec");
  GstElement* mux   = gst_element_factory_make("nvstreammux",    "mux");
  GstElement* infer = gst_element_factory_make("nvinfer",        "pgie");
  GstElement* dmx   = gst_element_factory_make("nvstreamdemux",  "dmx");
  GstElement* qa2   = gst_element_factory_make("queue",          "qa2");
  GstElement* fa    = gst_element_factory_make("fakesink",       "fa");

  GstElement* qb    = gst_element_factory_make("queue",          "qb");
  GstElement* pay   = gst_element_factory_make("rtph264pay",     "pay");
  GstElement* udp   = gst_element_factory_make("udpsink",        "out");

  if (!pipe||!src||!depay||!parse||!capsf||!tee||!qa||!dec||!mux
      ||!infer||!dmx||!qa2||!fa||!qb||!pay||!udp) {
    g_printerr("element creation failed\n"); return 1;
  }

  g_object_set(src,   "location", A.rtsp_url,
                      "latency",  50,
                      "protocols", 4 /* GST_RTSP_LOWER_TRANS_TCP */,
                      "drop-on-latency", TRUE, 
                      nullptr);
  g_object_set(parse, "config-interval", -1, nullptr);

  GstCaps* bs = gst_caps_from_string(
      "video/x-h264,stream-format=byte-stream,alignment=au");
  g_object_set(capsf, "caps", bs, nullptr);
  gst_caps_unref(bs);

  g_object_set(tee, "allow-not-linked", TRUE, nullptr);

  g_object_set(qa, "leaky", 2, "max-size-buffers", 2,
                   "max-size-bytes", 0, "max-size-time", (guint64)0, nullptr);

  g_object_set(mux, "batch-size", 1,
                    "width",  A.width, "height", A.height,
                    "live-source", 1,
                    "batched-push-timeout", 20000, nullptr);
  g_object_set(infer, "config-file-path", A.pgie_cfg, "batch-size", 1, "interval", 2,  nullptr);
  g_object_set(fa, "sync", FALSE, "async", FALSE, nullptr);

  g_object_set(qb, "max-size-buffers", 2,
                   "max-size-bytes", 0,
                   "max-size-time", (guint64)0,
                   "leaky", 2,
                   nullptr);
  g_object_set(pay, "config-interval", 1, "pt", 96, "mtu", 1400, nullptr);
  int udp_port = 5400;
  g_object_set(udp, "host", "127.0.0.1", "port", udp_port,
                    "sync", FALSE, "async", FALSE,
                    "qos", TRUE,
                    "buffer-size", 65536,
                    nullptr);

  gst_bin_add_many(GST_BIN(pipe), src, depay, parse, capsf, tee,
                                  qa, dec, mux, infer, dmx, qa2, fa,
                                  qb, pay, udp, nullptr);

  if (!gst_element_link_many(depay, parse, capsf, tee, nullptr)) {
    g_printerr("link depay->parse->caps->tee failed\n"); return 1;
  }
  if (!gst_element_link(tee, qa) || !gst_element_link(qa, dec)) {
    g_printerr("link branch-A pre-mux failed\n"); return 1;
  }
  GstPad* dec_src  = gst_element_get_static_pad(dec, "src");
  //GstPad* mux_sink = gst_element_request_pad_simple(mux, "sink_0");
  GstPad* mux_sink = gst_element_get_request_pad(mux, "sink_0");
  if (!dec_src || !mux_sink ||
      gst_pad_link(dec_src, mux_sink) != GST_PAD_LINK_OK) {
    g_printerr("dec.src -> mux.sink_0 failed\n"); return 1;
  }
  gst_object_unref(dec_src); gst_object_unref(mux_sink);
  if (!gst_element_link_many(mux, infer, dmx, nullptr)) {
    g_printerr("link mux->infer->dmx failed\n"); return 1;
  }
  //GstPad* dmx_src  = gst_element_request_pad_simple(dmx, "src_0");
  GstPad* dmx_src  = gst_element_get_request_pad(dmx, "src_0");
  GstPad* qa2_sink = gst_element_get_static_pad(qa2, "sink");
  if (!dmx_src || !qa2_sink ||
      gst_pad_link(dmx_src, qa2_sink) != GST_PAD_LINK_OK) {
    g_printerr("dmx.src_0 -> qa2 failed\n"); return 1;
  }
  gst_object_unref(dmx_src); gst_object_unref(qa2_sink);
  if (!gst_element_link(qa2, fa)) {
    g_printerr("link qa2 -> fa failed\n"); return 1;
  }
  if (!gst_element_link_many(tee, qb, pay, udp, nullptr)) {
    g_printerr("link branch-B failed\n"); return 1;
  }

  g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added), depay);

  GstPad* p = gst_element_get_static_pad(qa2, "sink");
  gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, nvinfer_src_probe, nullptr, nullptr);
  gst_object_unref(p);

  p = gst_element_get_static_pad(pay, "sink");
  gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER,
                    sei_inject_probe, GINT_TO_POINTER(A.wait_ms), nullptr);
  gst_object_unref(p);

  if (gst_element_set_state(pipe, GST_STATE_PLAYING)
      == GST_STATE_CHANGE_FAILURE) {
    g_printerr("pipeline start failed\n"); return 1;
  }

    // ==================== 启动 RTSP 服务器,将本地 UDP 流转为 RTSP 
  char rtsp_launch[512];
  /*snprintf(rtsp_launch, sizeof(rtsp_launch),
      "( udpsrc port=%d buffer-size=131072 caps=\"application/x-rtp,media=video,clock-rate=90000,encoding-name=H264,payload=96\" )",
      udp_port);*/
  snprintf(rtsp_launch, sizeof(rtsp_launch),
    "( udpsrc port=%d caps=\"application/x-rtp,media=video,encoding-name=H264\" ! "
    "rtph264depay ! h264parse ! rtph264pay name=pay0 pt=96 config-interval=1 )",
    udp_port);

  GstRTSPServer *rtsp_server = gst_rtsp_server_new();
  g_object_set(rtsp_server, "address", A.rtsp_listen,
                            "service", std::to_string(A.rtsp_port).c_str(),
                            NULL);
  GstRTSPMountPoints *mounts = gst_rtsp_server_get_mount_points(rtsp_server);
  GstRTSPMediaFactory *rtsp_factory = gst_rtsp_media_factory_new();
  gst_rtsp_media_factory_set_launch(rtsp_factory, rtsp_launch);
  gst_rtsp_media_factory_set_shared(rtsp_factory, TRUE);
  gst_rtsp_mount_points_add_factory(mounts, "/aibox", rtsp_factory);
  g_object_unref(mounts);
  if (gst_rtsp_server_attach(rtsp_server, NULL) == 0) {
    g_printerr("Failed to attach RTSP server\n");
    return 1;
  }
  g_print("RTSP server running at rtsp://%s:%d/aibox\n", A.rtsp_listen, A.rtsp_port);
  g_print("Pipeline started, forwarding to local UDP port %d\n", udp_port);

  GMainLoop* loop = g_main_loop_new(nullptr, FALSE);
  GstBus* bus = gst_element_get_bus(pipe);
  gst_bus_add_watch(bus, bus_cb, loop);
  g_object_unref(bus);

  g_timeout_add_seconds(2, stats_tick, nullptr);

  g_main_loop_run(loop);

  gst_element_set_state(pipe, GST_STATE_NULL);
  g_object_unref(pipe);
  g_object_unref(rtsp_server);
  g_main_loop_unref(loop);
  return 0;
}

1.How did you derive the end-to-end latency data? Was it based on the clock display transmitted via the camera feed, or via SEI timestamps? I consider other measurement methods to be inaccurate.
2. How did you compare the latency measurements? Were these latency measurements conducted within the same network environment at same time ? This program involves no encoding operations; therefore, nvv4l2h264enc will provide no benefit.
3. With a latency of 50ms for rtspsrc, network fluctuations can lead to issues such as screen corruption and jitter.

Refer to this FAQ to determine the latency introduced by gstreamer/ds element; however, I believe that 800ms is currently a rather not bad result.

Hi,junshengy:

Thank you for your detailed response and the FAQ link.

My latency measurement method:
I ran a millisecond‑resolution stopwatch on a PC monitor, pointed a camera at the monitor, and captured the camera’s RTSP stream through Orin Nano. On the same PC, I used ffplay to pull the forwarded RTSP stream from Orin Nano. I then compared the timestamp displayed on the camera feed (stopwatch) with the timestamp shown in the playback window. The difference was approximately 733 ms(Running Deepstream on Orin NX can reduce latency to 300ms).

Question 1:
In my current pipeline (based on DeepStream, without any encoder, e.g., no x264enc), why is the latency still very high (733 ms)? Moreover, the video stream shows stuttering and mosaic artifacts, even though no encoding is performed. What could be the root cause?

Question 2:
I also implemented a test program that adds a software encoder (x264enc). The latency is around 700 ms (similar to the no‑encode case), but the video is smooth without any artifacts. Does re‑encoding improve video quality and stability? If so, why would re‑encoding help when the original stream is already H.264?

Thank you very much for your guidance. Any suggestions to reduce latency and eliminate artifacts in the no‑encode case would be greatly appreciated.

Best regards.

Are you using sei_inject_demo to measure latency on both devices? If the comparison is not performed using the same program, the data will differ.

Regarding Issues 1 and 2, I suspect they stem from the same underlying problem.

The latency property of rtspsrc is set too low, and the sync property of udpsink is set to false; this causes data packets to be consumed as quickly as possible, rendering the system unable to effectively mitigate network jitter.

Re-encoding will not result in any improvement in video quality; however, since re-encoding introduces latency into the pipeline, it causes buffers to accumulate in upstream elements—which actually improves the system’s resilience against jitter.

Finally, please try configuring rtspsrc to use the TCP protocol.

Thank you for your detailed analysis.

I have performed additional tests based on your suggestions:

  1. UDP direct forwarding (without any RTSP server):

End‑to‑end latency measured on the PC is ~133 ms.

Artifacts are greatly reduced, but not completely eliminated – occasional minor tearing/mosaics still appear.

This confirms that the forwarding pipeline itself (rtspsrc → rtph264depay → rtph264pay → udpsink) is not the main bottleneck.

  1. RTSP server induced latency:

When I add the RTSP server component (udpsrc → rtph264depay → h264parse → rtph264pay), the latency increases dramatically to >700 ms and the video becomes choppy.

Requirement:

The stream must be accessible by multiple clients simultaneously via RTSP/RTMP (UDP only is not acceptable for production).

I need to reduce the RTSP server latency while keeping the necessary elements for SEI injection.

Could you please provide specific recommendations to optimize the RTSP server launch string? For example:

Critical properties for udpsrc (e.g., buffer-size, do-timestamp) to reduce buffering.

Proper configuration of h264parse and rtph264pay to minimize added latency while preserving the ability to insert SEI via pad probe.

Any other pipeline architecture that allows low‑latency multi‑client streaming with SEI injection (e.g., using rtspclientsink with an external lightweight RTSP server like Mediamtx).

Thank you for your guidance.

sei_inject_demo.cpp (22.8 KB)

I compared the latency of the UDP sink and the RTSP sink; in my network environment, the latency is around 200 ms, with the RTSP sink adding about 20 ms compared to the UDP sink.

My input is 1280x720 with an IDR interval of 30, using a Baseline profile RTSP stream. With the command-line arguments below, I set the nvinfer interval to 99999; this effectively skips the inference operation, yet the streammux/demux processes still occur.

./sei_inject_demo -r rtsp://xxxx -c config_infer_primary.local.txt -o udp -u xxx  -P 5400 -W 1280 -H 720 -d 200 -w 100 -I 999999

./sei_inject_demo -r rtsp://xxxx -c config_infer_primary.local.txt -o rtsp -h 0.0.0.0 -p 8555 -m /sei -W 1280 -H 720 -d 200 -w 100 -I 999999

I insert an SEI NALU containing a timestamp into the input source and parse the SEI at the playback end to measure latency.

Your pipeline is as follows; the player’s buffering for UDP/RTSP may result in varying latency.

rtsp in --> forward --> udp/rtsp sink --> player

Try adjusting the GOP size and player parameters for your RTSP camera, as well as the RTSP server API settings below.

gst_rtsp_media_factory_set_buffer_size(factory, 65536);
gst_rtsp_media_factory_set_latency(factory, 0);
gst_rtsp_media_factory_set_do_retransmission(factory, FALSE);
gst_rtsp_media_factory_set_retransmission_time(factory, 0);

Introducing rtspclientsink and Mediamtx will only result in higher latency, as the Mediamtx server introduces additional buffering.

Hi,junshengy:

Thank you very much for your detailed analysis and the updated test program.

I have run the program with the parameters you provided. The end-to-end latency indeed dropped to around 220 ms, which is a significant improvement. However, the video now suffers from severe stuttering (frame drops) and heavy artifacts (blocking / mosaics), making it unusable for real‑time monitoring.

Here is my PC command:

gst-launch-1.0 rtspsrc location=rtsp://192.168.11.1:8554/aibox latency=0 ! rtph264depay ! avdec_h264 ! timeoverlay ! videoconvert ! autovideosink sync=false

Could you please advise on how to improve the video quality while maintaining low latency? Specifically:

  1. What could be the main causes of such stuttering and artifacts?
  2. Are there any additional GStreamer/DeepStream parameters that can reduce artifacts without increasing latency significantly?
  3. In your test environment, what video quality (bitrate, frame rate, resolution) and network conditions allowed you to achieve smooth 200 ms latency? Do you have a sample configuration or gst-launch command that demonstrates both low latency and good visual quality?

Any specific guidance would be greatly appreciated. Thank you again for your support.

Try to set it to true.

As I have repeatedly explained above, if the sink consumes the buffer too quickly, rtspsrc may drop packets, resulting in stuttering and mosaic.

You need to adjust this based on network conditions and RTSP camera parameters, balancing latency and jitter resistance.

I am using FFmpeg publish a 1280x720 stream to SRS, simulating the behavior of an RTSP camera; here are my streaming parameters.

-c:v libx264 -g 30 -keyint_min 30 -preset ultrafast -tune zerolatency -profile:v baseline

hi,junshengy:
Thank you for your previous guidance. I have made significant changes to my implementation based on your suggestions and my own testing, and I would like to share the updates and the remaining issues I am facing.

I made some modifications:

  1. I switched the inference plugin from NVNFER to NVTracker and used a low-level tracker library with its own configuration file.
  2. Modify the SEI injection strategy. SEI insertion no longer waits for inference results, switch to asynchronous insertion.

However I have observed two critical issues through detailed logging that I need help understanding.

  1. Both the forwarding branch and the inference branch process frames in the form of two pulse trains, rather than a stable 33 millisecond rhythm. I have added monotonic timestamp logs in both sei_inject_combe and tracker_strc_combe. The log always shows that two frames are processed within a few milliseconds, followed by an interval of approximately 66 milliseconds before the next burst. For example, sei_inject_debe may process one frame and then process the next frame three milliseconds later, which is far from the expected 33 millisecond interval. I want to understand why GStreamer processes frames in this burst mode instead of smoothing them one after another.
  2. The second and more critical issue is that the same PTS frame arrives at sei_inject_probe approximately fifty to seventy milliseconds earlier than it arrives at tracker_src_probe.
[sei_inject_probe] pts:1689339161  mono=93430589850
[sei_inject_probe] pts:1719795555  mono=93430586603   ← only 3.3ms later!
[tracker_src_probe] pts:1689339161 mono=93430641369   ← 51.5ms after sei
[tracker_src_probe] pts:1719795555 mono=93430657543   ← 70.9ms after sei
  1. Can I directly delete udpsink and use RTSP server in the forwarding link?Just like:
 queue (delay)  -> [SEI prepend probe] -> h264parse -> rtph264pay -> rtsp server

To diagnose this I added timing probes at the nvstreammux sink pad input and at the qa2 sink pad which is the output of nvstreamdemux. The inference pipeline itself from mux through tracker to demux takes only about nineteen milliseconds. Since the total observed delay between sei and tracker for the same PTS is fifty to seventy milliseconds, the additional thirty to fifty milliseconds must come from somewhere before the inference pipeline starts.

My ultimate goal is to achieve inference lagging behind forwarding by at most one frame, so that MetaCache can perform accurate frame-level SEI injection. Currently the two to three frame gap makes this impossible. I would like recommendations on how to configure nvstreammux, nvv4l2decoder, and the queue elements to minimize the inference branch latency and eliminate the burst pattern.

Here is my code:

// SPDX-License-Identifier: MIT
//
// sei_inject_demo
//
// Run primary inference on Jetson with DeepStream and embed each frame's
// detection results as an H.264 user_data_unregistered SEI NAL into the
// **original** elementary stream, then publish RTP/H264 through udpsink. In RTSP
// mode the RTSP server reads that local RTP stream back with udpsrc. The original
// H.264 is never decoded for forwarding and never re-encoded.
//
// Pipeline:
//
//   rtspsrc -> rtph264depay -> h264parse -> capsfilter -> tee
//                                                          |-> queue (leaky) -> nvv4l2decoder -> nvstreammux -> nvinfer -> nvstreamdemux -> queue -> [meta-extract probe] -> fakesink
//                                                          |-> queue (delay)  -> [SEI prepend probe] -> h264parse -> rtph264pay -> udpsink
//                                                                                                                     |-> receiver UDP (-o udp)
//                                                                                                                     \-> 127.0.0.1:<port> -> RTSP server udpsrc (-o rtsp)
//
// The inference branch publishes detections (keyed by frame PTS) into a
// thread-safe cache. The forwarding branch waits up to N ms for the matching
// PTS, builds an Annex-B SEI NAL, and prepends it to the original H.264
// access unit before RTP packetization.

#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include <nvdsmeta.h>
#include <gstnvdsmeta.h>

#include <atomic>
#include <chrono>
#include <condition_variable>
#include <cstdlib>
#include <cstdint>
#include <cstdio>
#include <cstring>
#include <mutex>
#include <optional>
#include <string>
#include <map>
#include <vector>
#include <sys/ioctl.h>

//#define USE_NVINFER 1

// ---------------------------------------------------------------------------
// Detection cache: PTS -> detections, thread-safe with timed wait
// ---------------------------------------------------------------------------
struct Detection {
  uint16_t class_id;
  float    confidence;
  uint16_t x, y, w, h;
  uint64_t tracking_id;  // MOT/SOT object_id from nvtracker
};

using DetectionMap = std::map<uint64_t, std::vector<Detection>>;
using DetectionNode = DetectionMap::node_type;


uint64_t get_monotonic_time_us() {
    struct timespec mono_ts;

    if (clock_gettime(CLOCK_MONOTONIC, &mono_ts) < 0) {
        return 0;
    }

    return mono_ts.tv_sec * 1000000 + mono_ts.tv_nsec / 1000;
}

// nvtracker low-level path: confidence may be -0.1; real score in tracker_confidence.
static float object_confidence(const NvDsObjectMeta* om) {
  if (om->confidence >= 0.f && om->confidence <= 1.f)
    return om->confidence;
  if (om->tracker_confidence >= 0.f && om->tracker_confidence <= 1.f)
    return om->tracker_confidence;
  if (om->tracker_confidence > 1e-6f)
    return om->tracker_confidence;
  return om->confidence;
}

// Hash table — keys are PTS, no ordering needed.  put() takes the detection
// vector by rvalue ref so a caller's std::move(dets) binds directly without
// going through a by-value parameter copy.  insert_or_assign performs a
// single move into the node.  wait_pop uses extract() to splice the node
// out without a second find + copy.
class MetaCache {
 public:
  void put(uint64_t pts, std::vector<Detection> d) {
    std::lock_guard<std::mutex> lk(m_mtx);
    //如果保存的结果超过15帧,删除pts最小的结果
    while (m_cache.size() >= m_max_queue) {
      g_print("[MetaCache] queue full (%zu/15), drop oldest pts=%lu\n",
              m_cache.size(), m_cache.begin()->first);
      m_cache.erase(m_cache.begin());
    }
    //g_print("[%s:%d]nvtracker pts:%lu, det:%d, cache size:%d\n", __FUNCTION__, __LINE__, pts, d.size(), m_cache.size());
    m_cache[pts] = std::move(d);
    while (m_cache.size() > m_max_obj) m_cache.erase(m_cache.begin()); // bound memory
  }

  std::optional<DetectionNode> pop(uint64_t pts) {
    std::unique_lock<std::mutex> lk(m_mtx);
    if (m_cache.empty()) {
      return std::nullopt;
    }
    //
    auto it = m_cache.begin();
    while(it != m_cache.end()) {
      uint64_t cached_pts = it->first;
      int64_t diff = static_cast<int64_t>(pts) - static_cast<int64_t>(cached_pts);

      g_print("[MetaCache] stream_pts=%lu, cached_pts=%lu,  diff=%ld us\n",
              pts, cached_pts, diff / 1000);

      if (std::abs(diff) > m_timeout) {
        // 超时,丢弃这帧老数据
        g_print("[MetaCache] timeout drop: pts=%lu diff=%lu ms > 500ms\n",
                cached_pts, std::abs(diff / 1000000));
        it = m_cache.erase(it);  // erase 返回下一个迭代器
        continue;
      }

      // 匹配成功,取出并删除
      //auto result = std::move(it->second);
      auto result = m_cache.extract(it);
      /*g_print("[MetaCache] matched: pts=%lu obj=%zu remaining=%zu\n",
              result.key(), result.mapped().size(), m_cache.size() - 1);*/
      
      return  std::optional<DetectionNode>(std::move(result));
    }

    g_print("[MetaCache] no match within 500ms for pts=%lu\n", pts);
    return std::nullopt;
  }

  bool empty() {
    std::unique_lock<std::mutex> lk(m_mtx);
    return m_cache.empty();
  }

 private:
  static constexpr size_t m_max_obj = 128;
  static constexpr int64_t m_max_queue = 15;
  static constexpr int64_t m_timeout = 500 * 1000000LL;
  std::mutex m_mtx;
  std::map<uint64_t, std::vector<Detection>> m_cache;
};

static MetaCache g_cache;
static std::atomic<uint64_t> g_n_inserted{0};
static std::atomic<uint64_t> g_n_passthrough{0};
static std::atomic<uint64_t> g_n_inference{0};
static std::atomic<uint64_t> g_n_udp_packets{0};

static bool is_rtsp_output(const std::string& mode) { return mode == "rtsp"; }
static bool is_udp_output(const std::string& mode) { return mode == "udp"; }

// ---------------------------------------------------------------------------
// SEI builder (Annex-B, user_data_unregistered, payload_type = 5)
// ---------------------------------------------------------------------------
static const uint8_t SEI_UUID[16] = {
    0xde, 0xad, 0xbe, 0xef, 0x12, 0x34, 0x56, 0x78,
    0x9a, 0xbc, 0xde, 0xf0, 0x11, 0x22, 0x33, 0x44};

static void put_be16(std::vector<uint8_t>& v, uint16_t x) {
  v.push_back(uint8_t(x >> 8)); v.push_back(uint8_t(x));
}
static void put_be32(std::vector<uint8_t>& v, uint32_t x) {
  v.push_back(uint8_t(x >> 24)); v.push_back(uint8_t(x >> 16));
  v.push_back(uint8_t(x >>  8)); v.push_back(uint8_t(x));
}
static void put_f32(std::vector<uint8_t>& v, float f) {
  uint32_t u; std::memcpy(&u, &f, 4); put_be32(v, u);
}

// Insert 0x03 after any "00 00 [<=03]" pair so the SEI body cannot fake a
// start code inside the bitstream.
static std::vector<uint8_t> emul_prev(const std::vector<uint8_t>& in) {
  std::vector<uint8_t> out;
  out.reserve(in.size() + in.size() / 8);
  int z = 0;
  for (uint8_t b : in) {
    if (z >= 2 && b <= 0x03) { out.push_back(0x03); z = 0; }
    out.push_back(b);
    z = (b == 0) ? z + 1 : 0;
  }
  return out;
}

// Annex-B SEI NAL = [00 00 00 01][06][05][size_VLI][UUID + payload (with EPB)][80]
static std::vector<uint8_t>
build_sei_annexb(const std::vector<Detection>& dets, uint64_t pts_ns) {
  std::vector<uint8_t> rbsp;
  rbsp.insert(rbsp.end(), SEI_UUID, SEI_UUID + 16);
  put_be32(rbsp, uint32_t(pts_ns >> 32));
  put_be32(rbsp, uint32_t(pts_ns));
  put_be32(rbsp, uint32_t(dets.size()));
  for (const auto& d : dets) {
    put_be16(rbsp, d.class_id);
    put_f32 (rbsp, d.confidence);
    put_be16(rbsp, d.x); put_be16(rbsp, d.y);
    put_be16(rbsp, d.w); put_be16(rbsp, d.h);
  }
  auto epb = emul_prev(rbsp);

  std::vector<uint8_t> nal;
  nal.insert(nal.end(), {0x00, 0x00, 0x00, 0x01}); // start code
  nal.push_back(0x06);                             // NAL header: SEI
  nal.push_back(0x05);                             // payload_type = 5
  size_t sz = rbsp.size();                         // payload_size is RBSP bytes, excluding EPB bytes
  while (sz >= 255) { nal.push_back(0xFF); sz -= 255; }
  nal.push_back(uint8_t(sz));                      // payload_size
  nal.insert(nal.end(), epb.begin(), epb.end());   // payload
  nal.push_back(0x80);                             // RBSP trailing bits
  return nal;
}

static GstPadProbeReturn
mux_sink_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
  auto* buf = static_cast<GstBuffer*>(info->data);
  uint64_t pts = uint64_t(GST_BUFFER_PTS(buf));

  //g_print("[%s:%d:%lu]mux input pts:%lu\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), pts);
  /*std::lock_guard<std::mutex> lk(g_mux_time_mtx);
  g_mux_in_times[pts] = now;

  // 容量保护,防止内存无限增长(异常时 pts 匹配不上)
  while (g_mux_in_times.size() > 64) {
      g_mux_in_times.erase(g_mux_in_times.begin());
  }*/

  return GST_PAD_PROBE_OK;
}

// ---------------------------------------------------------------------------
// Probes
// ---------------------------------------------------------------------------

// On the demuxed src pad (after nvstreamdemux). GST_BUFFER_PTS here is the
// original input PTS, which matches branch B's H.264 buffers exactly.
// Important: we *cannot* key the cache with frame_meta->buf_pts when nvstreammux
// runs with live-source=1, because the muxer rewrites the outgoing batched
// PTS from its own clock. nvstreamdemux restores the per-source PTS.
#if USE_NVINFER
static GstPadProbeReturn
nvinfer_src_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
  auto* buf = static_cast<GstBuffer*>(info->data);
  NvDsBatchMeta* batch = gst_buffer_get_nvds_batch_meta(buf);
  if (!batch) return GST_PAD_PROBE_OK;

  uint64_t key_pts = uint64_t(GST_BUFFER_PTS(buf));

  for (NvDsMetaList* lf = batch->frame_meta_list; lf; lf = lf->next) {
    auto* fm = static_cast<NvDsFrameMeta*>(lf->data);
    std::vector<Detection> dets;
    for (NvDsMetaList* lo = fm->obj_meta_list; lo; lo = lo->next) {
      auto* om = static_cast<NvDsObjectMeta*>(lo->data);
      Detection d;
      d.class_id   = uint16_t(om->class_id);
      d.confidence = om->confidence;
      d.x = uint16_t(om->rect_params.left);
      d.y = uint16_t(om->rect_params.top);
      d.w = uint16_t(om->rect_params.width);
      d.h = uint16_t(om->rect_params.height);
      dets.push_back(d);
    }
    if (dets.size() > 0) {
      g_cache.put(key_pts, std::move(dets));
      g_n_inference.fetch_add(1, std::memory_order_relaxed);
    }
  }
  return GST_PAD_PROBE_OK;
}
#else
static GstPadProbeReturn
tracker_src_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
  auto* buf = static_cast<GstBuffer*>(info->data);
  NvDsBatchMeta* batch = gst_buffer_get_nvds_batch_meta(buf);
  if (!batch) return GST_PAD_PROBE_OK;

  uint64_t key_pts = uint64_t(GST_BUFFER_PTS(buf)); //unit:ns
  g_print("[%s:%d:%lu]tracker pts:%lu\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), key_pts);
  for (NvDsMetaList* lf = batch->frame_meta_list; lf; lf = lf->next) {
    auto* fm = static_cast<NvDsFrameMeta*>(lf->data);
    std::vector<Detection> dets;
    for (NvDsMetaList* lo = fm->obj_meta_list; lo; lo = lo->next) {
      auto* om = static_cast<NvDsObjectMeta*>(lo->data);
      Detection d;
      d.class_id    = uint16_t(om->class_id);
      d.confidence  = object_confidence(om);
      d.x = uint16_t(om->rect_params.left);
      d.y = uint16_t(om->rect_params.top);
      d.w = uint16_t(om->rect_params.width);
      d.h = uint16_t(om->rect_params.height);
      d.tracking_id = om->object_id;
      dets.push_back(d);
    }
    if (dets.size() > 0) {
      g_cache.put(key_pts, std::move(dets));
      g_n_inference.fetch_add(1, std::memory_order_relaxed);
    }
  }

  return GST_PAD_PROBE_OK;
}
#endif
static std::vector<uint8_t> build_sei_annexb_hello() {
  const std::string hello = "Hello World";
  const uint8_t test_uuid[16] = {
    0xaa, 0xbb, 0xcc, 0xdd, 0xee, 0xff, 0x00, 0x11,
    0x22, 0x33, 0x44, 0x55, 0x66, 0x77, 0x88, 0x99
  };
  std::vector<uint8_t> rbsp;
  rbsp.insert(rbsp.end(), test_uuid, test_uuid + 16);
  rbsp.insert(rbsp.end(), hello.begin(), hello.end());

  uint8_t *pbuf = rbsp.data();
  /*g_print("send data(%zu):", rbsp.size());
  for (int i = 0; i < rbsp.size(); i++) {
    g_print("%02X ", pbuf[i]);
  }
  g_print("\n\n");*/
  auto epb = emul_prev(rbsp);
  std::vector<uint8_t> nal;
  nal.insert(nal.end(), {0x00, 0x00, 0x00, 0x01}); // start code
  nal.push_back(0x06);                             // NAL type: SEI
  nal.push_back(0x05);                             // payload_type = 5 (user_data_unregistered)
  size_t sz = epb.size();
  while (sz >= 255) { nal.push_back(0xFF); sz -= 255; }
  nal.push_back(uint8_t(sz));
  nal.insert(nal.end(), epb.begin(), epb.end());
  nal.push_back(0x80);                             // RBSP trailing bits
  return nal;
}
// On forwarding branch sink pad: prepend the SEI NAL to the original H.264 buffer.
// The original buffer is shared with branch A via tee (refcounted), so we must
// build a *new* GstBuffer rather than mutating in place.
#if 1
static GstPadProbeReturn
sei_inject_probe(GstPad*, GstPadProbeInfo* info, gpointer user_data) {
  GstBuffer* orig = GST_PAD_PROBE_INFO_BUFFER(info);
  if (!orig) return GST_PAD_PROBE_OK;

  uint64_t pts = uint64_t(GST_BUFFER_PTS(orig));
  g_print("[%s:%d:%lu]sei pts:%lu\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), pts);
  if (g_cache.empty()) {
    g_n_passthrough.fetch_add(1, std::memory_order_relaxed);
    return GST_PAD_PROBE_OK; // forward unchanged
  }

  auto track_result = g_cache.pop(pts);
  if (!track_result) {
    return GST_PAD_PROBE_OK;
  }

  uint64_t result_pts = track_result->key();
  std::vector<Detection> meta = std::move(track_result->mapped());
  //g_print("[%s:%d:%lu]sei pts:%lu(%lu)\n", __FUNCTION__, __LINE__, get_monotonic_time_us(), pts, result_pts);
  /*g_print("meta count:%d\n", meta->size());
  for (auto &dd : *meta) {
    g_print("class_id:%u,conf:%f, rect:[%u %u %u %u], track:%lu\n",
      dd.class_id, dd.confidence, dd.x, dd.y, dd.w, dd.h, dd.tracking_id);
  }*/
  auto sei = build_sei_annexb(meta, pts);
  gsize osz = gst_buffer_get_size(orig);

  GstBuffer* nb = gst_buffer_new_allocate(nullptr, sei.size() + osz, nullptr);
  GstMapInfo mi;
  gst_buffer_map(nb, &mi, GST_MAP_WRITE);
  std::memcpy(mi.data, sei.data(), sei.size());
  gst_buffer_extract(orig, 0, mi.data + sei.size(), osz);
  gst_buffer_unmap(nb, &mi);

  GST_BUFFER_PTS(nb)      = GST_BUFFER_PTS(orig);
  GST_BUFFER_DTS(nb)      = GST_BUFFER_DTS(orig);
  GST_BUFFER_DURATION(nb) = GST_BUFFER_DURATION(orig);
  GST_BUFFER_FLAGS(nb)    = GST_BUFFER_FLAGS(orig);

  info->data = nb;
  gst_buffer_unref(orig);
  g_n_inserted.fetch_add(1, std::memory_order_relaxed);
  return GST_PAD_PROBE_OK;
}
#else
static GstPadProbeReturn
sei_inject_probe(GstPad*, GstPadProbeInfo* info, gpointer user_data) {
  GstBuffer* orig = GST_PAD_PROBE_INFO_BUFFER(info);
  if (!orig) return GST_PAD_PROBE_OK;

  // 构建一个包含 "Hello World" 字符串的 SEI NAL(user_data_unregistered)
  std::vector<uint8_t> sei = build_sei_annexb_hello();
  if (sei.empty()) {
    return GST_PAD_PROBE_OK;
  }

  gsize osz = gst_buffer_get_size(orig);
  GstBuffer* nb = gst_buffer_new_allocate(nullptr, sei.size() + osz, nullptr);
  GstMapInfo mi;
  gst_buffer_map(nb, &mi, GST_MAP_WRITE);
  std::memcpy(mi.data, sei.data(), sei.size());
  gst_buffer_extract(orig, 0, mi.data + sei.size(), osz);
  gst_buffer_unmap(nb, &mi);

  GST_BUFFER_PTS(nb)      = GST_BUFFER_PTS(orig);
  GST_BUFFER_DTS(nb)      = GST_BUFFER_DTS(orig);
  GST_BUFFER_DURATION(nb) = GST_BUFFER_DURATION(orig);
  GST_BUFFER_FLAGS(nb)    = GST_BUFFER_FLAGS(orig);

  info->data = nb;
  gst_buffer_unref(orig);

  return GST_PAD_PROBE_OK;
}
#endif
// ---------------------------------------------------------------------------
// rtspsrc dynamic pad -> rtph264depay
// ---------------------------------------------------------------------------
static void on_rtsp_pad_added(GstElement*, GstPad* pad, gpointer user) {
  auto* depay = static_cast<GstElement*>(user);
  GstPad* sink = gst_element_get_static_pad(depay, "sink");
  if (!gst_pad_is_linked(sink)) {
    GstCaps* caps = gst_pad_get_current_caps(pad);
    if (!caps) caps = gst_pad_query_caps(pad, nullptr);
    if (caps) {
      const gchar* enc = gst_structure_get_string(
          gst_caps_get_structure(caps, 0), "encoding-name");
      g_print("[rtspsrc] new pad encoding-name=%s\n", enc ? enc : "(null)");
      if (enc && g_str_equal(enc, "H264")) gst_pad_link(pad, sink);
      gst_caps_unref(caps);
    }
  }
  gst_object_unref(sink);
}

static GstPadProbeReturn udp_packet_probe(GstPad*, GstPadProbeInfo* info, gpointer) {
  if (GST_PAD_PROBE_INFO_BUFFER(info)) {
    g_n_udp_packets.fetch_add(1, std::memory_order_relaxed);
  }
  return GST_PAD_PROBE_OK;
}

// ---------------------------------------------------------------------------
// Periodic stats / bus
// ---------------------------------------------------------------------------
static gboolean stats_tick(gpointer) {
  g_print("[stats] inference=%lu sei_inserted=%lu passthrough=%lu udp_packets=%lu\n",
          (unsigned long)g_n_inference.load(),
          (unsigned long)g_n_inserted.load(),
          (unsigned long)g_n_passthrough.load(),
          (unsigned long)g_n_udp_packets.load());
  return TRUE;
}

static gboolean bus_cb(GstBus*, GstMessage* msg, gpointer loop) {
  switch (GST_MESSAGE_TYPE(msg)) {
    case GST_MESSAGE_EOS:
      g_print("[bus] EOS\n");
      g_main_loop_quit(static_cast<GMainLoop*>(loop));
      break;
    case GST_MESSAGE_ERROR: {
      GError* e = nullptr; gchar* d = nullptr;
      gst_message_parse_error(msg, &e, &d);
      g_printerr("[bus] ERROR: %s | %s\n", e->message, d ? d : "");
      g_error_free(e); g_free(d);
      g_main_loop_quit(static_cast<GMainLoop*>(loop));
      break;
    }
    case GST_MESSAGE_WARNING: {
      GError* e = nullptr; gchar* d = nullptr;
      gst_message_parse_warning(msg, &e, &d);
      g_printerr("[bus] WARN: %s | %s\n", e->message, d ? d : "");
      g_error_free(e); g_free(d);
      break;
    }
    default: break;
  }
  return TRUE;
}

// ---------------------------------------------------------------------------
// CLI
// ---------------------------------------------------------------------------
struct Args {
  const char* rtsp_url    = "rtsp://192.168.0.13:554/aibox_transfer_fpv";
  const char* pgie_cfg    = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/model/det/0/config_infer_primary_yolo11.txt";
  const char* ll_lib_file   = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/libs/libByteTracker.so";
  const char* ll_config_file = "/vendor_app/bin/output/chcnav_algo/fvs_ai_core/config/config_sot.yml";
  const char* output_mode = "rtsp";
  const char* out_host    = "192.168.11.1";
  int         out_port    = 8554;
  const char* out_mount   = "/aibox";
  const char* udp_host    = "127.0.0.1";
  int         udp_port    = 5400;
  int         width       = 1280;
  int         height      = 720;
  int         tracker_width  = 960;
  int         tracker_height = 544;
  int         infer_interval = 0; // nvinfer interval; a large value skips most inference batches.
};

static void usage(const char* a0) {
  g_printerr(
    "usage: %s [-r input_rtsp_url] [-c pgie_cfg] [-o udp|rtsp]\n"
    "          [-u udp_host] [-P udp_port]\n"
    "          [-h rtsp_bind_host] [-p rtsp_port] [-m rtsp_mount]\n"
    "          [-W width] [-H height] [-I nvinfer_interval]\n", a0);
}

int main(int argc, char** argv) {
  std::setvbuf(stdout, nullptr, _IOLBF, 0);
  Args A;
  for (int i = 1; i < argc; ++i) {
    auto next = [&](const char* def)->const char*{ return (i + 1 < argc) ? argv[++i] : def; };
    std::string s = argv[i];
    if      (s == "-r") A.rtsp_url = next(A.rtsp_url);
    //else if (s == "-c") A.pgie_cfg = next(A.pgie_cfg);
    else if (s == "-o") A.output_mode = next(A.output_mode);
    else if (s == "-u") A.udp_host = next(A.udp_host);
    else if (s == "-P") A.udp_port = std::atoi(next("5400"));
    else if (s == "-h") A.out_host = next(A.out_host);
    else if (s == "-p") A.out_port = std::atoi(next("8554"));
    else if (s == "-m") A.out_mount = next(A.out_mount);
    else if (s == "-W") A.width    = std::atoi(next("1280"));
    else if (s == "-H") A.height   = std::atoi(next("720"));
    else if (s == "-I") A.infer_interval = std::atoi(next("0"));
    else if (s == "--help") { usage(argv[0]); return 0; }
    else { usage(argv[0]); return 1; }
  }

  std::string output_mode = A.output_mode;
  if (!is_udp_output(output_mode) && !is_rtsp_output(output_mode)) {
    g_printerr("invalid output mode: %s\n", A.output_mode);
    usage(argv[0]);
    return 1;
  }

    if (A.infer_interval < 0) A.infer_interval = 0;

    g_print("[cfg] input=%s\n      ll_config_file=%s\n      mode=%s wh=%dx%d nvinfer_interval=%d\n",
      A.rtsp_url, A.ll_config_file, A.output_mode, A.width, A.height,
      A.infer_interval);
  if (is_udp_output(output_mode)) {
    g_print("      output=rtp://%s:%d payload=H264 pt=96\n", A.udp_host, A.udp_port);
  } else {
    g_print("      output=rtsp://%s:%d%s\n", A.out_host, A.out_port, A.out_mount);
    g_print("      bridge=rtp://127.0.0.1:%d payload=H264 pt=96\n", A.udp_port);
  }

  gst_init(&argc, &argv);

  GstRTSPServer* rtsp_server = nullptr;
  guint rtsp_source_id = 0;

  rtsp_server = gst_rtsp_server_new();
  std::string out_service = std::to_string(A.out_port);
  g_object_set(rtsp_server, "address", A.out_host,
                            "service", out_service.c_str(),
                            nullptr);

  GstRTSPMountPoints* mounts = gst_rtsp_server_get_mount_points(rtsp_server);
  GstRTSPMediaFactory* factory = gst_rtsp_media_factory_new();
  std::string rtsp_launch =
    "( udpsrc port=" + std::to_string(A.udp_port) +
    " caps=\"application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H264,payload=(int)96\" "
    "! queue leaky=downstream max-size-buffers=120 max-size-bytes=0 max-size-time=0 "
    "! rtph264depay "
    "! h264parse config-interval=-1 "
    "! rtph264pay name=pay0 pt=96 config-interval=-1 mtu=1400 )";
  gst_rtsp_media_factory_set_launch(factory, rtsp_launch.c_str());
  gst_rtsp_media_factory_set_shared(factory, TRUE);
  gst_rtsp_mount_points_add_factory(mounts, A.out_mount, factory);
  g_object_unref(mounts);

  rtsp_source_id = gst_rtsp_server_attach(rtsp_server, nullptr);
  if (!rtsp_source_id) {
    g_printerr("RTSP server attach failed for rtsp://%s:%d%s\n", A.out_host, A.out_port, A.out_mount);
    g_object_unref(rtsp_server);
    return 1;
  }
  g_print("[rtsp] serving rtsp://%s:%d%s from local UDP port %d\n",
          A.out_host, A.out_port, A.out_mount, A.udp_port);


  GstElement* pipe  = gst_pipeline_new("sei-inject");
  GstElement* src   = gst_element_factory_make("rtspsrc",        "src");
  GstElement* depay = gst_element_factory_make("rtph264depay",   "depay");
  GstElement* parse = gst_element_factory_make("h264parse",      "parse");
  GstElement* capsf = gst_element_factory_make("capsfilter",     "bsf");
  GstElement* tee   = gst_element_factory_make("tee",            "t");

  // Branch A = inference: decode + nvinfer + meta extract
  GstElement* qa    = gst_element_factory_make("queue",          "qa");
  GstElement* dec   = gst_element_factory_make("nvv4l2decoder",  "dec");
  GstElement* mux   = gst_element_factory_make("nvstreammux",    "mux");
#if USE_NVINFER
  GstElement* infer = gst_element_factory_make("nvinfer",        "pgie");
#else
  GstElement* tracker = gst_element_factory_make("nvtracker",     "tracker");
#endif
  GstElement* dmx   = gst_element_factory_make("nvstreamdemux",  "dmx");
  GstElement* qa2   = gst_element_factory_make("queue",          "qa2");
  GstElement* fa    = gst_element_factory_make("fakesink",       "fa");

  // Branch B = forwarding: queue with delay -> SEI prepend probe -> parse -> pay -> UDP/RTSP sink
  GstElement* qb    = gst_element_factory_make("queue",          "qb");
  GstElement* out_parse = gst_element_factory_make("h264parse", "udp_parse");
  GstElement* out_pay = gst_element_factory_make("rtph264pay", "udp_pay");
  GstElement* sink  = gst_element_factory_make("udpsink", "udp_sink");

#if USE_NVINFER
  if (!pipe||!src||!depay||!parse||!capsf||!tee||!qa||!dec||!mux
      ||!infer||!dmx||!qa2||!fa||!qb||!out_parse||!out_pay||!sink) {
    g_printerr("element creation failed\n"); return 1;
  }
#else
if (!pipe||!src||!depay||!parse||!capsf||!tee||!qa||!dec||!mux
      ||!tracker||!dmx||!qa2||!fa||!qb||!out_parse||!out_pay||!sink) {
    g_printerr("element creation failed\n"); return 1;
  }
#endif

  g_object_set(src,   "location", A.rtsp_url,
                      "latency",  80,
                      "drop-on-latency", TRUE,
                      "protocols", 4,
                      nullptr);
  g_object_set(parse, "config-interval", -1, nullptr);

  // Force Annex-B byte-stream into both branches so the SEI prepend (which uses
  // 00 00 00 01 start codes) and nvv4l2decoder both consume the same format.
  GstCaps* bs = gst_caps_from_string(
      "video/x-h264,stream-format=byte-stream,alignment=au");
  g_object_set(capsf, "caps", bs, nullptr);
  gst_caps_unref(bs);

  // Keep the tee alive even if branch B's downstream tears down.
  g_object_set(tee, "allow-not-linked", TRUE, nullptr);

  // Branch A: drop oldest decoder input if GPU briefly stalls. This keeps
  // backpressure off branch B (forwarding) at the cost of inferring fewer frames.
  g_object_set(qa, "leaky", 2,
                   "max-size-buffers", 4,
                   "max-size-bytes", 0,
                   "max-size-time", (guint64)0,
                   nullptr);

  g_object_set(mux, "batch-size", 1,
                    "width",  A.width, "height", A.height,
                    "live-source", 1,
                    "batched-push-timeout", 40000, nullptr);
  // nvinfer interval is the number of consecutive batches skipped between
  // inference runs. Use -I 999999 to keep the element in the pipeline while
  // effectively disabling per-frame inference after the initial batch.
#if USE_NVINFER
  g_object_set(infer,
               "config-file-path", A.pgie_cfg,
               "batch-size", 1,
               "interval", (guint)A.infer_interval,
               nullptr);
#else
  g_object_set(tracker,
               "ll-lib-file", A.ll_lib_file,
               "ll-config-file", A.ll_config_file,
               "tracker-width", A.tracker_width,
               "tracker-height", A.tracker_height,
               "gpu-id", 0,
               "compute-hw", 1,
               nullptr);
#endif
  g_object_set(fa, "sync", FALSE, "async", FALSE, nullptr);

  // Branch B: introduce a fixed delay so nvinfer has time to publish meta for
  // the matching PTS before this buffer reaches the SEI inject probe.
  g_object_set(qb, "max-size-buffers", 60,
                   "max-size-bytes", 0,
                   "max-size-time", (guint64)0,
                   nullptr);
  g_object_set(out_parse, "config-interval", -1, nullptr);
  g_object_set(out_pay, "pt", 96,
                        "config-interval", 1,
                        "mtu", 1400,
                        nullptr);
  g_object_set(sink, "host", A.udp_host,
                     "port", A.udp_port,
                     "sync", FALSE,
                     "async", FALSE,
                     "buffer-size", 1048576,
                     nullptr);
#if USE_NVINFER
  gst_bin_add_many(GST_BIN(pipe), src, depay, parse, capsf, tee,
                                  qa, dec, mux, infer, dmx, qa2, fa,
                                  qb, nullptr);
#else
  gst_bin_add_many(GST_BIN(pipe), src, depay, parse, capsf, tee,
                                  qa, dec, mux, tracker, dmx, qa2, fa,
                                  qb, nullptr);
#endif
  gst_bin_add_many(GST_BIN(pipe), out_parse, out_pay, sink, nullptr);

  if (!gst_element_link_many(depay, parse, capsf, tee, nullptr)) {
    g_printerr("link depay->parse->caps->tee failed\n"); return 1;
  }
  if (!gst_element_link(tee, qa) || !gst_element_link(qa, dec)) {
    g_printerr("link branch-A pre-mux failed\n"); return 1;
  }
  GstPad* dec_src  = gst_element_get_static_pad(dec, "src");
  //GstPad* mux_sink = gst_element_request_pad_simple(mux, "sink_0");
   GstPad* mux_sink = gst_element_get_request_pad(mux, "sink_0");
  if (!dec_src || !mux_sink ||
      gst_pad_link(dec_src, mux_sink) != GST_PAD_LINK_OK) {
    g_printerr("dec.src -> mux.sink_0 failed\n"); return 1;
  }
  gst_pad_add_probe(mux_sink, GST_PAD_PROBE_TYPE_BUFFER, mux_sink_probe, nullptr, nullptr);
  gst_object_unref(dec_src); gst_object_unref(mux_sink);
#if USE_NVINFER
    if (!gst_element_link_many(mux, infer, dmx, nullptr)) {
    g_printerr("link mux->infer->dmx failed\n"); return 1;
  }
#else
  if (!gst_element_link_many(mux, tracker, dmx, nullptr)) {
    g_printerr("link mux->tracker->dmx failed\n"); return 1;
  }
#endif
  //GstPad* dmx_src  = gst_element_request_pad_simple(dmx, "src_0");
  GstPad* dmx_src  = gst_element_get_request_pad(dmx, "src_0");
  GstPad* qa2_sink = gst_element_get_static_pad(qa2, "sink");
  if (!dmx_src || !qa2_sink ||
      gst_pad_link(dmx_src, qa2_sink) != GST_PAD_LINK_OK) {
    g_printerr("dmx.src_0 -> qa2 failed\n"); return 1;
  }
  gst_object_unref(dmx_src); gst_object_unref(qa2_sink);
  if (!gst_element_link(qa2, fa)) {
    g_printerr("link qa2 -> fa failed\n"); return 1;
  }
  if (!gst_element_link_many(tee, qb, out_parse, out_pay, sink, nullptr)) {
    g_printerr("link branch-B RTP/UDP failed\n"); return 1;
  }

  g_signal_connect(src, "pad-added", G_CALLBACK(on_rtsp_pad_added), depay);

  GstPad* p = gst_element_get_static_pad(qa2, "sink");
#if USE_NVINFER
  gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, nvinfer_src_probe, nullptr, nullptr);
#else
  gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, tracker_src_probe, nullptr, nullptr);
#endif
  gst_object_unref(p);

  p = gst_element_get_static_pad(out_parse, "sink");
  gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER,
                    sei_inject_probe, nullptr, nullptr);
  gst_object_unref(p);

  p = gst_element_get_static_pad(sink, "sink");
  gst_pad_add_probe(p, GST_PAD_PROBE_TYPE_BUFFER, udp_packet_probe, nullptr, nullptr);
  gst_object_unref(p);

  if (gst_element_set_state(pipe, GST_STATE_PLAYING)
      == GST_STATE_CHANGE_FAILURE) {
    g_printerr("pipeline start failed\n"); return 1;
  }

  GMainLoop* loop = g_main_loop_new(nullptr, FALSE);
  GstBus* bus = gst_element_get_bus(pipe);
  gst_bus_add_watch(bus, bus_cb, loop);
  g_object_unref(bus);

  g_timeout_add_seconds(2, stats_tick, nullptr);

  g_main_loop_run(loop);

  gst_element_set_state(pipe, GST_STATE_NULL);
  g_object_unref(pipe);
  if (rtsp_source_id) g_source_remove(rtsp_source_id);
  if (rtsp_server) g_object_unref(rtsp_server);
  g_main_loop_unref(loop);
  return 0;
}


Thank you for your continued support. I look forward to your insights on the burst pattern and the sources of latency in the inference branch.

I don’t understand what you’re talking about. Without nvinfer providing a bbox, what can you do with nvtracker?

In fast-moving scenes, the SEI of the attached bounding box will differ significantly from that of the current frame, rendering it unusable.

The bottleneck of the pipeline is the slowest element. I assume this is the inference model. If the interval is 10, and Nvinfer takes 70ms to infer one frame, then there will be a spike every 10 frames. This requires you to optimize the model, such as using int8 values ​​or pruning. Reduce inference time to within 1/fps interval.

Please measure the delay first, instead of guessing.

Hi,junshengy:

After investigation, our algorithm team found that nvtracker is more flexible and easier to adapt to our business requirements compared to nvinfer. Specifically:

  • nvtracker supports custom low-level tracker libraries (e.g., ByteTrack, DeepSORT), allowing us to integrate our own detection and tracking logic directly into the tracker plugin.
  • nvtracker can still provide object class IDs and bounding box coordinates through the NvDsObjectMeta structure, which meets our needs for detection result extraction.
  • The ll-lib-file and ll-config-file properties enable us to load custom tracker implementations that perform both detection and tracking in a single step, eliminating the dependency on nvinfer.

In our use case, the low-level tracker library we integrated performs object detection internally and outputs class_id, confidence, and rect_params just like nvinfer would. Therefore, nvtracker alone is sufficient for our pipeline.

We acknowledge that asynchronous SEI injection introduces a temporal mismatch between the bounding box and the video frame. However:

  • In our application scenario, a 1~2 frame delay is acceptable. The SEI payload includes the original PTS, allowing the receiver to correlate the metadata with the correct frame if needed.

  • The primary goal of our pipeline is to embed detection results into the original H.264 stream without re-encoding, rather than guaranteeing frame-perfect synchronization. The slight latency trade-off is acceptable for our use case.

  • If stricter synchronization becomes necessary in the future, we can reduce the qb queue delay or switch to synchronous insertion at the cost of higher latency.

We have already measured the end-to-end latency of the inference branch using probe-based timing:

  • Measurement method: Monotonic timestamps recorded at qa (queue before decoder) sink and qb (queue before SEI injection) sink.
  • Result: The inference branch latency ranges from 25ms to 45ms(tracker plugin :19ms~40ms).
  • Input frame rate: 30 FPS (~33ms per frame).

Since the measured latency (25–45ms) is close to but not consistently exceeding the frame interval, we do not believe the inference model is the sole bottleneck causing the burst pattern.

Our actual observation is:

Both the forwarding branch and inference branch exhibit burst processing: two frames processed within a few milliseconds, followed by a ~66ms gap.

The same PTS arrives at sei_inject_probe approximately 50–70ms earlier than at tracker_src_probe, indicating significant buffering/queuing behavior rather than pure inference delay.

Current inference link:

queue (leaky) -> nvv4l2decoder -> nvstreammux -> nvinfer -> nvstreamdemux -> queue -> [meta-extract probe] -> fakesink

I confirm that there is only one input, can I delete nvstreammux/nvstreamdemux?

Current rtsp server code:

  rtsp_server = gst_rtsp_server_new();
  std::string out_service = std::to_string(A.out_port);
  g_object_set(rtsp_server, "address", A.out_host,
                            "service", out_service.c_str(),
                            nullptr);

  GstRTSPMountPoints* mounts = gst_rtsp_server_get_mount_points(rtsp_server);
  GstRTSPMediaFactory* factory = gst_rtsp_media_factory_new();
  std::string rtsp_launch =
    "( udpsrc port=" + std::to_string(A.udp_port) +
    " caps=\"application/x-rtp,media=(string)video,clock-rate=(int)90000,encoding-name=(string)H264,payload=(int)96\" "
    "! queue leaky=downstream max-size-buffers=120 max-size-bytes=0 max-size-time=0 "
    "! rtph264depay "
    "! h264parse config-interval=-1 "
    "! rtph264pay name=pay0 pt=96 config-interval=-1 mtu=1400 )";
  gst_rtsp_media_factory_set_launch(factory, rtsp_launch.c_str());
  gst_rtsp_media_factory_set_shared(factory, TRUE);
  gst_rtsp_mount_points_add_factory(mounts, A.out_mount, factory);

Can we simplify this by moving the tee inside the RTSP server pipeline? Specifically:

rtspsrc (input) -> rtph264depay -> h264parse -> tee
                                               |-> nvv4l2decoder -> nvstreammux -> nvinfer -> fakesink
                                               \-> h264parse -> rtph264pay -> RTSP client