Failed to test a dynamic pipeline in gstreamer

Hi,

My scenario of a pipeline is below.

gst-launch-1.0 -e -v rtmpsrc location='rtmp://192.168.0.11/live/s2' ! flvdemux name=dem \
    dem.video ! queue ! h264parse ! omxh264dec ! nvvidconv ! capsfilter  ! nvoverlaysink \
    dem.audio ! queue ! fakesink

And I tested to change a capsfilter element dynamically using python with my same scenario.

This is my test example below.

"""

Dynamic Pipeline Example.

rtmpsrc -> ... -> videosink.

"""

import sys

import gi
gi.require_version('Gst', '1.0')
gi.require_version('GLib', '2.0')
gi.require_version('GObject', '2.0')
from gi.repository import GObject, Gst, GLib

"""
Launcher.
"""


class Launcher:
    def __init__(self, args):
        self.pipeline = Gst.Pipeline.new()

        self.src = Gst.ElementFactory.make("rtmpsrc")
        self.demux = Gst.ElementFactory.make("flvdemux")
        self.demux_v_que = Gst.ElementFactory.make("queue")
        self.h264parse = Gst.ElementFactory.make("h264parse")

        self.vdec = Gst.ElementFactory.make("omxh264dec")
        self.vconv = Gst.ElementFactory.make("nvvidconv")
        self.caps = Gst.ElementFactory.make("capsfilter")
        self.vsink = Gst.ElementFactory.make("nvoverlaysink")

        # self.vdec = Gst.ElementFactory.make("avdec_h264")
        # self.vconv = Gst.ElementFactory.make("videoscale")
        # self.caps = Gst.ElementFactory.make("capsfilter")
        # self.vsink = Gst.ElementFactory.make("glimagesink")

        self.demux_a_que = Gst.ElementFactory.make("queue")
        self.asink = Gst.ElementFactory.make("fakesink")

        self.pipeline.add(self.src)
        self.pipeline.add(self.demux)
        self.pipeline.add(self.demux_v_que)
        self.pipeline.add(self.h264parse)
        self.pipeline.add(self.vdec)
        self.pipeline.add(self.vconv)
        self.pipeline.add(self.caps)
        self.pipeline.add(self.vsink)
        self.pipeline.add(self.demux_a_que)
        self.pipeline.add(self.asink)

        # take the commandline argument and ensure that it is a uri
        if Gst.uri_is_valid(args[1]):
            uri = args[1]
        else:
            uri = Gst.filename_to_uri(args[1])

        self.src.set_property('location', uri)
        # self.vdec.set_property('disable-dpb', True)

        # capsprop = Gst.Caps.from_string("video/x-raw(memory:NVMM)")
        # self.caps.set_property("caps", capsprop)

        self.src.link(self.demux)
        self.demux.connect("pad-added", self.pad_added_handler)

        self.demux_v_que.link(self.h264parse)
        self.h264parse.link(self.vdec)
        self.vdec.link(self.vconv)
        self.vconv.link(self.caps)
        self.caps.link(self.vsink)

        self.demux_a_que.link(self.asink)

        # create and event loop and feed gstreamer bus mesages to it
        self.loop = GObject.MainLoop()

        GLib.timeout_add_seconds(5, self.timeout_cb)

        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.on_message)

        # start play back and listed to events
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            self.loop.run()

        except:
            pass

        # cleanup
        self.pipeline.set_state(Gst.State.NULL)

    def pad_added_handler(self, src, new_pad):
        print("Received new pad '%s' from '%s':" % (new_pad.get_name(),
              src.get_name()))

        # If our converter is already linked, we have nothing to do here
        if new_pad.is_linked():
            print("We are already linked. Ignoring.")
            return

        # Check the new pad's type
        new_pad_type = new_pad.query_caps(None).to_string()

        if new_pad_type.startswith("audio"):
            print("  It has type '%s' which is audio." %
                  new_pad_type)
            new_pad.link(self.demux_a_que.get_static_pad("sink"))

        elif new_pad_type.startswith("video"):
            print("  It has type '%s' which is video." %
                  new_pad_type)
            new_pad.link(self.demux_v_que.get_static_pad("sink"))
        else:
            print("error!")

        return

    def on_message(self, bus, message):
        t = message.type
        if t == Gst.MessageType.EOS:
            sys.stdout.write("End-of-stream\n")
            self.loop.quit()
        elif t == Gst.MessageType.ERROR:
            err, debug = message.parse_error()
            sys.stderr.write("Error: %s: %s\n" % (err, debug))
            self.loop.quit()
        return True

    def probe_cb(self, pad, info):
        print("called probe_cb")
        self.caps.set_state(Gst.State.NULL)
        self.pipeline.remove(self.caps)

        self.caps = Gst.ElementFactory.make('capsfilter')
        self.pipeline.add(self.caps)
        # capsprop = Gst.Caps.from_string(
        #     "video/x-raw(memory:NVMM), width=480, height=270")
        # self.caps.set_property("caps", capsprop)
        self.caps.sync_state_with_parent()

        self.vconv.link(self.caps)
        self.caps.link(self.vsink)
        self.caps.set_state(Gst.State.PLAYING)

        return Gst.PadProbeReturn.REMOVE

    def timeout_cb(self):
        print("called timeout_cb")
        srcpad = self.src.get_static_pad('src')
        srcpad.add_probe(Gst.PadProbeType.IDLE, self.probe_cb)
        return True


def main(args):
    if len(args) != 2:
        sys.stderr.write("usage: %s <media file or uri>\n" % args[0])
        sys.exit(1)

    GObject.threads_init()
    Gst.init(None)
    Launcher(args)


if __name__ == '__main__':
    sys.exit(main(sys.argv))

This example is failed when a probe_cb function is called!
There are two kinds of errors.

First, a process terminated.
An error messages is below.

# GST_DEBUG=3 python ./rtmpsrc_vsink.py rtmp://192.168.0.11/live/s2
0:00:00.044121315   869       0xb99f20 WARN                     omx gstomx.c:2826:plugin_init: Failed to load configuration file: Valid key file could not be found in search dirs (searched in: /root/.config:/etc/xdg as per GST_OMX_CONFIG_DIR environment variable, the xdg user config directory (or XDG_CONFIG_HOME) and the system config directory (or XDG_CONFIG_DIRS)
0:00:00.341570323   869       0x99a280 WARN                flvdemux gstflvdemux.c:627:gst_flv_demux_parse_tag_script:<flvdemux0> failed reading a tag, skipping
Received new pad 'audio' from 'flvdemux0':
  It has type 'audio/mpeg, mpegversion=(int)4, framed=(boolean)true, stream-format=(string)raw, rate=(int)44100, channels=(int)2, codec_data=(buffer)1208' which is audio.
Received new pad 'video' from 'flvdemux0':
  It has type 'video/x-h264, stream-format=(string)avc, pixel-aspect-ratio=(fraction)1/1, width=(int)1920, height=(int)1080, framerate=(fraction)30/1, codec_data=(buffer)01424028ffe1000b6742402895a01e0089f95001000468ce3c80' which is video.
0:00:00.408511751   869       0x99a190 FIXME           videodecoder gstvideodecoder.c:946:gst_video_decoder_drain_out:<omxh264dec-omxh264dec0> Sub-class should implement drain()
NvMMLiteOpen : Block : BlockType = 261
TVMR: NvMMLiteTVMRDecBlockOpen: 7647: NvMMLiteBlockOpen
NvMMLiteBlockCreate : Block : BlockType = 261
TVMR: cbBeginSequence: 1179: BeginSequence  1920x1088, bVPR = 0
TVMR: LowCorner Frequency = 180000
TVMR: cbBeginSequence: 1529: DecodeBuffers = 5, pnvsi->eCodec = 4, codec = 0
TVMR: cbBeginSequence: 1600: Display Resolution : (1920x1080)
TVMR: cbBeginSequence: 1601: Display Aspect Ratio : (1920x1080)
TVMR: cbBeginSequence: 1669: ColorFormat : 5
TVMR: cbBeginSequence:1683 ColorSpace = NvColorSpace_YCbCr601
TVMR: cbBeginSequence: 1809: SurfaceLayout = 3
TVMR: cbBeginSequence: 1902: NumOfSurfaces = 12, InteraceStream = 0, InterlaceEnabled = 0, bSecure = 0, MVC = 0 Semiplanar = 1, bReinit = 1, BitDepthForSurface = 8 LumaBitDepth = 8, ChromaBitDepth = 8, ChromaFormat = 5
TVMR: cbBeginSequence: 1904: BeginSequence  ColorPrimaries = 2, TransferCharacteristics = 2, MatrixCoefficients = 2
Allocating new output: 1920x1088 (x 12), ThumbnailMode = 0
OPENMAX: HandleNewStreamFormat: 3464: Send OMX_EventPortSettingsChanged : nFrameWidth = 1920, nFrameHeight = 1088
TVMR: FrameRate = 33
TVMR: NVDEC LowCorner Freq = (198000 * 1024)
---> TVMR: Video-conferencing detected !!!!!!!!!
TVMR: FrameRate = 29.992562
called timeout_cb
called probe_cb
0:00:05.109768786   869       0x99c2d0 WARN             omxvideodec gstomxvideodec.c:2904:gst_omx_video_dec_loop:<omxh264dec-omxh264dec0> error: Internal data stream error.
0:00:05.109920591   869       0x99c2d0 WARN             omxvideodec gstomxvideodec.c:2904:gst_omx_video_dec_loop:<omxh264dec-omxh264dec0> error: stream stopped, reason not-linked
Error: gst-stream-error-quark: Internal data stream error. (1): /dvs/git/dirty/git-master_linux/external/gstreamer/gst-omx/omx/gstomxvideodec.c(2904): gst_omx_video_dec_loop (): /GstPipeline:pipeline0/GstOMXH264Dec-omxh264dec:omxh264dec-omxh264dec0:
stream stopped, reason not-linked
TVMR: TVMRFrameStatusReporting: 6132: Closing TVMR Frame Status Thread -------------
TVMR: TVMRVPRFloorSizeSettingThread: 5942: Closing TVMRVPRFloorSizeSettingThread -------------
TVMR: TVMRFrameDelivery: 5982: Closing TVMR Frame Delivery Thread -------------
TVMR: NvMMLiteTVMRDecBlockClose: 7815: Done

Second, a process blocked.

# GST_DEBUG=3 python ./rtmpsrc_vsink.py rtmp://192.168.0.11/live/s2
0:00:00.043790894   830       0xb99f20 WARN                     omx gstomx.c:2826:plugin_init: Failed to load configuration file: Valid key file could not be found in search dirs (searched in: /root/.config:/etc/xdg as per GST_OMX_CONFIG_DIR environment variable, the xdg user config directory (or XDG_CONFIG_HOME) and the system config directory (or XDG_CONFIG_DIRS)
0:00:00.247391457   830       0x99a280 WARN                flvdemux gstflvdemux.c:627:gst_flv_demux_parse_tag_script:<flvdemux0> failed reading a tag, skipping
Received new pad 'audio' from 'flvdemux0':
  It has type 'audio/mpeg, mpegversion=(int)4, framed=(boolean)true, stream-format=(string)raw, rate=(int)44100, channels=(int)2, codec_data=(buffer)1208' which is audio.
Received new pad 'video' from 'flvdemux0':
  It has type 'video/x-h264, stream-format=(string)avc, pixel-aspect-ratio=(fraction)1/1, width=(int)1920, height=(int)1080, framerate=(fraction)30/1, codec_data=(buffer)01424028ffe1000b6742402895a01e0089f95001000468ce3c80' which is video.
0:00:00.257271286   830       0x99a190 FIXME           videodecoder gstvideodecoder.c:946:gst_video_decoder_drain_out:<omxh264dec-omxh264dec0> Sub-class should implement drain()
NvMMLiteOpen : Block : BlockType = 261
TVMR: NvMMLiteTVMRDecBlockOpen: 7647: NvMMLiteBlockOpen
NvMMLiteBlockCreate : Block : BlockType = 261
TVMR: cbBeginSequence: 1179: BeginSequence  1920x1088, bVPR = 0
TVMR: LowCorner Frequency = 180000
TVMR: cbBeginSequence: 1529: DecodeBuffers = 5, pnvsi->eCodec = 4, codec = 0
TVMR: cbBeginSequence: 1600: Display Resolution : (1920x1080)
TVMR: cbBeginSequence: 1601: Display Aspect Ratio : (1920x1080)
TVMR: cbBeginSequence: 1669: ColorFormat : 5
TVMR: cbBeginSequence:1683 ColorSpace = NvColorSpace_YCbCr601
TVMR: cbBeginSequence: 1809: SurfaceLayout = 3
TVMR: cbBeginSequence: 1902: NumOfSurfaces = 12, InteraceStream = 0, InterlaceEnabled = 0, bSecure = 0, MVC = 0 Semiplanar = 1, bReinit = 1, BitDepthForSurface = 8 LumaBitDepth = 8, ChromaBitDepth = 8, ChromaFormat = 5
TVMR: cbBeginSequence: 1904: BeginSequence  ColorPrimaries = 2, TransferCharacteristics = 2, MatrixCoefficients = 2
Allocating new output: 1920x1088 (x 12), ThumbnailMode = 0
OPENMAX: HandleNewStreamFormat: 3464: Send OMX_EventPortSettingsChanged : nFrameWidth = 1920, nFrameHeight = 1088
TVMR: FrameRate = 33
TVMR: NVDEC LowCorner Freq = (198000 * 1024)
---> TVMR: Video-conferencing detected !!!!!!!!!
TVMR: FrameRate = 29.970090
called timeout_cb
called probe_cb
called timeout_cb
called probe_cb
called timeout_cb
called probe_cb

I want to know what caused the problem and how to fix it.

Thanks.

Environment:
Jetson TX2, Jetpack 3.2.1, Gstreamer version 1.8.3

Hi iamlow,
Your case may not work because it always has to be ‘nvvidconv ! video/x-raw(memory:NVMM) ! nvoverlaysink’

Hi, DaneLLL

A gst-launch-1.0 script was successful without ‘nvvidconv ! video/x-raw(memory:NVMM) ! nvoverlaysink’.

But a python example was failed regardless ‘nvvidconv ! video/x-raw(memory:NVMM) ! nvoverlaysink’.

Please let me know how to fix.

Thanks.

Hi iamlow, we don’t have experience in running gstreamer in python. Other users can share their knowledge.

I rewrite an example in C.

#include <stdio.h>
#include <string.h>
#include <gst/gst.h>

static GMainLoop *loop;
static GstElement *pipeline;

static GstElement *src;
static GstElement *demux;

// for video
static GstElement *demux_v_que;
static GstElement *h264parse;
static GstElement *vdec;
static GstElement *vconv;
static GstElement *caps;
static GstElement *vsink;

// for audio
static GstElement *demux_a_que;
static GstElement *asink;

static GstPadProbeReturn
pad_probe_cb (GstPad * pad, GstPadProbeInfo * info, gpointer user_data) {
    printf("called pad_probe_cb\n");
    gst_element_set_state (caps, GST_STATE_NULL);
    gst_bin_remove (GST_BIN (pipeline), caps);
    caps = gst_element_factory_make ("capsfilter", NULL);
    gst_object_ref (caps);
    gst_bin_add_many (GST_BIN (pipeline), caps, NULL);
    gst_element_sync_state_with_parent (caps);
    gst_element_link_many (vconv, caps, vsink, NULL);
    gst_element_set_state (caps, GST_STATE_PLAYING);

    return GST_PAD_PROBE_REMOVE;
}

static gboolean
timeout_cb (gpointer user_data) {
    printf("called timeout_cb\n");
    GstPad *pad = gst_element_get_static_pad (src, "src");
    gst_pad_add_probe (pad, GST_PAD_PROBE_TYPE_IDLE, pad_probe_cb, NULL, NULL);

    return TRUE;
}

static void
pad_added_cb (GstElement * element, GstPad * pad, gpointer user_data) {
    GstCaps *caps;
    GstStructure *s;
    const gchar *name;

    caps = gst_pad_get_current_caps (pad);
    s = gst_caps_get_structure (caps, 0);
    name = gst_structure_get_name (s);

    printf("name = %s\n", name);

    if (strcmp (name, "audio/mpeg") == 0) {
        GstPad *sinkpad;

        sinkpad = gst_element_get_static_pad (demux_a_que, "sink");
        if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) {
            g_printerr ("Failed to link demux with demux_a_que\n");
            g_main_loop_quit (loop);
        }
        gst_object_unref (sinkpad);
    }

    if (strcmp (name, "video/x-h264") == 0) {
        GstPad *sinkpad;

        sinkpad = gst_element_get_static_pad (demux_v_que, "sink");
        if (gst_pad_link (pad, sinkpad) != GST_PAD_LINK_OK) {
            g_printerr ("Failed to link demux with demux_v_que\n");
            g_main_loop_quit (loop);
        }
        gst_object_unref (sinkpad);
    }

    gst_caps_unref (caps);
}

static gboolean
message_cb (GstBus *bus, GstMessage *message, gpointer user_data) {
    switch (GST_MESSAGE_TYPE (message)) {
        case GST_MESSAGE_ERROR:{
            GError *err = NULL;
            gchar *name, *debug = NULL;

            name = gst_object_get_path_string (message->src);
            gst_message_parse_error (message, &err, &debug);

            g_printerr ("ERROR: from element %s: %s\n", name, err->message);
            if (debug != NULL)
                g_printerr ("Additional debug info:\n%s\n", debug);

            g_error_free (err);
            g_free (debug);
            g_free (name);

            g_main_loop_quit (loop);
            break;
        }

        case GST_MESSAGE_WARNING:{
            GError *err = NULL;
            gchar *name, *debug = NULL;

            name = gst_object_get_path_string (message->src);
            gst_message_parse_warning (message, &err, &debug);

            g_printerr ("ERROR: from element %s: %s\n", name, err->message);
            if (debug != NULL)
                g_printerr ("Additional debug info:\n%s\n", debug);

            g_error_free (err);
            g_free (debug);
            g_free (name);
            break;
        }

        case GST_MESSAGE_EOS:
            g_print ("Got EOS\n");
            g_main_loop_quit (loop);
            break;

        default:
            break;
    }

    return TRUE;
}

int main(int argc, char *argv[]) {
    GstBus *bus;

    gst_init (&argc, &argv);

    if (argc != 2) {
        g_error ("Usage: %s filename", argv[0]);
        return -1;
    }

    pipeline = gst_pipeline_new (NULL);

    src = gst_element_factory_make ("rtmpsrc", NULL);
    demux = gst_element_factory_make ("flvdemux", NULL);

    demux_v_que = gst_element_factory_make ("queue", NULL);
    h264parse = gst_element_factory_make ("h264parse", NULL);

    vdec = gst_element_factory_make ("omxh264dec", NULL);
    vconv = gst_element_factory_make ("nvvidconv", NULL);
    caps = gst_element_factory_make ("capsfilter", NULL);
    vsink = gst_element_factory_make ("nvoverlaysink", NULL);

    demux_a_que = gst_element_factory_make ("queue", NULL);
    asink = gst_element_factory_make ("fakesink", NULL);

    if (!pipeline 
            || !src 
            || !demux 
            || !demux_v_que 
            || !h264parse 
            || !vdec 
            || !vconv
            || !caps 
            || !vsink 
            || !demux_a_que 
            || !asink) {
        g_error ("Failed to create elements");
        return -2;
    }

    g_object_set (src, "location", argv[1], NULL);

    gst_bin_add_many (GST_BIN (pipeline), 
            src, demux, 
            demux_v_que, h264parse, vdec, vconv, caps, vsink, 
            demux_a_que, asink, NULL);
    if (!gst_element_link_many (src, demux, NULL)
            || !gst_element_link_many (demux_v_que, h264parse, 
            vdec, vconv, caps, vsink, NULL)
            || !gst_element_link_many (demux_a_que, asink, NULL)) {
        g_error ("Failed to link elements");
        return -3;
    }

    g_signal_connect (demux, "pad-added", G_CALLBACK (pad_added_cb), NULL);

    loop = g_main_loop_new (NULL, FALSE);

    bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
    gst_bus_add_signal_watch (bus);
    g_signal_connect (G_OBJECT (bus), "message", G_CALLBACK (message_cb), NULL);
    gst_object_unref (GST_OBJECT (bus));

    if (gst_element_set_state (pipeline,
            GST_STATE_PLAYING) == GST_STATE_CHANGE_FAILURE) {
        g_error ("Failed to go into PLAYING state");
        return -4;
    }

    g_timeout_add_seconds (5, timeout_cb, NULL);

    g_main_loop_run (loop);

    gst_element_set_state (pipeline, GST_STATE_NULL);

    g_main_loop_unref (loop);

    gst_object_unref (pipeline);

    return 0;
}

Please check again.^^