Found the cause. The root cause off the topic.
At least, I want to share my code, contribute to the community, as I less saw example for python gst-binding.
For your app, use 2 python threads to feed the enc or dec, then use 2 another thread to wait the result. You also can apply FIFO queue to improve the code.
import numpy as np
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from threading import Event
class NvJpegEnc:
def __init__(self, ):
Gst.init(None)
pipeline_str = (
"appsrc name=source block=1 ! "
"videoconvert ! "
"nvvidconv ! nvjpegenc Enableperf=1 ! "
"appsink name=sink emit-signals=true"
)
self.pipeline = Gst.parse_launch(pipeline_str)
self.width = None
self.height = None
self.result = None
self.received = Event()
self.appsrc = self.pipeline.get_by_name("source")
# self.cap = Gst.Caps.from_string(
# f"video/x-raw,format=RGBx,width={self.width},height={self.height},framerate=1"
# )
# self.appsrc.set_property("caps", self.cap)
#self.appsrc.set_property("block", True)
self.appsink = self.pipeline.get_by_name('sink')
self.appsink.connect('new-sample', self.on_new_sample)
#self.pipeline.set_state(Gst.State.PAUSED)
self.pipeline.set_state(Gst.State.PLAYING)
def on_new_sample(self, appsink):
"""Callback function to handle new samples."""
sample = appsink.emit('pull-sample')
print("Received a encode new sample")
if sample:
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
buffer.unmap(map_info)
self.result = map_info.data
else:
self.result = None
self.received.set()
self.received.clear()
return Gst.FlowReturn.OK
def push(self, image):
h,w,c = image.shape
if h != self.height or w != self.width:
self.height = h
self.width = w
self.appsrc.set_property("caps", Gst.Caps.from_string(
f"video/x-raw,format=RGB ,width={w},height={h},framerate=1/1"
))
buffer = Gst.Buffer.new_wrapped(image.tobytes())
self.appsrc.emit('push-buffer', buffer)
def get(self, timeout=1):
self.received.wait(timeout)
return self.result
def clean(self):
self.received.set()
self.received.clear()
self.result = None
self.appsrc.emit('end-of-stream')
self.pipeline.set_state(Gst.State.NULL)
class NvJpegDec:
def __init__(self):
# Initialize GStreamer
Gst.init(None)
pipeline_str = (
"appsrc name=source block=1 ! "
"nvjpegdec Enableperf=1 ! nvvidconv ! video/x-raw,format=RGBA ! "
"appsink name=sink emit-signals=true"
)
self.result = None
self.received = Event()
self.pipeline = Gst.parse_launch(pipeline_str)
self.appsrc = self.pipeline.get_by_name("source")
self.appsrc.set_property("caps", Gst.Caps.from_string(
f"image/jpeg"
))
#self.appsrc.set_property("block", True)
self.appsink = self.pipeline.get_by_name('sink')
self.appsink.connect('new-sample', self.on_new_sample)
#self.pipeline.set_state(Gst.State.PAUSED)
self.pipeline.set_state(Gst.State.PLAYING)
def on_new_sample(self, appsink):
sample = appsink.emit('pull-sample')
print("Received a decode new sample")
if sample:
buffer = sample.get_buffer()
success, map_info = buffer.map(Gst.MapFlags.READ)
if success:
caps = sample.get_caps()
width = caps.get_structure(0).get_value("width")
height = caps.get_structure(0).get_value("height")
format = caps.get_structure(0).get_value("format")
if format == 'RGB' or format == 'BGR':
image = np.ndarray(
shape=(height, width, 3),
buffer=map_info.data,
dtype=np.uint8
)
else:
# for nvvidconv
image = np.ndarray(
shape=(height, width, 4),
buffer=map_info.data,
dtype=np.uint8
)
image = cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)
buffer.unmap(map_info)
self.result = image
else:
self.result = None
self.received.set()
self.received.clear()
return Gst.FlowReturn.OK
def push(self, bytes):
#print('push')
buffer = Gst.Buffer.new_wrapped(bytes)
self.appsrc.emit('push-buffer', buffer)
def get(self, timeout=1):
self.received.wait(timeout)
return self.result
def clean(self):
self.received.set()
self.received.clear()
self.result = None
self.appsrc.emit('end-of-stream')
self.pipeline.set_state(Gst.State.NULL)
# Example usage
if __name__ == "__main__":
import time
import cv2
import datetime
def test():
enc = NvJpegEnc()
dec = NvJpegDec()
img = np.zeros((2160, 3840, 3), dtype=np.uint8)
#img = cv2.imread('')
img = cv2.resize(img, (2160, 3840))
for _ in range(300):
#print(img.shape)
_img = img.copy()
print(_img.shape)
nowstr = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
cv2.putText(_img, nowstr, (200,300), 4, 4, (0))
start = time.perf_counter()
#rtn, encoded = cv2.imencode('.jpg', img)
enc.push(_img)
end = time.perf_counter()
encoded = enc.get()
if encoded is None:
print('encoded none')
continue
print('encoded used', 1/(end-start), (end-start))
start = time.perf_counter()
#decoded = cv2.imdecode(encoded, cv2.IMREAD_ANYCOLOR)
dec.push(encoded)
end = time.perf_counter()
print('decoded used', 1/(end-start), (end-start))
decoded = dec.get()
#print(decoded.shape)
if decoded is None:
print('decoded none')
continue
cv2.imshow('raw1', cv2.resize(_img, (640, 480)))
cv2.imshow('decoded1', cv2.resize(decoded, (640, 480)))
cv2.waitKey(1)
#time.sleep(1/15)
enc.clean()
dec.clean()
cv2.destroyAllWindows()
test()