Hi,
I implemented the probe as your example: deepstream-test3.py
With modification to get the box location and write to XML.
I see that I read my property file every time I probe. That is wrong of course.
import os
import gi
import sys
import cv2
import math
import time
import pyds
import hashlib
import logging
import configparser
import numpy as np
import xml.etree.cElementTree as ET
from threading import Thread
from common.FPS import PERF_DATA
gi.require_version('Gst', '1.0')
from gi.repository import GLib, Gst
from datetime import datetime as dt
from common.bus_call import bus_call
current_time = dt.now().strftime("%m:%d:%Y_%H:%M:%S").encode('utf-8')
UNIQUE_IDENTIFIER = hashlib.md5(current_time).hexdigest()
pgie_classes_str = []
frame_number = 0
pgie_classes_str = []
perf_data = None
LOT_ID = "NoLotID"
# Generate XML file and save image upon detection
def createXML(img_object, frame_copy, line, output_folder, _lot_id):
fileString = img_object["filename"]
root = ET.Element("annotation")
ET.SubElement(root, "machine").text = line
ET.SubElement(root, "timestamp").text = img_object["timestamp"]
ET.SubElement(root, "lotid").text = _lot_id
ET.SubElement(root, "folder").text = "Live"
ET.SubElement(root, "filename").text = fileString
ET.SubElement(root, "path").text = output_folder
source = ET.SubElement(root, "source")
ET.SubElement(source, "database").text = str(img_object["database"])
size = ET.SubElement(root, "size")
ET.SubElement(size, "width").text = str(img_object["width"])
ET.SubElement(size, "height").text = str(img_object["height"])
ET.SubElement(size, "depth").text = "1"
ET.SubElement(root, "segmented").text = "0"
dIndex = 0
nrBoxes = 0
for box in img_object['annotations']:
if (box["conf"] >= 0.0):
objelement = ET.SubElement(root, "object")
ET.SubElement(objelement, "name").text = box["class"]
ET.SubElement(objelement, "pose").text = "Unspecified"
ET.SubElement(objelement, "truncated").text = "0"
ET.SubElement(objelement, "difficult").text = "0"
ET.SubElement(objelement, "conf").text = str(box["conf"])
bndBox = ET.SubElement(objelement, "bndbox")
ET.SubElement(bndBox, "xmin").text = str(box["box_coordinates"]["Xmin"])
ET.SubElement(bndBox, "ymin").text = str(box["box_coordinates"]["Ymin"])
ET.SubElement(bndBox, "xmax").text = str(box["box_coordinates"]["Xmax"])
ET.SubElement(bndBox, "ymax").text = str(box["box_coordinates"]["Ymax"])
nrBoxes+=1
dIndex+=1
if(nrBoxes > 0):
tree = ET.ElementTree(root)
full_filename = output_folder +"/" + fileString + ".xml"
tree.write(full_filename)
img_path = output_folder +"/" + fileString + ".jpg"
cv2.imwrite(img_path, frame_copy)
# Probe the pipline and get image and inference information
def pgie_src_pad_buffer_probe(pad,info,u_data):
global frame_number, perf_data, LOT_ID
aiProperties = configparser.ConfigParser()
aiProperties.read('default.properties')
OUTPUT_FOLDER = aiProperties.get('AI','output_folder')
LINE = aiProperties.get('MQTT','machineID')
num_rects=0
got_fps = False
gst_buffer = info.get_buffer()
if not gst_buffer:
logging.debug("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number=frame_meta.frame_num
l_obj=frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
filename="ID"+str(frame_meta.source_id) + "_" + str(UNIQUE_IDENTIFIER) + "_" + str(1000000000+frame_number)
timestamp = dt.now().strftime("%Y-%m-%d %H:%M:%S")
new_item ={
"filename":filename,
"timestamp":timestamp,
"width":frame_meta.source_frame_width,
"height":frame_meta.source_frame_height,
"database" : frame_meta.source_id,
"annotations":[]
}
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
# obj_counter[obj_meta.class_id] += 1
new_annotation={
"class":pgie_classes_str[obj_meta.class_id],
"conf":obj_meta.confidence,
"box_coordinates":{
"Xmin":int(obj_meta.rect_params.left),
"Ymin":int(obj_meta.rect_params.top),
"Xmax":int(obj_meta.rect_params.left+obj_meta.rect_params.width),
"Ymax":int(obj_meta.rect_params.top+obj_meta.rect_params.height),
}
}
new_item["annotations"].append(new_annotation)
try:
l_obj=l_obj.next
except StopIteration:
break
n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
frame_copy = np.array(n_frame, copy=True, order='C')
frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_BGR2RGB)
pyds.unmap_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
if new_item["annotations"]:
# print(frame_meta.source_id) #Add this to the XLXML for camera information
createXML(new_item, frame_copy,LINE, OUTPUT_FOLDER,LOT_ID)
#logging.debug("Frame Number=", frame_number, "Number of Objects=",num_rects)
# Update frame rate through this probe
stream_index = "stream{0}".format(frame_meta.pad_index)
perf_data.update_fps(stream_index)
try:
l_frame=l_frame.next
except StopIteration:
logging.debug("Error in l_frame")
break
return Gst.PadProbeReturn.OK
#Read the object classes from file
def readClassesFile(filename):
file_path = filename
labels = []
with open(file_path, 'r') as file:
labels = file.readlines()
# Optionally, you can strip newline characters from the end of each line
labels = [label.strip() for label in labels]
return labels
class AIDetectionManager:
def __init__(self):
logging.getLogger(__name__)
logging.debug("AI Detection Manager initiated")
self.aiProperties = configparser.ConfigParser()
self.aiProperties.read('default.properties')
self.number_sources = int(self.aiProperties.get('AI','number_sources'))
self.runAIDM = None
self.CLASS_LABEL_FILE = self.aiProperties.get('AI','class_label_file')
self.INPUT_FPS = int(self.aiProperties.get('AI','input_fps'))
self.STREAMMUX_WIDTH = int(self.aiProperties.get('AI','streammux_width'))
self.STREAMMUX_HEIGHT = int(self.aiProperties.get('AI','streammux_height'))
self.CONFIG_FILE_PATH = self.aiProperties.get('AI','config_file_path')
self.OSD_PROCESS_MODE = int(self.aiProperties.get('AI','osd_process_mode'))
self.OSD_DISPLAY_TEXT = int(self.aiProperties.get('AI','osd_display_text'))
self.TILED_OUTPUT_WIDTH = int(self.aiProperties.get('AI','tiled_output_width'))
self.TILED_OUTPUT_HEIGHT = int(self.aiProperties.get('AI','tiled_output_height'))
self.NO_DISPLAY = eval(self.aiProperties.get('AI','no_display'))
self.loop = None
self.pipeline = None
# export GST_DEBUG=4, 0
def run(self):
global perf_data, pgie_classes_str, LOT_ID
number_sources=self.number_sources
pgie_classes_str = readClassesFile(self.CLASS_LABEL_FILE)
perf_data = PERF_DATA(number_sources)
logging.debug("Starting deepstream")
# Standard GStreamer initialization
Gst.init(None)
logging.debug("Creating Pipeline \n ")
pipeline = Gst.Pipeline()
logging.debug("Creating sources pipeline")
sources=[]
caps_v4l2srcs=[]
vidconvsrcs=[]
nvvidconvsrcs=[]
caps_vidconvsrcs=[]
for s in range(number_sources):
source = Gst.ElementFactory.make("v4l2src", f"usb-cam-source-"+str(s*2))
source.set_property('device', '/dev/video'+str(s*2))
caps_v4l2src = Gst.ElementFactory.make("capsfilter", "v4l2src_caps"+str(s*2))
caps_v4l2src.set_property('caps', Gst.Caps.from_string(f"video/x-raw, framerate={self.INPUT_FPS}/1"))
vidconvsrc = Gst.ElementFactory.make("videoconvert", "convertor_src1_"+str(s*2))
nvvidconvsrc = Gst.ElementFactory.make("nvvideoconvert", "convertor_src2_"+str(s*2))
caps_vidconvsrc = Gst.ElementFactory.make("capsfilter", "nvmm_caps"+str(s*2))
caps_vidconvsrc.set_property('caps', Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA"))
pipeline.add(source)
pipeline.add(caps_v4l2src)
pipeline.add(vidconvsrc)
pipeline.add(nvvidconvsrc)
pipeline.add(caps_vidconvsrc)
sources.append(source)
caps_v4l2srcs.append(caps_v4l2src)
vidconvsrcs.append(vidconvsrc)
nvvidconvsrcs.append(nvvidconvsrc)
caps_vidconvsrcs.append(caps_vidconvsrc)
streammux = Gst.ElementFactory.make("nvstreammux", "Stream-muxer")
queue1=Gst.ElementFactory.make("queue","queue1")
queue2=Gst.ElementFactory.make("queue","queue2")
queue3=Gst.ElementFactory.make("queue","queue3")
queue4=Gst.ElementFactory.make("queue","queue4")
queue5=Gst.ElementFactory.make("queue","queue5")
pgie = Gst.ElementFactory.make("nvinfer", "primary-inference")
tiler=Gst.ElementFactory.make("nvmultistreamtiler", "nvtiler")
nvvidconv = Gst.ElementFactory.make("nvvideoconvert", "convertor")
nvosd = Gst.ElementFactory.make("nvdsosd", "onscreendisplay")
transform = Gst.ElementFactory.make("nvegltransform", "nvegl-transform")
sink = None
if self.NO_DISPLAY:
logging.debug("Creating Fakesink \n")
sink = Gst.ElementFactory.make("fakesink", "nvvideo-renderer")
sink.set_property('enable-last-sample', 0)
sink.set_property('sync', 0)
else:
sink = Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
sink.set_property('sync', False)
logging.debug("Setting common properties")
streammux.set_property('width', self.STREAMMUX_WIDTH)
streammux.set_property('height', self.STREAMMUX_HEIGHT)
streammux.set_property('batch-size', number_sources)
streammux.set_property('batched-push-timeout', 4000000)
streammux.set_property('live-source',1)
pgie.set_property('config-file-path', "config_infer_primary_yoloV8.txt")
pgie.set_property("batch-size",number_sources)
nvosd.set_property('process-mode',self.OSD_PROCESS_MODE)
nvosd.set_property('display-text',self.OSD_DISPLAY_TEXT)
tiler_rows=int(math.sqrt(number_sources))
tiler_columns=int(math.ceil((1.0*number_sources)/tiler_rows))
tiler.set_property("rows",tiler_rows)
tiler.set_property("columns",tiler_columns)
tiler.set_property("width", self.TILED_OUTPUT_WIDTH)
tiler.set_property("height", self.TILED_OUTPUT_HEIGHT)
sink.set_property("qos",0)
logging.debug(f"Playing {number_sources} cameras")
pipeline.add(streammux)
pipeline.add(queue1)
pipeline.add(queue2)
pipeline.add(queue3)
pipeline.add(queue4)
pipeline.add(queue5)
pipeline.add(pgie)
pipeline.add(tiler)
pipeline.add(nvvidconv)
pipeline.add(nvosd)
pipeline.add(sink)
if not self.NO_DISPLAY:
pipeline.add(transform)
srcpads = []
sinkpads = []
for s in range(number_sources):
srcpad = caps_vidconvsrcs[s].get_static_pad("src")
sinkpad = streammux.request_pad_simple("sink_"+str(s))
logging.debug("sink_number:"+str(s))
srcpads.append(srcpad)
sinkpads.append(sinkpad)
for i in range(number_sources):
sources[i].link(caps_v4l2srcs[i])
caps_v4l2srcs[i].link(vidconvsrcs[i])
vidconvsrcs[i].link(nvvidconvsrcs[i])
nvvidconvsrcs[i].link(caps_vidconvsrcs[i])
srcpads[i].link(sinkpads[i])
streammux.link(queue1)
queue1.link(pgie)
pgie.link(queue2) #pgie.link(tiler)
queue2.link(tiler)
tiler.link(queue3) #tiler.link(nvvidconv)
queue3.link(nvvidconv)
nvvidconv.link(queue4) #nvvidconv.link(nvosd)
queue4.link(nvosd)
nvosd.link(queue5) #nvosd.link(sink)
if self.NO_DISPLAY:
queue5.link(sink)
else:
queue5.link(transform)
transform.link(sink)
# create an event loop and feed gstreamer bus mesages to it
self.loop = GLib.MainLoop()
bus = pipeline.get_bus()
bus.add_signal_watch()
bus.connect ("message", bus_call, self.loop)
# Lets add probe to get informed of the meta data generated, we add probe to
# the sink pad of the osd element, since by that time, the buffer would have
# had got all the metadata.
pgie_src_pad = pgie.get_static_pad("src")
if not pgie_src_pad:
sys.stderr.write(" Unable to get sink pad of nvosd \n")
pgie_src_pad.add_probe(Gst.PadProbeType.BUFFER, pgie_src_pad_buffer_probe, 0)
GLib.timeout_add(5000, perf_data.perf_print_callback)
# start play back and listen to events
logging.debug("Starting pipeline \n")
pipeline.set_state(Gst.State.PLAYING)
try:
self.loop.run()
except:
pass
# cleanup
pipeline.set_state(Gst.State.NULL)
def start(self):
logging.debug("AI Detection Manager started")
Thread(target=self.run, args=(), name="AI Detection Manager").start()
return self
def stop(self):
if self.loop is not None:
self.loop.quit()
logging.debug("AI Detection Manager stopping")
def isRunning(self):
if self.loop is not None: # Check if self.loop is not None
if self.loop.is_running():
logging.debug("AI Detection is Running")
return self.loop.is_running()
else:
logging.warning("AI detection manager is not initialized")
return False
def setLotID(self, lotID):
global LOT_ID
logging.debug("Settings lot ID")
LOT_ID = lotID