MjstreamingError: Internal data flow error

I used this pipeline to build an appsrc program:

gst-launch-1.0 videotestsrc ! 'video/x-raw, format=(string)RGB, width=(int)1504, height=(int)1504, framerate=(fraction)120/1' ! videoconvert ! omxh264enc control-rate=2 bitrate=15000000 ! 'video/x-h264, stream-format=(string)byte-stream' ! h264parse ! rtph264pay mtu=1400 ! udpsink host=$CLIENT_IP port=5000

I changed the type from raw, RGB to bayer,rggb like this:

gst-launch-1.0 videotestsrc ! video/x-bayer,format=rggb,width=1504,height=1504,framerate=120/1 ! bayer2rgb ! videoconvert ! omxh264enc control-rate=2 bitrate=15000000 ! 'video/x-h264, stream-format=(string)byte-stream' ! h264parse ! rtph264pay mtu=1400 ! udpsink host=$CLIENT_IP port=5000

The above pipeline runs fine but now I get the following error in my appsrc program:

Inside NvxLiteH264DecoderLowLatencyInitNvxLiteH264DecoderLowLatencyInit set DPB and MjstreamingInside NvxLiteH265DecoderLowLatencyInitNvxLiteH265DecoderLowLatencyInit set DPB and MjstreamingError: Internal data flow error.

Any suggestions on what’s going on or how I can troubleshoot this?

gst-app-src_bayer.cpp (15.6 KB)

Hi Dan,
Please use nvvidconv instead of videoconvert
gst-launch-1.0 videotestsrc num-buffers=300 ! video/x-bayer,format=rggb,width=1504,height=1504,framerate=120/1 ! bayer2rgb ! nvvidconv ! ‘video/x-raw(memory:NVMM),format=I420’ ! omxh264enc control-rate=2 bitrate=15000000 ! ‘video/x-h264, stream-format=(string)byte-stream’ ! fakesink

The input to omxh264enc is video/x-raw(memory:NVMM)

Thanks for the suggestion. I tried the provided pipeline and it worked in the terminal, but when I incorporate it to my code, I get a slightly different (but similar) error:

Inside NvxLiteH264DecoderLowLatencyInitNvxLiteH264DecoderLowLatencyInit set DPB and MjstreamingInside NvxLiteH265DecoderLowLatencyInitNvxLiteH265DecoderLowLatencyInit set DPB and MjstreamingFramerate set to : 120 at NvxVideoEncoderSetParameterNvMMLiteOpen : Block : BlockType = 4 
===== MSENC =====
NvMMLiteBlockCreate : Block : BlockType = 4 
===== MSENC blits (mode: 1) into tiled surfaces =====
Error: Internal data stream error.

I was using gst_caps_new_simple but changed it out for gst_caps_from_string for the YUV_cap init.

Hi Dan,
Please refer to the source code of nvgstcapture-1.0 in
https://developer.nvidia.com/embedded/dlc/l4t-sources-24-2-1

I searched the 110MB of source code and found one instance of this isolated error output, but I don’t understand it. In sources/gstegl_src/gst-egl/po/gst-plugins-bad-1.0.pot the following is at line 69-71

#: ext/sndfile/gstsfsink.c:439 gst/mpegtsdemux/mpegtsbase.c:1306
#: gst/mpegtsdemux/mpegtsbase.c:1310 gst/nuvdemux/gstnuvdemux.c:737
msgid "Internal data stream error."
msgstr ""

Is this related to an MPEG decoder?
My other question is how could my error be related to nvgstcapture-1.0 when I don’t even access the onboard csi camera - just videotestsrc?
Thanks!

Hi Dan,
We can see your share in https://devtalk.nvidia.com/default/topic/1003598/

Is this still an issue for you? If yes, could you share Makefile so that we can build and try gst-app-src_bayer.cpp

Probably not. This error came from some code that I modified from realdealneal (which worked before I modified it). The code I posted yesterday was put together from scratch, does not use any appsrc components but also does not produce this error. My expectation is that I can modify the working code to include the external camera. I have not yet figured out what caused this data stream error. Based on the source of the traceback I provided above, what is your suspicion?

Hi Dan,
By comparing gst-app-src_bayer.cpp and the source in
https://devtalk.nvidia.com/default/topic/1003598/jetson-tx1/code-to-send-a-bayer-video-feed-from-a-tx1-to-an-h-264-encoder-to-an-rtp-sink-/post/5125667/#5125667

Looks like nvvidconv replaces videoconvert. Probably the issue happens in linking videoconvert and omxh264enc. Since nvvidconv is a HW converter, it should bring better performance.

Once again, thank you for sharing a success usecase.

I am having the a similar problem to that of Dan_Goodrick. My goal is to pump a raw RGB image buffer to gstreamer for H264/H265 encoding on the TX1 HW prior to streaming via UDP. When I compile the attached code with:
g++ -Wall gstreamer_nvidia_appsrcLoopEx.cpp -o gstNvidiaApparcLoop pkg-config --cflags --libs gstreamer-1.0 -lgstapp-1.0

I get the same error:
Inside NvxLiteH264DecoderLowLatencyInitNvxLiteH264DecoderLowLatencyInit set DPB and MjstreamingInside NvxLiteH265DecoderLowLatencyInitNvxLiteH265DecoderLowLatencyInit set DPB and MjstreamingError: Internal data flow error

Ideally, I’d like the eliminate the main loop and push buffers as needed to gstreamer, because I am already using an openGL loop

Please see code below:

// g++ -Wall gstreamer_nvidia_appsrcLoopEx.cpp -o gstNvidiaApparcLoop `pkg-config --cflags --libs gstreamer-1.0` -lgstapp-1.0
// gst-launch-1.0 udpsrc port=5000 ! application/x-rtp,encoding-name=H264, payload = 96 ! rtph264depay ! h264parse ! queue ! avdec_h264 ! xvimagesink sync=false async=false -e
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
#include <stdio.h>
#include <iostream>
#include <sstream>
#include <stdint.h>
#include <signal.h>
#include <string.h>

//#define CLIENT_IP "130.134.235.179"
#define CLIENT_IP "127.0.0.1"
#define TARGET_BITRATE 15000000

//GSTREAMER COMPONENTS
static GMainLoop *loop;
GstElement *pipeline, *appsrc, *Nvidia_vid_conv, *encoder, *YUV_filter, *Ethernet_filter, *parser, *rtp264, *sink;
GstCaps *appsrc_caps, *YUV_filter_caps, *Ethernet_filter_caps;
GstBus *bus;

GstStateChangeReturn ret;
guint bus_watch_id;

//APPSRC BUFFER FUNCTIONS
//int want = 1;

//uint16_t b_white[385*288];
//uint16_t b_black[385*288];

/*
static void prepare_buffer(GstAppSrc* appsrc)
{

  static gboolean white = FALSE;
  static GstClockTime timestamp = 0;
  GstBuffer *buffer;
  guint size;
  GstFlowReturn ret;

  //if (!want) return;
  //want = 0;

  size = 385 * 288 * 2;

  buffer = gst_buffer_new_wrapped_full( GST_MEMORY_FLAG_READONLY, (gpointer)(white?b_white:b_black), size, 0, size, NULL, NULL );

  white = !white;

  GST_BUFFER_PTS (buffer) = timestamp;
  GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale_int (1, GST_SECOND, 4);

  timestamp += GST_BUFFER_DURATION (buffer);

  ret = gst_app_src_push_buffer(appsrc, buffer);

  if (ret != GST_FLOW_OK) {
    // something wrong, stop pushing
    g_main_loop_quit (loop);
  }
}
*/

/*
static void cb_need_data (GstElement *appsrc, guint unused_size, gpointer user_data) {
  prepare_buffer((GstAppSrc*)appsrc);
  //want = 1;
}
*/

static void
cb_need_data (GstElement *appsrc,
          guint       unused_size,
          gpointer    user_data)
{
  static gboolean white = FALSE;
  static GstClockTime timestamp = 0;
  GstBuffer *buffer;
  guint size;
  GstFlowReturn ret;

  size = 385 * 288 * 3;

  buffer = gst_buffer_new_allocate (NULL, size, NULL);

  /* this makes the image black/white */
  gst_buffer_memset (buffer, 0, white ? 0xff : 0x0, size);

  white = !white;

  GST_BUFFER_PTS (buffer) = timestamp;
  GST_BUFFER_DURATION (buffer) = gst_util_uint64_scale_int (1, GST_SECOND, 2);

  timestamp += GST_BUFFER_DURATION (buffer);

  g_signal_emit_by_name (appsrc, "push-buffer", buffer, &ret);
  gst_buffer_unref (buffer);

  if (ret != GST_FLOW_OK) {
    /* something wrong, stop pushing */
    g_main_loop_quit (loop);
  }
}
//EVENT HANDLERS
static void sigint_restore (void)
{
  struct sigaction action;

  memset (&action, 0, sizeof (action));
  action.sa_handler = SIG_DFL;

  sigaction (SIGINT, &action, NULL);
}

/* Signal handler for ctrl+c */
void intHandler(int dummy) {  
  //! Emit the EOS signal which tells all the elements to shut down properly:
  printf("Sending EOS signal to shutdown pipeline cleanly\n");
  gst_element_send_event(pipeline, gst_event_new_eos());
  sigint_restore();
  return;
}

static gboolean bus_call  (GstBus     *bus, GstMessage *msg, gpointer data)
{
  GMainLoop *loop = (GMainLoop *) data;

  switch (GST_MESSAGE_TYPE (msg)) {

    case GST_MESSAGE_EOS:
      g_print ("End of stream\n");
      g_main_loop_quit (loop);
    break;

    case GST_MESSAGE_ERROR: {
      gchar  *debug;
      GError *error;

      gst_message_parse_error (msg, &error, &debug);
      g_free (debug);

      g_printerr ("Error: %s\n", error->message);
      g_error_free (error);

      g_main_loop_quit (loop);
    break;
    }
    
    default:
    break;
  }

  return TRUE;
}

int watcher_make()
{
  /* we add a message handler */
  bus = gst_pipeline_get_bus (GST_PIPELINE (pipeline));
  bus_watch_id = gst_bus_add_watch (bus, bus_call, loop);
  gst_object_unref (bus); 
  return 0;
}

int main(int argc, char *argv[]) {
  signal(SIGINT, intHandler);

  /* Initialize GStreamer */
  gst_init (&argc, &argv);
  loop = g_main_loop_new (NULL, FALSE);

  //Initialize black and white buffers
  //for (int i = 0; i < 385*288; i++) { b_black[i] = 0; b_white[i] = 0xFFFF; }

  /* Create the elements */
  pipeline        = gst_pipeline_new ("pipeline");
  appsrc          = gst_element_factory_make ("appsrc", "source");
  Nvidia_vid_conv = gst_element_factory_make ("nvvidconv", "Nvidia_vid_conv");
  YUV_filter      = gst_element_factory_make ("capsfilter", "YUV_filter"); 
  encoder         = gst_element_factory_make ("omxh264enc", "encoder");
  Ethernet_filter = gst_element_factory_make ("capsfilter", "Ethernet_filter");
  parser          = gst_element_factory_make ("h264parse", "parser");
  rtp264          = gst_element_factory_make ("rtph264pay", "rtp264");
  sink            = gst_element_factory_make ("udpsink", "sink");
  
  if (!pipeline)
  {
    g_printerr ("Pipeline not created.\n");
    return -1;
  }

  if (!appsrc)
  {
    g_printerr ("Appsrc not created\n");
    return -1;
  }

  if (!YUV_filter)
  {
    g_printerr ("YUV filter not created\n");
    return -1;
  }

  if (!Ethernet_filter || !parser || !rtp264 || !sink)
  {
    g_printerr("Stream Elements not created\n");
    return -1;
  }

  if (!encoder)
  {
    g_printerr("Encoder not created\n");
    return -1;
  }

  if (!Nvidia_vid_conv)
  {
    g_printerr ("Nvvidconv not created\n");
    return -1;
  }
  
  /* Modify the caps properties */
  appsrc_caps = gst_caps_new_simple ("video/x-raw",
             "format", G_TYPE_STRING, "RGB",
             "width", G_TYPE_INT, 384,
             "height", G_TYPE_INT, 288,
             "framerate", GST_TYPE_FRACTION, 0, 1,
             NULL);

  YUV_filter_caps = gst_caps_from_string("video/x-raw(memory:NVMM),format=I420");

  Ethernet_filter_caps = gst_caps_new_simple ("video/x-h264",
            "stream-format", G_TYPE_STRING, "byte-stream",
            NULL);

  g_object_set (G_OBJECT (appsrc), "caps", appsrc_caps, NULL);
  g_object_set (G_OBJECT (YUV_filter), "caps", YUV_filter_caps, NULL);
  g_object_set (G_OBJECT (Ethernet_filter), "caps", Ethernet_filter_caps, NULL);
  g_object_set (G_OBJECT (encoder), "bitrate", TARGET_BITRATE, "control-rate", 2, NULL);
  g_object_set( G_OBJECT(rtp264), "pt", 96, "config-interval",1,NULL);
  g_object_set( G_OBJECT(sink), "host", CLIENT_IP, "port", 5000, "sync",FALSE, "async", FALSE, NULL); 

  /* setup appsrc */
  g_object_set (G_OBJECT (appsrc),
    "stream-type", 0, // GST_APP_STREAM_TYPE_STREAM
    "format", GST_FORMAT_TIME,
    NULL);
    //"is-live", TRUE,
    //NULL);

  g_signal_connect (appsrc, "need-data", G_CALLBACK (cb_need_data), NULL);

  /* Add function to watch bus */
  if(watcher_make() != 0)
  {
    g_printerr("Failed to make watcher\n");
    return -1; 
  }

  /* Build the pipeline */
  gst_bin_add_many (GST_BIN (pipeline), appsrc, Nvidia_vid_conv, YUV_filter, encoder, Ethernet_filter, parser, rtp264, sink, NULL);

  /* Link the elements together */
  gst_element_link_many (appsrc, Nvidia_vid_conv, YUV_filter, encoder, Ethernet_filter, parser, rtp264, sink, NULL);  

  /* play */
  ret = gst_element_set_state (pipeline, GST_STATE_PLAYING);
  if (ret == GST_STATE_CHANGE_FAILURE)
  {
    g_printerr ("Unable to set the pipeline to the playing state.\n");
    gst_object_unref (pipeline);
    return -1;
  }
  g_main_loop_run(loop);

  /* clean up */
  gst_caps_unref (appsrc_caps); 
  gst_caps_unref (YUV_filter_caps); 
  gst_caps_unref (Ethernet_filter_caps);
  gst_element_set_state (pipeline, GST_STATE_NULL);
  gst_object_unref (GST_OBJECT (pipeline));
  g_main_loop_unref (loop);

  return 0;
}

I believe I found the problem. The HW video converter nvvidconv only accepts a limited number of inputs formats (I420, UYVY, and GRAY8). Once I switched my toy example to GRAY8 everything worked. To Dan_Goodrick’s question, I don’t think nvvidconv can do the bayer layer transformation you desire. In my case, I was hoping to put in RGB buffers and convert to I420 prior to encoding. Now it looks like I’ll use a kernel to convert RGB to I420 then push directly into to the encoder.

My question now is regarding the formatting of the I420 buffer. Should it be formatted like the function NvBuffer in “read_video_frame” found in the tegra_multimedia_api/samples/common/classes/NVUtils.cpp?

Hi jdl2,
Please use nvvidconv to get ‘video/x-raw(memory:NVMM),format=NV12’ input to omxh264enc
gst-launch-1.0 videotestsrc num-buffers=300 ! ‘video/x-raw,forat=RGB,width=1280,height=720’ ! videoconvert ! ‘video/x-raw,format=I420’ ! nvvidconv ! ‘video/x-raw(memory:NVMM),format=NV12’ ! omxh264enc ! qtmux ! filesink location=a.mp4

And just like you said, nvvidconv supports RGBA, BGRx, but does not support RGB.

Hi Dan,
You should use gst_caps_from_string("…") instead.

Regards,
Trung.

Hi Trung, Thanks for the sharing.