Python Gstreamer Binding Slow on Appsrc NvJpeg Appsink

JP5.1.4 Ubuntu 20.04 Jetson Orin NX flash from SDKManger with Deepstream

I run gst-launch-1.0 videotestsrc ! video/x-raw, width=3840, height=2160 ! nvjpegenc ! filesink location=output.jpeg and the app read ROS around 0.03 second

However, I write it in python with appsink in, appsrc out, it become 0.07s, which is even slower than opencv imencode or the raw jpegenc in Gst.

I guess it is some gstreamer or python numpy issue.

Also, I tested with pynvjpeg a pip package binding NvJpeg, but same result, very slow.

import gi
import numpy as np
import time

gi.require_version('Gst', '1.0')
from gi.repository import Gst, GObject

Gst.init(None)

class GStreamerApp:
    def __init__(self):
        self.pipeline_str = (
            "appsrc name=source format=3 ! videoconvert ! video/x-raw, width=3840, height=2160 ! "
            "videoconvert ! "
            "nvjpegenc ! "
            "appsink name=sink emit-signals=true"
        )
        self.pipeline = Gst.parse_launch(self.pipeline_str)
        self.appsrc = self.pipeline.get_by_name('source')
        self.appsink = self.pipeline.get_by_name('sink')
        self.appsink.set_property('emit-signals', True)
        self.appsink.connect('new-sample', self.on_new_sample)

        # Set appsrc properties
        self.appsrc.set_property('caps', Gst.Caps.from_string('video/x-raw, format=RGB, width=3840, height=2160'))

    def on_new_sample(self, sink):
        sample = sink.emit('pull-sample')
        # buf = sample.get_buffer()

        return Gst.FlowReturn.OK

    def push_frame(self, frame):
        # Create a GStreamer buffer from the NumPy array
        buffer = Gst.Buffer.new_wrapped(frame.tobytes())
        self.appsrc.emit('push-buffer', buffer)

    def run(self):
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            # Simulate pushing frames into appsrc
            while True:
                # Create a dummy NumPy frame (e.g., a solid color)
                frame = np.full((2160, 3840, 3), fill_value=255, dtype=np.uint8)  # White frame
                self.push_frame(frame)
                # time.sleep(1)  # Adjust the sleep time as needed
        except KeyboardInterrupt:
            pass
        finally:
            self.pipeline.set_state(Gst.State.NULL)

if __name__ == "__main__":
    app = GStreamerApp()
    app.run()

What is ROS?

There are two videoconvert in the second pipeline, why do you compare two different pipeline?

Found the cause. The root cause off the topic.
At least, I want to share my code, contribute to the community, as I less saw example for python gst-binding.

For your app, use 2 python threads to feed the enc or dec, then use 2 another thread to wait the result. You also can apply FIFO queue to improve the code.

import numpy as np
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst

from threading import Event

class NvJpegEnc:
    def __init__(self, ):
        Gst.init(None)

        pipeline_str = (
            "appsrc name=source block=1 ! "
            "videoconvert ! "
            "nvvidconv ! nvjpegenc Enableperf=1 ! "
            "appsink name=sink emit-signals=true"
        )

        self.pipeline = Gst.parse_launch(pipeline_str)

        self.width = None
        self.height = None
        self.result = None
        self.received = Event()

        self.appsrc = self.pipeline.get_by_name("source")
        # self.cap = Gst.Caps.from_string(
        #     f"video/x-raw,format=RGBx,width={self.width},height={self.height},framerate=1"
        # )
        # self.appsrc.set_property("caps", self.cap)
        #self.appsrc.set_property("block", True)

        self.appsink = self.pipeline.get_by_name('sink')
        self.appsink.connect('new-sample', self.on_new_sample)

        #self.pipeline.set_state(Gst.State.PAUSED)
        self.pipeline.set_state(Gst.State.PLAYING)

    def on_new_sample(self, appsink):
        """Callback function to handle new samples."""
        sample = appsink.emit('pull-sample')
        print("Received a encode new sample")
        if sample:
            buffer = sample.get_buffer()
            success, map_info = buffer.map(Gst.MapFlags.READ)
            if success:

                buffer.unmap(map_info)
                self.result = map_info.data
            else:
                self.result = None

            self.received.set()
            self.received.clear()
        return Gst.FlowReturn.OK

    def push(self, image):
        h,w,c = image.shape
        if h != self.height or w != self.width:
            self.height = h
            self.width = w
            self.appsrc.set_property("caps", Gst.Caps.from_string(
                f"video/x-raw,format=RGB ,width={w},height={h},framerate=1/1"
            ))

        buffer = Gst.Buffer.new_wrapped(image.tobytes())
        self.appsrc.emit('push-buffer', buffer)
    
    def get(self, timeout=1):
        self.received.wait(timeout)
        return self.result

    def clean(self):
        self.received.set()
        self.received.clear()
        self.result = None
        self.appsrc.emit('end-of-stream')
        self.pipeline.set_state(Gst.State.NULL)

class NvJpegDec:
    def __init__(self):
        # Initialize GStreamer
        Gst.init(None)

        pipeline_str = (
            "appsrc name=source block=1 ! "
            "nvjpegdec Enableperf=1 ! nvvidconv ! video/x-raw,format=RGBA ! "
            "appsink name=sink emit-signals=true"
        )

        self.result = None
        self.received = Event()

        self.pipeline = Gst.parse_launch(pipeline_str)
        self.appsrc = self.pipeline.get_by_name("source")
        self.appsrc.set_property("caps", Gst.Caps.from_string(
            f"image/jpeg"
        ))
        #self.appsrc.set_property("block", True)

        self.appsink = self.pipeline.get_by_name('sink')
        self.appsink.connect('new-sample', self.on_new_sample)

        #self.pipeline.set_state(Gst.State.PAUSED)
        self.pipeline.set_state(Gst.State.PLAYING)


    def on_new_sample(self, appsink):
        sample = appsink.emit('pull-sample')
        print("Received a decode new sample")
        if sample:
            buffer = sample.get_buffer()
            success, map_info = buffer.map(Gst.MapFlags.READ)
            if success:
                caps = sample.get_caps()
                width = caps.get_structure(0).get_value("width")
                height = caps.get_structure(0).get_value("height")
                format = caps.get_structure(0).get_value("format")


                if format == 'RGB' or format == 'BGR':
                    image = np.ndarray(
                        shape=(height, width, 3), 
                        buffer=map_info.data,
                        dtype=np.uint8
                    )
                else:
                    # for nvvidconv
                    image = np.ndarray(
                        shape=(height, width, 4), 
                        buffer=map_info.data,
                        dtype=np.uint8 
                    )
                    image =  cv2.cvtColor(image, cv2.COLOR_RGBA2RGB)

                buffer.unmap(map_info)
                self.result = image
            else:
                self.result = None

        self.received.set()
        self.received.clear()
        return Gst.FlowReturn.OK

    def push(self, bytes):
        #print('push')

        buffer = Gst.Buffer.new_wrapped(bytes)
        self.appsrc.emit('push-buffer', buffer)
    
    def get(self, timeout=1):
        self.received.wait(timeout)
        return self.result
    
    def clean(self):
        self.received.set()
        self.received.clear()
        self.result = None
        self.appsrc.emit('end-of-stream')
        self.pipeline.set_state(Gst.State.NULL)

# Example usage
if __name__ == "__main__":
    import time
    import cv2
    import datetime

    def test():
        enc = NvJpegEnc()
        dec = NvJpegDec()

        img = np.zeros((2160, 3840, 3), dtype=np.uint8)
        #img = cv2.imread('')
        img = cv2.resize(img, (2160, 3840))
        for _ in range(300):

            #print(img.shape)
            _img = img.copy()
            print(_img.shape)
            nowstr = datetime.datetime.now().strftime("%Y-%m-%d %H:%M:%S")
            cv2.putText(_img, nowstr, (200,300), 4, 4, (0))

            start = time.perf_counter()
            #rtn, encoded = cv2.imencode('.jpg', img)
            enc.push(_img)
            end = time.perf_counter()

            encoded = enc.get()

            if encoded is None:
                print('encoded none')
                continue

            print('encoded used', 1/(end-start), (end-start))

            start = time.perf_counter()
            #decoded = cv2.imdecode(encoded, cv2.IMREAD_ANYCOLOR)
            dec.push(encoded)
            end = time.perf_counter()
            print('decoded used', 1/(end-start), (end-start))
            
            decoded = dec.get()

            #print(decoded.shape)
            if decoded is None:
                print('decoded none')
                continue

            cv2.imshow('raw1', cv2.resize(_img, (640, 480)))
            cv2.imshow('decoded1', cv2.resize(decoded, (640, 480)))
            cv2.waitKey(1)

            #time.sleep(1/15)

        enc.clean()
        dec.clean()
        cv2.destroyAllWindows()

    test()

As I see,
appsrc name=source block=1 ! videoconvert ! "nvvidconv ! nvjpegenc Enableperf=1 ! appsink name=sink emit-signals=true,
the videoconvert slow down the whole pipeline, however, my app use RGB/BGR as input instead of RGBA. Any way to remove the videoconvert but keep using RGB.

The “nvvideoconvert” is enough to convert “RGB” to “RGBA(memory:NVMM)”

appsrc name=source block=1 !  nvvideoconvert ! nvjpegenc ...