The following code throws an error. The objective is to capture a bit of video, play it back in one thread and write to disk in another. The error occurs just on opening the camera, but the camera works as expected with gst-launch. ( gst-launch-1.0 v4l2src device=/dev/video0 ! image/jpeg, width=1280, height=720, framerate=90/1, format=MJPG ! nvv4l2decoder mjpeg=1 ! nvvidconv ! xvimagesink )
The error is:
Traceback (most recent call last):
File “recordplayback.py”, line 123, in
processor = VideoProcessor()
File “recordplayback.py”, line 19, in init
self.src.set_property(“device”, “/dev/video0”)
I can’t figure out what i am missing.
Jetson TX2, USB Camera
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject
import threading
import cv2
# Initialize GStreamer
Gst.init(None)
class VideoProcessor:
def __init__(self):
self.pipeline = Gst.Pipeline()
# Video source (nvv4l2src for NVIDIA Jetson)
self.src = Gst.ElementFactory.make("nvv4l2src", "src")
self.src.set_property("device", "/dev/video0")
# Video processing elements
self.queue1 = Gst.ElementFactory.make("queue", "queue1")
self.queue2 = Gst.ElementFactory.make("queue", "queue2")
self.videoconvert1 = Gst.ElementFactory.make("videoconvert", "videoconvert1")
self.videoconvert2 = Gst.ElementFactory.make("videoconvert", "videoconvert2")
# Video sink for display (nvvideosink for NVIDIA Jetson)
self.sink = Gst.ElementFactory.make("nvvideosink", "sink")
# Video sink for writing to file
self.filesink = Gst.ElementFactory.make("filesink", "filesink")
self.filesink.set_property("location", "output.mp4")
# Add elements to the pipeline
self.pipeline.add(self.src)
self.pipeline.add(self.queue1)
self.pipeline.add(self.videoconvert1)
self.pipeline.add(self.queue2)
self.pipeline.add(self.videoconvert2)
self.pipeline.add(self.sink)
self.pipeline.add(self.filesink)
# Link elements
self.src.link(self.queue1)
self.queue1.link(self.videoconvert1)
self.videoconvert1.link(self.queue2)
self.queue2.link(self.videoconvert2)
tee = Gst.ElementFactory.make("tee", "tee")
self.pipeline.add(tee)
self.videoconvert2.link(tee)
tee.link(self.sink)
tee.link(self.filesink)
# Set up bus to handle messages
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_message)
# Create threads for playback and file writing
self.playback_thread = threading.Thread(target=self.playback)
self.file_write_thread = threading.Thread(target=self.write_to_file)
def start(self):
# Set the pipeline to the playing state
self.pipeline.set_state(Gst.State.PLAYING)
# Start threads
self.playback_thread.start()
self.file_write_thread.start()
# Run the main loop
loop = GObject.MainLoop()
loop.run()
def playback(self):
# Create a VideoCapture object using OpenCV for playback
cap = cv2.VideoCapture("appsrc ! videoconvert ! autovideosink")
while True:
ret, frame = cap.read()
if not ret:
break
cv2.imshow("Playback", frame)
cv2.waitKey(1)
cap.release()
def write_to_file(self):
# Create a VideoWriter object using OpenCV for writing to file
fourcc = cv2.VideoWriter_fourcc(*"mp4v")
out = cv2.VideoWriter("output.mp4", fourcc, 20.0, (640, 480))
# Create a VideoCapture object using OpenCV for file writing
cap = cv2.VideoCapture("appsrc ! videoconvert ! autovideosink")
while True:
ret, frame = cap.read()
if not ret:
break
out.write(frame)
out.release()
cap.release()
def on_message(self, bus, message):
t = message.type
if t == Gst.MessageType.EOS:
print("End of Stream")
self.pipeline.set_state(Gst.State.NULL)
GObject.MainLoop().quit()
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
print(f"Error: {err}, {debug}")
self.pipeline.set_state(Gst.State.NULL)
GObject.MainLoop().quit()
if __name__ == "__main__":
processor = VideoProcessor()
processor.start()