GPU Utilization Bottleneck: Single-Process with 8 Streams vs. Multi-Process with 1 Stream each (DeepStream 7.1 / PyServiceMaker)

I am building a live video analytics service using FastAPI and the DeepStream 7.1 Python ServiceMaker (pyservicemaker) API on an Ubuntu 22.04 system. I am experiencing a strange GPU utilization bottleneck when running multiple streams in a single process.

Environment:

  • OS: Ubuntu 22.04.1 LTS
  • GPU: 1 x NVIDIA GeForce RTX 3090 (Driver 590.48.01)
  • CUDA: 12.6
  • DeepStream: 7.1.0
  • Python: 3.10.20
  • Infer Model: Yolov5

Architecture Overview:

  1. The FastAPI app runs as a single-process, single-worker service.
  2. A single persistent DeepStream Pipeline (using nvmultiurisrcbin) runs in the background.
  3. Upon receiving a /detect POST request with multiple video paths, we use asyncio.to_thread to concurrently add the streams to the running pipeline via the nvmultiurisrcbin REST API.
  4. Pipeline structure: nvmultiurisrcbin (max-batch-size=8) → queue → nvinfer (YOLOv5) → queue → fakesink (with custom Metadata Operator/Probe to fetch detections).

The Bottleneck (Comparison):

  • Scenario A (Single Process, 1 Pipeline with 8 Streams): We submit 8 video streams to the single background pipeline simultaneously. 👉 Result: GPU utilization sits around 52% and cannot scale up. The inference processing speed feels throttled.
  • Scenario B (8 Processes, 8 Pipelines with 1 Stream each + Nginx Load Balancing): We run 8 separate FastAPI processes on the same GPU, each hosting its own single-stream pipeline. Streams are load-balanced via Nginx. 👉 Result: GPU utilization easily hits 100%. The overall throughput is significantly higher.

Questions:

Given that DeepStream’s core pipeline is written in C++ (GStreamer), running 8 streams in a single pipeline should theoretically have lower overhead and better batching efficiency than running 8 heavy Python processes.

Why is Scenario A underutilizing the GPU?

  1. Is Python GIL a factor? Does the custom Python Probe / Metadata Operator (reading batch_meta in Python) throttle the pipeline’s execution, preventing it from feeding the GPU fast enough?
  2. Is it an issue with nvmultiurisrcbin or its REST API configuration? (e.g., does dynamic adding cause sync/queuing bottlenecks)?
  3. Are there any PyServiceMaker specific threading limitations?

Any suggestions, workarounds, or profiling ideas would be highly appreciated. Thanks!

My FastAPI & DeepStream Code main.py

import argparse
import asyncio
import json
import os
import socket
import threading
import time
import uuid
from dataclasses import dataclass, field
from pathlib import Path
from typing import Any, List, Optional, Literal
from urllib import error, request
from fastapi import FastAPI, HTTPException, Request
from pydantic import BaseModel, Field
from contextlib import asynccontextmanager

from pyservicemaker import BatchMetadataOperator, DynamicSourceMessage, Pipeline, Probe


APP_DIR = Path(__file__).resolve().parent
HOR_INFER_CONFIG_PATH = APP_DIR / "config_infer_yolov5_hor.txt"
VER_INFER_CONFIG_PATH = APP_DIR / "config_infer_yolov5_ver.txt"
DEFAULT_OUTPUT_PATH = APP_DIR / "detections.json"

# Set the default maximum number of stream channels to 8
DEFAULT_MAX_STREAMS = 8
DEFAULT_SOURCE_IDLE_SEC = 1.0
DEFAULT_ADD_TIMEOUT_SEC = 10.0


def get_free_port() -> int:
    with socket.socket(socket.AF_INET, socket.SOCK_STREAM) as s:
        s.bind(('127.0.0.1', 0))
        return s.getsockname()[1]


@dataclass
class DetectionJob:
    request_id: str
    camera_id: str
    camera_name: str
    camera_url: str
    output_path: Path
    condition: threading.Condition = field(default_factory=threading.Condition)
    source_id: int | None = None
    frames: dict[int, dict[str, Any]] = field(default_factory=dict)
    last_frame_ts: float | None = None
    removed: bool = False
    error: str | None = None

    def bind_source(self, source_id: int) -> None:
        with self.condition:
            self.source_id = source_id
            self.condition.notify_all()

    def add_frame(self, frame: dict[str, Any]) -> None:
        with self.condition:
            self.frames[int(frame["frame_number"])] = frame
            self.last_frame_ts = time.monotonic()
            self.condition.notify_all()

    def mark_removed(self) -> None:
        with self.condition:
            self.removed = True
            self.condition.notify_all()

    def fail(self, message: str) -> None:
        with self.condition:
            self.error = message
            self.condition.notify_all()

    def wait_for_source(self, timeout_sec: float) -> int:
        deadline = time.monotonic() + timeout_sec
        with self.condition:
            while self.source_id is None and self.error is None:
                remaining = deadline - time.monotonic()
                if remaining <= 0:
                    raise TimeoutError(f"Timed out waiting source add ack: {self.camera_id}")
                self.condition.wait(timeout=remaining)
            if self.error is not None:
                raise RuntimeError(self.error)
            return int(self.source_id)

    def wait_for_stream_quiet(self, timeout_sec: float | None, idle_sec: float) -> None:
        deadline = None if timeout_sec is None else time.monotonic() + timeout_sec
        start_deadline = time.monotonic() + 15.0
        with self.condition:
            while self.error is None and not self.removed:
                now = time.monotonic()
                if self.last_frame_ts is not None:
                    if (now - self.last_frame_ts) >= idle_sec:
                        return
                else:
                    if now >= start_deadline:
                        raise TimeoutError(f"Timed out waiting for stream to start (first frame): {self.request_id}")
                if deadline is not None and now >= deadline:
                    raise TimeoutError(f"Timed out waiting inference results: {self.request_id}")
                wait_for = 0.2
                if deadline is not None:
                    wait_for = min(wait_for, max(0.0, deadline - now))
                self.condition.wait(timeout=wait_for)

            if self.error is not None:
                raise RuntimeError(self.error)

    def wait_removed(self, timeout_sec: float) -> None:
        deadline = time.monotonic() + timeout_sec
        with self.condition:
            while not self.removed and self.error is None:
                remaining = deadline - time.monotonic()
                if remaining <= 0:
                    raise TimeoutError(f"Timed out waiting source removal ack: {self.camera_id}")
                self.condition.wait(timeout=remaining)
            if self.error is not None:
                raise RuntimeError(self.error)

    def to_result(self, infer_config_path: Path) -> dict[str, Any]:
        frames = [self.frames[idx] for idx in sorted(self.frames)]
        result = {
            "request_id": self.request_id,
            "camera_id": self.camera_id,
            "source_id": self.source_id,
            "video": self.camera_url,
            "infer_config": str(infer_config_path),
            "frame_count": len(frames),
            "frames": frames,
        }
        self.output_path.parent.mkdir(parents=True, exist_ok=True)
        self.output_path.write_text(json.dumps(result, indent=2), encoding="utf-8")
        result["output_json"] = str(self.output_path)
        return result


class DetectionCollector(BatchMetadataOperator):
    def __init__(self, jobs_by_source: dict[int, DetectionJob], lock: threading.Lock) -> None:
        super().__init__()
        self._jobs_by_source = jobs_by_source
        self._lock = lock

    def _lookup_job(self, source_id: int) -> DetectionJob | None:
        with self._lock:
            return self._jobs_by_source.get(source_id)

    def handle_metadata(self, batch_meta: Any) -> None:
        for frame_meta in batch_meta.frame_items:
            source_id = int(frame_meta.source_id)
            job = self._lookup_job(source_id)
            if job is None:
                continue

            frame_result = {
                "frame_number": int(frame_meta.frame_number),
                "source_id": source_id,
                "width": int(frame_meta.source_width),
                "height": int(frame_meta.source_height),
                "pts_ns": int(frame_meta.buffer_pts),
                "detections": [],
            }
            for obj_meta in frame_meta.object_items:
                rect = obj_meta.rect_params
                left = float(rect.left)
                top = float(rect.top)
                width = float(rect.width)
                height = float(rect.height)
                frame_result["detections"].append(
                    {
                        "class_id": int(obj_meta.class_id),
                        "category": str(obj_meta.label or obj_meta.class_id),
                        "confidence": float(obj_meta.confidence),
                        "bbox": {
                            "left": left,
                            "top": top,
                            "width": width,
                            "height": height,
                            "x1": left,
                            "y1": top,
                            "x2": left + width,
                            "y2": top + height,
                        },
                    }
                )
            job.add_frame(frame_result)


class DynamicSourceDetectionService:
    """A persistent DeepStream pipeline on a single GPU that supports dynamically adding and removing multiple video sources in real time."""

    def __init__(
        self,
        infer_config_path: Path,
        max_streams: int,
        source_idle_sec: float,
    ) -> None:
        self.infer_config_path = infer_config_path.resolve()
        self.max_streams = max(1, int(max_streams))
        self.source_idle_sec = float(source_idle_sec)

        self.rest_port = get_free_port()
        print(f"[PID {os.getpid()}] Flow REST port: {self.rest_port}, max supported channels: {self.max_streams}")

        self.pipeline: Pipeline | None = None
        self._run_thread: threading.Thread | None = None
        self._start_lock = threading.Lock()
        self._started = threading.Event()
        self._pipeline_error: BaseException | None = None

        self._source_lock = threading.Lock()
        self._pending_lock = threading.Lock()
        self._jobs_by_source: dict[int, DetectionJob] = {}
        self._pending_jobs_by_camera: dict[str, DetectionJob] = {}
        
        self._active_semaphore = threading.BoundedSemaphore(self.max_streams)
        self._active_count = 0
        self._active_count_lock = threading.Lock()

        self.collector = DetectionCollector(self._jobs_by_source, self._source_lock)

    def _on_message(self, message: Any) -> None:
        if not isinstance(message, DynamicSourceMessage):
            return

        source_id = int(message.source_id)
        camera_id = str(message.sensor_id)

        if message.source_added:
            with self._pending_lock:
                job = self._pending_jobs_by_camera.get(camera_id)
            if job is None:
                return
            job.bind_source(source_id)
            with self._source_lock:
                self._jobs_by_source[source_id] = job
            return

        with self._source_lock:
            job = self._jobs_by_source.pop(source_id, None)
        if job is not None:
            job.mark_removed()

    def _create_and_start_pipeline(self) -> None:
        self.pipeline = Pipeline("persistent-dynamic-yolov5")
        self.pipeline.add(
            "nvmultiurisrcbin",
            "src",
            {
                "ip-address": "127.0.0.1",
                "port": self.rest_port,
                "max-batch-size": self.max_streams,
                "batched-push-timeout": 33333,
                "live-source": 0,
                "height": 384,
                "width": 640,
                "drop-pipeline-eos": 1,
                "async-handling": 1,
            },
        )
        self.pipeline.add(
            "nvinfer",
            "infer",
            {
                "config-file-path": str(self.infer_config_path),
                "batch-size": self.max_streams,
            },
        )
        self.pipeline.add(
            "fakesink",
            "sink",
            {
                "sync": 0,
                "async": 0,
                "qos": 0,
            },
        )
        self.pipeline.add("queue", "queue_pre_infer")
        self.pipeline.add("queue", "queue_post_infer")
        self.pipeline.link("src", "queue_pre_infer", "infer", "queue_post_infer", "sink")
        self.pipeline.attach("infer", Probe("json-detection-collector", self.collector))
        self.pipeline.start(self._on_message)
        self._pipeline_error = None

    def _recreate_and_restart_pipeline(self) -> None:
        with self._start_lock:
            if not self._started.is_set():
                return
            try:
                if self.pipeline is not None:
                    try:
                        self.pipeline.stop()
                    except Exception:
                        pass
                self._create_and_start_pipeline()
            except Exception as exc:
                self._pipeline_error = exc
                raise

    def _run_pipeline_wait(self) -> None:
        while self._started.is_set():
            try:
                if self.pipeline is None:
                    raise RuntimeError("Pipeline not initialized")
                self.pipeline.wait()
                if self._started.is_set():
                    print(f"[PID {os.getpid()}] Pipeline wait returned normally. Restarting...")
                    self._recreate_and_restart_pipeline()
                    time.sleep(1)
                else:
                    break
            except BaseException as exc:
                print(f"[PID {os.getpid()}] Pipeline crashed/exited: {exc}. Restarting...")
                self._pipeline_error = exc
                
                with self._source_lock:
                    live_jobs = list(self._jobs_by_source.values())
                    self._jobs_by_source.clear()
                for job in live_jobs:
                    job.fail(f"Pipeline stopped unexpectedly: {exc}")
                
                with self._pending_lock:
                    pending_jobs = list(self._pending_jobs_by_camera.values())
                    self._pending_jobs_by_camera.clear()
                for job in pending_jobs:
                    job.fail(f"Pipeline stopped unexpectedly during source add: {exc}")

                time.sleep(2)
                if not self._started.is_set():
                    break
                
                try:
                    self._recreate_and_restart_pipeline()
                except Exception as restart_exc:
                    print(f"[PID {os.getpid()}] Failed to restart pipeline: {restart_exc}")
                    time.sleep(3)

    def start(self) -> None:
        with self._start_lock:
            if self._started.is_set():
                return
            self._create_and_start_pipeline()
            self._started.set()

            self._run_thread = threading.Thread(
                target=self._run_pipeline_wait,
                name="dynamic-yolov5-pipeline-waiter",
                daemon=True,
            )
            self._run_thread.start()

    def stop(self) -> None:
        with self._start_lock:
            if not self._started.is_set():
                return
            try:
                if self.pipeline is not None:
                    self.pipeline.stop()
            finally:
                if self._run_thread is not None:
                    self._run_thread.join(timeout=5)
                self._run_thread = None
                self._started.clear()

    def _rest_call(self, endpoint: str, payload: dict[str, Any]) -> dict[str, Any]:
        url = f"http://127.0.0.1:{self.rest_port}{endpoint}"
        body = json.dumps(payload).encode("utf-8")
        req = request.Request(
            url=url,
            data=body,
            headers={"Content-Type": "application/json"},
            method="POST",
        )
        try:
            with request.urlopen(req, timeout=10) as resp:
                text = resp.read().decode("utf-8", errors="replace")
                return {} if not text else json.loads(text)
        except error.HTTPError as exc:
            detail = exc.read().decode("utf-8", errors="replace")
            raise RuntimeError(f"REST {endpoint} failed: {exc.code} {detail}") from exc
        except error.URLError as exc:
            raise RuntimeError(f"REST {endpoint} failed: {exc.reason}") from exc

    def _add_source(self, job: DetectionJob) -> None:
        payload = {
            "key": "sensor",
            "value": {
                "camera_id": job.camera_id,
                "camera_name": job.camera_name,
                "camera_url": job.camera_url,
                "change": "camera_add",
            },
        }
        self._rest_call("/api/v1/stream/add", payload)

    def _remove_source(self, job: DetectionJob) -> None:
        payload = {
            "key": "sensor",
            "value": {
                "camera_id": job.camera_id,
                "camera_url": job.camera_url,
                "change": "camera_remove",
            },
        }
        self._rest_call("/api/v1/stream/remove", payload)

    def submit_single_video(
        self,
        video_path: str,
        output_path: str | os.PathLike[str],
        timeout_sec: float | None,
    ) -> dict[str, Any]:
        if not self._started.is_set():
            raise RuntimeError("Service has not been started")
        if self._pipeline_error is not None:
            raise RuntimeError(f"Pipeline failed: {self._pipeline_error}")

        v = Path(video_path).expanduser().resolve()
        out = Path(output_path).expanduser().resolve()
        if not v.exists():
            raise FileNotFoundError(f"Input video not found: {video_path}")

        camera_id = f"job-{uuid.uuid4()}"
        camera_url = v.as_uri() if "://" not in str(video_path) else str(video_path)

        job = DetectionJob(
            request_id=str(uuid.uuid4()),
            camera_id=camera_id,
            camera_name=camera_id,
            camera_url=camera_url,
            output_path=out,
        )

        acquired = self._active_semaphore.acquire(timeout=timeout_sec)
        if not acquired:
            raise TimeoutError("No available slot in DeepStream flow within timeout limit")

        with self._active_count_lock:
            self._active_count += 1

        try:
            with self._pending_lock:
                self._pending_jobs_by_camera[job.camera_id] = job

            try:
                self._add_source(job)

                job.wait_for_source(DEFAULT_ADD_TIMEOUT_SEC)

                job.wait_for_stream_quiet(timeout_sec=timeout_sec, idle_sec=self.source_idle_sec)

                try:
                    job.wait_removed(2.0)
                except TimeoutError:
                    if not job.removed:
                        try:
                            self._remove_source(job)
                        except Exception as e:
                            print(f"Manual removal request of job non-fatal: {e}")
                
                try:
                    job.wait_removed(5.0)
                except TimeoutError:
                    print(f"Warning: job {job.camera_id} removal confirmation timed out, proceeding.")

                # 6. Aggregate this thread's data and save it as a separate JSON file
                return job.to_result(self.infer_config_path)

            finally:
                with self._pending_lock:
                    self._pending_jobs_by_camera.pop(job.camera_id, None)
                with self._source_lock:
                    if job.source_id is not None:
                        self._jobs_by_source.pop(job.source_id, None)
        finally:
            with self._active_count_lock:
                self._active_count -= 1
            self._active_semaphore.release()

    def status(self) -> dict[str, Any]:
        with self._source_lock:
            active_sources = sorted(self._jobs_by_source.keys())
        with self._pending_lock:
            pending = len(self._pending_jobs_by_camera)
        with self._active_count_lock:
            busy = self._active_count
        return {
            "started": self._started.is_set(),
            "pipeline_alive": self._run_thread is not None and self._run_thread.is_alive(),
            "pipeline_error": None if self._pipeline_error is None else str(self._pipeline_error),
            "infer_config": str(self.infer_config_path),
            "rest_port": self.rest_port,
            "max_streams": self.max_streams,
            "busy_streams": busy,
            "pending_add_ack": pending,
            "active_source_ids": active_sources,
        }


# ==============================================================================
# FastAPI Configuration and Lifecycle (Lifespan)
# ==============================================================================

def parse_args() -> argparse.Namespace:
    parser = argparse.ArgumentParser(description="YOLOv5 Dynamic Source DeepStream API")
    parser.add_argument("--host", default="0.0.0.0")
    parser.add_argument("--port", default=5000, type=int)
    parser.add_argument("--debug", action="store_true")
    # Set the default maximum number of parallel streams to 8
    parser.add_argument("--parallel-streams", default=DEFAULT_MAX_STREAMS, type=int)
    parser.add_argument("--workers", default=1, type=int)
    parser.add_argument("--source-idle-sec", default=DEFAULT_SOURCE_IDLE_SEC, type=float)
    
    args, _ = parser.parse_known_args()
    return args


args = parse_args()


@asynccontextmanager
async def lifespan(app: FastAPI):
    print(f"[PID {os.getpid()}] Starting DeepStream service pipelines (Hor / Ver)...")
    app.state.detection_service_hor = DynamicSourceDetectionService(
        infer_config_path=HOR_INFER_CONFIG_PATH,
        max_streams=args.parallel_streams,
        source_idle_sec=args.source_idle_sec,
    )
    app.state.detection_service_ver = DynamicSourceDetectionService(
        infer_config_path=VER_INFER_CONFIG_PATH,
        max_streams=args.parallel_streams,
        source_idle_sec=args.source_idle_sec,
    )
    app.state.detection_service_hor.start()
    app.state.detection_service_ver.start()
    yield
    print(f"[PID {os.getpid()}] Stopping DeepStream service pipelines...")
    app.state.detection_service_hor.stop()
    app.state.detection_service_ver.stop()


app = FastAPI(
    title="YOLOv5 DeepStream Dynamic Source API",
    description="FastAPI service keeping a persistent DeepStream Flow pipeline online.",
    version="1.0.0",
    lifespan=lifespan,
)


# ==============================================================================
# Request/Response Pydantic Models
# ==============================================================================

class DetectRequest(BaseModel):
    video_paths: List[str]
    output_path: Optional[str] = None
    timeout: Optional[float] = None
    model_choice: Literal["ver", "hor"]


# ==============================================================================
# API Routes
# ==============================================================================

@app.get("/health")
async def health(request: Request):
    status_hor = request.app.state.detection_service_hor.status()
    status_ver = request.app.state.detection_service_ver.status()
    return {
        "status_hor": "ok" if status_hor["pipeline_error"] is None else "error", 
        **status_hor, 
        "status_ver": "ok" if status_ver["pipeline_error"] is None else "error", 
        **status_ver
    }


@app.post("/detect")
async def detect(req: DetectRequest, request: Request):
    """
    Receive detection requests via FastAPI.
    Even when multiple videos are submitted in a single request, we use asyncio.to_thread
    to launch multiple independent detection tasks in parallel, which are concurrently
    injected into the running DeepStream pipeline.
    """
    video_paths = req.video_paths
    timeout_sec = req.timeout
    detection_service = (
        request.app.state.detection_service_hor 
        if req.model_choice == "hor" 
        else request.app.state.detection_service_ver
    )
    base_output_path = Path(req.output_path)
    try:
        tasks = []
        for idx, video in enumerate(video_paths):
            unique_output_path = base_output_path.parent / f"{base_output_path.stem}_{idx}_{uuid.uuid4().hex[:6]}{base_output_path.suffix}"
            tasks.append(
                asyncio.to_thread(
                    detection_service.submit_single_video,
                    video_path=video,
                    output_path=unique_output_path,
                    timeout_sec=timeout_sec
                )
            )
        results = await asyncio.gather(*tasks)
        return {
            "status": "success",
            "results": results
        }
        
    except HTTPException:
        raise
    except Exception as exc:
        raise HTTPException(status_code=500, detail=str(exc))


if __name__ == "__main__":
    import uvicorn
    uvicorn.run("main:app", host=args.host, port=args.port, reload=args.debug, workers=args.workers)

Inference Configuration

The inference model is YOLOv5. I compiled the custom C++ bounding box parsing library following this open-source project: DeepStream-Yolo YOLOv5 Guide and configured my inference file accordingly.

Here is my configuration file

config_infer_yolov5_hor.txt

[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
onnx-file=/home/xietianchong/data_demo/deepyolo/models/yolov5m_640_640_h_train30.8w.onnx
model-engine-file=/home/xietianchong/data_demo/yolov5_deepstream/hor_model_b2_gpu0_fp16.engine
#int8-calib-file=calib.table
labelfile-path=/home/xietianchong/data_demo/yolov5_deepstream/labels.txt
batch-size=32
network-mode=2
num-detected-classes=2
interval=0
gie-unique-id=1
process-mode=1
network-type=0
cluster-mode=2
#maintain-aspect-ratio=1
#symmetric-padding=1
#workspace-size=2000
#parse-bbox-func-name=NvDsInferParseYolo
parse-bbox-func-name=NvDsInferParseYoloCuda
custom-lib-path=/home/xietianchong/data_demo/deepyolo/DeepStream-Yolo/nvdsinfer_custom_impl_Yolo/libnvdsinfer_custom_impl_Yolo.so
engine-create-func-name=NvDsInferYoloCudaEngineGet
[class-attrs-all]
nms-iou-threshold=0.45
pre-cluster-threshold=0.25
topk=300

config_infer_yolov5_ver.txt

[property]
gpu-id=0
net-scale-factor=0.0039215697906911373
model-color-format=0
onnx-file=/home/xietianchong/data_demo/deepyolo/models/yolov5m_20251226.onnx
model-engine-file=/home/xietianchong/data_demo/yolov5_deepstream/ver_model_b8_gpu0_fp16.engine
#int8-calib-file=calib.table
labelfile-path=/home/xietianchong/data_demo/yolov5_deepstream/labels.txt
batch-size=32
network-mode=2
num-detected-classes=2
interval=0
gie-unique-id=1
process-mode=1
network-type=0
cluster-mode=2
#maintain-aspect-ratio=1
#symmetric-padding=1
#workspace-size=2000
#parse-bbox-func-name=NvDsInferParseYolo
parse-bbox-func-name=NvDsInferParseYoloCuda
custom-lib-path=/home/xietianchong/data_demo/deepyolo/DeepStream-Yolo/nvdsinfer_custom_impl_Yolo/libnvdsinfer_custom_impl_Yolo.so
engine-create-func-name=NvDsInferYoloCudaEngineGet
[class-attrs-all]
nms-iou-threshold=0.45
pre-cluster-threshold=0.25
topk=300

How to Start the Project

cd /path/to/main.py
uvicorn main_new:app --host 0.0.0.0 --port 5000

python requirement.txt

annotated-doc==0.0.4
annotated-types==0.7.0
anyio==4.13.0
blinker==1.9.0
certifi==2026.5.20
charset-normalizer==3.4.7
click==8.4.1
colorama==0.4.6
coloredlogs==15.0.1
contourpy==1.3.2
cuda-bindings==12.9.4
cuda-pathfinder==1.2.2
cuda-toolkit==12.6.3
cycler==0.12.1
exceptiongroup==1.3.1
fastapi==0.136.3
filelock==3.29.0
Flask==3.1.3
flatbuffers==25.12.19
fonttools==4.63.0
fsspec==2026.4.0
gitdb==4.0.12
GitPython==3.1.50
h11==0.16.0
humanfriendly==10.0
idna==3.15
itsdangerous==2.2.0
Jinja2==3.1.6
kiwisolver==1.5.0
MarkupSafe==3.0.3
matplotlib==3.10.9
ml_dtypes==0.5.4
mpmath==1.3.0
networkx==3.4.2
numpy==2.2.6
nvidia-cublas-cu12==12.6.4.1
nvidia-cuda-cupti-cu12==12.6.80
nvidia-cuda-nvrtc-cu12==12.6.85
nvidia-cuda-runtime-cu12==12.6.77
nvidia-cudnn-cu12==9.10.2.21
nvidia-cufft-cu12==11.3.0.4
nvidia-cufile-cu12==1.11.1.6
nvidia-curand-cu12==10.3.7.77
nvidia-cusolver-cu12==11.7.1.2
nvidia-cusparse-cu12==12.5.4.2
nvidia-cusparselt-cu12==0.7.1
nvidia-nccl-cu12==2.29.3
nvidia-nvjitlink-cu12==12.6.85
nvidia-nvshmem-cu12==3.4.5
nvidia-nvtx-cu12==12.6.77
onnx==1.21.0
onnx-ir==0.2.1
onnxruntime==1.23.2
onnxscript==0.7.0
onnxslim==0.1.93
opencv-python==4.13.0.92
packaging==26.0
pandas==2.3.3
pillow==12.2.0
polars==1.40.1
polars-runtime-32==1.40.1
protobuf==7.35.0
psutil==7.2.2
pydantic==2.13.4
pydantic_core==2.46.4
pyparsing==3.3.2
pyservicemaker @ file:///opt/nvidia/deepstream/deepstream-7.1/service-maker/python/pyservicemaker-0.0.1-py3-none-linux_x86_64.whl#sha256=d12b29ce589853b57cf58e0e2a1981db584d556e40f41fdec3e9ee825da15706
python-dateutil==2.9.0.post0
pytz==2026.2
PyYAML==6.0.3
requests==2.34.2
scipy==1.15.3
seaborn==0.13.2
six==1.17.0
smmap==5.0.3
starlette==1.1.0
sympy==1.14.0
thop==0.1.1.post2209072238
torch==2.12.0+cu126
torchaudio==2.11.0+cu126
torchvision==0.27.0+cu126
tqdm==4.67.3
triton==3.7.0
typing-inspection==0.4.2
typing_extensions==4.15.0
tzdata==2026.2
ultralytics==8.4.52
ultralytics-thop==2.0.19
urllib3==2.7.0
uvicorn==0.48.0
Werkzeug==3.1.8

Where and how did you get these model engine files?

I obtained the engine files I need from the open-source DeepStream-Yolo project on GitHub:

Following the instructions in the YOLOv5 documentation:

I will perform the following steps to export my models:

git clone https://github.com/ultralytics/yolov5.git
cd yolov5
pip3 install -r requirements.txt
pip3 install onnx onnxslim onnxruntime

git clone https://github.com/marcoslucianops/DeepStream-Yolo.git

cd DeepStream-Yolo/utils/export_yoloV5.py

python export_yoloV5.py -w /path/to/my_ver_model.pt -s 384 640 --dynamic --simplify

python export_yoloV5.py -w /path/to/my_hor_model.pt -s 384 640 --dynamic --simplify

These commands export my YOLOv5 .pt models to ONNX format, which I can then use to generate the TensorRT engine files required by DeepStream.

No. I mean the TensorRT engine file but not the Pytorch model or ONNX model.

Your nvinfer configurations are confusing. E.G. in config_infer_yolov5_hor.txt, you set model-engine-file=/home/xietianchong/data_demo/yolov5_deepstream/hor_model_b2_gpu0_fp16.engine while batch-size=32, I’m not sure whether the TensorRT engine is generated as batch size 32 or batch size 2. Please just tell us how the /home/xietianchong/data_demo/yolov5_deepstream/hor_model_b2_gpu0_fp16.engine is generated.

Can you elaborate the “The inference processing speed feels throttled.”? Is the video playback smooth? Do you mean the GPU is not fully used?

To run 8 pipelines means to run 8 same models simultaneously. The GPU is overloaded.

You are working with live streams, right?

In Scenario A, the GPU utilization reported by nvidia-smi stays at around 52%, so my concern is that the GPU does not appear to be fully utilized.

In Scenario B, we run 8 processes with 8 separate pipelines on the same GPU. The GPU memory does not overflow, and GPU utilization can easily reach 100%.

My question is why Scenario A cannot drive the GPU utilization close to 100%, while Scenario B can, even though both are processing the same workload on the same GPU.

Also, to clarify, I am not working with live streams. I am processing local MP4 video files, so the input source should not be limited by real-time frame rates.

This is why I suspect there may be some bottleneck in the single-process / single-pipeline architecture, but I have not yet identified where it is.

I can get the onnx file
python export_yoloV5.py -w /path/to/my_ver_model.pt -s 384 640 --dynamic --simplify
If I run the python main.py, the engine file will generate automaticly.