deepstream-app version 5.1.0
DeepStreamSDK 5.1.0
CUDA Driver Version: 11.4
CUDA Runtime Version: 11.1
TensorRT Version: 7.2
cuDNN Version: 8.0
libNVWarp360 Version: 2.0.1d3
device : jetson nx
jetpack : 4.5
Hi,everyone
I have a RTSP output application ,RTSP can preview through VLC , but WebRTC can’t preview.The output video is paused for a long time, and the video content will refresh occasionally,same RTSP output application code can be work in X86
when i am using “deepstream-app” application output RTSP stream , can preview through WebRTC,The video content is very smooth.
Do you have any ideas?
Looking forward to your reply ,thank
- RTSP output application
#include "gstnvdsmeta.h"
#include <cuda_runtime_api.h>
#include <glib.h>
#include <gst/gst.h>
#include <gst/rtsp-server/rtsp-server.h>
#include <stdio.h>
#include <stdlib.h>
#include <string.h>
#include <sys/stat.h>
#include <sys/types.h>
#define GST_CAPS_FEATURES_NVMM "memory:NVMM"
#define GPU_ID 0
#define CONFIG_GPU_ID "gpu-id"
#define CONFIG_GROUP_TRACKER "tracker"
#define CONFIG_GROUP_TRACKER_WIDTH "tracker-width"
#define CONFIG_GROUP_TRACKER_HEIGHT "tracker-height"
#define TRACKER_CONFIG_FILE "dsnvanalytics_tracker_config.txt"
#define CONFIG_GROUP_TRACKER_LL_CONFIG_FILE "ll-config-file"
#define CONFIG_GROUP_TRACKER_LL_LIB_FILE "ll-lib-file"
#define CONFIG_GROUP_TRACKER_ENABLE_BATCH_PROCESS "enable-batch-process"
#define FILE_NAME_SIZE (64)
static GstElement *pipeline=NULL , *streammux = NULL, *tee = NULL, *tee_file = NULL,
*nvvidconv = NULL, *nvosd = NULL, *nvosd_post = NULL,
*filter = NULL, *encoder = NULL, *rtppay = NULL, *sink = NULL,
*nvvidconv_file = NULL, *h264enc_file = NULL, *parse = NULL,
*qtmux = NULL, *filesink = NULL, *queue1 = NULL,
*queue2 = NULL, *queue3 = NULL, *queue4 = NULL,
*queue5 = NULL, *queue6 = NULL, *pgie = NULL, *tracker = NULL,
*nvdsanalytics = NULL, *h264enc_0 = NULL, *qtmux_0 = NULL,
*filesink_0 = NULL, *parse_0 = NULL, *h264enc_1 = NULL,
*qtmux_1 = NULL, *filesink_1 = NULL, *parse_1 = NULL;
#define CHECK_ERROR(error) \
if (error) { \
g_printerr("Error while parsing config file: %s\n", error->message); \
goto done; \
}
static gchar *get_absolute_file_path(gchar *cfg_file_path, gchar *file_path) {
gchar abs_cfg_path[PATH_MAX + 1];
gchar *abs_file_path;
gchar *delim;
if (file_path && file_path[0] == '/') {
return file_path;
}
if (!realpath(cfg_file_path, abs_cfg_path)) {
g_free(file_path);
return NULL;
}
/* Return absolute path of config file if file_path is NULL. */
if (!file_path) {
abs_file_path = g_strdup(abs_cfg_path);
return abs_file_path;
}
delim = g_strrstr(abs_cfg_path, "/");
*(delim + 1) = '\0';
abs_file_path = g_strconcat(abs_cfg_path, file_path, NULL);
g_free(file_path);
return abs_file_path;
}
static void cb_newpad(GstElement *decodebin, GstPad *decoder_src_pad,
gpointer data) {
g_print("In cb_newpad\n");
GstCaps *caps = gst_pad_get_current_caps(decoder_src_pad);
const GstStructure *str = gst_caps_get_structure(caps, 0);
const gchar *name = gst_structure_get_name(str);
GstElement *source_bin = (GstElement *)data;
GstCapsFeatures *features = gst_caps_get_features(caps, 0);
/* Need to check if the pad created by the decodebin is for video and not
* audio. */
if (!strncmp(name, "video", 5)) {
/* Link the decodebin pad only if decodebin has picked nvidia
* decoder plugin nvdec_*. We do this by checking if the pad caps contain
* NVMM memory features. */
if (gst_caps_features_contains(features, GST_CAPS_FEATURES_NVMM)) {
/* Get the source bin ghost pad */
GstPad *bin_ghost_pad = gst_element_get_static_pad(source_bin, "src");
if (!gst_ghost_pad_set_target(GST_GHOST_PAD(bin_ghost_pad),
decoder_src_pad)) {
g_printerr("Failed to link decoder src pad to source bin ghost pad\n");
}
gst_object_unref(bin_ghost_pad);
} else {
g_printerr("Error: Decodebin did not pick nvidia decoder plugin.\n");
}
}
}
static void decodebin_child_added(GstChildProxy *child_proxy, GObject *object,
gchar *name, gpointer user_data) {
g_print("Decodebin child added: %s\n", name);
if (g_strrstr(name, "decodebin") == name) {
g_signal_connect(G_OBJECT(object), "child-added",
G_CALLBACK(decodebin_child_added), user_data);
}
}
static GstElement *create_source_bin(guint index, gchar *uri) {
GstElement *bin = NULL, *uri_decode_bin = NULL;
gchar bin_name[16] = {};
g_snprintf(bin_name, 15, "source-bin-%02d", index);
/* Create a source GstBin to abstract this bin's content from the rest of the
* pipeline */
bin = gst_bin_new(bin_name);
/* Source element for reading from the uri.
* We will use decodebin and let it figure out the container format of the
* stream and the codec and plug the appropriate demux and decode plugins. */
uri_decode_bin = gst_element_factory_make("uridecodebin", "uri-decode-bin");
if (!bin || !uri_decode_bin) {
g_printerr("One element in source bin could not be created.\n");
return NULL;
}
/* We set the input uri to the source element */
g_object_set(G_OBJECT(uri_decode_bin), "uri", uri, NULL);
/* Connect to the "pad-added" signal of the decodebin which generates a
* callback once a new pad for raw data has beed created by the decodebin */
g_signal_connect(G_OBJECT(uri_decode_bin), "pad-added", G_CALLBACK(cb_newpad),
bin);
g_signal_connect(G_OBJECT(uri_decode_bin), "child-added",
G_CALLBACK(decodebin_child_added), bin);
gst_bin_add(GST_BIN(bin), uri_decode_bin);
/* We need to create a ghost pad for the source bin which will act as a proxy
* for the video decoder src pad. The ghost pad will not have a target right
* now. Once the decode bin creates the video decoder and generates the
* cb_newpad callback, we will set the ghost pad target to the video decoder
* src pad. */
if (!gst_element_add_pad(bin,
gst_ghost_pad_new_no_target("src", GST_PAD_SRC))) {
g_printerr("Failed to add ghost pad in source bin\n");
return NULL;
}
return bin;
}
static gboolean bus_call(GstBus *bus, GstMessage *msg, gpointer data) {
GMainLoop *loop = (GMainLoop *)data;
g_print("type:%d\n", GST_MESSAGE_TYPE(msg));
g_print("src:%s\n", GST_OBJECT_NAME(msg->src));
switch (GST_MESSAGE_TYPE(msg)) {
case GST_MESSAGE_EOS:
g_print("End of stream\n");
g_main_loop_quit(loop);
break;
case GST_MESSAGE_WARNING: {
gchar *debug;
GError *error;
gst_message_parse_warning(msg, &error, &debug);
g_printerr("WARNING from element %s: %s\n", GST_OBJECT_NAME(msg->src),
error->message);
g_free(debug);
g_printerr("Warning: %s\n", error->message);
g_error_free(error);
break;
}
case GST_MESSAGE_ERROR: {
gchar *debug;
GError *error;
gst_message_parse_error(msg, &error, &debug);
g_printerr("ERROR from element %s: %s\n", GST_OBJECT_NAME(msg->src),
error->message);
if (debug)
g_printerr("Error details: %s\n", debug);
g_free(debug);
g_error_free(error);
g_main_loop_quit(loop);
break;
}
case GST_MESSAGE_QOS: {
g_print("\nQos\n");
g_print("type:%d\n", GST_MESSAGE_TYPE(msg));
g_print("src:%s\n", GST_OBJECT_NAME(msg->src));
break;
}
#ifndef PLATFORM_TEGRA
case GST_MESSAGE_ELEMENT: {
if (gst_nvmessage_is_stream_eos(msg)) {
guint stream_id;
if (gst_nvmessage_parse_stream_eos(msg, &stream_id)) {
g_print("Got EOS from stream %d\n", stream_id);
}
}
break;
}
#endif
default:
break;
}
return TRUE;
}
static gboolean set_tracker_properties(GstElement *nvtracker) {
gboolean ret = FALSE;
GError *error = NULL;
gchar **keys = NULL;
gchar **key = NULL;
GKeyFile *key_file = g_key_file_new();
if (!g_key_file_load_from_file(key_file, TRACKER_CONFIG_FILE, G_KEY_FILE_NONE,
&error)) {
g_printerr("Failed to load config file: %s\n", error->message);
return FALSE;
}
keys = g_key_file_get_keys(key_file, CONFIG_GROUP_TRACKER, NULL, &error);
CHECK_ERROR(error);
for (key = keys; *key; key++) {
if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_WIDTH)) {
gint width = g_key_file_get_integer(key_file, CONFIG_GROUP_TRACKER,
CONFIG_GROUP_TRACKER_WIDTH, &error);
CHECK_ERROR(error);
g_object_set(G_OBJECT(nvtracker), "tracker-width", width, NULL);
} else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_HEIGHT)) {
gint height = g_key_file_get_integer(key_file, CONFIG_GROUP_TRACKER,
CONFIG_GROUP_TRACKER_HEIGHT, &error);
CHECK_ERROR(error);
g_object_set(G_OBJECT(nvtracker), "tracker-height", height, NULL);
} else if (!g_strcmp0(*key, CONFIG_GPU_ID)) {
guint gpu_id = g_key_file_get_integer(key_file, CONFIG_GROUP_TRACKER,
CONFIG_GPU_ID, &error);
CHECK_ERROR(error);
g_object_set(G_OBJECT(nvtracker), "gpu_id", gpu_id, NULL);
} else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_LL_CONFIG_FILE)) {
char *ll_config_file = get_absolute_file_path(
TRACKER_CONFIG_FILE,
g_key_file_get_string(key_file, CONFIG_GROUP_TRACKER,
CONFIG_GROUP_TRACKER_LL_CONFIG_FILE, &error));
CHECK_ERROR(error);
g_object_set(G_OBJECT(nvtracker), "ll-config-file", ll_config_file, NULL);
} else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_LL_LIB_FILE)) {
char *ll_lib_file = get_absolute_file_path(
TRACKER_CONFIG_FILE,
g_key_file_get_string(key_file, CONFIG_GROUP_TRACKER,
CONFIG_GROUP_TRACKER_LL_LIB_FILE, &error));
CHECK_ERROR(error);
g_object_set(G_OBJECT(nvtracker), "ll-lib-file", ll_lib_file, NULL);
} else if (!g_strcmp0(*key, CONFIG_GROUP_TRACKER_ENABLE_BATCH_PROCESS)) {
gboolean enable_batch_process = g_key_file_get_integer(
key_file, CONFIG_GROUP_TRACKER,
CONFIG_GROUP_TRACKER_ENABLE_BATCH_PROCESS, &error);
CHECK_ERROR(error);
g_object_set(G_OBJECT(nvtracker), "enable_batch_process",
enable_batch_process, NULL);
} else {
g_printerr("Unknown key '%s' for group [%s]", *key, CONFIG_GROUP_TRACKER);
}
}
ret = TRUE;
done:
if (error) {
g_error_free(error);
}
if (keys) {
g_strfreev(keys);
}
if (!ret) {
g_printerr("%s failed", __func__);
}
return ret;
}
int main(int argc, char *argv[]) {
GMainLoop *loop = NULL;
GstPad *sinkpad, *srcpad, *nvosd_src_pad;
GstBus *bus = NULL;
// check device
int current_device = -1;
cudaGetDevice(¤t_device);
struct cudaDeviceProp prop;
cudaGetDeviceProperties(&prop, current_device);
// gst_init
gst_init(&argc, &argv);
loop = g_main_loop_new(NULL, FALSE);
pipeline = gst_pipeline_new("NN_passenger_flow");
// make some element
streammux = gst_element_factory_make("nvstreammux", "stream");
GstElement *source_bin = create_source_bin(
0, "rtsp://wowzaec2demo.streamlock.net/vod/mp4:BigBuckBunny_115k.mov");
pgie = gst_element_factory_make("nvinfer", "pgie-nvinfer");
tracker = gst_element_factory_make("nvtracker", "tracker");
nvdsanalytics = gst_element_factory_make("nvdsanalytics", "analytics");
nvvidconv = gst_element_factory_make("nvvideoconvert", "nvvideo-converter");
nvosd = gst_element_factory_make("nvdsosd", "nv-onscreendisplay");
tee = gst_element_factory_make("tee", "tee");
// to rtsp
nvosd_post = gst_element_factory_make("nvvideoconvert", "convertor_postosd");
encoder = gst_element_factory_make("nvv4l2h264enc", "encoder");
filter = gst_element_factory_make("capsfilter", "filter");
rtppay = gst_element_factory_make("rtph264pay", "rtppay");
sink = gst_element_factory_make("udpsink", "udpsink");
// queue
queue1 = gst_element_factory_make("queue", "queue1");
queue2 = gst_element_factory_make("queue", "queue2");
queue3 = gst_element_factory_make("queue", "queue3");
g_object_set(G_OBJECT(streammux), "width", 1920, "height", 1080, "batch-size",
1, "live-source", 1, "batched-push-timeout", 40000, NULL);
g_object_set(G_OBJECT(pgie), "config-file-path",
"nvdsanalytics_pgie_config.txt", NULL);
if (!set_tracker_properties(tracker)) {
g_printerr("Failed to set tracker properties. Exiting.\n");
return -1;
}
g_object_set(G_OBJECT(nvdsanalytics), "config-file",
"config_nvdsanalytics.txt", NULL);
g_object_set(G_OBJECT(encoder), "bitrate", 4000000,"qp-range","0,51:0,51:0,51","control-rate",1 ,"iframeinterval",30,"preset-level",0,"ratecontrol-enable",true,"quant-p-frames",85, NULL);
#ifdef PLATFORM_TEGRA
g_object_set(G_OBJECT(encoder),"preset-level",1,"insert-sps-pps",1,"bufapi-version",1,NULL);
#endif
g_object_set(G_OBJECT(sink), "host", "127.0.0.1", "port", 18000,
"async", false, "sync", 1, NULL);
GstCaps *caps = NULL;
caps = gst_caps_from_string("video/x-raw(memory:NVMM),format=I420");
g_object_set(G_OBJECT(filter), "caps", caps, NULL);
gst_caps_unref(caps);
// add elements to pipeline and link
gst_bin_add_many(GST_BIN(pipeline), source_bin, streammux, pgie,
queue1, tracker, nvdsanalytics, nvvidconv, queue2, nvosd,
tee, queue3, nvosd_post, filter, encoder, rtppay, sink,
NULL);
gst_element_link_many(streammux, pgie, queue1, tracker, nvdsanalytics,
nvvidconv, queue2, nvosd, queue3, nvosd_post, filter, encoder, rtppay, sink, NULL);
// link source to streammux
sinkpad = gst_element_get_request_pad(streammux, "sink_0");
srcpad = gst_element_get_static_pad(source_bin, "src");
if (!srcpad) {
g_printerr("Failed to get src pad of source bin. Exiting.\n");
return -1;
}
if (gst_pad_link(srcpad, sinkpad) != GST_PAD_LINK_OK) {
g_printerr("Failed to link source bin to stream muxer. Exiting.\n");
return -1;
}
gst_object_unref(srcpad);
gst_object_unref(sinkpad);
// rtsp server
GstRTSPServer *server;
GstRTSPMountPoints *mounts;
GstRTSPMediaFactory *factory;
server = gst_rtsp_server_new();
mounts = gst_rtsp_server_get_mount_points(server);
factory = gst_rtsp_media_factory_new();
g_object_set(server, "service", "8555", NULL);
gst_rtsp_media_factory_set_launch(
factory, "( udpsrc name=pay0 port=18000 buffer-size=524288 "
"caps=\"application/x-rtp, media=video, clock-rate=90000, "
"encoding-name=(string)H264, payload=96 \" )");
gst_rtsp_media_factory_set_shared(factory, TRUE);
gst_rtsp_mount_points_add_factory(mounts, "/ds-test", factory);
g_object_unref(mounts);
gst_rtsp_server_attach(server, NULL);
gst_element_set_state(pipeline, GST_STATE_PLAYING);
g_print("Running...\n");
g_main_loop_run(loop);
/* Out of the main loop, clean up nicely */
g_print("Returned, stopping playback\n");
gst_element_set_state(pipeline, GST_STATE_NULL);
g_print("Deleting pipeline\n");
gst_object_unref(GST_OBJECT(pipeline));
g_main_loop_unref(loop);
return 0;
}
- WebRTC
WebRTC application
import random
import ssl
import websockets
import asyncio
import os
import sys
import json
import argparse
import gi
gi.require_version('Gst', '1.0')
from gi.repository import Gst
gi.require_version('GstWebRTC', '1.0')
from gi.repository import GstWebRTC
gi.require_version('GstSdp', '1.0')
from gi.repository import GstSdp
PIPELINE_DESC = '''
webrtcbin name=sendrecv stun-server=stun://stun.l.google.com:19302
rtspsrc location=rtsp://192.168.100.46:8555/ds-test latency=0 name=demuxer
parsebin name=parse
demuxer. ! rtph264depay ! h264parse ! capsfilter name=caps ! rtph264pay ! application/x-rtp,media=video,encoding-name=H264,payload=96 ! sendrecv.
'''
def decodebin_child_added(child_proxy,Object,name,user_data):
print("Decodebin child added:", name, "\n")
if ("capsfilter" in name):
print("caps:",Object.get_property("caps").to_string());
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if(name.find("nvv4l2decoder") != -1):
if (is_aarch64()):
Object.set_property("enable-max-performance", True)
Object.set_property("drop-frame-interval", 0)
Object.set_property("num-extra-surfaces", 0)
else:
Object.set_property("gpu_id", GPU_ID)
# ! interpipesink name=a interpipesrc listen-to=a
class WebRTCClient:
def __init__(self, id_, peer_id, server):
self.id_ = id_
self.conn = None
self.pipe = None
self.webrtc = None
self.peer_id = peer_id
self.parse = None
self.server = server or 'wss://webrtc.nirbheek.in:8443'
#self.server = server or 'wss://192.168.100.26:8443'
async def connect(self):
sslctx = ssl.create_default_context(purpose=ssl.Purpose.CLIENT_AUTH)
self.conn = await websockets.connect(self.server, ssl=sslctx)
await self.conn.send('HELLO %d' % our_id)
async def setup_call(self):
await self.conn.send('SESSION {}'.format(self.peer_id))
def send_sdp_offer(self, offer):
text = offer.sdp.as_text()
#print ('Sending offer:\n%s' % text)
msg = json.dumps({'sdp': {'type': 'offer', 'sdp': text}})
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(msg))
def on_offer_created(self, promise, _, __):
promise.wait()
reply = promise.get_reply()
print(type(reply))
offer = reply.get_value('offer')
print(type(offer))
promise = Gst.Promise.new()
self.webrtc.emit('set-local-description', offer, promise)
promise.interrupt()
print(type(offer))
print(offer.sdp.as_text())
self.send_sdp_offer(offer)
def on_negotiation_needed(self, element):
promise = Gst.Promise.new_with_change_func(self.on_offer_created, element, None)
element.emit('create-offer', None, promise)
def send_ice_candidate_message(self, _, mlineindex, candidate):
icemsg = json.dumps({'ice': {'candidate': candidate, 'sdpMLineIndex': mlineindex}})
#print('Send ice:\n%s'%(icemsg))
loop = asyncio.new_event_loop()
loop.run_until_complete(self.conn.send(icemsg))
def on_incoming_stream(self, _, pad):
print("into \n"*10);
if pad.direction != Gst.PadDirection.SRC:
return
caps=pad.get_current_caps()
structure = caps.get_structure(0)
print(structure.get_name())
fakesink = Gst.ElementFactory.make('fakesink')
self.pipe.add(fakesink)
fakesink.sync_state_with_parent()
self.webrtc.link(fakesink)
def decodebin_child_added(self,child_proxy,Object,name,user_data):
print("="*100)
print("Decodebin child added:", name, "\n")
if(name.find("decodebin") != -1):
Object.connect("child-added",decodebin_child_added,user_data)
if(name.find("source") != -1):
# Object.connect("child-added",decodebin_child_added,user_data)
Object.set_property("latency",100)
def start_pipeline(self):
self.pipe = Gst.parse_launch(PIPELINE_DESC)
self.webrtc = self.pipe.get_by_name('sendrecv')
print("webrtc send",self.webrtc)
#parsebin =self.pipe.get_by_name("parsebin");
#parsebin.connect("child-added",decodebin_child_added,10);
caps = self.pipe.get_by_name("caps");
caps.set_property("caps", Gst.Caps.from_string(" video/x-h264, stream-format=(string)byte-stream, alignment=(string)au, parsed=(boolean)true; video/x-h264, alignment=(string)au, stream-format=(string){ avc, byte-stream }, parsed=(boolean)true; video/x-h264, parsed=(boolean)true, stream-format=(string){ avc, avc3, byte-stream }, alignment=(string){ au, nal }"))
self.webrtc.connect('on-negotiation-needed', self.on_negotiation_needed)
self.webrtc.connect('on-ice-candidate', self.send_ice_candidate_message)
self.webrtc.connect('pad-added', self.on_incoming_stream)
self.pipe.set_state(Gst.State.PLAYING)
async def handle_sdp(self, message):
assert (self.webrtc)
msg = json.loads(message)
print("offer:",msg)
if 'sdp' in msg:
sdp = msg['sdp']
assert(sdp['type'] == 'answer')
sdp = sdp['sdp']
#print ('Received answer:\n%s' % sdp)
res, sdpmsg = GstSdp.SDPMessage.new()
GstSdp.sdp_message_parse_buffer(bytes(sdp.encode()), sdpmsg)
answer = GstWebRTC.WebRTCSessionDescription.new(GstWebRTC.WebRTCSDPType.ANSWER, sdpmsg)
promise = Gst.Promise.new()
self.webrtc.emit('set-remote-description', answer, promise)
promise.interrupt()
elif 'ice' in msg:
#print("Received sdp:\n%s"%(msg['ice']))
ice = msg['ice']
candidate = ice['candidate']
sdpmlineindex = ice['sdpMLineIndex']
self.webrtc.emit('add-ice-candidate', sdpmlineindex, candidate)
async def loop(self):
assert self.conn
async for message in self.conn:
if message == 'HELLO':
await self.setup_call()
elif message == 'SESSION_OK':
self.start_pipeline()
elif message.startswith('ERROR'):
return 1
else:
await self.handle_sdp(message)
return 0
def check_plugins():
needed = ["opus", "vpx", "nice", "webrtc", "dtls", "srtp", "rtp",
"rtpmanager", "videotestsrc"]
missing = list(filter(lambda p: Gst.Registry.get().find_plugin(p) is None, needed))
if len(missing):
print('Missing gstreamer plugins:', missing)
return False
return True
if __name__=='__main__':
Gst.init(None)
if not check_plugins():
sys.exit(1)
parser = argparse.ArgumentParser()
parser.add_argument('peerid', help='String ID of the peer to connect to')
parser.add_argument('--server', help='Signalling server to connect to, eg "wss://127.0.0.1:8443"')
args = parser.parse_args()
our_id = random.randrange(10, 10000)
c = WebRTCClient(our_id, args.peerid, args.server)
asyncio.get_event_loop().run_until_complete(c.connect())
res = asyncio.get_event_loop().run_until_complete(c.loop())
sys.exit(res)