Gstreamer - appsrc plugin - RTSP stream
Nvidia Jetson Orin NX,
Python 3.8.10
Below I presented code of simple script, that allow to read frames saved in some directory and stream them out through RTSP, based on Gstreamer and appsrc plugin.
Problem
Stream is unavailable. When trying to connect, the logs shown below appear.
Logs:
0:00:23.079044717 6221 0x1a6d5980 WARN rtspmedia rtsp-media.c:3272:wait_preroll: failed to preroll pipeline
0:00:23.079120655 6221 0x1a6d5980 WARN rtspmedia rtsp-media.c:3652:gst_rtsp_media_prepare: failed to preroll pipeline
0:00:23.082524155 6221 0x1a6d5980 ERROR rtspclient rtsp-client.c:1077:find_media: client 0x1a6ca4c0: can’t prepare media
0:00:23.083076039 6221 0x1a6d5980 ERROR rtspclient rtsp-client.c:2963:handle_describe_request: client 0x1a6ca4c0: no media
Problem could be related to incorrectness pipeline or data delivery to appsrc Gstreamer plugin? Please about any suggestions.
'''
#!/usr/bin/env python3
import cv2
import gi
import signal
from typing import List
import os
import time
import logging
gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import Gst, GLib, GObject, GstRtspServer
GObject.threads_init()
Gst.init(None)
Gst.debug_set_default_threshold(Gst.DebugLevel.DEBUG)
# logs
logging.basicConfig(level=logging.DEBUG, filename='app.log', filemode='w')
logger = logging.getLogger(__name__)
class FrameProvider:
def __init__(self, folder_path: str, img_format: str = 'jpg'):
self.folder_path = folder_path
self.img_format = img_format
self.image_index = 0
self.files = self.get_files(folder_path)
# function generate full path to images in folder
def get_files(self, folder_path) -> List[str]:
files_path_list = []
start_time = time.time()
for i in os.listdir(folder_path):
if i.endswith(f'.{self.img_format}'):
full_path = os.path.join(folder_path, i)
files_path_list.append(full_path)
#print(full_path)
end_time = time.time()
print(f'Number of loaded paths : {len(files_path_list)}, loading time: {end_time-start_time}')
return files_path_list
def get_frame_tobytes(self) -> bytes:
logger.debug('Wywolanie get_frame_tobytes()')
if self.image_index < len(self.files):
frame = cv2.imread(self.files[self.image_index])
self.image_index += 1
flag_jpg, encoded_image = cv2.imencode(f'.{self.img_format}', frame)
if not flag_jpg:
logger.error('FrameProvider - get_frame_tobytes() : Unable to frame encode.')
raise ValueError('FrameProvider - get_frame_tobytes() : Unable to frame encode.')
encoded_image_bytes = encoded_image.tobytes()
logger.debug(f'FrameProvider - get_frame_tobytes() : Successfully frame encoded, buffer size: {len(encoded_image_bytes)} .')
return encoded_image_bytes
else:
logger.debug('FrameProvider - get_frame_tobytes() : All frames have been processed. ')
return None # return none if all fiels have been processed
class CustomRTSPMediaFactory(GstRtspServer.RTSPMediaFactory):
def __init__(self, fps, frame_provider, **properties):
# super(CustomRTSPMediaFactory, self).__init__(**properties)
super().__init__(**properties)
self.frame_provider = frame_provider
self.duration = 1/fps * Gst.SECOND
self.number_frames = 0
self.launch_string = f'appsrc name=source is-live=true block=true format=GST_FORMAT_TIME caps=video/x-raw,format=BGR,width=1936,height=1216,framerate={fps}/1 ! videoconvert ! video/x-raw,format=NV12 ! x264enc speed-preset=ultrafast tune=zerolatency ! rtph264pay name=pay0'
def do_create_element(self, url):
return Gst.parse_launch(self.launch_string)
def do_create(self, rtsp_media):
appsrc = rtsp_media.get_element().get_child_by_name('source')
appsrc.connect('need-data', self.on_need_data)
def on_need_data(self, src, length):
logger.debug('CustomRTSPMediaFactory - on_need_data() : method calling')
try:
frame = self.frame_provider.get_frame_tobytes()
if frame:
buf = Gst.Buffer.new_allocate(None, len(frame), None)
buf.fill(0, frame)
buf.duration = self.duration
timestamp = self.number_frames * self.duration
buf.pts = buf.dts = int(timestamp)
buf.offset = timestamp
self.number_frames += 1
retval = src.emit('push-buffer', buf)
if retval != Gst.FlowReturn.OK:
logger.error(f'CustomRTSPMediaFactory - on_need_data() : Error pushing buffer {retval}')
else:
logger.debug('CustomRTSPMediaFactory - on_need_data() : Buffer pushed successfully')
except Exception as e:
logger.error(f'CustomRTSPMediaFactory - on_need_data() : Error : {e}')
def main():
dir_path = '/home/recomputer/test_images'
frame_provider = FrameProvider(dir_path, 'jpg') # Set 10 fps for example frame provider
server = GstRtspServer.RTSPServer()
factory = CustomRTSPMediaFactory(10, frame_provider)
factory.set_shared(True)
server.get_mount_points().add_factory("/stream", factory) # Stream available under rtsp://<server_address>:8554/stream
server.attach(None)
signal.signal(signal.SIGINT, signal.SIG_DFL) # Allow to close server with using Ctrl+C
GLib.MainLoop().run()
if __name__ == "__main__":
main()
'''