*• Hardware Platform ( GPU) Quadro T2000
• DeepStream Version 7.1
• TensorRT Version 10.03.00
**NVIDIA GPU Driver Version : 570
**• How to setup deepstream triton docker container, do inferencing over triton server on custom models, handle input/output streams using deepstream.
Priogramming Language Python-
- please refer to this link for how to setup Deepstream triton docker.
- DeepStream provides nvifnerserver plugin to do inference over triton. please refer to nvinfersever explanation.
- if using python, please refer to this link for how to use Python Bindings to develop DeepStream application. In short, after starting docker, you can use /opt/nvidia/deepstream/deepstream/user_deepstream_python_apps_install.sh to install python binding.
- please refer to ready-made nvinferserver sample deepstream-test3.
Thank you for the reply! I have set up DeepStream with Triton. Is it possible to run a Python file containing a GStreamer pipeline with custom inference using Triton, without using the config text files?
- please refer to my last commnent for how to use Python binding. please refer to this faq for python binding installation.
- please refer to deepstream-test3 mentioned above for Python nvinferserver sample. the config text is needed, one is model configuration for triton, and the other is configuration file for nvinferserver. README is helpful to run this sample.
Here is the sample code, is this a right way as i do not want to use the config files.
Please suggest me a better way to do this,
import gi
import numpy as np
import cv2
import threading
import os
import socket
import json
from loguru import logger
# from confluent_kafka import Producer
from ultralytics import YOLO
gi.require_version('Gst', '1.0')
gi.require_version('GstVideo', '1.0')
gi.require_version('GLib', '2.0')
from gi.repository import Gst, GstVideo, GLib, GstBase, GObject
Gst.init(None)
# Configure Kafka producer with better performance settings
# kafka_conf = {
# 'bootstrap.servers': 'localhost:9092',
# 'client.id': 'gst_inference',
#'queue.buffering.max.messages': 100000,
#'queue.buffering.max.ms': 500,
#'batch.num.messages': 10000,
#'linger.ms': 500,
#'compression.type': 'lz4'
#}
# producer = Producer(kafka_conf)
class InferenceElement(GstBase.BaseTransform):
__gstmetadata__ = ('Custom Inference Element', 'Filter', 'Performs custom ML inference on video frames', 'rtspInference')
__gsttemplates__ = (
Gst.PadTemplate.new("sink", Gst.PadDirection.SINK, Gst.PadPresence.ALWAYS, Gst.Caps.from_string("video/x-raw,format=BGR,width=640,height=480")),
Gst.PadTemplate.new("src", Gst.PadDirection.SRC, Gst.PadPresence.ALWAYS, Gst.Caps.from_string("video/x-raw,format=BGR,width=640,height=480"))
)
def __init__(self):
super(InferenceElement, self).__init__()
self.model = None
self.frame_count = 0
self.load_model()
def load_model(self):
try:
logger.info("Loading YOLO model...")
self.model = YOLO("http://localhost:8000/yolov8_attendence", task='detect')
logger.info("YOLO model loaded successfully.")
except Exception as e:
logger.error(f"Failed to load model: {str(e)}")
raise
def do_transform_ip(self, buf):
try:
caps = self.sinkpad.get_current_caps()
structure = caps.get_structure(0)
self.width = structure.get_int("width")[1] # Extract width
self.height = structure.get_int("height")[1] # Extract height
logger.info(f"Processing frame with original shape: ({self.height}, {self.width}, 3)")
with buf.map(Gst.MapFlags.READ | Gst.MapFlags.WRITE) as map_info:
frame = np.ndarray(shape=(self.height, self.width, 3), dtype=np.uint8, buffer=map_info.data)
# Run inference
detections = self.run_inference(frame)
if detections:
logger.info(f"Detected {len(detections)} objects")
self.draw_detections(frame, detections)
# Send Kafka messages one by one
# if detections:
# for detection in detections:
# self.send_kafka_message(detection)
buffer_array = np.frombuffer(map_info.data, dtype=np.uint8).reshape(self.height, self.width, 3)
np.copyto(buffer_array, frame)
except Exception as e:
logger.error(f"Processing error: {str(e)}")
return Gst.FlowReturn.ERROR
return Gst.FlowReturn.OK
def run_inference(self, frame):
try:
logger.debug("Running inference...")
res = self.model.predict(frame, conf=0.5, verbose=False)
return self.parse_detections(res)
except Exception as e:
logger.error(f"Inference error: {str(e)}")
return []
def parse_detections(self, res):
detections = []
for pred in res[0].boxes.data.cpu().numpy():
x1, y1, x2, y2 = map(int, pred[:4])
confidence = float(pred[4])
class_id = int(pred[5])
detections.append({
"bbox": [x1, y1, x2, y2],
"confidence": confidence,
"class_id": class_id
})
return detections
def draw_detections(self, frame, detections):
for det in detections:
x1, y1, x2, y2 = det["bbox"]
cv2.rectangle(frame, (x1, y1), (x2, y2), (0, 255, 0), 2)
label = f'{det["class_id"]}: {det["confidence"]:.2f}'
cv2.putText(frame, label, (x1, y1 - 10), cv2.FONT_HERSHEY_SIMPLEX, 0.5, (255, 0, 0), 2)
def send_kafka_message(self, detection):
try:
# Send each detection as a separate Kafka message
producer.produce(
"test_results",
key=str(self.frame_count),
value=json.dumps(detection)
)
producer.poll(0)
logger.info(f"Sent detection: {detection}")
except Exception as e:
logger.error(f"Kafka send error: {str(e)}")
# Register custom element
GObject.type_register(InferenceElement)
Gst.Element.register(None, "inference_rtsp", Gst.Rank.NONE, InferenceElement)
class VideoViewer:
def __init__(self, source, http_port):
self.source = str(source) if isinstance(source, int) else source
self.http_port = http_port
self.pipeline = None
self.main_loop = None
self.cleanup_flag = threading.Event()
def is_file_source(self):
return self.source.startswith("file://") or os.path.isfile(self.source)
def get_local_ip(self):
try:
with socket.socket(socket.AF_INET, socket.SOCK_DGRAM) as s:
s.connect(("8.8.8.8", 80))
return s.getsockname()[0]
except Exception:
return "0.0.0.0"
def create_pipeline(self):
common_pipeline = (
"videoconvert ! videoscale ! video/x-raw,width=640,height=480,format=BGR ! "
"inference_rtsp name=rtspInference ! "
"videoconvert ! jpegenc ! "
"multipartmux boundary=mjpegstream ! "
f"tcpserversink host=0.0.0.0 port={self.http_port} sync=false"
)
if self.is_file_source():
file_path = os.path.abspath(self.source.replace("file://", ""))
pipeline_str = (
f"filesrc location={file_path} ! qtdemux name=demux "
f"demux.video_0 ! h264parse ! avdec_h264 ! {common_pipeline}" # videoconvert ! autovideosink"
)
elif self.source.startswith("rtsp://"):
pipeline_str = (
f'rtspsrc location={self.source} latency=0 ! '
'queue max-size-buffers=3 ! rtph264depay ! h264parse ! '
'avdec_h264 ! videoconvert ! '
f'queue max-size-buffers=3 ! {common_pipeline}'
)
else:
device = f"/dev/video{self.source}" if self.source.isdigit() else self.source
pipeline_str = f"v4l2src device={device} ! video/x-raw,format=YUY2,width=640,height=480 ! videoconvert ! queue max-size-buffers=3 ! {common_pipeline}"
self.pipeline = Gst.parse_launch(pipeline_str)
bus = self.pipeline.get_bus()
bus.add_signal_watch()
bus.connect("message", self.on_message)
def on_message(self, bus, message):
if message.type == Gst.MessageType.ERROR:
err, debug = message.parse_error()
logger.error(f"Error: {err}, {debug}")
elif message.type == Gst.MessageType.EOS:
logger.info("End of stream reached")
def start(self):
try:
self.create_pipeline()
self.pipeline.set_state(Gst.State.PLAYING)
self.main_loop = GLib.MainLoop()
self.main_loop.run()
except Exception as e:
logger.error(f"Failed to start pipeline: {str(e)}")
viewer = VideoViewer('/dev/video0', 5001)
viewer.start()
you may use gst-launch to debug first. then port the pipeline to the code. please refer to the following cmd and add nvstreammux and nvinferserver in the code for inference.
gst-launch-1.0 filesrc location=/opt/nvidia/deepstream/deepstream/samples/streams//sample_720p.mp4 ! qtdemux ! h264parse ! nvv4l2decoder ! mux.sink_0 nvstreammux name=mux batch-size=1 width=1920 height=1080 ! nvinferserver config-file-path=./ds_image_meta_pgie_config.txt ! nvvideoconvert ! 'video/x-raw(memory:NVMM),format=RGBA' ! nvdsosd ! nvvideoconvert ! nvv4l2h264enc ! h264parse ! qtmux ! filesink location=./out.mp4
- What i am trying to do here is to create a custom plugin to perform perprocessing, inferencing(by triton) and postprocessing.
- Also is it possible not to use the config.txt and still achieve the same performance.
why don’t you want to use the config.txt? please refer to my last comments. nvinferserver plugin needs a cfg file. please refer to opt\nvidia\deepstream\deepstream-7.1\sources\apps\sample_apps\deepstream-test3\dstest3_pgie_nvinferserver_config.txt. triton needs a cfg file. please refer to opt\nvidia\deepstream\deepstream\samples\triton_model_repo\Primary_Detector\config.pbtxt.
Thanks, Got it.
I want to add webrtc plugin to my gstreamer pipeline, whose input source is a rtsplink.
please refer to my comment on Mar 3. if inputting rtsp, you only need to replace the source part. please refer to the new cmd.
gst-launch-1.0 uridecodebin uri=rtsp://xxx ! nvvideoconvert ! mux.sink_0 nvstreammux name=mux batch-size=1 width=1920 height=1080 ! nvinferserver config-file-path=./ds_image_meta_pgie_config.txt ! nvvideoconvert ! 'video/x-raw(memory:NVMM),format=RGBA' ! nvdsosd ! nvvideoconvert ! nvv4l2h264enc ! h264parse ! qtmux ! filesink location=./out.mp4
I Understood that, but is there a way to add webrtc plugin in the pipeline, so that the output link can be directly supported by the browser to view the camera stream.
Does deepstream provide webrtc plugins by default.
DeepStream is based on Gstreamer. please check if Gstreamer support webrtc plugin. could you share the desired media pipeline? what is the webrtc plugin used to do?
Yes Sure,
pipeline_str = f"“”
filesrc location={container_path} ! qtdemux name=demux
webrtcbin name=webrtcbin bundle-policy=max-bundle stun-server=stun://stun.l.google.com:19302
demux.video_0 ! h264parse ! rtph264pay config-interval=-1 ! queue ! application/x-rtp,media=video,encoding-name=H264,payload=96 ! webrtcbin.
“”"
reference: Mp4 to WebRTC — A python implementation | by Hithesh Jayawardana | Medium
I want to use the webrtc to get the http link of the camera directly so that i can use it in the browser.
Thanks for the sharing! from the DeepStream guide, DeepStream does not provde webrtc plugins. you may check Gstreamer code, which is opensource.
Hi, can anyone help with with Gstreamer, I am trying out rtsp in and rtsp out, But i want to add authentication(username, password) to the rtsp out link.
GstRtspServer is used as rtsp out. gst-rtsp-server is Gstreamer opensource plugin. adding RTSP authentication wound be outside of DeepStream. please refer to the code. Thanks!
There is no update from you for a period, assuming this is not an issue anymore. Hence we are closing this topic. If need further support, please open a new one. Thanks
This topic was automatically closed 14 days after the last reply. New replies are no longer allowed.