Python DeepStream with AMQP: issues with messages and tracking

Hello!

I am trying to build my own DeepStream app for Jetson Nano, which will send detection and tracking info to AMQP queue, using custom Yolo PGIE and KLT tracker. However, I am having a few problems with configuring.

  • I do not want to set additional metadata for the message, so I removed it from the original deepstream-test4 app. However, I am still getting messages with “vehicle” included (even though I don’t set this type anywhere), and I wouldn’t like to have it this way (I am using a model that classifies quite a lot more classes than just vehicles). What should I change in my code?
  • I cannot manage to set tracking id accurately - for each detection the resulting ID is -1. Is there some additional config to be performed to make that happen?
  • I receive Segmentation Faults after ~100-200 frames of processing - here, I have no idea what the cause is, however it occurs only when messaging is enabled (bare detection and RTSP output works with no faults)

I append my code for setting up messaging callbacks:

def metadata_deepcopy(data, user_data):
    """Callback function for deep-copying an NvDsEventMsgMeta struct"""
    # Cast data to pyds.NvDsUserMeta and pyds.NvDsEventMsgMeta
    user_meta = pyds.NvDsUserMeta.cast(data)
    src_meta_data = user_meta.user_meta_data
    src_meta = pyds.NvDsEventMsgMeta.cast(src_meta_data)

    # Duplicate the memory contents of src_meta to dst_meta
    dst_meta_ptr = pyds.memdup(
        pyds.get_ptr(src_meta), sys.getsizeof(pyds.NvDsEventMsgMeta)
    )
    dst_meta = pyds.NvDsEventMsgMeta.cast(dst_meta_ptr)
    if src_meta.objSignature.size > 0:
        dst_meta.objSignature.signature = pyds.memdup(
            src_meta.objSignature.signature, src_meta.objSignature.size
        )
        dst_meta.objSignature.size = src_meta.objSignature.size

    return dst_meta


def metadata_free(data, user_data):
    """Callback function for freeing an NvDsEventMsgMeta instance"""
    user_meta = pyds.NvDsUserMeta.cast(data)
    src_meta = pyds.NvDsEventMsgMeta.cast(user_meta.user_meta_data)

    if src_meta.objSignature.size > 0:
        pyds.free_buffer(src_meta.objSignature.signature)
        src_meta.objSignature.size = 0
    if src_meta.extMsgSize > 0:
        pyds.free_gbuffer(src_meta.extMsg)
        src_meta.extMsgSize = 0


def msg_osd_buffer_probe(pad, info, u_data, config: Config):
    """Probe function to extract metadata received on OSD sink pad.

    .. note: this is used for updating params to draw boxes, obj information etc.
    """
    now = time.time()
    start_times: Dict[str, float] = defaultdict(lambda: now)
    fps_streams: Dict[str, int] = defaultdict(int)

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        logger.error("Unable to get GstBuffer")
        return

    # Retrieve batch metadata from the gst_buffer
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))  # obtain address
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        l_obj = frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                continue

            msg_meta = pyds.alloc_nvds_event_msg_meta()
            msg_meta.bbox.top = obj_meta.rect_params.top
            msg_meta.bbox.left = obj_meta.rect_params.left
            msg_meta.bbox.width = obj_meta.rect_params.width
            msg_meta.bbox.height = obj_meta.rect_params.height
            msg_meta.frameId = frame_number
            msg_meta.trackingId = ctypes.c_int(obj_meta.object_id & 0xFFFFFFFF).value
            msg_meta.confidence = obj_meta.confidence
            msg_meta.sensorId = 0
            msg_meta.placeId = 0
            msg_meta.moduleId = 0
            msg_meta.objClassId = obj_meta.class_id
            user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            user_event_meta.user_meta_data = msg_meta
            user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
            pyds.set_user_copyfunc(user_event_meta, metadata_deepcopy)
            pyds.set_user_releasefunc(user_event_meta, metadata_free)
            pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)

            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        logger.debug("Frame %d; total detected objects %d", frame_number, num_rects)
        fps_streams[frame_meta.pad_index] += 1
        now = time.time()
        delta = now - start_times[frame_meta.pad_index]
        if delta > 5:
            logger.info(
                "Stream %d FPS: %f",
                frame_meta.pad_index,
                fps_streams[frame_meta.pad_index] / delta,
            )
            fps_streams[frame_meta.pad_index] = 0
            start_times[frame_meta.pad_index] = now

        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK

What am I doing wrong? Thank you very much for any help.

• Hardware Platform: Jetson Nano
• DeepStream Version: 5.0.1
• JetPack Version: 4.4-b144
• Issue Type: questions

  • I do not want to set additional metadata for the message, so I removed it from the original deepstream-test4 app. However, I am still getting messages with “vehicle” included (even though I don’t set this type anywhere), and I wouldn’t like to have it this way (I am using a model that classifies quite a lot more classes than just vehicles). What should I change in my code?

json objects generated here, based on object meta data field objType, you can add your own customized extMsg and generated json objects based on it. sources/libs/nvmsgconv/nvmsgconv.cpp::generate_object_object

Tracking ID generated automatically, can you see tracking ID from the output?

Will try to repro the issue and get back to you.

there some run error, it would be better you give the whole source code for saving time.

I could not get RTSP stream to show tracking IDs. I can’t say if there really are tracking IDs or not, only that they are not sent through msg broker.

I am sorry, I provided code with my additional objects, imported from other modules. Here is the code necessary to setup messaging callbacks:

import ctypes
import logging
import platform
import sys
import time
from collections import defaultdict
from enum import Enum
from typing import Dict, Union

import pyds
from gi.repository import Gst


logger = logging.getLogger(__name__)


class DeepStreamError(Exception):
    """Exception raised when DeepStream module initialization failed."""


class PadType(str, Enum):
    """Gst pad type."""

    SRC = "src"
    SINK = "sink"


def get_static_pad(
    module: Gst.Element, pad_type: PadType, postfix: Union[int, str, None] = None
) -> Gst.Pad:
    """Get DeepStream static pad.

    .. note: pads are the element's interface to the outside world
             a static pad always exists

    :param module: DeepStream module to create the pad for
    :param pad_type: Gst pad type
    :param postfix: Gst pad type postfix
    :return: static pad
    """
    pad_name = pad_type.value if postfix is None else f"{pad_type.value}_{postfix}"
    pad = module.get_static_pad(pad_type.value)
    if not pad:
        raise DeepStreamError(f"Unable to create static pad {pad_name}")
    return pad


def metadata_deepcopy(data, user_data):
    """Callback function for deep-copying an NvDsEventMsgMeta struct"""
    # Cast data to pyds.NvDsUserMeta and pyds.NvDsEventMsgMeta
    user_meta = pyds.NvDsUserMeta.cast(data)
    src_meta_data = user_meta.user_meta_data
    src_meta = pyds.NvDsEventMsgMeta.cast(src_meta_data)

    # Duplicate the memory contents of src_meta to dst_meta
    dst_meta_ptr = pyds.memdup(
        pyds.get_ptr(src_meta), sys.getsizeof(pyds.NvDsEventMsgMeta)
    )
    dst_meta = pyds.NvDsEventMsgMeta.cast(dst_meta_ptr)
    if src_meta.objSignature.size > 0:
        dst_meta.objSignature.signature = pyds.memdup(
            src_meta.objSignature.signature, src_meta.objSignature.size
        )
        dst_meta.objSignature.size = src_meta.objSignature.size

    return dst_meta


def metadata_free(data, user_data):
    """Callback function for freeing an NvDsEventMsgMeta instance"""
    user_meta = pyds.NvDsUserMeta.cast(data)
    src_meta = pyds.NvDsEventMsgMeta.cast(user_meta.user_meta_data)

    if src_meta.objSignature.size > 0:
        pyds.free_buffer(src_meta.objSignature.signature)
        src_meta.objSignature.size = 0
    if src_meta.extMsgSize > 0:
        pyds.free_gbuffer(src_meta.extMsg)
        src_meta.extMsgSize = 0


def msg_osd_buffer_probe(pad, info, u_data):
    """Probe function to extract metadata received on OSD sink pad.

    .. note: this is used for updating params to draw boxes, obj information etc.
    """
    now = time.time()
    start_times: Dict[str, float] = defaultdict(lambda: now)
    fps_streams: Dict[str, int] = defaultdict(int)

    gst_buffer = info.get_buffer()
    if not gst_buffer:
        logger.error("Unable to get GstBuffer")
        return

    # Retrieve batch metadata from the gst_buffer
    batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))  # obtain address
    l_frame = batch_meta.frame_meta_list
    while l_frame is not None:
        try:
            frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
        except StopIteration:
            break

        frame_number = frame_meta.frame_num
        l_obj = frame_meta.obj_meta_list
        num_rects = frame_meta.num_obj_meta
        while l_obj is not None:
            try:
                obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
            except StopIteration:
                continue

            msg_meta = pyds.alloc_nvds_event_msg_meta()
            msg_meta.bbox.top = obj_meta.rect_params.top
            msg_meta.bbox.left = obj_meta.rect_params.left
            msg_meta.bbox.width = obj_meta.rect_params.width
            msg_meta.bbox.height = obj_meta.rect_params.height
            msg_meta.frameId = frame_number
            msg_meta.trackingId = ctypes.c_int(obj_meta.object_id & 0xFFFFFFFF).value
            msg_meta.confidence = obj_meta.confidence
            msg_meta.sensorId = 0
            msg_meta.placeId = 0
            msg_meta.moduleId = 0
            msg_meta.objClassId = obj_meta.class_id
            user_event_meta = pyds.nvds_acquire_user_meta_from_pool(batch_meta)
            user_event_meta.user_meta_data = msg_meta
            user_event_meta.base_meta.meta_type = pyds.NvDsMetaType.NVDS_EVENT_MSG_META
            pyds.set_user_copyfunc(user_event_meta, metadata_deepcopy)
            pyds.set_user_releasefunc(user_event_meta, metadata_free)
            pyds.nvds_add_user_meta_to_frame(frame_meta, user_event_meta)

            try:
                l_obj = l_obj.next
            except StopIteration:
                break

        logger.debug("Frame %d; total detected objects %d", frame_number, num_rects)
        fps_streams[frame_meta.pad_index] += 1
        now = time.time()
        delta = now - start_times[frame_meta.pad_index]
        if delta > 5:
            logger.info(
                "Stream %d FPS: %f",
                frame_meta.pad_index,
                fps_streams[frame_meta.pad_index] / delta,
            )
            fps_streams[frame_meta.pad_index] = 0
            start_times[frame_meta.pad_index] = now

        try:
            l_frame = l_frame.next
        except StopIteration:
            break

    return Gst.PadProbeReturn.OK


def pad_callback(decodebin, decoder_src_pad, data):
    """Callback to link decoder source pad to ghost pad on creating."""
    caps = decoder_src_pad.get_current_caps()
    gst_struct = caps.get_structure(0)
    gst_name = gst_struct.get_name()
    source_bin = data
    features = caps.get_features(0)

    # Check if the pad is a video pad
    if gst_name.find("video") != -1:
        # filter only nvdec_* decoder plugin (NVMM memory features)
        if features.contains("memory:NVMM"):
            # Get the source bin ghost pad
            bin_ghost_pad = get_static_pad(source_bin, PadType.SRC)
            if not bin_ghost_pad.set_target(decoder_src_pad):
                raise DeepStreamError(
                    "Failed to link decoder src pad to source bin ghost pad"
                )
        else:
            raise DeepStreamError("Error: Decodebin did not pick nvidia decoder plugin")


def child_added_callback(child_proxy, obj, name, user_data, n_sources: int):
    """Callback for setting up decodebin properties."""
    logger.debug("Decodebin child added: %s", name)
    if n_sources > 1:
        obj.set_property("drop-frame-interval", n_sources)
    if name.find("decodebin") != -1:
        obj.connect("child-added", child_added_callback, user_data)
    if platform.uname()[4] == "aarch64" and name.find("nvv4l2decoder") != -1:
        logger.debug("Seting bufapi_version")
        obj.set_property("bufapi-version", True)

Then I get pgie static pad and add msg osd buffer probe as here:

    pgie_pad = get_static_pad(pgie, PadType.SRC)
    pgie_pad.add_probe(
        Gst.PadProbeType.BUFFER,
        partial(msg_osd_buffer_probe, config=config),
        0,
    )

Perhaps it should be added in to tracker’s pad?

Yeah, you should add after tracker, osd should be fine.

1 Like

Indeed, moving the probe after the tracker fixed the issue with tracking ID. But still, I am getting segmentation faults after ca. 200 frames

I had similar issues that I narrowed down to setting copy_func and releasefunc in python. As an experiment, could you comment out those two lines?

I ended up rewriting the code in C where I didn’t have this problem.

Hi,
Did you try Nors’s suggestion? how about the result?