On the topic of getting the image buffer from gstreamer in python.
try this:
import gi
gi.require_version(‘Gst’, ‘1.0’)
gi.require_version(‘GstVideo’, ‘1.0’)
gi.require_version(‘GstBase’, ‘1.0’)from gi.repository import GLib, GObject, Gst, GstBase, GstVideo
Gst.init(None)
then define the pipeline
pipeline = ('nvarguscamerasrc name=src_camera sensor_id=%d ! ’
'video/x-raw(memory:NVMM), width=(int)%d, height=(int)%d, framerate=(fraction)%d/1 ! ’
'nvvidconv output-buffers=1 ! ’
'video/x-raw, ’
'format=(string)BGRx ! videoconvert ! video/x-raw, ’
‘format=(string)BGR ! appsink max-buffers=1 drop=true name=appsink emit-signals=True’ %(camera_number, frame_width, frame_height, frame_rate))
then set up the pipeline:
gst_pipeline = Gst.parse_launch(pipeline)
Define the callback function:
def on_new_sample(sink):
sample = sink.emit('pull-sample') buffer = sample.get_buffer() caps = sample.get_caps() height = caps.get_structure(0).get_value("height") width = caps.get_structure(0).get_value("width") success, map_info = buffer.map(Gst.MapFlags.READ) if not success: print("Buffer data could not be mapped.") image = np.ndarray( shape=(height, width, 3), dtype=np.uint8, buffer=map_info.data) buffer.unmap(map_info) return Gst.FlowReturn.OK
Attach the call back function:
appsink = gst_pipeline.get_by_name(‘appsink’)
appsink.connect(“new-sample”, on_new_sample)
Start the pipeline:
gst_pipeline.set_state(Gst.State.PLAYING)
These are just the functions you need to run and there is a lot more for you to play around with and explore but with this code every time the pipeline gets a new buffer the call back function will fire and put the BGR image into the numpy array. These actions are async and should be made thread safe.