gi.overrides.Gst.AddError: <Gst.Bin object>

I am trying to run the pipeline of gstreamer using test.py:

import os
import logging
from app.pipeline import Pipeline
from app.utils.config import CONFIGS_DIR, LOGLEVEL
import time

if __name__ == '__main__':

   DP = Pipeline(
           pgie_config_path="/home/velens/jetson-orin/configs/pgies/yolov8_pom.txt",
           tracker_config_path="/home/velens/jetson-orin/configs/trackers/nvdcf.txt",
         #   output_format="disp",
           host='192.168.124.214',
           port=5000
           )
   DP.run()


   time.sleep(30)

   DP.set_farmid(20)

   print("debug--------------------------------> adding  source")
   #exit(0)
   status = DP.start_playing()
   print("debug--------------------------------> start playing first time ", status)

while running i am getting this error:

inside mp4 sink
sink start at 192.168.124.214 with port 5000
gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream-6.4/lib/libnvds_nvmultiobjecttracker.so
[NvTrackerParams::getConfigRoot()] !!![WARNING] File doesn't exist. Will go ahead with default values
[NvTrackerParams::getConfigRoot()] !!![WARNING] File doesn't exist. Will go ahead with default values
[NvMultiObjectTracker] Initialized
WARNING: [TRT]: Using an engine plan file across different models of devices is not recommended and is likely to affect performance or even cause errors.
0:00:09.669003788 74561 0xfffe8bc6a0a0 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:2092> [UID = 1]: deserialized trt engine from :/home/velens/jetson-orin/configs/pgies/yolo_v8/model_b1_gpu0_fp32.engine
WARNING: [TRT]: The getMaxBatchSize() function should not be used with an engine built from a network created with NetworkDefinitionCreationFlag::kEXPLICIT_BATCH flag. This function will always return 1.
INFO: [Implicit Engine Info]: layers num: 4
0   INPUT  kFLOAT input           3x640x640       
1   OUTPUT kFLOAT boxes           8400x4          
2   OUTPUT kFLOAT scores          8400x1          
3   OUTPUT kFLOAT classes         8400x1          

0:00:10.113825531 74561 0xfffe8bc6a0a0 INFO                 nvinfer gstnvinfer.cpp:682:gst_nvinfer_logger:<primary-inference> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:2195> [UID = 1]: Use deserialized engine model: /home/velens/jetson-orin/configs/pgies/yolo_v8/model_b1_gpu0_fp32.engine
0:00:10.130586620 74561 0xfffe8bc6a0a0 INFO                 nvinfer gstnvinfer_impl.cpp:328:notifyLoadModelStatus:<primary-inference> [UID 1]: Load new model:/home/velens/jetson-orin/configs/pgies/yolov8_pom.txt sucessfully
debug--------------------------------> adding  source
bin_ghost_src_pad: <Gst.GhostPad object at 0xffff8eec1a40 (GstGhostPad at 0xaaaae80b31a0)>
bin_ghost_src_pad name : src
Created  src ghost pad and linked to : conv src pad, <Gst.Pad object at 0xffff8ff01f40 (GstPad at 0xaaaae80a3aa0)>
Traceback (most recent call last):
  File "/home/velens/jetson-orin/test.py", line 25, in <module>
    status = DP.start_playing()
  File "/home/velens/jetson-orin/app/pipeline.py", line 636, in start_playing
    self.pipeline.add(self.rs_source_bin)
  File "/usr/lib/python3/dist-packages/gi/overrides/Gst.py", line 73, in add
    raise AddError(arg)
gi.overrides.Gst.AddError: <Gst.Bin object at 0xffff8ef35e40 (GstBin at 0xaaaae80850a0)>

And my pipeline.py is:

"""This module contains all Gst pipeline related logic."""
# pipeline stuck at creating flush stop 0 after stopping second time

import csv
import json
import os
import sys
import math
import logging
import configparser
from functools import partial
from inspect import signature
import time
import shutil
from typing import List
import threading
from collections import defaultdict
import signal
import cv2
import numpy as np
import app.utils.gst_realsense_meta as gst_realsense_meta
sys.path.append('../')
import gi

os.environ["GST_DEBUG_DUMP_DOT_DIR"] = "/tmp"
os.putenv('GST_DEBUG_DUMP_DIR_DIR', '/tmp')

gi.require_version('Gst', '1.0')
gi.require_version('GstRtspServer', '1.0')
from gi.repository import GLib, Gst
import pyds

from app.utils.bus_call import bus_call
from app.utils.is_aarch_64 import is_aarch64
from app.utils.fps import PERF_DATA
from app.utils.bbox import rect_params_to_coords
from app.utils.config import CONFIGS_DIR, OUTPUT_DIR, CROPS_DIR,CONSTANTS

PGIE_CLASS_ID_PARTIAL = 1
PGIE_CLASS_ID_FULL = 0
STREAM_TYPE = 2
ALIGN = 1
IMU_ON = False
perf_data = None


class Pipeline:

    def __init__(self, *,pgie_config_path: str = os.path.join(CONFIGS_DIR, "pgies/pgie.txt"),
                 tracker_config_path: str = os.path.join(CONFIGS_DIR, "trackers/nvdcf.txt"),
                 host: str = None,
                 port=5000):
        """Create a Deepstream Pipeline.

        Args:
            pgie_config_path (str): The configuration file path of the primary inference engine.
            tracker_config_path (str): The configuration file path of the tracker.
        """
        self.logger = logging.getLogger(__name__ + "." + self.__class__.__name__)
        self.pgie_config_path = pgie_config_path
        self.tracker_config_path = tracker_config_path
        self.output_format = CONSTANTS['OUTPUT_FORMAT']
        self.rtsp_codec = CONSTANTS['RTSP_CODEC']
        self.host = host
        self.port = port
        self.input_shape = (CONSTANTS['IMAGE_RES_WIDTH'], CONSTANTS['IMAGE_RES_HEIGHT'])
        self.input_width = self.input_shape[0]
        self.input_height = self.input_shape[1]
        self.num_sources = 1  # TODO: to support multiple sources in the future
        self.fps_streams = {}
        self.farmID = None
        self.raw_video_path = os.path.join(OUTPUT_DIR, 'out.mp4')
        self.video_dir = None
        self.farm_dir = None
        self.crop_dir = None
        self.sdepth_image = np.zeros((self.input_height, self.input_width, 3))
        self.videoID = 1
        global perf_data
        perf_data = PERF_DATA(self.num_sources)
        self.track_scores = defaultdict(list)

        # self.logger.info(f"Playing from URI {self.video_uri}")
        Gst.init(None)
        self.logger.info("Creating Pipeline")
        self.pipeline = Gst.Pipeline()
        if not self.pipeline:
            self.logger.error("Failed to create Pipeline")

        self.elements = []
        self._create_elements()
        self._link_elements()
        self._add_probes()
        Gst.debug_bin_to_dot_file(self.pipeline, Gst.DebugGraphDetails.ALL, "pipeline")

    def set_farmid(self, farmid):
        self.farmID = farmid
        self.video_dir = os.path.join(OUTPUT_DIR, str(self.farmID),'videos')
        self.crop_dir = os.path.join(OUTPUT_DIR, str(self.farmID),'crop_data')
        self.farm_dir = os.path.join(OUTPUT_DIR, str(self.farmID))
        os.makedirs(self.crop_dir,exist_ok=True)
        os.makedirs(self.video_dir,exist_ok=True)

    def __str__(self):
        return " -> ".join([elm.name for elm in self.elements])

    def _add_element(self, element, idx=None):
        if idx:
            self.elements.insert(idx, element)
        else:
            self.elements.append(element)

        self.pipeline.add(element)

    def _create_element(self, factory_name, name, print_name, detail="", add=True):
        """Creates an element with Gst Element Factory make.

        Return the element if successfully created, otherwise print to stderr and return None.
        """
        self.logger.info(f"Creating {print_name}")
        elm = Gst.ElementFactory.make(factory_name, name)

        if not elm:
            self.logger.error(f"Unable to create {print_name}")
            if detail:
                self.logger.error(detail)

        if add:
            self._add_element(elm)

        return elm

    def _create_source(self, add=False):
        rssrc = self._create_element("realsensesrc", "realsense-source", "Realsense Source", add=add)
        rssrc.set_property('stream-type', STREAM_TYPE)
        rssrc.set_property('align', ALIGN)
        rssrc.set_property('imu_on', IMU_ON)
        return rssrc

    def demuxer_callback(self, demuxer, pad):
        print(f'pad template: {pad.get_property("template").name_template}')
        if pad.get_property("template").name_template == "color":
            qc_pad = self.queue_color.get_static_pad("sink")
            linked = pad.link(qc_pad)
            if linked != Gst.PadLinkReturn.OK:
                print('failed to link demux to color queue')
        elif pad.get_property("template").name_template == "depth":
            # print("not saving sink from depth")
            qd_pad = self.queue_depth.get_static_pad("sink")
            linked = pad.link(qd_pad)
            if linked != Gst.PadLinkReturn.OK:
                print('failed to link demux to depth queue')
        elif IMU_ON and pad.get_property("template").name_template == "imu":
            qi_pad = self.queue_imu.get_static_pad("sink")
            linked = pad.link(qi_pad)
            if linked != Gst.PadLinkReturn.OK:
                print('failed to link demux to IMU queue')

    def _create_streammux(self):
        streammux = self._create_element("nvstreammux", "stream-muxer", "Stream mux")
        streammux.set_property('width', self.input_width)
        streammux.set_property('height', self.input_height)
        streammux.set_property('batch-size', 1)
        streammux.set_property('batched-push-timeout', 40000)
        streammux.set_property('live-source', 1)
        return streammux

    def _create_demux(self, add=False):
        rsdemux = self._create_element("rsdemux", "realsense-demux", "Realsense Demux", add=add)
        rsdemux.connect('pad-added', self.demuxer_callback)
        return rsdemux

    def _create_tracker(self):
        tracker = self._create_element("nvtracker", "tracker", "Tracker")

        config = configparser.ConfigParser()
        config.read(self.tracker_config_path)
        config.sections()

        for key in config['tracker']:
            if key == 'tracker-width':
                tracker_width = config.getint('tracker', key)
                tracker.set_property('tracker-width', tracker_width)
            if key == 'tracker-height':
                tracker_height = config.getint('tracker', key)
                tracker.set_property('tracker-height', tracker_height)
            if key == 'gpu-id':
                tracker_gpu_id = config.getint('tracker', key)
                tracker.set_property('gpu_id', tracker_gpu_id)
            if key == 'll-lib-file':
                tracker_ll_lib_file = config.get('tracker', key)
                tracker.set_property('ll-lib-file', tracker_ll_lib_file)
            if key == 'll-config-file':
                tracker_ll_config_file = config.get('tracker', key)
                tracker.set_property('ll-config-file', tracker_ll_config_file)
            if key == 'enable-past-frame':
                tracker_enable_past_frame = config.getint('tracker', key)
                tracker.set_property('enable_past_frame', tracker_enable_past_frame)
            tracker.set_property('user-meta-pool-size', 64)

        return tracker

    def _create_rtsp_sink(self):
        nvvidconv3 = self._create_element("nvvideoconvert", "convertor3", "Converter 3")
        nvvidconv3.set_property("compute-hw", 1)
        nvvidconv3.set_property("nvbuf-memory-type", 2)
        capsfilter2 = self._create_element("capsfilter", "capsfilter2", "Caps filter 2")
        capsfilter2.set_property("caps",
                                 Gst.Caps.from_string("video/x-raw,width=(int)1280, height=(int)720, format=I420"))
        if self.rtsp_codec not in ["264", "265"]:
            raise ValueError(f"Invalid codec '{self.rtsp_codec}'")

        # Make the encoder
        encoder = self._create_element(f"x{self.rtsp_codec.lower()}enc", "encoder",
                                       f"{self.rtsp_codec} encoder")
        encoder.set_property('bitrate', 500)

        if is_aarch64():
            # encoder.set_property('speed-preset', "ultrafast")
            encoder.set_property('tune', "zerolatency")
            # encoder.set_property('bufapi-version', 1)

        # Make the payload-encode video into RTP packets
        rtppay = self._create_element(f"rtph{self.rtsp_codec.lower()}pay", "rtppay",
                                      f"{self.rtsp_codec} rtppay")

        rtppay.set_property('config-interval', 1)
        rtppay.set_property('pt', 96)

        # Make the UDP sink
        sink = self._create_element("udpsink", "udpsink", "UDP sink")
        sink.set_property('host', self.host)
        sink.set_property('port', self.port)
        sink.set_property('async', False)
        sink.set_property('sync', False)
        print(f"sink start at {self.host} with port {self.port}")
        return [self.queue_rtsp, nvvidconv3, capsfilter2, encoder, rtppay, sink]

    def _create_mp4_sink(self):
        print("inside mp4 sink")
        nvvidconv = self._create_element("nvvideoconvert", "nvvidconv", "NV Video Converter")
        nvvidconv.set_property("compute-hw", 1)
        nvvidconv.set_property("nvbuf-memory-type", 2)
        capsfilter = self._create_element("capsfilter", "capsfilter mp4", "Caps Filter mp4")
        capsfilter.set_property("caps", Gst.Caps.from_string("video/x-raw, format=I420"))

        # On Jetson, there is a problem with the encoder failing to initialize due to limitation on
        # TLS usage. To work around this, preload libgomp.
        preload_reminder = "If the following error is encountered:\n" + \
                           "/usr/lib/aarch64-linux-gnu/libgomp.so.1: cannot allocate memory in " \
                           "static TLS block\n" + \
                           "Preload the offending library:\n" + \
                           "export LD_PRELOAD=/usr/lib/aarch64-linux-gnu/libgomp.so.1\n"
        encoder = self._create_element("avenc_mpeg4", "avenc_mpeg4", "MPEG-4 Encoder",
                                       detail=preload_reminder)
        encoder.set_property("bitrate", 2000000)
        codeparser = self._create_element("mpeg4videoparse", "mpeg4-parser", "MPEG-4 Parser")
        container = self._create_element("qtmux", "qtmux", "QuickTime Muxer")
        #mkv = Gst.ElementFactory.make("matroskamux", "mkv")
        filesink = self._create_element("filesink", "filesink", "File Sink")
        filesink.set_property("location", self.raw_video_path)
        filesink.set_property("sync", False)
        filesink.set_property("async", False)
        return [self.queue_mp4, nvvidconv, capsfilter, encoder, codeparser, container, filesink]

    def _create_source_bin(self, index=0):
        def _cb_newpad(_, pad, data):
            self.logger.info("In cb_newpad\n")
            if not pad.is_linked():
                # Get a sink pad from the streammux, link to decodebin
                sinkpad = self._get_request_pad(self.streammux, "sink_0")
                if pad.link(sinkpad) == Gst.PadLinkReturn.OK:
                    print("Decodebin linked to pipeline")
                else:
                    sys.stderr.write("Failed to link decodebin to pipeline\n")

        self.logger.info("Creating Source bin")
        bin_name = "source-bin-%02d" % index
        source_bin = Gst.Bin.new(bin_name)
        if not source_bin:
            self.logger.error("Unable to create source bin")

        self.rs_source = self._create_source(add=False)
        self.queue_color = self._create_element('queue', 'queue_color', 'queue color', add=False)
        vidconvert_color = self._create_element("videoconvert", "convert-color", "convert-color", add=False)
        nvvidconvsrc = self._create_element("nvvidconv", "nv convert-color", "nv convert-color", add=False)
        caps_vidconvsrc = self._create_element("capsfilter", "nvmm_caps", "nvmm_caps", add=False)
        caps_vidconvsrc.set_property('caps', Gst.Caps.from_string("video/x-raw(memory:NVMM),format=(string)NV12"))
        nvvidconvsrc1 = self._create_element("nvvidconv", "nv convert-color1", "nv convert-color1", add=False)

        vidconvert_depth = self._create_element("videoconvert", "convert-depth", "convert-depth", add=False)
        self.queue_depth = self._create_element('queue', 'queue_depth', 'queue depth', add=False)
        sink_depth = self._create_element('fakesink', 'sink-depth', 'Sink Depth', add=False)
        self.depth_enc_elements = [self.queue_depth, vidconvert_depth, sink_depth]
        rs_demux = self._create_demux(add=False)
        self.rs_elements = [self.rs_source, rs_demux]
        self.rgb_enc_elements = [self.queue_color, vidconvert_color, nvvidconvsrc, caps_vidconvsrc, nvvidconvsrc1]

        Gst.Bin.add(source_bin, self.rs_source)
        Gst.Bin.add(source_bin, rs_demux)
        Gst.Bin.add(source_bin, self.queue_color)
        Gst.Bin.add(source_bin, vidconvert_color)
        Gst.Bin.add(source_bin, nvvidconvsrc)
        Gst.Bin.add(source_bin, caps_vidconvsrc)
        Gst.Bin.add(source_bin, nvvidconvsrc1)

        Gst.Bin.add(source_bin, vidconvert_depth)
        Gst.Bin.add(source_bin, self.queue_depth)
        Gst.Bin.add(source_bin, sink_depth)

        self._link_sequential(self.rs_elements)
        self._link_sequential(self.depth_enc_elements)
        self._link_sequential(self.rgb_enc_elements)

        srcpad = self._get_static_pad(nvvidconvsrc1, "src")

        ### Add signal handler
        # self.rs_source.connect("pad-added", _cb_newpad, srcpad)
        bin_pad = source_bin.add_pad(Gst.GhostPad.new("src", srcpad))
        if not bin_pad:
            self.logger.error("Failed to add ghost pad in source bin")
            return None
        else:
            bin_ghost_src_pad = source_bin.get_static_pad("src")
            bin_ghost_src_pad.set_active(True)
            print(f'bin_ghost_src_pad: {bin_ghost_src_pad}')
            print(f'bin_ghost_src_pad name : {bin_ghost_src_pad.get_name()}')
            print(f'Created  src ghost pad and linked to : conv src pad, {srcpad}')
            if (not bin_ghost_src_pad):
                print("Failed to get recently added src ghost pad src of bin. Exiting.\n")
                return None

        # release the src
        # srcpad.unref()                # GST_OBJECT/GObject might cause error
        self._add_element(source_bin)
        return source_bin

    def _create_elements(self):
        self.streammux = self._create_streammux()
        pgie = self._create_element("nvinfer", "primary-inference", "PGIE")
        pgie.set_property('config-file-path', self.pgie_config_path)
        tracker = self._create_tracker()

        # Use convertor to convert from NV12 to RGBA (easier to work with in Python)
        nvvidconv1 = self._create_element("nvvideoconvert", "convertor1", "Converter 1")
        capsfilter1 = self._create_element("capsfilter", "capsfilter1", "Caps filter 1")
        capsfilter1.set_property("caps", Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA"))
        self.nvvidconv2 = self._create_element("nvvideoconvert", "convertor2", "Converter 2")
        self.queue_rtsp = self._create_element("queue", "queue-rtsp", "Queue rtsp sink")
        nvosd = self._create_element("nvdsosd", "onscreendisplay", "OSD")
        nvosd.set_property('display-bbox', 1)
        if self.output_format.lower() == "rtsp":
            self.queue_mp4 = self._create_element("queue", "queue-mp4", "Queue mp4 sink")
            self.tee = self._create_element("tee", "nvsink-tee", "nvsink tee")
            self.dl_elements = [self.streammux, pgie, tracker, nvvidconv1, capsfilter1, self.nvvidconv2, nvosd,
                                self.tee]
        else:
            self.dl_elements = [self.streammux, pgie, tracker, nvvidconv1, capsfilter1, self.nvvidconv2, nvosd,
                                self.queue_rtsp]

        if self.output_format.lower() == "rtsp":
            self.mp4_elements = self._create_mp4_sink()
            self.rtsp_elements = self._create_rtsp_sink()
        else:
            sink_display = self._create_element("nv3dsink", "nv3d-sink", "nv3d sink")
            sink_display.set_property('sync', 0)
            self.render_elements = [self.queue_rtsp, sink_display]

        if not is_aarch64():
            # Use CUDA unified memory so frames can be easily accessed on CPU in Python.
            mem_type = int(pyds.NVBUF_MEM_CUDA_UNIFIED)
            nvvidconv1.set_property("nvbuf-memory-type", mem_type)
            self.nvvidconv2.set_property("nvbuf-memory-type", mem_type)

    @staticmethod
    def _link_sequential(elements: list):
        for i in range(0, len(elements) - 1):
            elements[i].link(elements[i + 1])

    def _link_elements(self):
        # ptvsd.debug_this_thread()
        self.logger.info(f"Linking elements in the Pipeline: {self}")
        # detecte/track pipeline
        self._link_sequential(self.dl_elements)
        if self.output_format.lower() == "rtsp":
            # parallel sinking to rtsp and local
            tee_q1_pad = self.tee.get_request_pad("src_%u")
            q1_pad = self.queue_mp4.get_static_pad('sink')
            tee_q1_pad.link(q1_pad)

            tee_q2_pad = self.tee.get_request_pad("src_%u")
            q2_pad = self.queue_rtsp.get_static_pad('sink')
            tee_q2_pad.link(q2_pad)

            # render/stream/save video
            self._link_sequential(self.mp4_elements)
            self._link_sequential(self.rtsp_elements)
        else:
            self._link_sequential(self.render_elements)

    def _write_osd_analytics(self, batch_meta, l_frame_meta: List, ll_obj_meta: List[List]):
        obj_counter = defaultdict(int)

        for frame_meta, l_obj_meta in zip(l_frame_meta, ll_obj_meta):
            frame_number = frame_meta.frame_num
            num_rects = frame_meta.num_obj_meta

            for obj_meta in l_obj_meta:
                obj_counter[obj_meta.class_id] += 1
                obj_meta.rect_params.border_color.set(0.0, 0.0, 1.0, 0.0)

            stream_index = "stream{0}".format(frame_meta.pad_index)
            global perf_data
            fps = perf_data.update_fps(stream_index)
            print("fps:-", fps)
            display_meta = pyds.nvds_acquire_display_meta_from_pool(batch_meta)
            display_meta.num_labels = 1
            py_nvosd_text_params = display_meta.text_params[0]
            py_nvosd_text_params.display_text = \
                "Frame Number={} Number of Objects={} Full_pom_count={} Partial_pom_count={} FPS={}".format(
                    frame_number, num_rects, obj_counter[PGIE_CLASS_ID_FULL],
                    obj_counter[PGIE_CLASS_ID_PARTIAL], fps)

            py_nvosd_text_params.x_offset = 10
            py_nvosd_text_params.y_offset = 12
            py_nvosd_text_params.font_params.font_name = "Serif"
            py_nvosd_text_params.font_params.font_size = 10
            py_nvosd_text_params.font_params.font_color.set(1.0, 1.0, 1.0, 1.0)
            py_nvosd_text_params.set_bg_clr = 1
            py_nvosd_text_params.text_bg_clr.set(0.0, 0.0, 0.0, 1.0)

            self.logger.info(pyds.get_string(py_nvosd_text_params.display_text))
            pyds.nvds_add_display_meta_to_frame(frame_meta, display_meta)

    def _calculate_crop_score(self, track_id, crop):
        # implement selection logic
        return None

    def _save_crops(self, frames, _, l_frame_meta: List, ll_obj_meta: List[List]):
        # ptvsd.debug_this_thread()
        self.logger.info(f"Saving crops to '{os.path.realpath(CROPS_DIR)}'")
        for frame_data, frame_meta, l_obj_meta in zip(frames, l_frame_meta, ll_obj_meta):
            frame_copy = np.array(frame_data[0], copy=True, order='C')
            frame_copy = cv2.cvtColor(frame_copy, cv2.COLOR_RGBA2BGRA)
            if frame_data[1].shape[1]:
                depth_copy = np.array(frame_data[1], copy=True, order='C')
                # print("herer shape of data is ",frame_copy.shape,depth_copy.shape)
                self.sdepth_image[:, :, 0] = depth_copy // 10
                self.sdepth_image[:, :, 1] = depth_copy % 10
                # cv2.imwrite(os.path.join(CROPS_DIR, f"depth_{frame_meta.frame_num}_full.jpg"), self.sdepth_image)
                # cv2.imwrite(os.path.join(CROPS_DIR, f"color_{frame_meta.frame_num}_full.jpg"), frame_copy)

            for obj_meta in l_obj_meta:
                track_id = obj_meta.object_id
                x1, y1, x2, y2 = rect_params_to_coords(obj_meta.tracker_bbox_info.org_bbox_coords)
                crop = frame_copy[y1:y2, x1:x2]
                if frame_data[1].shape[1]:
                    cropd = self.sdepth_image[y1:y2, x1:x2, :]
                crop_score = obj_meta.tracker_confidence

                if not self.track_scores[track_id] or crop_score > max(self.track_scores[track_id]):
                    os.makedirs(self.crop_dir, exist_ok=True)
                    for f in os.listdir(self.crop_dir):
                        os.remove(os.path.join(self.crop_dir, f))
                    crop_path = os.path.join(self.crop_dir, f"color_fruit_{obj_meta.object_id}.png")
                    cv2.imwrite(crop_path, crop)
                    if frame_data[1].shape[1]:
                        cv2.imwrite(crop_path.replace("color_", "depth_"), cropd)
                    self.logger.debug(f"Saved crop to '{crop_path}'")
                    with open(os.path.join(self.farm_dir, f"tracking_video.csv"),'a') as csvfile:
                        # creating a csv writer object
                        csvwriter = csv.writer(csvfile)
                        # writing the data rows
                        csvwriter.writerows([[f"color_fruit_{str(obj_meta.object_id)}.png",str(x1),str(x2),str(y1),str(y2),str(x2-x1),str(y2-y1)]])

                self.track_scores[track_id].append(crop_score)

    def _probe_fn_wrapper(self, _, info, probe_fn, get_frames=False):
        gst_buffer = info.get_buffer()
        if not gst_buffer:
            self.logger.error("Unable to get GstBuffer")
            return

        frames = []
        l_frame_meta = []
        ll_obj_meta = []
        batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
        l_frame = batch_meta.frame_meta_list
        while l_frame is not None:
            try:
                frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)

            except StopIteration:
                break

            intrs = gst_realsense_meta.get_color_intrinsics(gst_buffer)
            if not os.path.isfile(os.path.join(self.farm_dir,'intrinsic.json')):
                data_dict = {
                    'width': intrs.width,
                    'height': intrs.height,
                    'cameraIntrinsics': [[intrs.fx, intrs.ppx, ], [0, intrs.fx, intrs.ppy],
                                         [0, 0, 1]],
                }
                with open(os.path.join(self.farm_dir, 'intrinsic.json'), 'w') as json_file:
                    json.dump(data_dict, json_file)

            if get_frames:
                frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
                if frame_meta.frame_num > 0:
                    depth = gst_realsense_meta.get_depth_data(gst_buffer)
                else:
                    depth = np.array([[]])
                    # print("shape of depth data is  ============ ",depth.shape)
                frames.append((frame, depth))

            l_frame_meta.append(frame_meta)
            l_obj_meta = []

            l_obj = frame_meta.obj_meta_list
            while l_obj is not None:
                try:
                    obj_meta = pyds.NvDsObjectMeta.cast(l_obj.data)
                except StopIteration:
                    break

                l_obj_meta.append(obj_meta)

                try:
                    l_obj = l_obj.next
                except StopIteration:
                    break

            ll_obj_meta.append(l_obj_meta)

            try:
                l_frame = l_frame.next
            except StopIteration:
                break

        if get_frames:
            probe_fn(frames, batch_meta, l_frame_meta, ll_obj_meta)
        else:
            probe_fn(batch_meta, l_frame_meta, ll_obj_meta)

        return Gst.PadProbeReturn.OK

    def _wrap_probe(self, probe_fn):
        get_frames = "frames" in signature(probe_fn).parameters
        return partial(self._probe_fn_wrapper, probe_fn=probe_fn, get_frames=get_frames)

    @staticmethod
    def _get_static_pad(element, pad_name: str = "sink"):
        pad = element.get_static_pad(pad_name)
        if not pad:
            raise AttributeError(f"Unable to get {pad_name} pad of {element.name}")

        return pad

    @staticmethod
    def _get_request_pad(element, pad_name: str = "sink"):
        pad = element.get_request_pad(pad_name)
        if not pad:
            raise AttributeError(f"Unable to get {pad_name} pad of {element.name}")

        return pad

    def _add_probes(self):
        sinkpad = self._get_static_pad(self.nvvidconv2)
        sinkpad.add_probe(Gst.PadProbeType.BUFFER, self._wrap_probe(self._write_osd_analytics))
        sinkpad.add_probe(Gst.PadProbeType.BUFFER, self._wrap_probe(self._save_crops))

    def set_video_name(self):
        if os.path.isfile(self.raw_video_path):
            shutil.move(self.raw_video_path, os.path.join(self.video_dir, f'video{self.videoID}.mp4'))
            shutil.copy('/tmp/out.mp4',self.raw_video_path)
            self.logger.error(
                f"Saved video at location {os.path.join(self.video_dir, f'video{self.videoID}.mp4')}")
            self.videoID += 1
            return True
        else:
            self.logger.error(f"File {self.raw_video_path} does not exist.")
            return False

    def copy_video_name(self):
        if os.path.isfile(self.raw_video_path):
            shutil.copy(self.raw_video_path,'/tmp/out.mp4')
            return True
        else:
            self.logger.error(f"File {self.raw_video_path} does not exist.")
            return False

    def stop_playing(self):
        # Attempt to change status of source to be released
        self.pipeline.set_state(Gst.State.PAUSED)
        state_return = self.rs_source_bin.set_state(Gst.State.NULL)
        if state_return == Gst.StateChangeReturn.SUCCESS:
            state_return = self.streammux.set_state(Gst.State.PAUSED)
            pad_name = "sink_0"
            # Retrieve sink pad to be released
            sinkpad = self.streammux.get_static_pad(pad_name)
            # Send flush stop event to the sink pad, then release from the streammux
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self.streammux.release_request_pad(sinkpad)
            self.logger.info("STATE CHANGE SUCCESS\n")
            # Remove the source bin from the pipeline
            self.pipeline.remove(self.rs_source_bin)

        elif state_return == Gst.StateChangeReturn.FAILURE:
            self.logger.info("STATE CHANGE FAILURE\n")
            return False

        elif state_return == Gst.StateChangeReturn.ASYNC:
            state_return = self.streammux.set_state(Gst.State.PAUSED)
            state_return = self.rs_source_bin.get_state(Gst.CLOCK_TIME_NONE)
            pad_name = "sink_0"
            sinkpad = self.streammux.get_static_pad(pad_name)
            sinkpad.send_event(Gst.Event.new_flush_stop(False))
            self.streammux.release_request_pad(sinkpad)
            self.logger.info("STATE CHANGE ASYNC\n")
            self.pipeline.remove(self.rs_source_bin)

        #self.pipeline.set_state(Gst.State.READY)
        #time.sleep(1)
        self.pipeline.set_state(Gst.State.PLAYING)
        self.set_video_name()
        return True

    def start_playing(self):
        if not self.copy_video_name():
            return False

        self.rs_source_bin = self._create_source_bin()
        if not self.rs_source_bin:
            self.logger.error("Failed to create source bin. Exiting.")
            return False
        time.sleep(2)
        self.pipeline.add(self.rs_source_bin)
        sinkpad = self._get_request_pad(self.streammux, "sink_0")
        srcpad = self._get_static_pad(self.rs_source_bin, "src")
        if not sinkpad.is_linked():
            self._link_sequential([srcpad, sinkpad])
            self.logger.info("srcpad is linked to sinkpad of streamux")
        else:
            self.logger.error("Already linked sikpad sink_0 of streamux")
            return False
        # Set state of source bin to playing
        state_return = self.rs_source_bin.set_state(Gst.State.PLAYING)
        if state_return == Gst.StateChangeReturn.SUCCESS:
            self.logger.info("STATE CHANGE SUCCESS\n")
        elif state_return == Gst.StateChangeReturn.FAILURE:
            self.logger.info("STATE CHANGE FAILURE\n")
            return False
        elif state_return == Gst.StateChangeReturn.ASYNC:
            state_return = self.rs_source_bin.get_state(Gst.CLOCK_TIME_NONE)
            self.logger.info("STATE CHANGE ASYNC\n")
        elif state_return == Gst.StateChangeReturn.NO_PREROLL:
            self.logger.info("STATE CHANGE NO PREROLL\n")
            return False
        return True

    def kill_pipeline(self):
        if not self.set_video_name():
            return False
        self.pipeline.set_state(Gst.State.NULL)
        self.loop.quit()
        print("All sources stopped quitting")
        return True

    def thread_pipeline(self):
        # Create an event loop and feed gstreamer bus messages to it
        self.loop = GLib.MainLoop()
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", bus_call, self.loop)
        self.logger.info("Starting pipeline")
        self.pipeline.set_state(Gst.State.PLAYING)
        try:
            self.loop.run()
        except:
            pass
        self.pipeline.set_state(Gst.State.NULL)

    def run(self):
        signal.signal(
            signal.SIGINT,
            lambda n, f: self.pipeline.send_event(Gst.Event.new_eos()),
        )
        self.loop_thread = threading.Thread(target=self.thread_pipeline)
        self.loop_thread.start()

Hi,
We would suggest break down the pipeline to clarify which part triggers the issue. There are two sinks: RTSP and MP4. You may replace the sinks with fakesink for a try.

This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.