import cv2
import numpy as np
import pycuda.driver as cuda
import pycuda.autoinit
import tensorrt as trt
import time
Initialize TensorRT logger
TRT_LOGGER = trt.Logger(trt.Logger.INFO)
def load_engine(engine_path):
with open(engine_path, “rb”) as f:
runtime = trt.Runtime(TRT_LOGGER)
return runtime.deserialize_cuda_engine(f.read())
def allocate_buffers(engine):
inputs =
outputs =
bindings =
stream = cuda.Stream()
for binding in engine:
size = trt.volume(engine.get_binding_shape(binding)) * engine.max_batch_size
dtype = trt.nptype(engine.get_binding_dtype(binding))
host_mem = cuda.pagelocked_empty(size, dtype)
device_mem = cuda.mem_alloc(host_mem.nbytes)
bindings.append(int(device_mem))
if engine.binding_is_input(binding):
inputs.append({"host": host_mem, "device": device_mem})
else:
outputs.append({"host": host_mem, "device": device_mem})
return inputs, outputs, bindings, stream
def preprocess_image(image, input_shape):
h, w = input_shape
resized = cv2.resize(image, (w, h))
normalized = resized / 255.0
transposed = normalized.transpose(2, 0, 1) # HWC to CHW
return np.ascontiguousarray(transposed, dtype=np.float32)
def postprocess_output(output, original_shape, confidence_threshold=0.8):
h, w, _ = original_shape
detections =
for det in output:
if det[4] > confidence_threshold: # Confidence threshold
x, y, box_w, box_h = det[:4]
x_min = int((x - box_w / 2) * w)
y_min = int((y - box_h / 2) * h)
x_max = int((x + box_w / 2) * w)
y_max = int((y + box_h / 2) * h)
detections.append((x_min, y_min, x_max, y_max))
return detections
def detect_objects(context, buffers, image, input_shape):
inputs, outputs, bindings, stream = buffers
preprocessed_image = preprocess_image(image, input_shape)
np.copyto(inputs[0][“host”], preprocessed_image.ravel())
cuda.memcpy_htod_async(inputs[0]["device"], inputs[0]["host"], stream)
context.execute_async_v2(bindings, stream.handle, None)
cuda.memcpy_dtoh_async(outputs[0]["host"], outputs[0]["device"], stream)
stream.synchronize()
return outputs[0]["host"].reshape(-1, 6) # Adjust this based on your model output
def main():
engine_path = “/home/proxpc/Documents/yolo11n_new.engine” # Path to your TensorRT engine file
input_shape = (640, 640) # Update based on your model input shape
engine = load_engine(engine_path)
context = engine.create_execution_context()
buffers = allocate_buffers(engine)
cap = cv2.VideoCapture('rtsp://abcdd:xxxxx%40dxx@192.xxx.xx.xx:554/Streaming/Channels/101') # Change 0 to your CCTV feed URL or device ID
while cap.isOpened():
start_time = time.time()
ret, frame = cap.read()
if not ret:
break
# Preprocess and run inference
original_shape = frame.shape
output = detect_objects(context, buffers, frame, input_shape)
detections = postprocess_output(output, original_shape)
# Draw bounding boxes
for x_min, y_min, x_max, y_max in detections:
cv2.rectangle(frame, (x_min, y_min), (x_max, y_max), (0, 255, 0), 2)
# Calculate FPS
inference_time = time.time() - start_time
fps = 1 / inference_time
cv2.putText(frame, f"FPS: {fps:.2f}", (10, 30), cv2.FONT_HERSHEY_SIMPLEX, 1, (0, 255, 0), 2)
# Display the frame
cv2.imshow("Person Detection", frame)
if cv2.waitKey(1) & 0xFF == ord('q'):
break
cap.release()
cv2.destroyAllWindows()
if name == “main”:
main()