With these settings the delay is 2 seconds.
class JetsonCameraStreamLoader:
def __init__(self, img_size=640, stride=32, auto=True, transforms=None):
self.img_size = img_size
self.stride = stride
self.auto = auto
self.transforms = transforms
self.sources = ['Jetson Camera']
self.mode = 'stream'
self.pipeline = None
self.stopped = False
# Build the GStreamer pipeline
self.build_pipeline()
# Start the pipeline
self.pipeline.set_state(Gst.State.PLAYING)
# Create the bus and set up message handling
self.bus = self.pipeline.get_bus()
self.bus.add_signal_watch()
self.bus.connect("message", self.on_bus_message)
def build_pipeline(self):
# Create a new GStreamer pipeline
pipeline = Gst.Pipeline()
# Create elements
source = create_element('nvarguscamerasrc', 'source')
capsfilter = create_element('capsfilter', 'capsfilter')
capsfilter.set_property('caps', Gst.Caps.from_string(
'video/x-raw(memory:NVMM),width=960,height=600,format=NV12,framerate=10/1'))
tee = create_element('tee', 'tee')
queue1 = create_element('queue', 'queue1')
nvvidconv = create_element('nvvidconv', 'nvvidconv')
capsfilter2 = create_element('capsfilter', 'capsfilter2')
capsfilter2.set_property('caps', Gst.Caps.from_string('video/x-raw, format=BGRx'))
videoconvert = create_element('videoconvert', 'videoconvert')
capsfilter3 = create_element('capsfilter', 'capsfilter3')
capsfilter3.set_property('caps', Gst.Caps.from_string('video/x-raw, format=RGB'))
appsink = create_element('appsink', 'appsink')
appsink.set_property('emit-signals', False)
appsink.set_property('sync', False)
appsink.set_property('drop', False)
appsink.set_property('max-buffers', 1)
appsink.set_property('wait-on-eos', False)
appsink.set_property('enable-last-sample', False)
queue2 = create_element('queue', 'queue2')
encoder = create_element('nvv4l2h264enc', 'encoder')
encoder.set_property('bitrate', 8000000)
h264parse = create_element('h264parse', 'h264parse')
rtsp_sink = create_element('rtspclientsink', 'rtsp_sink')
rtsp_sink.set_property('location', 'rtsp://redirect-rtsp:8554/main_cam')
rtsp_sink.set_property('protocols', 'tcp')
elements = [
source, capsfilter, tee,
queue1, nvvidconv, capsfilter2, videoconvert, capsfilter3, appsink,
queue2, encoder, h264parse, rtsp_sink
]
for elem in elements:
if not elem:
print(f"Failed to create element {elem.get_name()}")
sys.exit(1)
pipeline.add(elem)
# Link elements in the main pipeline
if not link_elements([source, capsfilter, tee]):
print("Failed to link main pipeline elements")
sys.exit(1)
# Link tee to queue1 (appsink branch)
tee_pad1 = tee.get_request_pad('src_%u')
queue1_pad = queue1.get_static_pad('sink')
if tee_pad1.link(queue1_pad) != Gst.PadLinkReturn.OK:
print('Failed to link tee to queue1')
sys.exit(1)
if not link_elements([queue1, nvvidconv, capsfilter2, videoconvert, capsfilter3, appsink]):
print("Failed to link appsink branch")
sys.exit(1)
# Link tee to queue2 (RTSP branch)
tee_pad2 = tee.get_request_pad('src_%u')
queue2_pad = queue2.get_static_pad('sink')
if tee_pad2.link(queue2_pad) != Gst.PadLinkReturn.OK:
print('Failed to link tee to queue2')
sys.exit(1)
if not link_elements([queue2, encoder, h264parse, rtsp_sink]):
print("Failed to link RTSP branch")
sys.exit(1)
self.pipeline = pipeline
def on_bus_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print("End-Of-Stream reached")
self.stopped = True
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}")
print(f"Debugging information: {debug if debug else 'None'}")
self.stopped = True
def __iter__(self):
return self
def __next__(self):
if self.stopped:
raise StopIteration
# Poll the bus for messages
while True:
message = self.bus.timed_pop_filtered(0, Gst.MessageType.ANY)
if not message:
break
self.on_bus_message(self.bus, message)
# Pull a sample from the appsink
appsink = self.pipeline.get_by_name('appsink')
sample = appsink.emit('pull-sample')
caps_format = sample.get_caps().get_structure(0)
width = caps_format.get_value('width')
height = caps_format.get_value('height')
format_str = caps_format.get_value('format')
print(f"Received sample with format: {format_str}, width: {width}, height: {height}")
if not sample:
# No sample, possibly EOS
self.stopped = True
raise StopIteration
# Process the sample
buffer = sample.get_buffer()
caps_format = sample.get_caps().get_structure(0)
width = caps_format.get_value('width')
height = caps_format.get_value('height')
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
frame_data = bytes(map_info.data)
buffer.unmap(map_info)
frame = np.frombuffer(frame_data, np.uint8).reshape((height, width, 3)) # BGRA format
im0 = frame
if self.transforms:
im = self.transforms(im0)
else:
# Apply letterbox resize
im = letterbox(im0, self.img_size, stride=self.stride, auto=self.auto)[0]
print(f"---___--__- {im.shape}, {im.dtype}")
# cv2.imwrite(f'/factorymonitor/after/processed_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.png', im)
im = im[..., ::-1].transpose((2, 0, 1)) # BGR to RGB, HWC to CHW
#cv2.imwrite(f'/factorymonitor/after/processed_{datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")}.png', im[..., ::-1].transpose((2,0,1)))
# im = im.astype(np.float32) / 255.0 # Normalize to [0, 1]
im = np.ascontiguousarray(im)
#im = np.expand_dims(im, axis=0) # Add batch dimension
im0 = [im0] # Make im0 a list to match expected input format
return self.sources, im, im0, None, ''
else:
# Failed to map buffer
self.stopped = True
raise StopIteration
def __len__(self):
return len(self.sources)
def stop(self):
self.stopped = True
self.pipeline.set_state(Gst.State.NULL)
gstreamer version
$ gst-launch-1.0 --version
gst-launch-1.0 version 1.20.3
GStreamer 1.20.3
https://launchpad.net/distros/ubuntu/+source/gstreamer1.0
Connected via a ZED Link Duo Capture Card over GMSL2
I’m operating in 15W mode to meet the power requirements of my setup.
I’m getting the delay in appsink too