I am trying to read the drone RTSP stream through Video capture GStreamer appsink and adding the overlay text to the frame. I am pushing the processed frame to the rtmp endpoint using Video writer.
To decode the RTSP src, I am using h264parse and nvv4l2decoder. Attaching the sample code for the reference
#!/usr/bin/env python
import cv2
import gi
import numpy as np
from datetime import datetime
import json
import time
import hashlib
gi.require_version('Gst', '1.0')
from gi.repository import Gst
class Video():
def __init__(self, port=5600):
Gst.init(None)
self.port = port
self._frame = None
self.video_source = 'rtspsrc location=rtsp://10.0.1.0:5504'
self.video_codec = '! application/x-rtp, payload=96 ! rtph264depay ! h264parse '
self.video_decode = \
'! nvv4l2decoder ! nvvidconv ! video/x-raw, format=BGRx ! videoconvert ! video/x-raw,format=BGR'
self.video_sink_conf = \
'! appsink emit-signals=true sync=false max-buffers=1 drop=true'
self.video_pipe = None
self.video_sink = None
self.run()
def start_gst(self, config=None):
if not config:
config = \
[
'videotestsrc ! decodebin',
'! videoconvert ! video/x-raw,format=(string)BGR ! videoconvert',
'! appsink'
]
command = ' '.join(config)
print (command)
self.video_pipe = Gst.parse_launch(command)
self.video_pipe.set_state(Gst.State.PLAYING)
self.video_sink = self.video_pipe.get_by_name('appsink0')
@staticmethod
def gst_to_opencv(sample):
buf = sample.get_buffer()
caps = sample.get_caps()
array = np.ndarray(
(
caps.get_structure(0).get_value('height'),
caps.get_structure(0).get_value('width'),
3
),
buffer=buf.extract_dup(0, buf.get_size()), dtype=np.uint8)
return array
def frame(self):
""" Get Frame
Returns:
iterable: bool and image frame, cap.read() output
"""
return self._frame
def frame_available(self):
"""Check if frame is available
Returns:
bool: true if frame is available
"""
return type(self._frame) != type(None)
def run(self):
""" Get frame to update _frame
"""
self.start_gst(
[
self.video_source,
self.video_codec,
self.video_decode,
self.video_sink_conf
])
self.video_sink.connect('new-sample', self.callback)
def callback(self, sink):
sample = sink.emit('pull-sample')
new_frame = self.gst_to_opencv(sample)
print (hashlib.md5(new_frame).hexdigest())
self._frame = new_frame
#print (hashlib.md5(self._frame).hexdigest()+"------"+hashlib.md5(new_frame).hexdigest())
return Gst.FlowReturn.OK
if __name__ == '__main__':
# Create the video object
# Add port= if is necessary to use a different one
scale = 1
font = cv2.FONT_HERSHEY_PLAIN
xMargin = 10
yMarginBottom = 10
yMarginTop = 20
updatedTime = datetime.now()
video = Video()
out_send = cv2.VideoWriter("appsrc ! video/x-raw, format=BGR ! videoconvert ! nvvidconv ! nvv4l2h264enc ! video/x-h264, stream-format=byte-stream ! h264parse ! flvmux ! rtmpsink location=rtmp://*.*.*.*:1935/live", cv2.CAP_GSTREAMER, 0, 13.0, (1280,720))
while True:
# Wait for the next frame
if not video.frame_available():
#print ()
continue
frame = video.frame()
cv2.putText(frame, updatedTime, (vechileBatAndSatAlignX, vechileBatAndSatAlignY), font, scale, (255,255,255), 1, cv2.LINE_4)
# cv2.imshow('frame', frame)
out_send.write(frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
While flying the drone the code randomly fails and returns below error.
The error occurs randomly, CPU load was stable during that time. Also, there is no network interruption. If I restart the code, everything works and it breaks on random time.