• Hardware Platform (Jetson / GPU) Jetson AGX Orion Dev kit
• DeepStream Version Deepstream 7.1
• JetPack Version (valid for Jetson only) 6.2
I’m have a problem when run my script in “raw_mode“, i don’t know how resolved.
bellow have a peace of script and the gstreamer log.
def run(self, appsrc):
logger.debug("Thread da Câmera OAK V3 iniciada.")
try:
def callback(x: dai.Device.ReconnectionStatus):
print(f"Reconnecting state {x}")
# --- BLOCO DE CONEXÃO MONITORADO ---
try:
if self.device_id_or_ip:
logger.info(f"Tentando conectar ao dispositivo específico: {self.device_id_or_ip}")
device_info = dai.DeviceInfo(self.device_id_or_ip)
device = dai.Device(device_info)
else:
logger.info("Nenhum dispositivo especificado. Tentando conectar ao primeiro disponível...")
device = dai.Device()
logger.info(f"Dispositivo {device.getMxId()} conectado com sucesso.")
device.setMaxReconnectionAttempts(10, callback)
# SINALIZA SUCESSO! O PipelineManager pode parar de esperar.
self.init_event.set()
except Exception as e:
# SINALIZA ERRO! Grava o erro e libera o PipelineManager.
logger.error(f"Falha ao inicializar OAK: {e}")
self.init_error = e
self.init_event.set()
return
controls = self.camera_state.get_all_controls()
is_raw_mode = controls.get('use_raw_stream', False)
# 1. Pega as chaves dos presets selecionados na UI
res_key = controls.get('resolucao_fps', "1080p @ 30 FPS")
crop_key = controls.get('tamanho_corte', "1024x1024")
# 2. Busca os valores correspondentes nos dicionários
SENSOR_WIDTH, SENSOR_HEIGHT, FPS = PRESETS_RESOLUCAO[res_key]
CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size']
logger.info(f"Usando preset de sensor: {res_key} ({SENSOR_WIDTH}x{SENSOR_HEIGHT} @ {FPS} FPS)")
logger.info(f"Usando preset de corte: {crop_key} ({CROP_WIDTH}x{CROP_HEIGHT})")
logger.info(f"Modo de Stream OAK: {'Bruto (NV12)' if is_raw_mode else 'Encodado (H.265)'}")
# O 'with' agora gerencia o ciclo de vida do pipeline.
with dai.Pipeline(defaultDevice=device) as pipeline:
device = pipeline.getDefaultDevice()
logger.debug(f'Câmeras conectadas: {device.getConnectedCameras()}')
# A definição dos nós permanece a mesma
cam_rgb = pipeline.create(dai.node.Camera).build(
boardSocket=dai.CameraBoardSocket.CAM_A,
sensorResolution= [SENSOR_WIDTH,SENSOR_HEIGHT],
sensorFps=FPS,
)
isp_out = cam_rgb.requestOutput((SENSOR_WIDTH, SENSOR_HEIGHT), dai.ImgFrame.Type.NV12, fps=FPS)
# 1. Criamos o nó ImageManip.
manip = pipeline.create(dai.node.ImageManip)
# 2. Lemos as coordenadas de corte iniciais do nosso estado centralizado.
controls = self.camera_state.get_all_controls()
resize_mode = controls.get('resize_mode', 'Cortar')
is_depth_enabled = controls.get('depth_enabled', False)
q_depth = None
if resize_mode == 'Esticar':
logger.info(f"Configurando modo 'Esticar'. Redimensionando de {SENSOR_WIDTH}x{SENSOR_HEIGHT} para {CROP_WIDTH}x{CROP_HEIGHT}.")
manip.initialConfig.setOutputSize(CROP_WIDTH, CROP_HEIGHT, dai.ImageManipConfig.ResizeMode.LETTERBOX)
else: # O comportamento padrão e 'Cortar'
default_crop_x = (SENSOR_WIDTH - CROP_WIDTH) // 2
default_crop_y = (SENSOR_HEIGHT - CROP_HEIGHT) // 2
crop_x = controls.get('crop_x', default_crop_x)
crop_y = controls.get('crop_y', default_crop_y)
logger.info(f"Configurando modo 'Cortar' em ({crop_x}, {crop_y}) com tamanho {CROP_WIDTH}x{CROP_HEIGHT}")
manip.initialConfig.addCrop(crop_x, crop_y, CROP_WIDTH, CROP_HEIGHT)
manip.setMaxOutputFrameSize(CROP_WIDTH * CROP_HEIGHT * 3 // 2) # NV12 tem 1.5 bytes por pixel
manip.initialConfig.setFrameType(dai.ImgFrame.Type.NV12)
isp_out.link(manip.inputImage) # Saída ISP vai para o ImageManip
if is_raw_mode:
q_out = manip.out.createOutputQueue(maxSize=2, blocking=True)
else:
video_enc = pipeline.create(dai.node.VideoEncoder)
video_enc.setDefaultProfilePreset(fps=FPS, profile=dai.VideoEncoderProperties.Profile.H265_MAIN)
manip.out.link(video_enc.input)
q_out = video_enc.bitstream.createOutputQueue(maxSize=24, blocking=False)
if is_depth_enabled:
logger.info("Profundidade habilitada. Configurando pipeline estéreo...")
mono_left = pipeline.create(dai.node.Camera).build(
boardSocket=dai.CameraBoardSocket.CAM_B,
sensorFps=2,
)
mono_right = pipeline.create(dai.node.Camera).build(
boardSocket=dai.CameraBoardSocket.CAM_C,
sensorFps=2,
)
stereo = pipeline.create(dai.node.StereoDepth)
stereo.setDefaultProfilePreset(dai.node.StereoDepth.PresetMode.HIGH_DETAIL)
stereo.setDepthAlign(dai.CameraBoardSocket.CAM_A)
# LeftRightCheck é MANTIDO ATIVO devido à exigência do alinhamento
stereo.setLeftRightCheck(True)
stereo.setSubpixel(False)
mono_left_out = mono_left.requestOutput(size=(640, 400), type=dai.ImgFrame.Type.GRAY8, fps=2)
mono_right_out = mono_right.requestOutput(size=(640, 400), type=dai.ImgFrame.Type.GRAY8, fps=2)
mono_left_out.link(stereo.left)
mono_right_out.link(stereo.right)
# A fila só é criada se a profundidade estiver habilitada
q_depth = stereo.depth.createOutputQueue(maxSize=4, blocking=False)
else:
logger.info("Profundidade desabilitada. Pulando a criação dos nós estéreo.")
logger.info("Aplicando configurações iniciais da câmera...")
self._apply_controls(cam_rgb.initialControl)
control_in = cam_rgb.inputControl.createInputQueue()
sys_logger = pipeline.create(dai.node.SystemLogger)
sys_logger.setRate(1) # Envia dados a cada 1 segundo
q_sys_info = sys_logger.out.createOutputQueue(maxSize=4, blocking=False)
pipeline.start()
logger.info("Pipeline V3 iniciado no dispositivo.")
pts = 0
duration_ns = 10**9 / FPS
while self.is_running and pipeline.isRunning():
self._process_commands(control_in)
packet = q_out.tryGet()
sys_info = q_sys_info.tryGet()
if q_depth:
depth_frame = q_depth.tryGet()
if depth_frame is not None:
depth_cv_frame = depth_frame.getFrame()
# Obter a região de interesse (ROI) do estado central
controls = self.camera_state.get_all_controls()
roi_x = int(controls['selection_region_x'])
roi_y = int(controls['selection_region_y'])
roi_w = int(controls['selection_region_w'])
roi_h = int(controls['selection_region_h'])
# Calcular o centro da ROI
center_x = roi_x + roi_w // 2
center_y = roi_y + roi_h // 2
# Garantir que as coordenadas estão dentro dos limites do frame
if 0 <= center_y < CROP_HEIGHT and 0 <= center_x < CROP_WIDTH:
# Obter o valor da distância (em mm) no pixel central
dist_mm = depth_cv_frame[center_y, center_x]
# Atualizar o estado central com o novo valor
self.camera_state.update_depth_info({'center_depth_mm': dist_mm})
if sys_info:
m = 1024 * 1024 # MiB
ddr_mem_info = {'used': sys_info.ddrMemoryUsage.used / m, 'total': sys_info.ddrMemoryUsage.total / m}
cmx_mem_info = {'used': sys_info.cmxMemoryUsage.used / m, 'total': sys_info.cmxMemoryUsage.total / m}
css_mem_info = {'used': sys_info.leonCssMemoryUsage.used / m, 'total': sys_info.leonCssMemoryUsage.total / m}
mss_mem_info = {'used': sys_info.leonMssMemoryUsage.used / m, 'total': sys_info.leonMssMemoryUsage.total / m}
self.camera_state.update_system_info({
'temp': sys_info.chipTemperature.average,
'css_cpu': sys_info.leonCssCpuUsage.average,
'mss_cpu': sys_info.leonMssCpuUsage.average,
'ddr_memory': ddr_mem_info,
'cmx_memory': cmx_mem_info,
'css_memory': css_mem_info,
'mss_memory': mss_mem_info
})
if packet:
metadata = {
"lens_pos": packet.getLensPosition(), # [cite: 4276]
"lens_pos_raw": packet.getLensPositionRaw(), # [cite: 4277]
"exposure_us": packet.getExposureTime().microseconds, # [cite: 4275]
"iso": packet.getSensitivity(), # [cite: 4278]
"color_temp": packet.getColorTemperature() # [cite: 4275]
}
with self.metadata_lock:
self.shared_metadata.update(metadata)
current_time = time.time()
if current_time - self.last_ui_update_time > 1: # Throttle a 2Hz
self.camera_state.update_from_camera_metadata(metadata)
self.last_ui_update_time = current_time
buf = Gst.Buffer.new_wrapped(packet.getData().tobytes())
buf.pts = pts
buf.duration = duration_ns
pts += duration_ns
retval = appsrc.emit('push-buffer', buf)
if retval != Gst.FlowReturn.OK:
logger.warning("Appsrc (GStreamer) rejeitou o buffer. Parando.")
pass
# Quando o 'with pipeline' termina, o pipeline é parado.
logger.info("O pipeline foi parado.")
except Exception as e:
logger.critical(f"ERRO CRÍTICO na thread da câmera: {e}", exc_info=True)
finally:
self.is_running = False
if appsrc:
appsrc.emit('end-of-stream')
logger.info("Thread da câmera finalizada e EOS enviado.")
def stop(self):
logger.info("Sinal de parada recebido pela OakCameraManager.")
self.is_running = False
def start(self, pgie_config_path: str, source_type: str, source_address: str):
if self.is_running:
logger.warning("Pipeline já está em execução.")
return
controls = self.camera_state.get_all_controls()
crop_key = controls.get('tamanho_corte', "1280x720 (720p)")
CROP_WIDTH, CROP_HEIGHT = PRESETS_CORTE[crop_key]['size']
if source_type == 'Câmera OAK':
res_key = controls.get('resolucao_fps', "1080p @ 30 FPS")
_, _, FPS = PRESETS_RESOLUCAO[res_key]
else:
FPS = 30.0
model_selecionado = controls.get('model')
is_capture_mode = (model_selecionado == "Modo Captura")
is_raw_mode_oak = controls.get('use_raw_stream', False)
self.current_source_type = source_type
self.pipeline_start_time = time.time()
self.last_fps_calc_time = time.time() # Inicia o contador de FPS
self.frame_count_for_fps = 0
self.calculated_fps = 0
logger.info(f"Iniciando pipeline com fonte '{source_type}' no endereço '{source_address}'")
self.pipeline = Gst.Pipeline.new("deepstream-pipeline")
common_elements = {
"streammux": Gst.ElementFactory.make("nvstreammux", "stream-muxer"),
"pgie": Gst.ElementFactory.make("nvinfer", "primary-inference"),
"tracker": Gst.ElementFactory.make("nvtracker", "object-tracker"),
"nvconv": Gst.ElementFactory.make("nvvideoconvert", "nvvid-converter"),
"nvosd": Gst.ElementFactory.make("nvdsosd", "onscreen-display"),
"sink": Gst.ElementFactory.make("nveglglessink", "nvvideo-renderer")
}
common_elements["display_queue"] = Gst.ElementFactory.make("queue", "display-queue")
if not common_elements["display_queue"]:
raise Exception("Falha ao criar o elemento GStreamer: queue")
# 1. Garante que nenhum elemento seja nulo (o queue já foi verificado)
for name, elem in common_elements.items():
if not elem: raise Exception(f"Falha ao criar o elemento GStreamer: {name}")
# 2. Adiciona os elementos ao pipeline
self.pipeline.add(common_elements['streammux'])
self.pipeline.add(common_elements['nvconv'])
self.pipeline.add(common_elements['nvosd'])
self.pipeline.add(common_elements['sink'])
self.pipeline.add(common_elements['display_queue'])
# 3. Adiciona os elementos de inferência SOMENTE se não for modo de captura
if not is_capture_mode:
logger.debug("Adicionando pgie e tracker ao pipeline.")
self.pipeline.add(common_elements['pgie'])
self.pipeline.add(common_elements['tracker'])
if source_type == 'Câmera OAK':
appsrc = Gst.ElementFactory.make("appsrc", "oak-video-source")
if not appsrc: raise Exception("Falha ao criar appsrc")
self.pipeline.add(appsrc)
appsrc.set_property("format", Gst.Format.TIME)
appsrc.set_property("block", False)
appsrc.set_property("do-timestamp", True)
appsrc.set_property("leaky-type", 2)
appsrc.set_property("max-buffers", 1)
if is_raw_mode_oak:
logger.debug("Construindo fonte GStreamer para stream BRUTO (NV12).")
source_conv = Gst.ElementFactory.make("nvvideoconvert", "source-converter")
caps_filter = Gst.ElementFactory.make("capsfilter", "source-caps-filter")
if not source_conv or not caps_filter:
raise Exception("Falha ao criar nvvideoconvert ou capsfilter da fonte")
self.pipeline.add(source_conv)
self.pipeline.add(caps_filter)
source_conv.set_property("nvbuf-memory-type", 4)
source_conv.set_property("compute-hw", 2)
source_conv.set_property("bl-output", True)
# Caps para o appsrc (vídeo bruto da CPU)
appsrc_caps_str = f"video/x-raw, format=NV12, width={CROP_WIDTH}, height={CROP_HEIGHT}, framerate={int(FPS)}/1"
appsrc.set_property("caps", Gst.Caps.from_string(appsrc_caps_str))
# Caps para o capsfilter (força a saída em memória NVMM para o streammux)
filter_caps_str = f"video/x-raw(memory:NVMM), format=NV12, width={CROP_WIDTH}, height={CROP_HEIGHT}"
caps_filter.set_property("caps", Gst.Caps.from_string(filter_caps_str))
# Liga os elementos na ordem correta
appsrc.link(source_conv)
source_conv.link(caps_filter)
sinkpad = common_elements['streammux'].get_request_pad("sink_0")
srcpad = caps_filter.get_static_pad("src")
srcpad.link(sinkpad)
else:
# MODO ENCODADO: appsrc -> parser -> decoder -> streammux
logger.debug("Construindo fonte GStreamer para stream ENCODADO (H.265).")
parser = Gst.ElementFactory.make("h265parse", "h265-parser")
queue = Gst.ElementFactory.make("queue", "h265-decoder-queue")
decoder = Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder")
decoder.set_property("enable-max-performance", True) # (No Jetson) Força os clocks de GPU mais altos
decoder.set_property("num-extra-surfaces", 1) #
if not parser or not queue or not decoder:
raise Exception("Falha ao criar parser, queue ou decoder H.265")
self.pipeline.add(parser)
self.pipeline.add(queue)
self.pipeline.add(decoder)
caps = Gst.Caps.from_string("video/x-h265, stream-format=(string)byte-stream, alignment=(string)au")
appsrc.set_property("caps", caps)
# Ligação corrigida com a queue no meio
appsrc.link(parser)
parser.link(queue)
queue.link(decoder)
sinkpad = common_elements['streammux'].get_request_pad("sink_0")
srcpad = decoder.get_static_pad("src")
srcpad.link(sinkpad)
try:
self.oak_camera = OakCameraManager(source_address, self.camera_state, self.latest_metadata, self.metadata_lock)
self.camera_thread = threading.Thread(target=self.oak_camera.run, args=(appsrc,))
self.camera_thread.start()
if source_type == 'Câmera OAK':
logger.info("Aguardando inicialização da câmera OAK...")
# Espera até 10 segundos pela conexão
if not self.oak_camera.init_event.wait(timeout=10.0):
raise RuntimeError("Timeout: A câmera OAK demorou muito para responder.")
# Se a thread relatou erro, lançamos aqui na thread principal
if self.oak_camera.init_error:
raise self.oak_camera.init_error
logger.info("Câmera OAK confirmada e rodando.")
# -----------------------------------------
except RuntimeError as e:
logger.error(f"Falha ao conectar à Câmera OAK {source_address}. {e}")
# Importante: Parar o pipeline que acabamos de criar para não deixar lixo
self.stop()
# Relançar o erro para a UI pegar
raise e
elif source_type == 'Stream RTSP':
source_elements = {
"rtspsrc": Gst.ElementFactory.make("rtspsrc", "rtsp-source"),
"depay": Gst.ElementFactory.make("rtph265depay", "rtp-h265-depay"),
"parser": Gst.ElementFactory.make("h265parse", "h265-parser"),
"decoder": Gst.ElementFactory.make("nvv4l2decoder", "nvv4l2-decoder"),
}
for name, elem in source_elements.items():
if not elem: raise Exception(f"Falha ao criar o elemento da fonte RTSP: {name}")
self.pipeline.add(elem)
# Configura rtspsrc
source_elements['rtspsrc'].set_property('location', source_address)
source_elements['rtspsrc'].set_property('latency', 100)
source_elements['rtspsrc'].set_property('protocols', 'tcp')
# Link da fonte RTSP (com callback dinâmico)
depay_sink_pad = source_elements['depay'].get_static_pad("sink")
source_elements['rtspsrc'].connect("pad-added", self.on_pad_added, depay_sink_pad)
source_elements['depay'].link(source_elements['parser'])
source_elements['parser'].link(source_elements['decoder'])
# Link da fonte para o streammux
sinkpad = common_elements['streammux'].get_request_pad("sink_0")
srcpad = source_elements['decoder'].get_static_pad("src")
srcpad.link(sinkpad)
# --- Lógica de ligação condicional ---
# A parte final da cadeia (conversor, OSD, sink) é comum a ambos os modos
common_elements['nvconv'].link(common_elements['nvosd'])
common_elements['nvosd'].link(common_elements['sink'])
if not is_capture_mode:
logger.info("Construindo pipeline no MODO DE INFERÊNCIA (completo).")
if not pgie_config_path or not os.path.exists(pgie_config_path):
logger.critical(f"Caminho do arquivo de configuração do modelo é nulo ou não existe: '{pgie_config_path}'")
self.stop()
return
common_elements['pgie'].set_property("config-file-path", pgie_config_path)
common_elements['tracker'].set_property('ll-lib-file', '/opt/nvidia/deepstream/deepstream/lib/libnvds_nvmultiobjecttracker.so')
common_elements['tracker'].set_property('ll-config-file', '/opt/nvidia/deepstream/deepstream/samples/configs/deepstream-app/config_tracker_NvDCF_max_perf.yml')
common_elements['tracker'].set_property('tracker-width', CROP_WIDTH/4)
print(f"Configuring tracker with width: {CROP_WIDTH/4}")
common_elements['tracker'].set_property('tracker-height', CROP_HEIGHT/4)
print(f"Configuring tracker with height: {CROP_HEIGHT/4}")
common_elements['streammux'].link(common_elements['pgie'])
common_elements['pgie'].link(common_elements['tracker'])
logger.info("Inserindo queue para desacoplar inferência do OSD.")
common_elements['tracker'].link(common_elements['display_queue'])
common_elements['display_queue'].link(common_elements['nvconv'])
else:
logger.info("Construindo pipeline no MODO DE CAPTURA (simplificado).")
common_elements['streammux'].link(common_elements['nvconv'])
common_elements['streammux'].set_property("width", CROP_WIDTH)
common_elements['streammux'].set_property("height", CROP_HEIGHT)
common_elements['streammux'].set_property("batch-size", 1)
common_elements['streammux'].set_property("live-source", True)
common_elements['streammux'].set_property("batched-push-timeout",25000)
common_elements['streammux'].set_property("nvbuf-memory-type", 4)
if FPS > 0: frame_duration_ns = int(1_000_000_000 / FPS)
else: frame_duration_ns = 33333333
logger.info(f"Configurando streammux para {FPS:.2f} FPS (duração do frame: {frame_duration_ns} ns)")
common_elements['streammux'].set_property("frame-duration", frame_duration_ns)
common_elements['nvosd'].set_property('display-clock', 1)
common_elements['nvosd'].set_property('x-clock-offset', 30)
common_elements['nvosd'].set_property('y-clock-offset', 30)
common_elements['nvosd'].set_property('clock-font-size', 24)
common_elements['sink'].set_property("max-lateness", -1)
common_elements['sink'].set_property("async", True)
osdsinkpad = common_elements['nvosd'].get_static_pad("sink")
osdsinkpad.add_probe(Gst.PadProbeType.BUFFER, self.osd_sink_pad_buffer_probe, 0)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.bus_call, self.loop)
logger.info("Mudando estado da pipeline para PLAYING...")
self.pipeline.set_state(Gst.State.PLAYING)
self.is_running = True
def stop(self):
if not self.is_running:
return
logger.info("Parando pipeline...")
# Para a thread da câmera primeiro, se ela existir
if self.oak_camera:
self.oak_camera.stop()
self.camera_thread.join(timeout=2)
self.oak_camera = None
self.camera_thread = None
if self.pipeline:
self.pipeline.set_state(Gst.State.NULL)
self.pipeline = None
if self.capture_thread and self.capture_thread.is_alive():
self.stop_capture_event.set()
self.capture_thread.join(timeout=2)
# Reseta o estado para a próxima execução
self.trajectories.clear()
self.counted_object_ids.clear()
self.line_crossing_class_counter.clear()
self.dynamic_class_colors.clear()
self.line_crossing_counter = 0
if self.counter_callback: self.counter_callback(0)
self.is_running = False
self.pipeline_start_time = None
logger.info("Pipeline parado.")
gstreamer.log (56.4 KB)
