USB camera works with pipeline in terminal but not in python with opencv

Right now im enrolled in a machine learning project with artificial vision, one of the challenges is reduce the capture speed of the camera, thats why im trying to use cap = cv2.VideoCapture(pipeline, cv2.CAP_GSTREAMER) for taking real time pictures, but when i check if the camera is running with

if not cap.isOpened():
    print("Error: The camera could not be opened")
    exit()

always i get that message, im using this pipeline, that works for me when i run it in bash

~$ gst-launch-1.0 v4l2src device=/dev/video0 ! image/jpeg, width=960, height=720, framerate=90/1 ! jpegdec ! videoconvert ! autovideosink

that pipeline works perfectly and display real time video, i just want to use it with VideoCapture(pipeline,cv2.CAP_GSTREAMER) to get frames with cap.read()

some important information is that im using MJPG format for video, because i can reach the fastest fps with my camera, here the dev info:

ioctl: VIDIOC_ENUM_FMT
Type: Video Capture

[0]: 'MJPG' (Motion-JPEG, compressed)

	Size: Discrete 960x720
		Interval: Discrete 0.011s (90.000 fps)
		Interval: Discrete 0.017s (60.000 fps)
		Interval: Discrete 0.033s (30.000 fps)
		Interval: Discrete 0.040s (25.000 fps)
		Interval: Discrete 0.050s (20.000 fps)
		Interval: Discrete 0.067s (15.000 fps)
		Interval: Discrete 0.100s (10.000 fps)
		Interval: Discrete 0.200s (5.000 fps)

if someone can help me writing a simple python code for open the cam and display a frame it would be of great help

im in a Jetson Orin Nano Super Dev kit
jetpack 6.2

Hello @aristi0207,

Hope everything is going great.

We took the freedom to write a sample python script for you.
Unfortunately I did not have access to a Jetson board during the weekend, so I tried it on my Linux PC. Still, I would not expect any major changes on the script for it to work on your Jetson board. Please just make sure to set the correct device for the v4l2src according to your setup.

If everything works as expected on your end, please let us know if you need further assistance optimizing the script for the NVIDIA Jetson platform. There might be some improvements that could be made on the pipeline for it to run smoother.

Please give it a test and keep us posted on the results.

import gi
import cv2
import numpy as np
import sys

gi.require_version("Gst", "1.0")
gi.require_version("GLib", "2.0")
from gi.repository import Gst, GLib

class Camera:
    def __init__(self, width=640, height=480, fps=30):
        """Initialize the GStreamer pipeline and elements."""
        Gst.init(None)  # Initialize GStreamer

        self.width = width
        self.height = height
        self.fps = fps
        self.frame = None  # Placeholder for the latest frame
        self.running = True  # Flag to track if the loop should run

        # Define GStreamer pipeline
        self.pipeline_str = (
            f"v4l2src device=/dev/video6 ! "
            f"image/jpeg, width={self.width}, height={self.height}, framerate={self.fps}/1 ! "
            "jpegdec ! "
            "videoconvert ! "
            "videoscale ! "
            f"video/x-raw,format=BGR,width={self.width},height={self.height} ! "
            "appsink name=sink emit-signals=true max-buffers=1 drop=true"
        )

        # Create pipeline
        self.pipeline = Gst.parse_launch(self.pipeline_str)
        self.appsink = self.pipeline.get_by_name("sink")

        if not self.appsink:
            print("Error: Could not find appsink element.")
            sys.exit(1)

        # Connect new-sample signal
        self.appsink.connect("new-sample", self.on_new_sample)

        # Start pipeline
        self.pipeline.set_state(Gst.State.PLAYING)

        # Start GLib main loop
        self.loop = GLib.MainLoop()

        # Add a timer to update OpenCV display
        GLib.timeout_add(30, self.update_display)

    def on_new_sample(self, sink):
        """Callback function for processing new samples."""
        sample = sink.emit("pull-sample")
        if sample is None:
            return Gst.FlowReturn.ERROR

        buffer = sample.get_buffer()
        success, map_info = buffer.map(Gst.MapFlags.READ)
        if not success:
            return Gst.FlowReturn.ERROR

        # Convert buffer to NumPy array
        frame_data = np.frombuffer(map_info.data, dtype=np.uint8)
        buffer.unmap(map_info)

        try:
            self.frame = frame_data.reshape((self.height, self.width, 3))
        except ValueError:
            print("Frame size mismatch, skipping frame.")

        return Gst.FlowReturn.OK

    def update_display(self):
        """Update OpenCV display and check if the window is closed."""
        if self.frame is not None:
            cv2.imshow("GStreamer Video", self.frame)
            
            # Check if OpenCV window is closed
            if cv2.getWindowProperty("GStreamer Video", cv2.WND_PROP_VISIBLE) < 1:
                self.stop()
                return False  # Stop updating

            if cv2.waitKey(1) & 0xFF == ord("q"):
                self.stop()
                return False  # Stop updating

        return True  # Continue updating

    def start(self):
        """Run the GStreamer pipeline loop."""
        try:
            self.loop.run()
        except KeyboardInterrupt:
            self.stop()

    def stop(self):
        """Stop the GStreamer pipeline and cleanup."""
        if self.running:
            self.running = False
            print("Stopping camera...")
            self.pipeline.set_state(Gst.State.NULL)
            cv2.destroyAllWindows()
            self.loop.quit()

if __name__ == "__main__":
    camera = Camera(width=640, height=480, fps=30)
    camera.start()

best regards,
Andrew
Embedded Software Engineer at ProventusNova

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.