Admittedly, it took me some time learning libgstreamer and getting it to run.
/*
* gstEncoder
*/
#include "gstEncoder.h"
#include "sysTime.h"
#include "sysXML.h"
#include <gst/gst.h>
#include <gst/app/gstappsrc.h>
// gst_message_print
static gboolean gst_message_print(GstBus* bus, GstMessage* message, gpointer user_data)
{
printf(LOG_HYDRA "gstreamer pipeline msg %s\n", gst_message_type_get_name(GST_MESSAGE_TYPE(message)));
switch (GST_MESSAGE_TYPE (message))
{
case GST_MESSAGE_ERROR:
{
GError *err = NULL;
gchar *dbg_info = NULL;
gst_message_parse_error (message, &err, &dbg_info);
printf(LOG_HYDRA "gstreamer ERROR from element %s: %s\n", GST_OBJECT_NAME (message->src), err->message);
printf(LOG_HYDRA "gstreamer Debugging info: %s\n", (dbg_info) ? dbg_info : "none");
g_error_free(err);
g_free(dbg_info);
//g_main_loop_quit (app->loop);
break;
}
case GST_MESSAGE_EOS:
{
printf(LOG_HYDRA "gstreamer recieved EOS signal...\n");
//g_main_loop_quit (app->loop); // TODO trigger plugin Close() upon error
break;
}
default:
break;
}
return TRUE;
}
// constructor
gstEncoder::gstEncoder()
{
mAppSrc = NULL;
mBus = NULL;
mBufferCaps = NULL;
mPipeline = NULL;
mNeedData = false;
AutoThread(); // TODO see if encoder can run in primary rendering thread, saving CPU
}
// destructor
gstEncoder::~gstEncoder()
{
}
// onNeed
void gstEncoder::onNeed(GstElement * pipeline, guint size, gpointer user_data)
{
//printf(LOG_HYDRA "gstreamer appsrc requesting data (%u bytes)\n", size);
if( !user_data )
return;
gstEncoder* enc = (gstEncoder*)user_data;
enc->mNeedData = true;
}
// onEnough
void gstEncoder::onEnough(GstElement * pipeline, gpointer user_data)
{
printf(LOG_HYDRA "gstreamer appsrc signalling enough data\n");
if( !user_data )
return;
gstEncoder* enc = (gstEncoder*)user_data;
enc->mNeedData = false;
}
// ProcessBuffer
bool gstEncoder::ProcessBuffer( Buffer* buffer )
{
if( !buffer )
return false;
if( !mNeedData )
{
buffer->Release();
return true;
}
// convert hydra buffer to GstBuffer
GstBuffer* gstBuffer = gst_buffer_new();
const size_t size = buffer->GetSize();
GST_BUFFER_MALLOCDATA(gstBuffer) = (guint8*)g_malloc(size);
GST_BUFFER_DATA(gstBuffer) = GST_BUFFER_MALLOCDATA(gstBuffer);
GST_BUFFER_SIZE(gstBuffer) = size;
//static size_t num_frame = 0;
//GST_BUFFER_TIMESTAMP(gstBuffer) = (GstClockTime)((num_frame / 30.0) * 1e9);
//num_frame++;
if( mBufferCaps != NULL )
gst_buffer_set_caps(gstBuffer, mBufferCaps);
memcpy(GST_BUFFER_DATA(gstBuffer), buffer->GetCPU(), size);
buffer->Release();
// queue buffer to gstreamer
//GstFlowReturn ret = gst_app_src_push_buffer(GST_APP_SRC(mAppSrc), gstBuffer);
GstFlowReturn ret;
g_signal_emit_by_name(mAppSrc, "push-buffer", gstBuffer, &ret);
gst_buffer_unref(gstBuffer);
if( ret != 0 )
printf(LOG_HYDRA "gstreamer -- AppSrc pushed buffer (result %u)\n", ret);
return true;
}
// ProcessEmit
void gstEncoder::ProcessEmit()
{
while(true)
{
GstMessage* msg = gst_bus_pop(mBus);
if( !msg )
break;
gst_message_print(mBus, msg, this);
gst_message_unref(msg);
}
}
//#define CAPS_STR "video/x-raw-rgb,width=640,height=480,bpp=24,depth=24"
//#define CAPS_STR "video/x-raw-yuv,width=640,height=480,format=(fourcc)I420"
#define CAPS_STR "video/x-raw-yuv,width=1280,height=1024,format=(fourcc)I420,framerate=30/1"
//#define CAPS_STR "video/x-raw-gray,width=640,height=480,bpp=8,depth=8,framerate=30/1"
#define GST_LAUNCH_FROM_STRING
// Open
bool gstEncoder::Open()
{
printf(LOG_HYDRA "gstEncoder::Open()\n");
// parse pipeline
const char* launchStr = "appsrc name=mysource ! " CAPS_STR " ! "
//"nvvidconv ! nv_omx_h264enc quality-level=2 ! "
"nv_omx_h264enc ! "
"video/x-h264 ! matroskamux ! queue ! "
"filesink location=/media/ubuntu/SDU11/test.mkv";
GError* err = NULL;
mPipeline = gst_parse_launch(launchStr, &err);
if( err != NULL )
{
printf(LOG_HYDRA "gstreamer failed to create pipeline\n");
printf(LOG_HYDRA " (%s)\n", err->message);
g_error_free(err);
return false;
}
GstPipeline* pipeline = GST_PIPELINE(mPipeline);
if( !pipeline )
{
printf(LOG_HYDRA "gstreamer failed to cast GstElement into GstPipeline\n");
return false;
}
// retrieve pipeline bus
/*GstBus**/ mBus = gst_pipeline_get_bus(pipeline);
if( !mBus )
{
printf(LOG_HYDRA "gstreamer failed to retrieve GstBus from pipeline\n");
return false;
}
// add watch for messages (disabled when we poll the bus ourselves, instead of gmainloop)
//gst_bus_add_watch(mBus, (GstBusFunc)gst_message_print, NULL);
// get the appsrc
GstElement* appsrcElement = gst_bin_get_by_name(GST_BIN(pipeline), "mysource");
GstAppSrc* appsrc = GST_APP_SRC(appsrcElement);
if( !appsrcElement || !appsrc )
{
printf(LOG_HYDRA "gstreamer failed to retrieve AppSrc element from pipeline\n");
return false;
}
mAppSrc = appsrcElement;
g_signal_connect(appsrcElement, "need-data", G_CALLBACK(onNeed), this);
g_signal_connect(appsrcElement, "enough-data", G_CALLBACK(onEnough), this);
/*GstCaps* caps = gst_caps_new_simple("video/x-raw-rgb",
"bpp",G_TYPE_INT,24,
"depth",G_TYPE_INT,24,
"width", G_TYPE_INT, 640,
"height", G_TYPE_INT, 480,
NULL);*/
mBufferCaps = gst_caps_from_string(CAPS_STR);
if( !mBufferCaps )
{
printf(LOG_HYDRA "gstreamer failed to parse caps from string\n");
return false;
}
gst_app_src_set_caps(appsrc, mBufferCaps);
//gst_app_src_set_size(appsrc, 640*480*10);
//gst_app_src_set_max_bytes(appsrc, 640*480*20);
gst_app_src_set_stream_type(appsrc, GST_APP_STREAM_TYPE_STREAM);
//gst_app_src_set_latency(appsrc, 1, 20);
//g_object_set(G_OBJECT(m_pAppSrc), "caps", m_pCaps, NULL);
//g_object_set(G_OBJECT(mAppSrc), "is-live", TRUE, NULL);
//g_object_set(G_OBJECT(mAppSrc), "block", FALSE, NULL);
g_object_set(G_OBJECT(mAppSrc), "do-timestamp", TRUE, NULL);
/*typedef enum {
GST_STATE_CHANGE_FAILURE = 0,
GST_STATE_CHANGE_SUCCESS = 1,
GST_STATE_CHANGE_ASYNC = 2,
GST_STATE_CHANGE_NO_PREROLL = 3
} GstStateChangeReturn;*/
printf(LOG_HYDRA "gstreamer transitioning pipeline to GST_STATE_PLAYING\n");
const GstStateChangeReturn result = gst_element_set_state(mPipeline, GST_STATE_PLAYING);
if( result == GST_STATE_CHANGE_ASYNC )
{
#if 0
GstMessage* asyncMsg = gst_bus_timed_pop_filtered(mBus, 5 * GST_SECOND,
(GstMessageType)(GST_MESSAGE_ASYNC_DONE|GST_MESSAGE_ERROR));
if( asyncMsg != NULL )
{
gst_message_print(mBus, asyncMsg, this);
gst_message_unref(asyncMsg);
}
else
printf(LOG_HYDRA "gstreamer NULL message after transitioning pipeline to PLAYING...\n");
#endif
}
else if( result != GST_STATE_CHANGE_SUCCESS )
{
printf(LOG_HYDRA "gstreamer failed to set pipeline state to PLAYING (error %u)\n", result);
return false;
}
return Node::Open();
}
// Close
bool gstEncoder::Close()
{
// send EOS
mNeedData = false;
printf(LOG_HYDRA "gstreamer sending encoder EOS\n");
GstFlowReturn eos_result = gst_app_src_end_of_stream(GST_APP_SRC(mAppSrc));
if( eos_result != 0 )
printf(LOG_HYDRA "gstreamer failed sending appsrc EOS (result %u)\n", eos_result);
sysSleepMs(250);
// stop pipeline
printf(LOG_HYDRA "gstreamer transitioning pipeline to GST_STATE_NULL\n");
const GstStateChangeReturn result = gst_element_set_state(mPipeline, GST_STATE_NULL);
if( result != GST_STATE_CHANGE_SUCCESS )
printf(LOG_HYDRA "gstreamer failed to set pipeline state to PLAYING (error %u)\n", result);
sysSleepMs(250);
// stop node and polling thread
if( !Node::Close() )
return false;
return true;
}
Originally I tried using nvvidconv for the YUV-I420 conversion required by nv_omx_h264enc, but it was blowing up my pipeline with “internal flow error” so I used NPP to do the colorspace conversion instead. Also there was an issue where the gstreamer pipeline likes to run inside a gmainloop, but this was to be integrated into my application with an already existing main loop, so the pipeline’s bus needed popped routinely.