Hi,
I am sharing a simple script that kind of simulates the congestion control behaviour to help test this out. Actually matching the original test scenario with live congestion control might involve a lot of custom setup.
The way to run this script is as follows
./test-encoders.py --encoder gpu
# For testing via cpu encoder.
./test-encoders.py --encoder cpu
Running the below script should cause continous memory increase when using the gpu encoder and can be profiled by looking at jtop.
#!/usr/bin/env python3
"""
A GStreamer script to test dynamic bitrate and resolution changes.
This script simulates adaptive streaming by cycling through a predefined ladder
of resolutions and bitrates. It can be configured to use either a
CPU-based encoder (x264enc) or a GPU-based NVIDIA encoder (NVENC).
"""
from __future__ import annotations
import argparse
import sys
import signal
import gi
gi.require_version("Gst", "1.0")
from gi.repository import Gst, GLib
Gst.init(None)
# ---------------------------------------------------------------------------
# Helpers
# ---------------------------------------------------------------------------
def make_element(factory: str, name: str | None = None, **props) -> Gst.Element:
"""Create a GStreamer element with error checking."""
elem = Gst.ElementFactory.make(factory, name)
if elem is None:
print(
f"[error] Required element '{factory}' is missing.\n"
" Please check your GStreamer installation (gst-inspect-1.0 {factory}).",
file=sys.stderr,
)
sys.exit(1)
for k, v in props.items():
elem.set_property(k, v)
return elem
def get_encoder_pipeline_elements(encoder_type: str) -> tuple[str, str, str]:
"""
Selects the appropriate converter, caps format, and encoder based on user choice.
Returns a tuple of: (converter_name, caps_string_format, encoder_name)
"""
if encoder_type == "gpu":
# Check for NVIDIA converters first
conv_cand = next((c for c in ("nvvconv", "nvvidconv", "nvvideoconvert") if Gst.ElementFactory.find(c)), None)
if not conv_cand:
print("[error] No NVIDIA video converter found (e.g., nvvideoconvert). GPU encoding is not possible.", file=sys.stderr)
sys.exit(1)
# Check for NVIDIA encoders
enc_cand = next((e for e in ("nvv4l2h264enc", "nvh264enc") if Gst.ElementFactory.find(e)), None)
if not enc_cand:
print("[error] No NVIDIA H.264 encoder found (nvv4l2h264enc, nvh264enc).", file=sys.stderr)
sys.exit(1)
# GPU encoders work best with NVMM memory
caps_format = "video/x-raw(memory:NVMM),format=I420"
return conv_cand, caps_format, enc_cand
elif encoder_type == "cpu":
# CPU encoding requires a standard converter
if not Gst.ElementFactory.find("videoconvert"):
print("[error] 'videoconvert' element not found. CPU encoding is not possible.", file=sys.stderr)
sys.exit(1)
# Check for the CPU-based x264enc
if not Gst.ElementFactory.find("x264enc"):
print("[error] 'x264enc' element not found. Please install gst-plugins-ugly.", file=sys.stderr)
sys.exit(1)
# CPU encoders use standard system memory
caps_format = "video/x-raw,format=I420"
return "videoconvert", caps_format, "x264enc"
else:
# This case is for safety; argparse should prevent it.
print(f"[error] Invalid encoder type specified: {encoder_type}", file=sys.stderr)
sys.exit(1)
# ---------------------------------------------------------------------------
# Pipeline builder
# ---------------------------------------------------------------------------
def build_pipeline(
use_device: str | None,
pattern: int,
size: str,
conv_name: str,
encoder_name: str,
caps_format_str: str,
) -> Gst.Pipeline:
"""Build the GStreamer pipeline with the specified elements."""
try:
width, height = (int(x) for x in size.lower().split("x"))
except ValueError:
print(f"[error] Invalid size format: {size}. Use WxH format (e.g., 640x480)", file=sys.stderr)
sys.exit(1)
# Create source
src = make_element("v4l2src", "src", device=use_device) if use_device else \
make_element("videotestsrc", "src", is_live=True, pattern=pattern)
# Raw caps for the source output
caps_raw = make_element(
"capsfilter",
"caps_raw",
caps=Gst.Caps.from_string(
f"video/x-raw,format=NV12,width={width},height={height},framerate=30/1"
)
)
# Converter and its output caps
conv = make_element(conv_name, "conv")
caps_enc_in = make_element(
"capsfilter",
"caps_enc_in",
caps=Gst.Caps.from_string(
f"{caps_format_str},width={width},height={height},framerate=30/1"
)
)
# Encoder
if encoder_name == "nvv4l2h264enc":
enc = make_element(encoder_name, "enc", bitrate=3_000_000, insert_sps_pps=True, idrinterval=30)
elif encoder_name == "nvh264enc":
enc = make_element(encoder_name, "enc", bitrate=3000, gop_size=30) # Uses kbps
else: # x264enc
enc = make_element(encoder_name, "enc", bitrate=3000, key_int_max=30, tune="zerolatency") # Uses kbps
parse = make_element("h264parse", "parse")
sink = make_element("fakesink", "sink", sync=False)
# Create and build pipeline
pipeline = Gst.Pipeline.new("adaptive-pipeline")
elements = [src, caps_raw, conv, caps_enc_in, enc, parse, sink]
for elem in elements:
pipeline.add(elem)
# --- FIX: Link elements one-by-one for better error reporting ---
if not src.link(caps_raw):
print("[error] Failed to link source to raw caps filter", file=sys.stderr)
sys.exit(1)
if not caps_raw.link(conv):
print("[error] Failed to link raw caps filter to converter", file=sys.stderr)
sys.exit(1)
if not conv.link(caps_enc_in):
print("[error] Failed to link converter to encoder input caps filter", file=sys.stderr)
sys.exit(1)
if not caps_enc_in.link(enc):
print("[error] Failed to link encoder input caps filter to encoder", file=sys.stderr)
sys.exit(1)
if not enc.link(parse):
print("[error] Failed to link encoder to parser", file=sys.stderr)
sys.exit(1)
if not parse.link(sink):
print("[error] Failed to link parser to sink", file=sys.stderr)
sys.exit(1)
# --- End of FIX ---
print(f"[info] Using converter: {conv_name}")
print(f"[info] Using encoder: {encoder_name}")
return pipeline
def setup_bus_watch(pipeline: Gst.Pipeline) -> None:
"""Set up bus message handling for debugging."""
bus = pipeline.get_bus()
bus.add_signal_watch()
def on_message(bus, message):
t = message.type
if t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"[error] {err}: {debug}", file=sys.stderr)
GLib.MainLoop().quit()
elif t == Gst.MessageType.WARNING:
warn, debug = message.parse_warning()
print(f"[warning] {warn}: {debug}", file=sys.stderr)
elif t == Gst.MessageType.EOS:
print("[info] End of stream")
GLib.MainLoop().quit()
bus.connect("message", on_message)
# ---------------------------------------------------------------------------
# Main
# ---------------------------------------------------------------------------
def main() -> None:
parser = argparse.ArgumentParser(description="Dynamic bitrate and resolution adaptation test.")
parser.add_argument("--device", help="/dev/videoX to use instead of test pattern")
parser.add_argument("--encoder", type=str, choices=["cpu", "gpu"], default="gpu", help="Encoder to use: 'cpu' (x264enc) or 'gpu' (NVENC)")
parser.add_argument("--period", type=float, default=3.0, help="Seconds between adaptations")
parser.add_argument("--pattern", type=int, default=0, help="videotestsrc pattern ID")
parser.add_argument("--mode", choices=["bitrate", "resolution", "both"], default="both",
help="What to adapt: bitrate only, resolution only, or both")
args = parser.parse_args()
adaptation_ladder = [
{"width": 320, "height": 240, "bitrate": 150_000, "name": "QVGA"},
{"width": 426, "height": 240, "bitrate": 300_000, "name": "240p"},
{"width": 640, "height": 360, "bitrate": 600_000, "name": "360p"},
{"width": 854, "height": 480, "bitrate": 1_200_000, "name": "480p"},
{"width": 1280, "height": 720, "bitrate": 2_500_000, "name": "720p"},
{"width": 1920, "height": 1080, "bitrate": 4_000_000, "name": "1080p"},
]
current_level = 2
direction = 1 # 1 for up, -1 for down
# Determine encoder-specific elements before building the pipeline
conv_name, caps_format_str, encoder_name = get_encoder_pipeline_elements(args.encoder)
# Build initial pipeline
initial_config = adaptation_ladder[current_level]
initial_size = f"{initial_config['width']}x{initial_config['height']}"
pipeline = build_pipeline(args.device, args.pattern, initial_size, conv_name, encoder_name, caps_format_str)
# Get elements for dynamic changes
enc: Gst.Element = pipeline.get_by_name("enc")
caps_raw: Gst.Element = pipeline.get_by_name("caps_raw")
caps_enc_in: Gst.Element = pipeline.get_by_name("caps_enc_in")
setup_bus_watch(pipeline)
# Determine if the chosen encoder uses bits/sec or kbits/sec
uses_bps = encoder_name == "nvv4l2h264enc"
# Set initial bitrate
initial_bitrate = initial_config["bitrate"]
enc.set_property("bitrate", initial_bitrate if uses_bps else initial_bitrate // 1000)
print(f"[init] Starting with {initial_config['name']} - {initial_config['width']}x{initial_config['height']} @ {initial_bitrate/1000:.1f} kbps")
def adapt_stream() -> bool:
nonlocal current_level, direction
current_level += direction
if not 0 <= current_level < len(adaptation_ladder):
direction *= -1
current_level += 2 * direction
current_level = max(0, min(current_level, len(adaptation_ladder) - 1))
config = adaptation_ladder[current_level]
new_bitrate = config["bitrate"]
new_width, new_height = config["width"], config["height"]
# Apply changes based on mode
if args.mode in ["bitrate", "both"]:
enc.set_property("bitrate", new_bitrate if uses_bps else new_bitrate // 1000)
if args.mode in ["resolution", "both"]:
try:
# Update resolution by changing caps dynamically
new_caps_raw = Gst.Caps.from_string(f"video/x-raw,format=NV12,width={new_width},height={new_height},framerate=30/1")
caps_raw.set_property("caps", new_caps_raw)
# Use the correct caps format string (with or without NVMM)
new_caps_enc_in = Gst.Caps.from_string(f"{caps_format_str},width={new_width},height={new_height},framerate=30/1")
caps_enc_in.set_property("caps", new_caps_enc_in)
print(f"[adapt] → {config['name']} ({new_width}x{new_height} @ {new_bitrate/1000:.1f} kbps)")
except Exception as e:
print(f"[warning] Resolution change failed: {e}. Adapting bitrate only.", file=sys.stderr)
print(f"[adapt] → bitrate only: {new_bitrate/1000:.1f} kbps")
else:
print(f"[adapt] → {config['name']} bitrate: {new_bitrate/1000:.1f} kbps")
return True # Keep the timer running
GLib.timeout_add(max(int(args.period * 1000), 1000), adapt_stream)
main_loop = GLib.MainLoop()
def sigint_handler(_sig, _frame):
print("\nStopping…")
pipeline.set_state(Gst.State.NULL)
main_loop.quit()
signal.signal(signal.SIGINT, sigint_handler)
# Start pipeline
ret = pipeline.set_state(Gst.State.PLAYING)
if ret == Gst.StateChangeReturn.FAILURE:
print("[error] Failed to start pipeline.", file=sys.stderr)
sys.exit(1)
print("Pipeline running. Press Ctrl‑C to stop.")
print("Adaptation ladder: QVGA ↔ 240p ↔ 360p ↔ 480p ↔ 720p ↔ 1080p")
try:
main_loop.run()
except KeyboardInterrupt:
sigint_handler(None, None)
finally:
pipeline.set_state(Gst.State.NULL)
if __name__ == "__main__":
main()