@yuweiw I am attaching the main function.
I separate the stream Using nvstreamdemux
My code is based on deepstream-imagedata-multistream-redaction.py
def main(uri_inputs,codec,bitrate ):
# Check input arguments
number_sources = len(uri_inputs)
global NUM_STREAMS
NUM_STREAMS = len(uri_inputs)
global perf_data
perf_data = PERF_DATA(number_sources)
global folder_name
folder_name = "out_crops"
# if path.exists(folder_name):
# sys.stderr.write("The output folder %s already exists. Please remove it first.\n" % folder_name)
# sys.exit(1)
# os.mkdir(folder_name)
# print("Frames will be saved in ", folder_name)
# Standard GStreamer initialization
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
print("Creating streamux \n ")
# Create nvstreammux instance to form batches from one or more sources.
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
if not streammux:
sys.stderr.write(" Unable to create NvStreamMux \n")
pipeline.add(streammux)
for i in range(number_sources):
#os.mkdir(folder_name + "/stream_" + str(i))
frame_count["stream_" + str(i)] = 0
saved_count["stream_" + str(i)] = 0
print("Creating source_bin ", i, " \n ")
uri_name = uri_inputs[i]
if uri_name.find("rtsp://") == 0:
is_live = True
source_bin = create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname = "sink_%u" % i
sinkpad = streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad = source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
q = Gst.ElementFactory.make("queue", "queue1")
# Primary detector
print("Creating Pgie \n ")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
if not pgie:
sys.stderr.write(" Unable to create pgie \n")
# creating tracker
tracker = Gst.ElementFactory.make("nvtracker", "tracker")
if not tracker:
sys.stderr.write(" Unable to create tracker \n")
# Secondary detector
sgie1 = Gst.ElementFactory.make("nvinfer", "secondary1-nvinference-engine")
if not sgie1:
sys.stderr.write(" Unable to make sgie1 \n")
# Third Detector
sgie2 = Gst.ElementFactory.make("nvinfer", "secondary2-nvinference-engine")
if not sgie2:
sys.stderr.write(" Unable to make sgie2 \n")
print("Creating nvstreamdemux \n ")
nvstreamdemux = Gst.ElementFactory.make("nvstreamdemux", "nvstreamdemux")
if not nvstreamdemux:
sys.stderr.write(" Unable to create nvstreamdemux \n")
if is_live:
print("Atleast one of the sources is live")
streammux.set_property("live-source", 1)
#print("Creating tiler \n ")
# tiler = Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
# if not tiler:
# sys.stderr.write(" Unable to create tiler \n")
# fake sink
# sink = Gst.ElementFactory.make("fakesink", "nvvideo-renderer")
# if not sink:
# sys.stderr.write("ERROR: Unable to create FAKE sink\n")
# sys.exit(1)
# sink.set_property("sync", 0)
print("Playing file {} ".format(uri_inputs))
streammux.set_property('width', 1920)
streammux.set_property('height', 1080)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
pgie.set_property('config-file-path', "config_infer_primary.txt")
sgie1.set_property('config-file-path', "config_infer_secondary.txt")
sgie2.set_property('config-file-path', "config_infer_third.txt")
pgie_batch_size = pgie.get_property("batch-size")
if (pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size", pgie_batch_size, " with number of sources ",
number_sources, " \n")
pgie.set_property("batch-size", number_sources)
# tiler_rows = int(math.sqrt(number_sources))
# tiler_columns = int(math.ceil((1.0 * number_sources) / tiler_rows))
# tiler.set_property("rows", tiler_rows)
# tiler.set_property("columns", tiler_columns)
# tiler.set_property("width", TILED_OUTPUT_WIDTH)
# tiler.set_property("height", TILED_OUTPUT_HEIGHT)
#Set properties of tracker
config = configparser.ConfigParser()
config.read('dstest_tracker_config.txt')
config.sections()
for key in config['tracker']:
if key == 'tracker-width' :
tracker_width = config.getint('tracker', key)
tracker.set_property('tracker-width', tracker_width)
if key == 'tracker-height' :
tracker_height = config.getint('tracker', key)
tracker.set_property('tracker-height', tracker_height)
if key == 'gpu-id' :
tracker_gpu_id = config.getint('tracker', key)
tracker.set_property('gpu_id', tracker_gpu_id)
if key == 'll-lib-file' :
tracker_ll_lib_file = config.get('tracker', key)
tracker.set_property('ll-lib-file', tracker_ll_lib_file)
if key == 'll-config-file' :
tracker_ll_config_file = config.get('tracker', key)
tracker.set_property('ll-config-file', tracker_ll_config_file)
if key == 'enable-batch-process' :
tracker_enable_batch_process = config.getint('tracker', key)
tracker.set_property('enable_batch_process', tracker_enable_batch_process)
if key == 'enable-past-frame' :
tracker_enable_past_frame = config.getint('tracker', key)
tracker.set_property('enable_past_frame', tracker_enable_past_frame)
if not is_aarch64():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
streammux.set_property("nvbuf-memory-type", mem_type)
# nvvidconv.set_property("nvbuf-memory-type", mem_type)
# nvvidconv1.set_property("nvbuf-memory-type", mem_type)
#tiler.set_property("nvbuf-memory-type", mem_type)
print("Creating nvvidconv1 \n ")
#nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
nvvidconvert_pre = Gst.ElementFactory.make("nvvideoconvert", "nvvidconvert_pre")
if not nvvidconvert_pre:
sys.stderr.write(" Unable to create nvvidconvert_pre \n")
print("Creating pre filter1 \n ")
caps1_pre = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
#filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
filter1_pre = Gst.ElementFactory.make("capsfilter", "filter1_pre")
if not filter1_pre:
sys.stderr.write(" Unable to get the caps filter1_pre \n")
filter1_pre.set_property("caps", caps1_pre)
queue1 = Gst.ElementFactory.make("queue", "nvtee-que1")
#queue1 = make_element("queue", "queue1"+str(i))
if not queue1:
sys.stderr.write(" Unable to create queue1 \n")
queue2 = Gst.ElementFactory.make("queue", "nvtee-que2")
#queue2 = make_element("queue", "queue2"+str(i))
if not queue2:
sys.stderr.write(" Unable to create queue2 \n")
msgconv = Gst.ElementFactory.make("nvmsgconv", "nvmsg-converter")
#msgconv = make_element("nvmsgconv", i)
if not msgconv:
sys.stderr.write(" Unable to create msgconv \n")
msgbroker = Gst.ElementFactory.make("nvmsgbroker", "nvmsg-broker")
#msgbroker = make_element("nvmsgbroker", i)
if not msgbroker:
sys.stderr.write(" Unable to create msgbroker \n")
global MSCONV_CONFIG_FILE
msgconv.set_property('config', MSCONV_CONFIG_FILE)
global schema_type
msgconv.set_property('payload-type', schema_type)
global proto_lib
msgbroker.set_property('proto-lib', proto_lib)
global conn_str
msgbroker.set_property('conn-str', conn_str)
# if cfg_file is not None:
# msgbroker.set_property('config', cfg_file)
# if topic is not None:
# msgbroker.set_property('topic', topic)
msgbroker.set_property('sync', False)
tee = Gst.ElementFactory.make("tee", "nvsink-tee")
#tee = make_element("tee", i)
if not tee:
sys.stderr.write(" Unable to create tee \n")
print("Adding elements to Pipeline \n")
pipeline.add(q)
pipeline.add(pgie)
pipeline.add(tracker)
pipeline.add(sgie1)
pipeline.add(sgie2)
pipeline.add(nvstreamdemux)
pipeline.add(nvvidconvert_pre)
pipeline.add(filter1_pre)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(tee)
pipeline.add(msgconv)
pipeline.add(msgbroker)
#streammux.link(pgie)
streammux.link(q)
q.link(pgie)
# pgie.link(nvvidconv1)
pgie.link(tracker)
tracker.link(nvvidconvert_pre)
nvvidconvert_pre.link(filter1_pre)
filter1_pre.link(sgie1)
sgie1.link(sgie2)
sgie2.link(tee)
queue1.link(msgconv)
msgconv.link(msgbroker)
queue2.link(nvstreamdemux)
for i in range(number_sources):
# Make the UDP sink
port = "540"+str(i)
updsink_port_num = int(port)
#sink = Gst.ElementFactory.make("udpsink", "udpsink")
sink = make_element("udpsink", i)
if not sink:
sys.stderr.write(" Unable to create udpsink")
sink.set_property('host', '224.224.255.255')
sink.set_property('port', updsink_port_num)
sink.set_property('async', False)
sink.set_property('sync', 0)
#pipeline.add(sink)
# creating queue
queue = make_element("queue", i)
pipeline.add(queue)
# Add nvvidconv1 and filter1 to convert the frames to RGBA
# which is easier to work with in Python.
print("Creating nvvidconv1 \n ")
#nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
nvvidconv1 = make_element("nvvideoconvert", "nvvideoconvert"+str(i))
if not nvvidconv1:
sys.stderr.write(" Unable to create nvvidconv1 \n")
print("Creating nvvidconv \n ")
#nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
nvvidconv = make_element("nvvideoconvert", "nvvidconv"+str(i))
if not nvvidconv:
sys.stderr.write(" Unable to create nvvidconv \n")
if not is_aarch64():
# Use CUDA unified memory in the pipeline so frames
# can be easily accessed on CPU in Python.
mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
nvvidconv.set_property("nvbuf-memory-type", mem_type)
nvvidconv1.set_property("nvbuf-memory-type", mem_type)
#nvvidconv_postosd = Gst.ElementFactory.make("nvvideoconvert", "convertor_postosd")
nvvidconv_postosd = make_element("nvvideoconvert", "nvvidconv_postosd"+str(i))
if not nvvidconv_postosd:
sys.stderr.write(" Unable to create nvvidconv_postosd \n")
print("Creating nvosd \n ")
#nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
nvosd = make_element("nvdsosd", "nvdsosd" + str(i))
if not nvosd:
sys.stderr.write(" Unable to create nvosd \n")
nvosd.set_property("process-mode", OSD_PROCESS_MODE)
#nvosd.set_property("display-text", OSD_DISPLAY_TEXT)
nvosd.set_property("display-text", 0)
nvosd.set_property("display-bbox", 0)
# connect nvstreamdemux -> queue
padname = "src_%u" % i
demuxsrcpad = nvstreamdemux.get_request_pad(padname)
if not demuxsrcpad:
sys.stderr.write("Unable to create demux src pad \n")
queuesinkpad = queue.get_static_pad("sink")
if not queuesinkpad:
sys.stderr.write("Unable to create queue sink pad \n")
demuxsrcpad.link(queuesinkpad)
print("Creating filter1 \n ")
caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
#filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
filter1 = make_element("capsfilter", "filter1"+str(i))
if not filter1:
sys.stderr.write(" Unable to get the caps filter1 \n")
filter1.set_property("caps", caps1)
# Create a caps filter
#caps = Gst.ElementFactory.make("capsfilter", "filter")
caps = make_element("capsfilter", "caps"+str(i))
caps.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=I420"))
# Make the encoder
if codec == "H264":
#encoder = Gst.ElementFactory.make("nvv4l2h264enc", "encoder")
encoder = make_element("nvv4l2h264enc", i)
print("Creating H264 Encoder")
elif codec == "H265":
#encoder = Gst.ElementFactory.make("nvv4l2h265enc", "encoder")
encoder = make_element("nvv4l2h265enc", i)
print("Creating H265 Encoder")
if not encoder:
sys.stderr.write(" Unable to create encoder")
encoder.set_property('bitrate', bitrate)
if is_aarch64():
encoder.set_property('preset-level', 1)
encoder.set_property('insert-sps-pps', 1)
#encoder.set_property('bufapi-version', 1)
# Make the payload-encode video into RTP packets
if codec == "H264":
#rtppay = Gst.ElementFactory.make("rtph264pay", "rtppay")
rtppay = make_element("rtph264pay", i)
print("Creating H264 rtppay")
elif codec == "H265":
#rtppay = Gst.ElementFactory.make("rtph265pay", "rtppay")
rtppay = make_element("rtph265pay", i)
print("Creating H265 rtppay")
if not rtppay:
sys.stderr.write(" Unable to create rtppay")
pipeline.add(nvvidconv)
pipeline.add(filter1)
pipeline.add(nvvidconv1)
pipeline.add(nvosd)
pipeline.add(nvvidconv_postosd)
pipeline.add(caps)
pipeline.add(encoder)
pipeline.add(rtppay)
pipeline.add(sink)
# linking elements
queue.link(nvvidconv1)
nvvidconv1.link(nvosd)
# queue.link(nvvidconv1)
# nvvidconv1.link(filter1)
# filter1.link(nvvidconv)
# nvvidconv.link(nvosd)
nvosd.link(nvvidconv_postosd)
nvvidconv_postosd.link(caps)
caps.link(encoder)
encoder.link(rtppay)
rtppay.link(sink)
sink.set_property("qos", 0)
# Start streaming
rtsp_port = "855"+str(i)
rtsp_port_num = int(rtsp_port)
server = GstRtspServer.RTSPServer.new()
server.props.service = "%d" % rtsp_port_num
server.attach(None)
factory = GstRtspServer.RTSPMediaFactory.new()
factory.set_launch( "( udpsrc name=pay0 port=%d buffer-size=524288 caps=\"application/x-rtp, media=video, clock-rate=90000, encoding-name=(string)%s, payload=96 \" )" % (updsink_port_num, codec))
factory.set_shared(True)
fac = "/ds-test" + str(i)
server.get_mount_points().add_factory(fac, factory)
print("\n *** DeepStream: Launched RTSP Streaming at rtsp://localhost:%d/%s ***\n\n" % (rtsp_port_num,str(fac)))
sink_pad = queue1.get_static_pad("sink")
tee_msg_pad = tee.get_request_pad('src_%u')
tee_render_pad = tee.get_request_pad("src_%u")
if not tee_msg_pad or not tee_render_pad:
sys.stderr.write("Unable to get request pads\n")
tee_msg_pad.link(sink_pad)
sink_pad = queue2.get_static_pad("sink")
tee_render_pad.link(sink_pad)
print("Linking elements in the Pipeline \n")
# create an event loop and feed gstreamer bus mesages to it
loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", bus_call, loop)
# nvosd_sink_pad = nvosd.get_static_pad("src")
nvosd_sink_pad = sgie2.get_static_pad("src")
if not nvosd_sink_pad:
sys.stderr.write(" Unable to get sink pad \n")
else:
nvosd_sink_pad.add_probe(Gst.PadProbeType.BUFFER, osd_sink_pad_buffer_probe, 0)
# perf callback function to print fps every 5 sec
GLib.timeout_add(4000, perf_data.perf_print_callback)
print("Starting pipeline \n")
# start play back and listed to events
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
# cleanup
print("Exiting app\n")
pyds.unset_callback_funcs()
pipeline.set_state(Gst.State.NULL)