Hi Hope you are doing good.
I have modified the code for custom labels and added different colors for each class.
Now I want to set properties of Label Text font, background color and position of label.
Also I want to add a threshold for prediction so that only accurate prediction achieve. I test the same model and same video on the python sample app and Reference app but the reference app result was very good and the python app object detection result was also good but showed some uncertain objects, therefore, I want to set thresholds that are equal to the threshold of Object detection of Deepstream Reference app to avoid uncertain prediction.
my code for custom label and custom colors.
color_list = {"balcony_with_railing\r": (0, 0, 1),
"balcony_without_railing\r": (1, 0.8, 0),
"helmet\r": (0.6, 0.2, 1),
"incomplete_railing\r": (1, 0, 0),
"person\r": (0.2, 0.8, 0.2),
"rail\r": (0.4, 0.4, 0.2)}
count_frames = 0
last_time = time.perf_counter()
stopping_flag = False
#-------------new added--------------
#
#no_display = False
#silent = False
#file_loop = False
#perf_data = None
MAX_DISPLAY_LEN=64
PGIE_CLASS_ID_balcony_with_railing = 0
PGIE_CLASS_ID_balcony_without_railing = 1
PGIE_CLASS_ID_helmet = 2
PGIE_CLASS_ID_incomplete_railing = 3
PGIE_CLASS_ID_person = 4
PGIE_CLASS_ID_rail = 5
#MUXER_OUTPUT_WIDTH=1920
#MUXER_OUTPUT_HEIGHT=1080
#MUXER_BATCH_TIMEOUT_USEC=4000000
#TILED_OUTPUT_WIDTH=1280
#TILED_OUTPUT_HEIGHT=720
#GST_CAPS_FEATURES_NVMM="memory:NVMM"
#OSD_PROCESS_MODE= 0
#OSD_DISPLAY_TEXT= 1
pgie_classes_str= ["balcony_with_railing", "balcony_without_railing", "helmet","incomplete_railing", "person", "rail"]
# pgie_src_pad_buffer_probe will extract metadata received on tiler sink pad
# and update params for drawing rectangle, object information etc.
def pgie_src_pad_buffer_probe(pad,info,u_data):
frame_number=0
obj_counter ={
PGIE_CLASS_ID_balcony_with_railing:0,
PGIE_CLASS_ID_balcony_without_railing:0,
PGIE_CLASS_ID_helmet:0,
PGIE_CLASS_ID_incomplete_railing:0,
PGIE_CLASS_ID_person: 0,
PGIE_CLASS_ID_rail: 0,
}
num_rects=0
# got_fps = False
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
# Retrieve batch metadata from the gst_buffer
# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame is not None:
try:
# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
# The casting is done by pyds.NvDsFrameMeta.cast()
# The casting also keeps ownership of the underlying memory
# in the C code, so the Python garbage collector will leave
# it alone.
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
frame_number=frame_meta.frame_num
l_obj=frame_meta.obj_meta_list
num_rects = frame_meta.num_obj_meta
while l_obj is not None:
try:
# Casting l_obj.data to pyds.NvDsObjectMeta
obj_meta=pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
#
txt_params = obj_meta.text_params
# Set display_text. Any existing display_text string will be
# freed by the bindings module.
txt_params.display_text = pgie_classes_str[obj_meta.class_id]
obj_counter[obj_meta.class_id] += 1
#------------------adding code for each color---------------------
color = color_list[obj_meta.obj_label.split("\\")[0]]
rect_params = obj_meta.rect_params
rect_params.border_color.set(*color, 1)
try:
l_obj=l_obj.next
except StopIteration:
break
#--------------------if he below lines of code become uncommented then you will see on display metadata ----------
# display_meta=pyds.nvds_acquire_display_meta_from_pool(batch_meta)
# display_meta.num_labels = 1
# py_nvosd_text_params = display_meta.text_params[0]
# #Frame Number={} Number of Objects={} Vehicle_count={} Person_count={}".format(frame_number, num_rects, obj_counter[PGIE_CLASS_ID_VEHICLE], obj_counter[PGIE_CLASS_ID_PERSON])
# py_nvosd_text_params.display_text = "Frame Number={} Number of Objects={} Helmet_count={} Person_count={}".format(frame_number, num_rects, obj_counter[PGIE_CLASS_ID_helmet], obj_counter[PGIE_CLASS_ID_person])
# if not silent:
# pass # uncomment this if you want to uncomment above code and comment below single line
# print("Frame Number=", frame_number, "Number of Objects=",num_rects,"Helmet_count=",obj_counter[PGIE_CLASS_ID_helmet],"Person_count=",obj_counter[PGIE_CLASS_ID_person])
with open("logfile.txt","a") as file:
file.write("Frame Number={} Number of Objects={} Helmet_count={} Person_count={} \n".format(frame_number, num_rects, obj_counter[PGIE_CLASS_ID_helmet], obj_counter[PGIE_CLASS_ID_person]))
# Now set the offsets where the string should appear
# py_nvosd_text_params.x_offset = 10
# py_nvosd_text_params.y_offset = 12
#
# # Font , font-color and font-size
# py_nvosd_text_params.font_params.font_name = "Serif"
# py_nvosd_text_params.font_params.font_size = 10
# # set(red, green, blue, alpha); set to White
# py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
#
# # Text background color
# py_nvosd_text_params.set_bg_clr = 1
# # set(red, green, blue, alpha); set to Black
# py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
# # Using pyds.get_string() to get display_text as string
# print(pyds.get_string(py_nvosd_text_params.display_text))
# pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
# Update frame rate through this probe
#stream_index = "stream{0}".format(frame_meta.pad_index)
#global perf_data
#perf_data.update_fps(stream_index)
try:
l_frame=l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
#------------------------------end of new fun-------------
def bus_call(bus, message, manager):
global stopping_flag
t = message.type
if t == Gst.MessageType.EOS:
print("End-of-stream\n")
stopping_flag = True
manager.loop.quit()
elif t == Gst.MessageType.WARNING:
err, debug = message.parse_warning()
print("Warning: %s: %s\n" % (err, debug))
elif t == Gst.MessageType.ERROR:
err, debug = message.parse_error()
stopping_flag = True
print("Error: %s: %s\n" % (err, debug))
manager.loop.quit()
return True
def filesink_out_bin(pipeline, manager, index):
print("creating filesink out bin")
queue = Gst.ElementFactory.make("queue", f"queue_file_{index}")
print("Creating nvvidconv1")
nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", f"convertor1_file_{index}")
if not nvvidconv1:
sys.stderr.write("Error: Unable to create nvvidconv1")
nvv4l2h265enc = Gst.ElementFactory.make("nvv4l2h265enc", f"h265-enc_file_{index}")
# nvv4l2h265enc.set_property("bufapi-version", "true")
h265parse = Gst.ElementFactory.make("h265parse", f"h265-parse_file_{index}")
print(f"saving file at {os.path.join(manager.video_output_dir, f'camera-{index}_video_xx.mp4')}")
file_sink = Gst.ElementFactory.make("splitmuxsink", f"filesink_file_{index}")
file_sink.set_property("location", os.path.join(manager.video_output_dir, f"camera-{index}_video_%d.mp4"))
file_sink.set_property("max-size-time", 1 * 60_000_000_000) # in minutes
pipeline.add(queue)
pipeline.add(nvvidconv1)
pipeline.add(nvv4l2h265enc)
pipeline.add(h265parse)
pipeline.add(file_sink)
queue.link(nvvidconv1)
nvvidconv1.link(nvv4l2h265enc)
nvv4l2h265enc.link(h265parse)
h265parse.link(file_sink)
sinkpad = queue.get_static_pad('sink')
srcpad = queue.get_static_pad('src')
if not sinkpad:
sys.stderr.write(f"Error: Unable to create file sink pad for stream-{index}")
if index > 0:
srcpad.add_probe(Gst.PadProbeType.BUFFER, control_flow_raw_probe, manager)
else:
srcpad.add_probe(Gst.PadProbeType.BUFFER, control_flow_inferred_probe, manager)
return sinkpad
def analytics_probe(pad, info, manager):
gst_buffer = info.get_buffer()
if not gst_buffer:
print("Unable to get GstBuffer ")
return
batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
l_frame = batch_meta.frame_meta_list
while l_frame:
balconys = set()
try:
frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
except StopIteration:
break
l_obj = frame_meta.obj_meta_list
while l_obj:
try:
obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
except StopIteration:
break
l_user_meta = obj_meta.obj_user_meta_list
while l_user_meta:
try:
user_meta = pyds.NvDsUserMeta.cast(l_user_meta.data)
if user_meta.base_meta.meta_type == pyds.nvds_get_user_meta_type("NVIDIA.DSANALYTICSOBJ.USER_META"):
user_meta_data = pyds.NvDsAnalyticsObjInfo.cast(user_meta.user_meta_data)
if user_meta_data.roiStatus:
for i in user_meta_data.roiStatus:
balconys.add(i)
except StopIteration:
break
try:
l_user_meta = l_user_meta.next
except StopIteration:
break
try:
l_obj = l_obj.next
except StopIteration:
break
display_text = ""
first = True
for balcony in balconys:
if not first:
display_text += "\n"
display_text += f"DANGER! humans in {balcony}"
first = False
display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
display_meta.num_labels = 1
py_nvosd_text_params = display_meta.text_params[0]
py_nvosd_text_params.display_text = display_text
py_nvosd_text_params.x_offset = 10
py_nvosd_text_params.y_offset = 12
py_nvosd_text_params.font_params.font_name = "Serif"
py_nvosd_text_params.font_params.font_size = 15
py_nvosd_text_params.font_params.font_color.set(1.0, 0.0, 0.0, 1.0)
py_nvosd_text_params.set_bg_clr = 1
py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)
pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)
try:
l_frame = l_frame.next
except StopIteration:
break
return Gst.PadProbeReturn.OK
please kindly give me someadvice regarding the issue.