i’m using a jetson nano B01 to capture save and stream frames from flir hadron camera that provides RGB/IR data using gstreamer in python3.
i have added tempature metadata to my ir frames to display hot/cold spot in each frame but till now i can only display them while saving displaying and streaming rgb data . i have been trying to save and stream the ir frames with tempature using a second Appsrc but i keep getting this error down below that i don’t know how to solve it .
this code below display ir frames with tempature and saves an empty file , i’v tried to convert the GRAY8 to BGR before it is pushed to the encoding appsrc and then convert it to BGRx to encode in the gstreamer pipeline but then that only displays a black screen and an empty video file . I will appreciat any advice thank you
the code:
[import RPi.GPIO as GPIO
import time
import datetime
import gi
import numpy as np
import cv2 as cv
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLibInitialize GStreamer
Gst.init(None)
Variables to manage the video capture
is_recording = False
rgb_pipeline = None
ir_pipeline = None
loop = GLib.MainLoop()
display_pipeline = None
appsrc = None
encode_src = None
width, height = 640, 512def convert_y16_to_8bit(frame):
“”“Converts Y16 16-bit thermal image frame to 8-bit.”“”
min_val = np.min(frame)
max_val = np.max(frame)
frame_8bit = ((frame - min_val) / (max_val - min_val) * 255).astype(np.uint8)
return frame_8bit, min_val, max_valdef raw_to_celsius(raw_val):
“”“Converts raw thermal camera value to degrees Celsius.”“”
kelvin = raw_val * 0.01 # Example conversion factor, replace with correct one
celsius = kelvin - 273.15
return celsiusdef handle_frame(frame):
“”“Handle the thermal frame: convert to 8-bit, extract metadata, and display min/max temperatures.”“”
frame_8bit, min_raw, max_raw = convert_y16_to_8bit(frame)
min_temp = raw_to_celsius(min_raw)
max_temp = raw_to_celsius(max_raw)
print(f"Min Temperature: {min_temp:.2f}°C, Max Temperature: {max_temp:.2f}°C")# Draw hotspots on the frame min_loc = np.unravel_index(np.argmin(frame), frame.shape) max_loc = np.unravel_index(np.argmax(frame), frame.shape) cv.circle(frame_8bit, min_loc[::-1], 5, (255, 0, 0), 2) cv.circle(frame_8bit, max_loc[::-1], 5, (0, 0, 255), 2) cv.putText(frame_8bit, f"Cold: {min_temp:.2f}C", min_loc[::-1], cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1) cv.putText(frame_8bit, f"Hot: {max_temp:.2f}C", max_loc[::-1], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1) return frame_8bit
def create_ir_pipeline(device, width, height, stream_url, file_prefix, timestamp):
“”“Create IR processing pipeline.”“”
pipeline_description = (
f"v4l2src device={device} ! video/x-raw,format=GRAY16_LE,width={width},height={height},framerate=30/1 ! "
“videoconvert ! appsink name=sink emit-signals=True sync=false”
)
return Gst.parse_launch(pipeline_description)def create_rgb_pipeline(device, width, height, stream_url, file_prefix, timestamp):
“”“Create RGB processing pipeline.”“”
pipeline_description = (
f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
f"video/x-raw, width={width}, height={height}, framerate=30/1 ! clockoverlay halignment=left valignment=bottom time-format= "%Y-%m-%d %H:%M:%S" font-desc=‘Sans, 36’! timeoverlay halignment=right valignment=bottom text= "Stream time:" font-desc=‘Sans, 24’ ! "
"tee name=t ! "
"queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
"tee name=l ! "
f"queue ! flvmux ! rtmpsink location=‘{stream_url} live=1’ "
"l. ! "
f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
"t. ! "
“queue leaky=1 ! xvimagesink sync=false”
)
return Gst.parse_launch(pipeline_description)def on_message_rgb(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}“)
print(f"Debugging information: {debug_info if debug_info else ‘none’}”)
rgb_pipeline.set_state(Gst.State.NULL)
elif msg_type == Gst.MessageType.EOS:
print(“End-Of-Stream reached”)
rgb_pipeline.set_state(Gst.State.NULL)def on_message_ir(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}“)
print(f"Debugging information: {debug_info if debug_info else ‘none’}”)
ir_pipeline.set_state(Gst.State.NULL)
elif msg_type == Gst.MessageType.EOS:
print(“End-Of-Stream reached”)
ir_pipeline.set_state(Gst.State.NULL)def on_message_encode_save(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}“)
print(f"Debugging information: {debug_info if debug_info else ‘none’}”)
encode_save_pipeline.set_state(Gst.State.NULL)
elif msg_type == Gst.MessageType.EOS:
print(“End-Of-Stream reached”)
encode_save_pipeline.set_state(Gst.State.NULL)def new_sample(sink, data):
“”“Handle new samples from IR pipeline.”“”
global appsrc, width, heightsample = sink.emit('pull-sample') buf = sample.get_buffer() # Extract frame data from buffer array = np.ndarray( (height, width), buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint16 ) # Handle the frame (convert, annotate, etc.) frame_8bit = handle_frame(array) # Convert frame to bytes frame_bytes = frame_8bit.tobytes() # Push the frame bytes to appsrc for display buffer_size = len(frame_bytes) gst_buffer = Gst.Buffer.new_allocate(None, buffer_size, None) gst_buffer.fill(0, frame_bytes) # Push the buffer to appsrc appsrc.emit("push-buffer", gst_buffer) # Push the frame bytes to encode_src for encoding and saving encode_src.emit("push-buffer", gst_buffer) return Gst.FlowReturn.OK
def start_recording():
global is_recording, rgb_pipeline, ir_pipeline, loop, display_pipeline, appsrc, encode_src, encode_save_pipelinecurrent_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S") # Create RGB and IR pipelines rgb_pipeline = create_rgb_pipeline('/dev/video0', 1920, 1080, www.google.com', 'RGB', current_timestamp) ir_pipeline = create_ir_pipeline('/dev/video1', width, height, www.google.com', 'IR', current_timestamp) # Create encode and save pipeline file_path = f"/home/nvidia/Desktop/{current_timestamp}.mp4" encode_save_pipeline_desc = ( f"appsrc name=encode_src ! videoconvert ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! " f"qtmux ! filesink location={file_path}" ) encode_save_pipeline = Gst.parse_launch(encode_save_pipeline_desc) # Set up bus connections for pipelines bus_rgb = rgb_pipeline.get_bus() bus_ir = ir_pipeline.get_bus() bus_encode_save = encode_save_pipeline.get_bus() bus_rgb.add_signal_watch() bus_ir.add_signal_watch() bus_encode_save.add_signal_watch() bus_rgb.connect("message", on_message_rgb, loop) bus_ir.connect("message", on_message_ir, loop) bus_encode_save.connect("message", on_message_encode_save, loop) # Connect IR pipeline to handle new samples ir_sink = ir_pipeline.get_by_name('sink') ir_sink.connect('new-sample', new_sample, None) # Start pipelines rgb_pipeline.set_state(Gst.State.PLAYING) ir_pipeline.set_state(Gst.State.PLAYING) encode_save_pipeline.set_state(Gst.State.PLAYING) # Setup display pipeline display_pipeline = Gst.parse_launch("appsrc name=src ! tee name=t ! queue leaky=1 ! videoconvert ! xvimagesink sync=false") appsrc = display_pipeline.get_by_name("src") caps_str = f"video/x-raw,format=GRAY8,width={width},height={height}" caps = Gst.Caps.from_string(caps_str) appsrc.set_property("caps", caps) display_pipeline.set_state(Gst.State.PLAYING) # Get encode_src from encode_save_pipeline encode_src = encode_save_pipeline.get_by_name("encode_src") print("Starting RGB, IR, and Encode/Save pipelines...")
def stop_recording():
global is_recording, rgb_pipeline, ir_pipeline, display_pipeline, encode_save_pipelineif rgb_pipeline: rgb_pipeline.send_event(Gst.Event.new_eos()) time.sleep(1) rgb_pipeline.set_state(Gst.State.NULL) print("RGB stopped") if ir_pipeline: ir_pipeline.send_event(Gst.Event.new_eos()) time.sleep(1) ir_pipeline.set_state(Gst.State.NULL) print("IR stopped") if display_pipeline: display_pipeline.send_event(Gst.Event.new_eos()) time.sleep(1) display_pipeline.set_state(Gst.State.NULL) display_pipeline = None print("Display pipeline stopped") if encode_save_pipeline: encode_save_pipeline.send_event(Gst.Event.new_eos()) time.sleep(1) encode_save_pipeline.set_state(Gst.State.NULL) encode_save_pipeline = None print("Encode/Save pipeline stopped")
def main():
print("Starting demo now! Press CTRL+C to exit") start_recording() try: loop.run() except KeyboardInterrupt: print("Stopping pipelines...") stop_recording() loop.quit() print("Pipelines stopped.")
if name == ‘main’:
main()
Blockquote
Blockquote
the Error that i get:
Error received from element encode_src: Internal data stream error.
Debugging information: gstbasesrc.c(3055): gst_base_src_loop (): /GstPipeline:pipeline2/GstAppSrc:encode_src:
streaming stopped, reason not-negotiated (-4)