Sure, here it is:
#!/usr/bin/env python3
from ctypes import util
from distutils.command.config import config
from os import stat
import sys
import gi
gi.require_version('Gst', '1.0')
from gi.repository import GObject, Gst
from ctypes import *
import sys
import math
from common.bus_call import bus_call
from common.FPS import GETFPS
import common.utils as UTILS
import face_recognition_utils as FACE_UTILS
import pyds
fps_streams={}
PGIE_ALIGN_ID = 2
PGIE_RECOG_ID = 3
GOOD_FOR_RECOG_ID=2
DEBUG_CODE=True
MUXER_OUTPUT_WIDTH=1920
MUXER_OUTPUT_HEIGHT=1080
MUXER_BATCH_TIMEOUT_USEC=4000000
TILED_OUTPUT_WIDTH=1280
TILED_OUTPUT_HEIGHT=720
GST_CAPS_FEATURES_NVMM="memory:NVMM"
OSD_PROCESS_MODE=0
OSD_DISPLAY_TEXT=1
def pgie_align_src_pad_buffer_probe(pad,info,u_data):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
camara_id = frame_meta.source_id
l_obj=frame_meta.obj_meta_list
# detected faces
while l_obj is not None:
try:
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
tracker_id = obj_meta.object_id
# accesing to object metalist
l_user = obj_meta.obj_user_meta_list
while l_user is not None:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user.data)
except StopIteration:
break
if user_meta.base_meta.meta_type != pyds.NvDsMetaType.NVDSINFER_TENSOR_OUTPUT_META:
continue
tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)
# process the align part
if tensor_meta.unique_id == PGIE_ALIGN_ID:
almost_frontal, (roll, pitch, yaw) = FACE_UTILS.get_face_orientation(tensor_meta=tensor_meta)
print("CameraId", camara_id, "FaceID:", tracker_id, "Angles:", roll, pitch, yaw)
try:
l_user = l_user.next
except StopIteration:
break
try:
l_obj=l_obj.next
except StopIteration:
break
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
def run(stream_sources):
number_sources = len(stream_sources)
for i in range(number_sources): fps_streams["stream{0}".format(i)]=GETFPS(i)
# Standard GStreamer initialization
GObject.threads_init()
Gst.init(None)
# Create gstreamer elements */
# Create Pipeline element that will form a connection of other elements
print("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
is_live = False
if not pipeline:
sys.stderr.write(" Unable to create Pipeline \n")
return
# Create nvstreammux instance to form batches from one or more sources.
streammux = UTILS.create_gst_element("nvstreammux", "Stream-muxer")
pipeline.add(streammux)
for i in range(number_sources):
print("Creating source_bin ",i," \n ")
uri_name= stream_sources[i]
if uri_name.find("rtsp://") == 0 :
is_live = True
source_bin=UTILS.create_source_bin(i, uri_name)
if not source_bin:
sys.stderr.write("Unable to create source bin \n")
pipeline.add(source_bin)
padname="sink_%u" %i
sinkpad= streammux.get_request_pad(padname)
if not sinkpad:
sys.stderr.write("Unable to create sink pad bin \n")
srcpad=source_bin.get_static_pad("src")
if not srcpad:
sys.stderr.write("Unable to create src pad bin \n")
srcpad.link(sinkpad)
if is_live:
print("Atleast one of the sources is live")
streammux.set_property('live-source', 1)
queue1 = UTILS.create_gst_element("queue", "queue1")
pgie_det = UTILS.create_gst_element("nvinfer", "pgie_det")
queue2 = UTILS.create_gst_element("queue", "queue2")
pgie_tracker = UTILS.create_gst_element("nvtracker", "pgie_tracker")
queue3 = UTILS.create_gst_element("queue", "queue3")
pgie_align = UTILS.create_gst_element("nvinfer", "pgie_align")
queue4 = UTILS.create_gst_element("queue", "queue4")
# config mux
streammux.set_property('width', MUXER_OUTPUT_WIDTH)
streammux.set_property('height', MUXER_OUTPUT_HEIGHT)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', MUXER_BATCH_TIMEOUT_USEC)
# config pgies
pgie_det.set_property('config-file-path', "face_det_ULFD.txt")
pgie_batch_size=pgie_det.get_property("batch-size")
pgie_align.set_property('config-file-path', "onet_align.txt")
UTILS.configure_tracker(tracker=pgie_tracker,yml_file='tracker_config.txt')
# TODO(rbt): avoid change the batch size with ONNX models
if(pgie_batch_size != number_sources):
print("WARNING: Overriding infer-config batch-size",pgie_batch_size," with number of sources ", number_sources," \n")
pgie_det.set_property("batch-size",number_sources)
# Adding elements to Pipeline
pipeline.add(queue1)
pipeline.add(pgie_det)
pipeline.add(queue2)
pipeline.add(pgie_tracker)
pipeline.add(queue3)
pipeline.add(pgie_align)
pipeline.add(queue4)
# link elements
streammux.link(queue1); queue1.link(pgie_det)
pgie_det.link(queue2); queue2.link(pgie_tracker)
pgie_tracker.link(queue3); queue3.link(pgie_align)
pgie_align.link(queue4);
if DEBUG_CODE == False:
fakesink = UTILS.create_gst_element("fakesink", "fakesink")
pipeline.add(fakesink)
queue4.link(fakesink)
else:
tiler=UTILS.create_gst_element("nvmultistreamtiler", "nvtiler")
nvvidconv = UTILS.create_gst_element("nvvideoconvert", "convertor")
nvosd = UTILS.create_gst_element("nvdsosd", "onscreendisplay")
sink = UTILS.create_gst_element("nveglglessink", "nvvideo-renderer")
queue6 = UTILS.create_gst_element("queue", "queue6")
queue7 = UTILS.create_gst_element("queue", "queue7")
queue8 = UTILS.create_gst_element("queue", "queue8")
nvosd.set_property('process-mode',OSD_PROCESS_MODE)
nvosd.set_property('display-text',OSD_DISPLAY_TEXT)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", TILED_OUTPUT_WIDTH)
tiler.set_property("height", TILED_OUTPUT_HEIGHT)
sink.set_property("qos",0)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
pipeline.add(queue6)
pipeline.add(queue7)
pipeline.add(queue8)
queue4.link(tiler); tiler.link(queue6)
queue6.link(nvvidconv); nvvidconv.link(queue7)
queue7.link(nvosd); nvosd.link(queue8);
queue8.link(sink)
# create an event loop and feed gstreamer bus mesages to it
loop = GObject.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, loop)
# add buffer probes
UTILS.add_probe_callback(element=pgie_align, pad_name="src", funct=pgie_align_src_pad_buffer_probe)
# List the sources
print("Now playing...")
for i, source in enumerate(stream_sources):
print(i, ": ", source)
print("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
loop.run()
except:
pass
print("Exiting app\n")
pipeline.set_state(Gst.State.NULL)
if __name__ == '__main__':
run(stream_sources=[
"file:///app/samplevideos/face-demographics-walking-and-pause.mp4",
"file:///app/samplevideos/head-pose-face-detection-female-and-male.mp4",
"file:///app/samplevideos/head-pose-face-detection-male.mp4",
"file:///app/samplevideos/face-demographics-walking.mp4"
])