I have boosted clocks, it does not affect the speed. My code is as follows:
import cv2
import threading
import numpy as np
import time
class CSI_Camera:
def init(self):
self.video_capture = None
self.frame = None
self.grabbed = False
self.read_thread = None
self.read_lock = threading.Lock()
self.running = False
self.zoom_factor = 1.0
self.pan_x = 0
self.pan_y = 0
self.last_time = time.time()
self.fps = 0
self.writer = None
self.is_recording = False
def open(self, gstreamer_pipeline_string):
try:
self.video_capture = cv2.VideoCapture(
gstreamer_pipeline_string, cv2.CAP_GSTREAMER
)
self.grabbed, self.frame = self.video_capture.read()
if self.grabbed:
self.frame_height, self.frame_width = self.frame.shape[:2]
except RuntimeError:
self.video_capture = None
print("Unable to open camera")
print("Pipeline: " + gstreamer_pipeline_string)
def start(self):
if self.running:
print('Video capturing is already running')
return None
if self.video_capture is not None:
self.running = True
self.read_thread = threading.Thread(target=self.updateCamera)
self.read_thread.start()
return self
def stop(self):
self.running = False
if self.read_thread is not None:
self.read_thread.join()
self.read_thread = None
if self.is_recording:
self.stop_recording()
def updateCamera(self):
while self.running:
try:
grabbed, frame = self.video_capture.read()
if grabbed:
with self.read_lock:
self.grabbed = grabbed
self.frame = frame
current_time = time.time()
self.fps = int(1 / (current_time - self.last_time))
self.last_time = current_time
if self.is_recording and self.writer is not None:
self.writer.write(frame)
except RuntimeError:
print("Could not read image from camera")
def read(self):
with self.read_lock:
frame = self.frame.copy() if self.frame is not None else None
grabbed = self.grabbed
return grabbed, frame
def release(self):
if self.video_capture is not None:
self.video_capture.release()
self.video_capture = None
if self.read_thread is not None:
self.read_thread.join()
def start_recording(self, filename):
with self.read_lock:
fourcc = cv2.VideoWriter_fourcc(*'MJPG')
self.writer = cv2.VideoWriter(filename, fourcc, self.fps, (self.frame_width, self.frame_height))
self.is_recording = True
def stop_recording(self):
with self.read_lock:
if self.writer:
self.writer.release()
self.writer = None
self.is_recording = False
time.sleep(0.1)
def apply_zoom_pan(self):
with self.read_lock:
if self.frame is not None:
center_x, center_y = int(self.frame.shape[1] / 2), int(self.frame.shape[0] / 2)
new_width = int(self.frame.shape[1] / self.zoom_factor)
new_height = int(self.frame.shape[0] / self.zoom_factor)
x1 = max(min(center_x - new_width // 2 + self.pan_x, self.frame.shape[1] - new_width), 0)
y1 = max(min(center_y - new_height // 2 + self.pan_y, self.frame.shape[0] - new_height), 0)
x2 = x1 + new_width
y2 = y1 + new_height
cropped_frame = self.frame[y1:y2, x1:x2]
resized_frame = cv2.resize(cropped_frame, (self.frame.shape[1], self.frame.shape[0]))
font = cv2.FONT_HERSHEY_SIMPLEX
cv2.putText(resized_frame, f'FPS: {self.fps}', (10, 30), font, 1, (0, 0, 255), 2, cv2.LINE_AA)
return resized_frame
else:
return None
def gstreamer_pipeline(
sensor_id=0,
capture_width=4056,
capture_height=3040,
display_width=1920,
display_height=1080,
framerate=60,
flip_method=0,
):
return (
f"nvarguscamerasrc sensor-id={sensor_id} ! "
f"video/x-raw(memory:NVMM), width=(int){capture_width}, height=(int){capture_height}, "
f"framerate=(fraction){framerate}/1 ! nvvidconv flip-method={flip_method} ! "
f"video/x-raw, width=(int){display_width}, height=(int){display_height}, format=(string)BGRx ! "
f"videoconvert ! video/x-raw, format=(string)BGR ! appsink"
)
def run_cameras():
window_title = “Dual CSI Cameras”
display_width = 1920
display_height = 2160
left_camera = CSI_Camera()
right_camera = CSI_Camera()
left_camera.open(
gstreamer_pipeline(
sensor_id=0,
capture_width=4056,
capture_height=3040,
display_width=display_width,
display_height=display_height,
)
)
right_camera.open(
gstreamer_pipeline(
sensor_id=1,
capture_width=4056,
capture_height=3040,
display_width=display_width,
display_height=display_height,
)
)
left_camera.start()
right_camera.start()
if left_camera.video_capture.isOpened() and right_camera.video_capture.isOpened():
cv2.namedWindow(window_title, cv2.WINDOW_NORMAL)
cv2.resizeWindow(window_title, 3840, 2160)
try:
while True:
_, left_image = left_camera.read()
_, right_image = right_camera.read()
left_image = left_camera.apply_zoom_pan()
right_image = right_camera.apply_zoom_pan()
if left_image is not None and right_image is not None:
combined_image = np.hstack((left_image, right_image))
cv2.imshow(window_title, combined_image)
keyCode = cv2.waitKey(30) & 0xFF
if keyCode == ord('z'):
left_camera.zoom_factor *= 1.1
right_camera.zoom_factor *= 1.1
elif keyCode == ord('x'):
left_camera.zoom_factor /= 1.1
right_camera.zoom_factor /= 1.1
elif keyCode == ord('w'):
left_camera.pan_y -= 10
right_camera.pan_y -= 10
elif keyCode == ord('s'):
left_camera.pan_y += 10
right_camera.pan_y += 10
elif keyCode == ord('a'):
left_camera.pan_x -= 10
right_camera.pan_x -= 10
elif keyCode == ord('d'):
left_camera.pan_x += 10
right_camera.pan_x += 10
elif keyCode == ord('r'):
if not left_camera.is_recording:
left_camera.start_recording("output.avi")
elif keyCode == ord('c'):
if left_camera.is_recording:
left_camera.stop_recording()
elif keyCode == 27: # ESC key
break
finally:
left_camera.stop()
left_camera.release()
right_camera.stop()
right_camera.release()
cv2.destroyAllWindows()
else:
print("Error: Unable to open both cameras")
left_camera.stop()
left_camera.release()
right_camera.stop()
right_camera.release()
if name == “main”:
run_cameras()