Appsrc correct format? video files are not saved proparly

i’m using a jetson nano B01 to capture save and stream frames from flir hadron camera that provides RGB/IR data using gstreamer in python3.
i have added tempature metadata to my ir frames to display hot/cold spot in each frame but till now i can only display them while saving displaying and streaming rgb data . i have been trying to save and stream the ir frames with tempature using a second Appsrc but i keep getting this error down below that i don’t know how to solve it .
this code below display ir frames with tempature and saves an empty file , i’v tried to convert the GRAY8 to BGR before it is pushed to the encoding appsrc and then convert it to BGRx to encode in the gstreamer pipeline but then that only displays a black screen and an empty video file . I will appreciat any advice thank you

the code:

[import RPi.GPIO as GPIO
import time
import datetime
import gi
import numpy as np
import cv2 as cv
gi.require_version(‘Gst’, ‘1.0’)
from gi.repository import Gst, GLib

Initialize GStreamer

Gst.init(None)

Variables to manage the video capture

is_recording = False
rgb_pipeline = None
ir_pipeline = None
loop = GLib.MainLoop()
display_pipeline = None
appsrc = None
encode_src = None
width, height = 640, 512

def convert_y16_to_8bit(frame):
“”“Converts Y16 16-bit thermal image frame to 8-bit.”“”
min_val = np.min(frame)
max_val = np.max(frame)
frame_8bit = ((frame - min_val) / (max_val - min_val) * 255).astype(np.uint8)
return frame_8bit, min_val, max_val

def raw_to_celsius(raw_val):
“”“Converts raw thermal camera value to degrees Celsius.”“”
kelvin = raw_val * 0.01 # Example conversion factor, replace with correct one
celsius = kelvin - 273.15
return celsius

def handle_frame(frame):
“”“Handle the thermal frame: convert to 8-bit, extract metadata, and display min/max temperatures.”“”
frame_8bit, min_raw, max_raw = convert_y16_to_8bit(frame)
min_temp = raw_to_celsius(min_raw)
max_temp = raw_to_celsius(max_raw)
print(f"Min Temperature: {min_temp:.2f}°C, Max Temperature: {max_temp:.2f}°C")

# Draw hotspots on the frame
min_loc = np.unravel_index(np.argmin(frame), frame.shape)
max_loc = np.unravel_index(np.argmax(frame), frame.shape)
cv.circle(frame_8bit, min_loc[::-1], 5, (255, 0, 0), 2)
cv.circle(frame_8bit, max_loc[::-1], 5, (0, 0, 255), 2)
cv.putText(frame_8bit, f"Cold: {min_temp:.2f}C", min_loc[::-1], cv.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 1)
cv.putText(frame_8bit, f"Hot: {max_temp:.2f}C", max_loc[::-1], cv.FONT_HERSHEY_SIMPLEX, 0.5, (0, 0, 255), 1)

return frame_8bit

def create_ir_pipeline(device, width, height, stream_url, file_prefix, timestamp):
“”“Create IR processing pipeline.”“”
pipeline_description = (
f"v4l2src device={device} ! video/x-raw,format=GRAY16_LE,width={width},height={height},framerate=30/1 ! "
“videoconvert ! appsink name=sink emit-signals=True sync=false”
)
return Gst.parse_launch(pipeline_description)

def create_rgb_pipeline(device, width, height, stream_url, file_prefix, timestamp):
“”“Create RGB processing pipeline.”“”
pipeline_description = (
f"v4l2src io-mode=4 device={device} do-timestamp=true ! "
f"video/x-raw, width={width}, height={height}, framerate=30/1 ! clockoverlay halignment=left valignment=bottom time-format= "%Y-%m-%d %H:%M:%S" font-desc=‘Sans, 36’! timeoverlay halignment=right valignment=bottom text= "Stream time:" font-desc=‘Sans, 24’ ! "
"tee name=t ! "
"queue ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
"tee name=l ! "
f"queue ! flvmux ! rtmpsink location=‘{stream_url} live=1’ "
"l. ! "
f"queue ! qtmux ! filesink location=/home/nvidia/Desktop/{file_prefix}_{timestamp}.mp4 "
"t. ! "
“queue leaky=1 ! xvimagesink sync=false”
)
return Gst.parse_launch(pipeline_description)

def on_message_rgb(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}“)
print(f"Debugging information: {debug_info if debug_info else ‘none’}”)
rgb_pipeline.set_state(Gst.State.NULL)
elif msg_type == Gst.MessageType.EOS:
print(“End-Of-Stream reached”)
rgb_pipeline.set_state(Gst.State.NULL)

def on_message_ir(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}“)
print(f"Debugging information: {debug_info if debug_info else ‘none’}”)
ir_pipeline.set_state(Gst.State.NULL)
elif msg_type == Gst.MessageType.EOS:
print(“End-Of-Stream reached”)
ir_pipeline.set_state(Gst.State.NULL)

def on_message_encode_save(bus, message, loop):
msg_type = message.type
if msg_type == Gst.MessageType.ERROR:
err, debug_info = message.parse_error()
print(f"Error received from element {message.src.get_name()}: {err.message}“)
print(f"Debugging information: {debug_info if debug_info else ‘none’}”)
encode_save_pipeline.set_state(Gst.State.NULL)
elif msg_type == Gst.MessageType.EOS:
print(“End-Of-Stream reached”)
encode_save_pipeline.set_state(Gst.State.NULL)

def new_sample(sink, data):
“”“Handle new samples from IR pipeline.”“”
global appsrc, width, height

sample = sink.emit('pull-sample')
buf = sample.get_buffer()


# Extract frame data from buffer
array = np.ndarray(
    (height, width),
    buffer=buf.extract_dup(0, buf.get_size()),
    dtype=np.uint16
)

# Handle the frame (convert, annotate, etc.)
frame_8bit = handle_frame(array)

# Convert frame to bytes
frame_bytes = frame_8bit.tobytes()

# Push the frame bytes to appsrc for display
buffer_size = len(frame_bytes)
gst_buffer = Gst.Buffer.new_allocate(None, buffer_size, None)
gst_buffer.fill(0, frame_bytes)

# Push the buffer to appsrc
appsrc.emit("push-buffer", gst_buffer)

# Push the frame bytes to encode_src for encoding and saving
encode_src.emit("push-buffer", gst_buffer)

return Gst.FlowReturn.OK

def start_recording():
global is_recording, rgb_pipeline, ir_pipeline, loop, display_pipeline, appsrc, encode_src, encode_save_pipeline

current_timestamp = datetime.datetime.now().strftime("%Y-%m-%d_%H-%M-%S")

# Create RGB and IR pipelines
rgb_pipeline = create_rgb_pipeline('/dev/video0', 1920, 1080, www.google.com', 'RGB', current_timestamp)
ir_pipeline = create_ir_pipeline('/dev/video1', width, height, www.google.com', 'IR', current_timestamp)

# Create encode and save pipeline
file_path = f"/home/nvidia/Desktop/{current_timestamp}.mp4"
encode_save_pipeline_desc = (
    f"appsrc name=encode_src ! videoconvert ! nvvidconv ! nvv4l2h264enc bitrate=5000000 ! h264parse ! "
    f"qtmux ! filesink location={file_path}"
)
encode_save_pipeline = Gst.parse_launch(encode_save_pipeline_desc)

# Set up bus connections for pipelines
bus_rgb = rgb_pipeline.get_bus()
bus_ir = ir_pipeline.get_bus()
bus_encode_save = encode_save_pipeline.get_bus()

bus_rgb.add_signal_watch()
bus_ir.add_signal_watch()
bus_encode_save.add_signal_watch()

bus_rgb.connect("message", on_message_rgb, loop)
bus_ir.connect("message", on_message_ir, loop)
bus_encode_save.connect("message", on_message_encode_save, loop)

# Connect IR pipeline to handle new samples
ir_sink = ir_pipeline.get_by_name('sink')
ir_sink.connect('new-sample', new_sample, None)

# Start pipelines
rgb_pipeline.set_state(Gst.State.PLAYING)
ir_pipeline.set_state(Gst.State.PLAYING)
encode_save_pipeline.set_state(Gst.State.PLAYING)

# Setup display pipeline
display_pipeline = Gst.parse_launch("appsrc name=src ! tee name=t ! queue leaky=1 ! videoconvert ! xvimagesink sync=false")
appsrc = display_pipeline.get_by_name("src")
caps_str = f"video/x-raw,format=GRAY8,width={width},height={height}"
caps = Gst.Caps.from_string(caps_str)
appsrc.set_property("caps", caps)
display_pipeline.set_state(Gst.State.PLAYING)

# Get encode_src from encode_save_pipeline
encode_src = encode_save_pipeline.get_by_name("encode_src")

print("Starting RGB, IR, and Encode/Save pipelines...")

def stop_recording():
global is_recording, rgb_pipeline, ir_pipeline, display_pipeline, encode_save_pipeline

if rgb_pipeline:
    rgb_pipeline.send_event(Gst.Event.new_eos())
    time.sleep(1)
    rgb_pipeline.set_state(Gst.State.NULL)
    print("RGB stopped")

if ir_pipeline:
    ir_pipeline.send_event(Gst.Event.new_eos())
    time.sleep(1)
    ir_pipeline.set_state(Gst.State.NULL)
    print("IR stopped")

if display_pipeline:
    display_pipeline.send_event(Gst.Event.new_eos())
    time.sleep(1)
    display_pipeline.set_state(Gst.State.NULL)
    display_pipeline = None
    print("Display pipeline stopped")

if encode_save_pipeline:
    encode_save_pipeline.send_event(Gst.Event.new_eos())
    time.sleep(1)
    encode_save_pipeline.set_state(Gst.State.NULL)
    encode_save_pipeline = None
    print("Encode/Save pipeline stopped")

def main():

print("Starting demo now! Press CTRL+C to exit")
start_recording()
try:
    loop.run()
except KeyboardInterrupt:
    print("Stopping pipelines...")
    stop_recording()
    loop.quit()
    print("Pipelines stopped.")

if name == ‘main’:
main()

Blockquote

Blockquote

the Error that i get:

Error received from element encode_src: Internal data stream error.
Debugging information: gstbasesrc.c(3055): gst_base_src_loop (): /GstPipeline:pipeline2/GstAppSrc:encode_src:
streaming stopped, reason not-negotiated (-4)