This is the code I am building by pipeline with
def cb_newpad(decodebin, decoder_src_pad, nvvidconv_in_bin):
"""
Callback function invoked when a new pad is added to the uridecodebin.
It links the uridecodebin's new video source pad to the sink pad of the
nvvideoconvert element provided in 'nvvidconv_in_bin' (user_data).
"""
print(f"In cb_newpad: Pad '{decoder_src_pad.get_name()}' added to '{decodebin.get_name()}'")
caps = decoder_src_pad.get_current_caps()
if not caps:
print(f"Pad '{decoder_src_pad.get_name()}' has no current caps, querying...")
caps = decoder_src_pad.query_caps()
if not caps:
sys.stderr.write(f"Error: Failed to get caps from pad '{decoder_src_pad.get_name()}' of element '{decodebin.get_name()}'.\n")
return
gststruct = caps.get_structure(0)
gstname = gststruct.get_name()
print(f"Pad '{decoder_src_pad.get_name()}' has caps: {caps.to_string()}, gstname = {gstname}")
if "video" in gstname.lower():
sink_pad_of_nvvidconv = nvvidconv_in_bin.get_static_pad("sink")
if not sink_pad_of_nvvidconv:
sys.stderr.write(f"Error: Unable to get sink pad of '{nvvidconv_in_bin.get_name()}'.\n")
return
if sink_pad_of_nvvidconv.is_linked():
print(f"Info: Sink pad of '{nvvidconv_in_bin.get_name()}' is already linked. Unlinking before relinking.")
print(f"Attempting to link '{decoder_src_pad.get_name()}' of '{decodebin.get_name()}' to sink of '{nvvidconv_in_bin.get_name()}'")
link_result = decoder_src_pad.link(sink_pad_of_nvvidconv)
if link_result != Gst.PadLinkReturn.OK:
sys.stderr.write(f"Error: Failed to link pad '{decoder_src_pad.get_name()}' of '{decodebin.get_name()}' to sink pad of '{nvvidconv_in_bin.get_name()}'. Result: {link_result}\n")
else:
print(f"Successfully linked pad '{decoder_src_pad.get_name()}' of '{decodebin.get_name()}' to sink pad of '{nvvidconv_in_bin.get_name()}'")
else:
print(f"Info: Pad '{decoder_src_pad.get_name()}' is not a video pad (type: {gstname}). Skipping link.")
def decodebin_child_added(child_proxy, obj, name, user_data):
"""
Callback for 'child-added' signal of uridecodebin.
Used to configure properties of internal elements like rtspsrc.
"""
print(f"Decodebin child added: '{name}' to element '{child_proxy.get_name()}' (obj type: {type(obj)})")
if name.find("decodebin") != -1:
obj.connect("child-added", decodebin_child_added, user_data)
if "source" in name:
if obj and obj.find_property('drop-on-latency') is not None:
print(f"Setting 'drop-on-latency' to True for '{name}'")
obj.set_property("drop-on-latency", True)
elif obj and obj.find_property('latency') is not None:
print(f"Setting 'latency' to 2000 for '{name}' (rtspsrc default can be high)")
obj.set_property("latency", 2000)
def create_source_bin(index, uri):
"""
Creates a source bin for a given URI. This bin includes:
uridecodebin -> nvvideoconvert -> videorate -> capsfilter (1 FPS, NVMM)
Returns the Gst.Bin.
"""
print(f"Creating source bin for index {index}, URI: {uri}")
bin_name = f"source-bin-{index:02d}"
nbin = Gst.Bin.new(bin_name)
if not nbin:
sys.stderr.write(f"Error: Unable to create source bin '{bin_name}'.\n")
return None
# Create uridecodebin or nvurisrcbin based on file_loop
if file_loop:
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", f"uri-decode-bin-{index}")
if uri_decode_bin:
uri_decode_bin.set_property("file-loop", True) # For nvurisrcbin, use True/False
uri_decode_bin.set_property("cudadec-memtype", 0) # 0 for NVMM memory
else:
sys.stderr.write(f"Error: Unable to create nvurisrcbin for source {index}.\n")
return None
else:
uri_decode_bin = Gst.ElementFactory.make("nvurisrcbin", f"uri-decode-bin-{index}")
uri_decode_bin.set_property("cudadec-memtype", 0)
if not uri_decode_bin:
sys.stderr.write(f"Error: Unable to create uridecodebin for source {index}.\n")
return None
uri_decode_bin.set_property("uri", uri)
# Create the elements for frame rate control
# These elements will be internal to the source bin
nvvidconv_in_bin = Gst.ElementFactory.make("nvvideoconvert", f"nvvidconv-src-{index}")
videorate_in_bin = Gst.ElementFactory.make("videorate", f"videorate-src-{index}")
capsfilter_in_bin = Gst.ElementFactory.make("capsfilter", f"capsfilter-src-{index}")
if not nvvidconv_in_bin or not videorate_in_bin or not capsfilter_in_bin:
sys.stderr.write(f"Error: Failed to create conversion/rate elements for source_bin {index}.\n")
return None
# Configure the capsfilter for 1 FPS and NVMM memory
caps_str = "video/x-raw(memory:NVMM), framerate=1/1"
rate_caps = Gst.Caps.from_string(caps_str)
capsfilter_in_bin.set_property("caps", rate_caps)
print(f"Source bin {index}: capsfilter for 1FPS set to {caps_str}")
# Add all elements to the bin
nbin.add(uri_decode_bin)
nbin.add(nvvidconv_in_bin)
nbin.add(videorate_in_bin)
nbin.add(capsfilter_in_bin)
if not nvvidconv_in_bin.link(videorate_in_bin):
sys.stderr.write(f"Error: Could not link nvvidconv to videorate in {bin_name}.\n")
return None
if not videorate_in_bin.link(capsfilter_in_bin):
sys.stderr.write(f"Error: Could not link videorate to capsfilter in {bin_name}.\n")
return None
ghost_pad = Gst.GhostPad.new("src", capsfilter_in_bin.get_static_pad("src"))
if not ghost_pad:
sys.stderr.write(f"Error: Failed to create ghost pad in {bin_name}.\n")
return None
nbin.add_pad(ghost_pad)
# Connect signals:
uri_decode_bin.connect("pad-added", cb_newpad, nvvidconv_in_bin)
uri_decode_bin.connect("child-added", decodebin_child_added, nbin)
print(f"Successfully created source bin '{bin_name}' with 1 FPS processing.")
return nbin
def create_pipeline(app_config):
global log_file, stream_metadata, snapshot_writer
log_file = app_config["output_log"]
pgie_config_file = app_config["pgie_config"]
batch_size = len(stream_configs)
stream_metadata = {i: stream_configs[i] for i in range(batch_size)}
# Initialize GStreamer
Gst.init(None)
pipeline = Gst.Pipeline.new("main-pipeline")
if not pipeline:
sys.stderr.write("Error: Unable to create Gst.Pipeline.\n")
sys.exit(1)
# nvstreammux
streammux = Gst.ElementFactory.make("nvstreammux", "stream-muxer")
if not streammux:
sys.stderr.write("Error: Could not create nvstreammux.\n")
sys.exit(1)
streammux.set_property("batch-size", batch_size)
streammux.set_property("batched-push-timeout", 40000)
streammux.set_property("width", 1280)
streammux.set_property("height", 720)
streammux.set_property("live-source", 1 if any("rtsp://" in s["source"] for s in stream_configs) else 0) # Set if any RTSP streams
pipeline.add(streammux)
# Source bins
for i, stream_conf in enumerate(stream_configs):
uri = stream_conf["source"]
print(f"Pipeline: Processing stream {i}: URI = {uri}")
source_bin = create_source_bin(i, uri)
if not source_bin:
sys.stderr.write(f"Error: Failed creating source bin for stream {i}. Skipping this source.\n")
continue
pipeline.add(source_bin)
# Request a sink pad from the streammuxer for this source
muxer_sink_pad_template = streammux.get_pad_template("sink_%u")
if not muxer_sink_pad_template:
sys.stderr.write(f"CRITICAL Error: Unable to get sink pad template 'sink_%u' from nvstreammux.\n")
sys.exit(1) # This is a fundamental issue with streammux setup
requested_pad_name = f"sink_{i}"
muxer_sink_pad = streammux.request_pad(muxer_sink_pad_template, requested_pad_name, None)
if not muxer_sink_pad:
sys.stderr.write(f"Error: Unable to request sink pad '{requested_pad_name}' from nvstreammux.\n")
continue # Skip linking this source
# Get the source pad from the source_bin (which is the ghost pad)
bin_src_pad = source_bin.get_static_pad("src")
if not bin_src_pad:
sys.stderr.write(f"Error: Failed to get src ghost pad from source_bin '{source_bin.get_name()}'.\n")
continue # Skip linking this source
print(f"Pipeline: Linking src pad of '{source_bin.get_name()}' to sink pad '{muxer_sink_pad.get_name()}' of nvstreammux.")
if bin_src_pad.link(muxer_sink_pad) != Gst.PadLinkReturn.OK:
sys.stderr.write(f"Error: Failed linking source_bin '{source_bin.get_name()}' to nvstreammux pad '{muxer_sink_pad.get_name()}'.\n")
else:
print(f"Pipeline: Successfully linked source_bin '{source_bin.get_name()}' to nvstreammux sink_{i}.")
# --- Downstream elements (PGIE, Tracker, etc.) ---
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
nvvidconv_postmux = Gst.ElementFactory.make("nvvideoconvert", "nvvideo-converter-postmux")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
sink = Gst.ElementFactory.make("fakesink", "final-sink")
# sink = Gst.ElementFactory.make("nveglglessink", "display_sink") # For actual display
elements_to_add = [pgie, tracker, nvvidconv_postmux, nvosd, sink]
element_names = ["pgie", "tracker", "nvvidconv_postmux", "nvosd", "sink"] # For error messages
for idx, elem_factory_result in enumerate(elements_to_add):
if not elem_factory_result:
sys.stderr.write(f"Error: Unable to create element: {element_names[idx]}.\n")
sys.exit(1)
pipeline.add(elem_factory_result) # Add the actual element object
# Configure PGIE
pgie.set_property("config-file-path", pgie_config_file)
# Configure Tracker from tracker.txt
tracker_config_file_path = app_config['tracking_config']
tracker_config_parser = configparser.ConfigParser()
if not tracker_config_parser.read(tracker_config_file_path):
sys.stderr.write(f"Warning: Could not read tracker config file: {tracker_config_file_path}\n")
else:
if 'tracker' in tracker_config_parser:
for key, value in tracker_config_parser['tracker'].items():
print(f"Tracker Config: Setting '{key}' to '{value}'")
if key == 'tracker-width':
tracker.set_property('tracker-width', int(value))
elif key == 'tracker-height':
tracker.set_property('tracker-height', int(value))
elif key == 'gpu-id':
tracker.set_property('gpu-id', int(value))
elif key == 'll-lib-file':
tracker.set_property('ll-lib-file', value)
elif key == 'll-config-file':
tracker.set_property('ll-config-file', value)
# Add other tracker properties as needed
else:
sys.stderr.write(f"Warning: 'tracker' section not found in {tracker_config_file_path}\n")
# Link elements downstream of streammux
print("Pipeline: Linking streammux -> pgie -> tracker -> nvvidconv_postmux -> nvosd -> sink")
if not streammux.link(pgie): sys.exit("CRITICAL Error: Failed to link streammux to pgie.")
if not pgie.link(tracker): sys.exit("CRITICAL Error: Failed to link pgie to tracker.")
if not tracker.link(nvvidconv_postmux): sys.exit("CRITICAL Error: Failed to link tracker to nvvidconv_postmux.")
if not nvvidconv_postmux.link(nvosd): sys.exit("CRITICAL Error: Failed to link nvvidconv_postmux to nvosd.")
if not nvosd.link(sink): sys.exit("CRITICAL Error: Failed to link nvosd to sink.")
# Attach buffer probes
pgie_srcpad = pgie.get_static_pad("src")
if not pgie_srcpad:
sys.stderr.write("Error: Unable to get pgie src pad.\n")
else:
pgie_srcpad.add_probe(Gst.PadProbeType.BUFFER, snapshot_writer.roi_object_counts_probe)
print("Pipeline: Added probe to PGIE src pad.")
tracker_srcpad = tracker.get_static_pad("src")
if not tracker_srcpad:
sys.stderr.write("Error: Unable to get tracker src pad.\n")
else:
tracker_srcpad.add_probe(Gst.PadProbeType.BUFFER, snapshot_writer.update_tracking_data_probe)
print("Pipeline: Added probe to Tracker src pad.")
return pipeline
I can run the command
gst-launch-1.0 uridecodebin uri=rtsp://"uri address" ! fakesink
Somehow I run into this issue when executing that command
That is also weird because I don’t get this problem with the code I provided (maybe because I only link video sources with cb_newpad function) .
I tried this and didn’t have this problem
gst-launch-1.0 playbin uri=rtsp://rtsp_url
The code I provided is based on the deepstream-test3 part of creating components to read sources (functions in uridecode are practically equal). Besides, that code works for all situations when executed in the Jetson Orin Nano.