Dropping old item buffer

• Hardware Platform (Jetson / GPU) Jetson AGX Orion Dev kit
• DeepStream Version Deepstream 7.1
• JetPack Version (valid for Jetson only) 6.2

I’m have a problem when run my script in “raw_mode“, i don’t know how resolved.

bellow have a peace of script and the gstreamer log.

        
    def run(self, appsrc):
        logger.debug("Thread da Câmera OAK V3 iniciada.")
        try:
            def callback(x: dai.Device.ReconnectionStatus):
                print(f"Reconnecting state {x}")
            
            # --- BLOCO DE CONEXÃO MONITORADO ---
            try:
                if self.device_id_or_ip:
                    logger.info(f"Tentando conectar ao dispositivo específico: {self.device_id_or_ip}")
                    device_info = dai.DeviceInfo(self.device_id_or_ip)
                    device = dai.Device(device_info)
                else:
                    logger.info("Nenhum dispositivo especificado. Tentando conectar ao primeiro disponível...")
                    device = dai.Device()
                
                logger.info(f"Dispositivo {device.getMxId()} conectado com sucesso.")
                device.setMaxReconnectionAttempts(10, callback)
                
                # SINALIZA SUCESSO! O PipelineManager pode parar de esperar.
                self.init_event.set()
                
            except Exception as e:
                # SINALIZA ERRO! Grava o erro e libera o PipelineManager.
                logger.error(f"Falha ao inicializar OAK: {e}")
                self.init_error = e
                self.init_event.set() 
                return

            controls = self.camera_state.get_all_controls()

            is_raw_mode = controls.get('use_raw_stream', False)
            
            # 1. Pega as chaves dos presets selecionados na UI
            res_key = controls.get('resolucao_fps', "1080p @ 30 FPS")
            crop_key = controls.get('tamanho_corte', "1024x1024")

            # 2. Busca os valores correspondentes nos dicionários
            SENSOR_WIDTH, SENSOR_HEIGHT, FPS = PRESETS_RESOLUCAO[res_key]
            CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size']

            logger.info(f"Usando preset de sensor: {res_key} ({SENSOR_WIDTH}x{SENSOR_HEIGHT} @ {FPS} FPS)")
            logger.info(f"Usando preset de corte: {crop_key} ({CROP_WIDTH}x{CROP_HEIGHT})")
            logger.info(f"Modo de Stream OAK: {'Bruto (NV12)' if is_raw_mode else 'Encodado (H.265)'}")

            # O 'with' agora gerencia o ciclo de vida do pipeline.
            with dai.Pipeline(defaultDevice=device) as pipeline:
                
                device = pipeline.getDefaultDevice()                
                logger.debug(f'Câmeras conectadas: {device.getConnectedCameras()}')
                # A definição dos nós permanece a mesma
                cam_rgb = pipeline.create(dai.node.Camera).build(
                    boardSocket=dai.CameraBoardSocket.CAM_A,
                    sensorResolution= [SENSOR_WIDTH,SENSOR_HEIGHT],
                    sensorFps=FPS,
                )

                isp_out = cam_rgb.requestOutput((SENSOR_WIDTH, SENSOR_HEIGHT), dai.ImgFrame.Type.NV12, fps=FPS)

                 # 1. Criamos o nó ImageManip. 
                manip = pipeline.create(dai.node.ImageManip)

                # 2. Lemos as coordenadas de corte iniciais do nosso estado centralizado.
                controls = self.camera_state.get_all_controls()
                resize_mode = controls.get('resize_mode', 'Cortar')
                is_depth_enabled = controls.get('depth_enabled', False)
                q_depth = None
                if resize_mode == 'Esticar':
                    logger.info(f"Configurando modo 'Esticar'. Redimensionando de {SENSOR_WIDTH}x{SENSOR_HEIGHT} para {CROP_WIDTH}x{CROP_HEIGHT}.")
                    manip.initialConfig.setOutputSize(CROP_WIDTH, CROP_HEIGHT, dai.ImageManipConfig.ResizeMode.LETTERBOX)
                else: # O comportamento padrão e 'Cortar'
                    default_crop_x = (SENSOR_WIDTH - CROP_WIDTH) // 2
                    default_crop_y = (SENSOR_HEIGHT - CROP_HEIGHT) // 2
                    crop_x = controls.get('crop_x', default_crop_x)
                    crop_y = controls.get('crop_y', default_crop_y)
                    logger.info(f"Configurando modo 'Cortar' em ({crop_x}, {crop_y}) com tamanho {CROP_WIDTH}x{CROP_HEIGHT}")
                    manip.initialConfig.addCrop(crop_x, crop_y, CROP_WIDTH, CROP_HEIGHT)
                
                manip.setMaxOutputFrameSize(CROP_WIDTH * CROP_HEIGHT * 3 // 2) # NV12 tem 1.5 bytes por pixel
                
                manip.initialConfig.setFrameType(dai.ImgFrame.Type.NV12)

                isp_out.link(manip.inputImage) # Saída ISP vai para o ImageManip

                if is_raw_mode:
                    q_out = manip.out.createOutputQueue(maxSize=2, blocking=True)
                else:
                    video_enc = pipeline.create(dai.node.VideoEncoder)
                    video_enc.setDefaultProfilePreset(fps=FPS, profile=dai.VideoEncoderProperties.Profile.H265_MAIN)
                    manip.out.link(video_enc.input)
                    q_out = video_enc.bitstream.createOutputQueue(maxSize=24, blocking=False)
                
                if is_depth_enabled:
                    logger.info("Profundidade habilitada. Configurando pipeline estéreo...")

                    mono_left = pipeline.create(dai.node.Camera).build(
                        boardSocket=dai.CameraBoardSocket.CAM_B,
                        sensorFps=2,
                    )

                    mono_right = pipeline.create(dai.node.Camera).build(
                        boardSocket=dai.CameraBoardSocket.CAM_C,
                        sensorFps=2,
                    )

                    stereo = pipeline.create(dai.node.StereoDepth)
                    stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DETAIL)
                    stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A)
                    
                    # LeftRightCheck é MANTIDO ATIVO devido à exigência do alinhamento
                    stereo.setLeftRightCheck(True)
                    stereo.setSubpixel(False)
                    
                    mono_left_out = mono_left.requestOutput(size=(640, 400), type=dai.ImgFrame.Type.GRAY8, fps=2)
                    mono_right_out = mono_right.requestOutput(size=(640, 400), type=dai.ImgFrame.Type.GRAY8, fps=2)

                    mono_left_out.link(stereo.left)
                    mono_right_out.link(stereo.right)

                    # A fila só é criada se a profundidade estiver habilitada
                    q_depth = stereo.depth.createOutputQueue(maxSize=4, blocking=False)
                else:
                    logger.info("Profundidade desabilitada. Pulando a criação dos nós estéreo.")

                logger.info("Aplicando configurações iniciais da câmera...")
                self._apply_controls(cam_rgb.initialControl)

                control_in = cam_rgb.inputControl.createInputQueue()
                
                sys_logger = pipeline.create(dai.node.SystemLogger)
                sys_logger.setRate(1) # Envia dados a cada 1 segundo
                
                q_sys_info = sys_logger.out.createOutputQueue(maxSize=4, blocking=False)
                
                pipeline.start()
                logger.info("Pipeline V3 iniciado no dispositivo.")

                pts = 0
                duration_ns = 10**9 / FPS

                while self.is_running and pipeline.isRunning():
                    self._process_commands(control_in)
                    
                    packet = q_out.tryGet()
                    sys_info = q_sys_info.tryGet()

                    if q_depth:
                        depth_frame = q_depth.tryGet()
                        if depth_frame is not None:
                            depth_cv_frame = depth_frame.getFrame()
                            
                            # Obter a região de interesse (ROI) do estado central
                            controls = self.camera_state.get_all_controls()
                            roi_x = int(controls['selection_region_x'])
                            roi_y = int(controls['selection_region_y'])
                            roi_w = int(controls['selection_region_w'])
                            roi_h = int(controls['selection_region_h'])

                            # Calcular o centro da ROI
                            center_x = roi_x + roi_w // 2                            
                            center_y = roi_y + roi_h // 2
                            
                            # Garantir que as coordenadas estão dentro dos limites do frame
                            if 0 <= center_y < CROP_HEIGHT and 0 <= center_x < CROP_WIDTH:
                                # Obter o valor da distância (em mm) no pixel central
                                dist_mm = depth_cv_frame[center_y, center_x]
                                
                                # Atualizar o estado central com o novo valor
                                self.camera_state.update_depth_info({'center_depth_mm': dist_mm})

                                
                    if sys_info:
                        m = 1024 * 1024 # MiB
                        ddr_mem_info = {'used': sys_info.ddrMemoryUsage.used / m, 'total': sys_info.ddrMemoryUsage.total / m}
                        cmx_mem_info = {'used': sys_info.cmxMemoryUsage.used / m, 'total': sys_info.cmxMemoryUsage.total / m}
                        css_mem_info = {'used': sys_info.leonCssMemoryUsage.used / m, 'total': sys_info.leonCssMemoryUsage.total / m}
                        mss_mem_info = {'used': sys_info.leonMssMemoryUsage.used / m, 'total': sys_info.leonMssMemoryUsage.total / m}

                        self.camera_state.update_system_info({
                            'temp': sys_info.chipTemperature.average,
                            'css_cpu': sys_info.leonCssCpuUsage.average,
                            'mss_cpu': sys_info.leonMssCpuUsage.average,
                            'ddr_memory': ddr_mem_info,
                            'cmx_memory': cmx_mem_info,
                            'css_memory': css_mem_info,
                            'mss_memory': mss_mem_info
                        })
                        
                    if packet:                        
                        metadata = {
                            "lens_pos": packet.getLensPosition(), # [cite: 4276]
                            "lens_pos_raw": packet.getLensPositionRaw(), # [cite: 4277]
                            "exposure_us": packet.getExposureTime().microseconds, # [cite: 4275]
                            "iso": packet.getSensitivity(), # [cite: 4278]
                            "color_temp": packet.getColorTemperature() # [cite: 4275]
                        }
                        with self.metadata_lock:
                            self.shared_metadata.update(metadata)

                        current_time = time.time()
                        if current_time - self.last_ui_update_time > 1: # Throttle a 2Hz
                            self.camera_state.update_from_camera_metadata(metadata)
                            self.last_ui_update_time = current_time

                        buf = Gst.Buffer.new_wrapped(packet.getData().tobytes())
                        buf.pts = pts
                        buf.duration = duration_ns
                        pts += duration_ns
                        
                        retval = appsrc.emit('push-buffer', buf)
                        if retval != Gst.FlowReturn.OK:
                            logger.warning("Appsrc (GStreamer) rejeitou o buffer. Parando.")
                            pass                        
            
            # Quando o 'with pipeline' termina, o pipeline é parado.
            logger.info("O pipeline foi parado.")


        except Exception as e:
            logger.critical(f"ERRO CRÍTICO na thread da câmera: {e}", exc_info=True)
        finally:
            self.is_running = False
            if appsrc:
                appsrc.emit('end-of-stream')
            logger.info("Thread da câmera finalizada e EOS enviado.")

    def stop(self):
        logger.info("Sinal de parada recebido pela OakCameraManager.")
        self.is_running = False



    def start(self, pgie_config_path: str, source_type: str, source_address: str):
        if self.is_running:
            logger.warning("Pipeline já está em execução.")
            return
                
        controls = self.camera_state.get_all_controls()

        crop_key = controls.get('tamanho_corte', "1280x720 (720p)")
        CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size']

        if source_type == 'Câmera OAK':
            res_key = controls.get('resolucao_fps', "1080p @ 30 FPS")
            _, _, FPS = PRESETS_RESOLUCAO[res_key]            
        else:
            FPS = 30.0

        model_selecionado = controls.get('model')
        is_capture_mode = (model_selecionado == "Modo Captura")
        is_raw_mode_oak = controls.get('use_raw_stream', False)

        self.current_source_type = source_type
        self.pipeline_start_time = time.time()
        self.last_fps_calc_time = time.time() # Inicia o contador de FPS
        self.frame_count_for_fps = 0
        self.calculated_fps = 0

        logger.info(f"Iniciando pipeline com fonte '{source_type}' no endereço '{source_address}'")
        self.pipeline = Gst.Pipeline.new("deepstream-pipeline")
        
        common_elements = {
            "streammux": Gst.ElementFactory.make("nvstreammux", "stream-muxer"),
            "pgie": Gst.ElementFactory.make("nvinfer", "primary-inference"),
            "tracker": Gst.ElementFactory.make("nvtracker", "object-tracker"),
            "nvconv": Gst.ElementFactory.make("nvvideoconvert", "nvvid-converter"),
            "nvosd": Gst.ElementFactory.make("nvdsosd", "onscreen-display"),
            "sink": Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
        }

        common_elements["display_queue"] = Gst.ElementFactory.make("queue", "display-queue")
        if not common_elements["display_queue"]:
            raise Exception("Falha ao criar o elemento GStreamer: queue")

        # 1. Garante que nenhum elemento seja nulo (o queue já foi verificado)
        for name, elem in common_elements.items():
            if not elem: raise Exception(f"Falha ao criar o elemento GStreamer: {name}")

        # 2. Adiciona os elementos ao pipeline
        self.pipeline.add(common_elements['streammux'])
        self.pipeline.add(common_elements['nvconv'])
        self.pipeline.add(common_elements['nvosd'])
        self.pipeline.add(common_elements['sink'])
        self.pipeline.add(common_elements['display_queue']) 

        # 3. Adiciona os elementos de inferência SOMENTE se não for modo de captura
        if not is_capture_mode:
            logger.debug("Adicionando pgie e tracker ao pipeline.")
            self.pipeline.add(common_elements['pgie'])
            self.pipeline.add(common_elements['tracker'])
                
        if source_type == 'Câmera OAK':
            appsrc = Gst.ElementFactory.make("appsrc", "oak-video-source")
            if not appsrc: raise Exception("Falha ao criar appsrc")
            self.pipeline.add(appsrc)

            appsrc.set_property("format", Gst.Format.TIME)
            appsrc.set_property("block", False)
            appsrc.set_property("do-timestamp", True)
            appsrc.set_property("leaky-type", 2)
            appsrc.set_property("max-buffers", 1)
            
            if is_raw_mode_oak:
                logger.debug("Construindo fonte GStreamer para stream BRUTO (NV12).")
                source_conv = Gst.ElementFactory.make("nvvideoconvert", "source-converter")
                caps_filter = Gst.ElementFactory.make("capsfilter", "source-caps-filter")
                
                if not source_conv or not caps_filter: 
                    raise Exception("Falha ao criar nvvideoconvert ou capsfilter da fonte")

                self.pipeline.add(source_conv)
                self.pipeline.add(caps_filter)

                source_conv.set_property("nvbuf-memory-type", 4)
                source_conv.set_property("compute-hw", 2)
                source_conv.set_property("bl-output", True)

                # Caps para o appsrc (vídeo bruto da CPU)
                appsrc_caps_str = f"video/x-raw, format=NV12, width={CROP_WIDTH}, height={CROP_HEIGHT}, framerate={int(FPS)}/1"
                appsrc.set_property("caps", Gst.Caps.from_string(appsrc_caps_str))

                # Caps para o capsfilter (força a saída em memória NVMM para o streammux)
                filter_caps_str = f"video/x-raw(memory:NVMM), format=NV12, width={CROP_WIDTH}, height={CROP_HEIGHT}"
                caps_filter.set_property("caps", Gst.Caps.from_string(filter_caps_str))

                # Liga os elementos na ordem correta
                appsrc.link(source_conv)
                source_conv.link(caps_filter)
                
                sinkpad = common_elements['streammux'].get_request_pad("sink_0")
                srcpad = caps_filter.get_static_pad("src")
                srcpad.link(sinkpad)

            else:
                # MODO ENCODADO: appsrc -> parser -> decoder -> streammux
                logger.debug("Construindo fonte GStreamer para stream ENCODADO (H.265).")
                parser = Gst.ElementFactory.make("h265parse", "h265-parser")
                queue = Gst.ElementFactory.make("queue", "h265-decoder-queue")
                decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")

                decoder.set_property("enable-max-performance", True) # (No Jetson) Força os clocks de GPU mais altos
                decoder.set_property("num-extra-surfaces", 1) # 
                
                if not parser or not queue or not decoder: 
                    raise Exception("Falha ao criar parser, queue ou decoder H.265")
                
                self.pipeline.add(parser)
                self.pipeline.add(queue)
                self.pipeline.add(decoder)

                caps = Gst.Caps.from_string("video/x-h265, stream-format=(string)byte-stream, alignment=(string)au")
                appsrc.set_property("caps", caps)
                
                # Ligação corrigida com a queue no meio
                appsrc.link(parser)
                parser.link(queue)
                queue.link(decoder)
                
                sinkpad = common_elements['streammux'].get_request_pad("sink_0")
                srcpad = decoder.get_static_pad("src")
                srcpad.link(sinkpad)
            
            try:
                self.oak_camera = OakCameraManager(source_address, self.camera_state, self.latest_metadata, self.metadata_lock)
                self.camera_thread = threading.Thread(target=self.oak_camera.run, args=(appsrc,))
                self.camera_thread.start()

                if source_type == 'Câmera OAK':
                    logger.info("Aguardando inicialização da câmera OAK...")
                    # Espera até 10 segundos pela conexão
                    if not self.oak_camera.init_event.wait(timeout=10.0):
                        raise RuntimeError("Timeout: A câmera OAK demorou muito para responder.")
                    
                    # Se a thread relatou erro, lançamos aqui na thread principal
                    if self.oak_camera.init_error:
                        raise self.oak_camera.init_error
                    
                    logger.info("Câmera OAK confirmada e rodando.")
                # -----------------------------------------
                
            except RuntimeError as e:
                logger.error(f"Falha ao conectar à Câmera OAK {source_address}. {e}")
                # Importante: Parar o pipeline que acabamos de criar para não deixar lixo
                self.stop() 
                # Relançar o erro para a UI pegar
                raise e

        elif source_type == 'Stream RTSP':
            source_elements = {
                "rtspsrc": Gst.ElementFactory.make("rtspsrc", "rtsp-source"),
                "depay": Gst.ElementFactory.make("rtph265depay", "rtp-h265-depay"),
                "parser": Gst.ElementFactory.make("h265parse", "h265-parser"),
                "decoder": Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder"),
            }
            for name, elem in source_elements.items():
                if not elem: raise Exception(f"Falha ao criar o elemento da fonte RTSP: {name}")
                self.pipeline.add(elem)

            # Configura rtspsrc
            source_elements['rtspsrc'].set_property('location', source_address)
            source_elements['rtspsrc'].set_property('latency', 100)
            source_elements['rtspsrc'].set_property('protocols', 'tcp')

            # Link da fonte RTSP (com callback dinâmico)
            depay_sink_pad = source_elements['depay'].get_static_pad("sink")
            source_elements['rtspsrc'].connect("pad-added", self.on_pad_added, depay_sink_pad)
            
            source_elements['depay'].link(source_elements['parser'])
            source_elements['parser'].link(source_elements['decoder'])

            # Link da fonte para o streammux
            sinkpad = common_elements['streammux'].get_request_pad("sink_0")
            srcpad = source_elements['decoder'].get_static_pad("src")
            srcpad.link(sinkpad)

        # --- Lógica de ligação condicional ---
        # A parte final da cadeia (conversor, OSD, sink) é comum a ambos os modos
        common_elements['nvconv'].link(common_elements['nvosd'])
        common_elements['nvosd'].link(common_elements['sink'])

        if not is_capture_mode:
            logger.info("Construindo pipeline no MODO DE INFERÊNCIA (completo).")
            
            if not pgie_config_path or not os.path.exists(pgie_config_path):
                logger.critical(f"Caminho do arquivo de configuração do modelo é nulo ou não existe: '{pgie_config_path}'")
                self.stop()
                return

            common_elements['pgie'].set_property("config-file-path", pgie_config_path)            
            common_elements['tracker'].set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
            common_elements['tracker'].set_property('ll-config-file', '/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_max_perf.yml')
            common_elements['tracker'].set_property('tracker-width', CROP_WIDTH/4)
            print(f"Configuring tracker with width: {CROP_WIDTH/4}")
            common_elements['tracker'].set_property('tracker-height', CROP_HEIGHT/4)
            print(f"Configuring tracker with height: {CROP_HEIGHT/4}")

            common_elements['streammux'].link(common_elements['pgie'])
            common_elements['pgie'].link(common_elements['tracker'])
            logger.info("Inserindo queue para desacoplar inferência do OSD.")
            common_elements['tracker'].link(common_elements['display_queue'])
            common_elements['display_queue'].link(common_elements['nvconv'])
        else:
            logger.info("Construindo pipeline no MODO DE CAPTURA (simplificado).")
            common_elements['streammux'].link(common_elements['nvconv'])
        
        common_elements['streammux'].set_property("width", CROP_WIDTH)
        common_elements['streammux'].set_property("height", CROP_HEIGHT)
        common_elements['streammux'].set_property("batch-size", 1)
        common_elements['streammux'].set_property("live-source", True)
        common_elements['streammux'].set_property("batched-push-timeout",25000)
        common_elements['streammux'].set_property("nvbuf-memory-type", 4)
        if FPS > 0: frame_duration_ns = int(1_000_000_000 / FPS)
        else: frame_duration_ns = 33333333
            
        logger.info(f"Configurando streammux para {FPS:.2f} FPS (duração do frame: {frame_duration_ns} ns)")
        common_elements['streammux'].set_property("frame-duration", frame_duration_ns)

        common_elements['nvosd'].set_property('display-clock', 1)
        common_elements['nvosd'].set_property('x-clock-offset', 30)
        common_elements['nvosd'].set_property('y-clock-offset', 30)
        common_elements['nvosd'].set_property('clock-font-size', 24)
        
        common_elements['sink'].set_property("max-lateness", -1)
        common_elements['sink'].set_property("async", True)

        osdsinkpad = common_elements['nvosd'].get_static_pad("sink")
        osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, self.osd_sink_pad_buffer_probe, 0)
        
        bus = self.pipeline.get_bus()
        bus.add_signal_watch()
        bus.connect("message", self.bus_call, self.loop)

        logger.info("Mudando estado da pipeline para PLAYING...")
        self.pipeline.set_state(Gst.State.PLAYING)
        self.is_running = True


    def stop(self):
        if not self.is_running:
            return
            
        logger.info("Parando pipeline...")
        # Para a thread da câmera primeiro, se ela existir
        if self.oak_camera:
            self.oak_camera.stop()
            self.camera_thread.join(timeout=2)
            self.oak_camera = None
            self.camera_thread = None

        if self.pipeline:
            self.pipeline.set_state(Gst.State.NULL)
            self.pipeline = None

        if self.capture_thread and self.capture_thread.is_alive():
            self.stop_capture_event.set()
            self.capture_thread.join(timeout=2)

        # Reseta o estado para a próxima execução
        self.trajectories.clear()
        self.counted_object_ids.clear()
        self.line_crossing_class_counter.clear()
        self.dynamic_class_colors.clear()
        self.line_crossing_counter = 0
        if self.counter_callback: self.counter_callback(0)
        self.is_running = False
        self.pipeline_start_time = None
        logger.info("Pipeline parado.")

gstreamer.log (56.4 KB)