Here’s the gst string i use:
rtspsrc location=rtsp://admin:@192.168.168.168 latency=0 ! rtph264depay ! h264parse ! omxh264dec ! nvvidconv ! video/x-raw, width=(int)3072, height=(int)1728, format=(string)BGRx ! videoconvert ! appsink drop=1
Here’s the python3 code I use to create cv2.VideoCapture with gstreamer:
def _create_jetson_nano_v_cap(self) -> cv2.VideoCapture:
if not self.has_watermark:
gst_str = f"rtspsrc location={self.uri} latency={self.latency} " \
f"! rtph264depay " \
f"! h264parse " \
f"! omxh264dec " \
f"! nvvidconv " \
f"! video/x-raw, width=(int){self.width}, height=(int){self.height}, format=(string)BGRx " \
f"! videoconvert " \
f"! appsink drop=1"
else:
top = 140
left = 0
right = self.width
bottom = 2070
gst_str = f"rtspsrc location={self.uri} latency={self.latency} " \
f"! rtph264depay " \
f"! h264parse " \
f"! omxh264dec " \
f"! nvvidconv top={top} left={left} right={right} bottom={bottom} " \
f"! video/x-raw, width=(int){self.width}, height=(int){self.height}, format=(string)BGRx " \
f"! videoconvert " \
f"! appsink drop=1"
msg = f"ip camera gst_str :\n{gst_str}\n"
self.logger.info(msg)
v_cap = cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
msg = f"video capture created"
self.logger.info(msg)
return v_cap
After the long run of the camera running, the error will occur.
The full version of IPCamera python class:
class IPCamera(ICHASEEdge):
CLS_EMOJI = '📷'
DEFAULT_W = 3840
DEFAULT_H = 2160
def __init__(self,
uri='rtsp://admin:@192.168.168.168',
width=3840, height=2160,
latency=0,
none_frame_thresh=15,
scale_fact=None,
has_watermark=False,
log_lv=AIBR.LOG.LV,
log_path: Union[pathlib.Path, str] = 'default'):
super().__init__()
self.uri = uri
self.width = width
self.height = height
self.latency = latency
self.none_frame_thresh = none_frame_thresh
self.scale_fact = scale_fact
self.has_watermark = has_watermark
self.log_lv = log_lv
self.log_path = log_path
if self.scale_fact is not None:
self.width = int(self.DEFAULT_W * self.scale_fact)
self.height = int(self.DEFAULT_H * self.scale_fact)
# setting logger
self.logger = get_cls_instance_logger(self, log_lv=self.log_lv, log_path=self.log_path)
self.frame = None
self.frame_width = None
self.frame_height = None
self.v_cap = None
self.is_opened = False
self.is_broken = False
self.thread_run_flag = False
self.thread = None
# self.thread_lock = threading.Lock()
self.release_flag = False
self._launch()
self.none_frame_count = 0
self.logger.info('initialized')
def _create_jetson_nano_v_cap(self) -> cv2.VideoCapture:
if not self.has_watermark:
gst_str = f"rtspsrc location={self.uri} latency={self.latency} " \
f"! rtph264depay " \
f"! h264parse " \
f"! omxh264dec " \
f"! nvvidconv " \
f"! video/x-raw, width=(int){self.width}, height=(int){self.height}, format=(string)BGRx " \
f"! videoconvert " \
f"! appsink drop=1"
else:
top = 140
left = 0
right = self.width
bottom = 2070
gst_str = f"rtspsrc location={self.uri} latency={self.latency} " \
f"! rtph264depay " \
f"! h264parse " \
f"! omxh264dec " \
f"! nvvidconv top={top} left={left} right={right} bottom={bottom} " \
f"! video/x-raw, width=(int){self.width}, height=(int){self.height}, format=(string)BGRx " \
f"! videoconvert " \
f"! appsink drop=1"
msg = f"ip camera gst_str :\n{gst_str}\n"
self.logger.info(msg)
v_cap = cv2.VideoCapture(gst_str, cv2.CAP_GSTREAMER)
msg = f"video capture created"
self.logger.info(msg)
return v_cap
def _open_camera(self) -> True:
# with self.thread_lock:
if self.v_cap is not None:
raise RuntimeError('camera is already opened!')
self.v_cap = self._create_jetson_nano_v_cap()
if not self.v_cap.isOpened():
msg = 'video capture is not opened'
self.logger.error(msg)
raise RuntimeError('cannot open camera if VideoCapture is not opened!')
_, self.frame = self.v_cap.read()
if self.frame is None:
msg = 'VideoCapture.read() returns no image'
self.logger.error(msg)
self.is_opened = False
return False
self.is_opened = True
self.frame_height, self.frame_width, _ = self.frame.shape
assert self.frame_width is not None and self.frame_width is not None
return True
def _launch(self):
assert self._open_camera()
assert not self.thread_run_flag
self.thread_run_flag = True
self.thread = threading.Thread(target=self._update_frame)
self.thread.start()
msg = f'camera launched successfully'
self.logger.debug(msg)
def _update_frame(self):
while self.thread_run_flag:
_, self.frame = self.v_cap.read()
# self.frame = None # !@#
if self.frame is None:
if not self.release_flag:
self.none_frame_count += 1
msg = f"fetched None frame count = {self.none_frame_count}"
# print(str(self.none_frame_count).zfill(6), end='\r') # !@#
self.logger.warning(msg)
if self.none_frame_count > self.none_frame_thresh:
self.is_broken = True
msg = f"fetched None frame count exceed thresh, prepare to stop update frame thread"
self.logger.warning(msg)
self.thread_run_flag = False
break
else:
self.none_frame_count = 0
self.thread_run_flag = False
msg = f"updating frame loop is break"
self.logger.debug(msg)
def _stop(self):
if self.thread_run_flag:
self.thread_run_flag = False
def read(self):
"""Read a frame from the camera object.
Returns None if the camera runs out of image or error.
"""
if not self.is_opened:
return None
return self.frame
def release(self):
self.release_flag = True
self._stop()
try:
self.v_cap.release()
except:
pass
self.is_opened = False
msg = f"camera released"
self.logger.debug(msg)
def __del__(self):
if not self.release_flag:
self.release()
def isOpened(self):
return self.is_opened
@property
def status(self) -> bool:
if self.is_broken:
return False
msg = f"status check: none_frame_count = {self.none_frame_count}"
return self.none_frame_count < self.none_frame_thresh and self.is_opened
def get_curr_frame_eagerly(self, try_count=3):
for i in range(try_count):
frame = self.read()
if frame is not None:
return frame
self.is_broken = True