Using Kafka protocol for retrieving data from a deepstream pipeline.

Hi Everyone,

I have been trying to get DeepStream and Kafka working together from a very long time.
I am writing all the steps here that I followed to receive the data from DS to my python consumer.

Background on Kafka
Kafka is a distributed streaming platform that is used to publish and subscribe to streams of records.

Step 1: Start Kafka broker on Host machine
Make sure you have docker and docker-compose installed on your host machine.
create a file namely docker-compose.yml and put below content.

version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:5.4.3
    depends_on:
      - zookeeper
    ports:
     - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_LOG_DIRS: /kafka/logs
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://<ADD-YOUR-HOST-IP-HERE>:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

Yes you need to have zookeper container running in order for kafka to work, please update <ADD-YOUR-HOST-IP-HERE> in the above file otherwise deepstream will not be able to connect to your host pc. Start the containers using below command

sudo docker-compose up

Step 2: Test if kafka is working
Very basic thing you can do to make sure Kafka is running is by checking the running containers, open another terminal on Host machine and run

sudo docker ps

Output:

CONTAINER ID        IMAGE                             COMMAND                  CREATED             STATUS              PORTS                          NAMES
0e9c253875b4        confluentinc/cp-kafka:5.4.3       "/etc/confluent/dock…"   7 hours ago         Up 52 minutes       0.0.0.0:9092->9092/tcp         kafka-exp_kafka_1
58b24793db15        confluentinc/cp-zookeeper:5.4.3   "/etc/confluent/dock…"   7 hours ago         Up 52 minutes       2181/tcp, 2888/tcp, 3888/tcp   kafka-exp_zookeeper_1

Now test if you can publish / subscribe to kafka, I am using python for this.
Install python client using:

pip3 install kafka-python==2.0.2

Create a file called producer.py and put in below content.

import time
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='<ADD-YOUR-HOST-IP-HERE>:9092',
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

idx = 0
while True:
    data = {'idx': idx}
    producer.send('test', value=data)
    print(data)
    time.sleep(0.03)
    idx += 1

Run producer.py

python3 producer.py

Output:

{'idx': 0}
{'idx': 1}
.
.
.

This means we are able to publish data to kafka. Now let’s try to consume.
Create another file consumer.py and put in below content.

import time
from kafka import KafkaConsumer
from json import loads
import uuid 

consumer = KafkaConsumer(
    'test',
    bootstrap_servers='<ADD-YOUR-HOST-IP-HERE>:9092',
    auto_offset_reset='latest',from latest
    enable_auto_commit=True,
    group_id=str(uuid.uuid1()),
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

# do a dummy poll to retrieve some message
consumer.poll()

# go to end of the stream
consumer.seek_to_end()

for event in consumer:
    event_data = event.value
    print(event_data)

Note I am using random group_id so that I can have Independent consumers receiving the same data.
Run consumer.py

python3 consumer.py

Output:

{'idx': 30}
{'idx': 31}
.
.
.

If you are able to receive the messages, that means your kafka broker is working. just to make sure that deepstream will be able to connect, try to run the producer from Jetson and consumer from Host machine. This will confirm that the kafka is reachable from other machines too.

Step 3: Setup kafka dependency on Jetson
We are going to run deepstream-test4 in this example. First let’s install librdkafka

cd /opt/nvidia/deepstream/deepstream/sources/libs/kafka_protocol_adaptor
sudo git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
sudo git reset --hard 7101c2310341ab3f4675fc565f64f0967e135a6a
sudo ./configure
sudo make
sudo make install
sudo cp /usr/local/lib/librdkafka* /opt/nvidia/deepstream/deepstream/lib/
sudo ldconfig
sudo apt-get install libglib2.0 libglib2.0-dev
sudo apt-get install  libjansson4  libjansson-dev

Now librdkafka is installed, we will start out deepstream-application with proper configuration.

cd /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test4
sudo make
./deepstream-test4-app -i /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264 \
    -p /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so \
    --conn-str="<ADD-YOUR-HOST-IP-HERE>;9092" \
    --topic="test" -s 0

Output:

Now playing: /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264

Using winsys: x11 
Opening in BLOCKING MODE 
ERROR: Deserialize engine failed because file path: /opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test4/../../../../samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine open error
0:00:01.507939438 10139   0x55afb59a90 WARN                 nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1690> [UID = 1]: deserialize engine from file :/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test4/../../../../samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine failed
0:00:01.508133807 10139   0x55afb59a90 WARN                 nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:1797> [UID = 1]: deserialize backend context from engine from file :/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test4/../../../../samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine failed, try rebuild
0:00:01.508171504 10139   0x55afb59a90 INFO                 nvinfer gstnvinfer.cpp:619:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:1715> [UID = 1]: Trying to create engine from model files
INFO: [TRT]: Reading Calibration Cache for calibrator: EntropyCalibration2
INFO: [TRT]: Generated calibration scales using calibration cache. Make sure that calibration cache has latest scales.
INFO: [TRT]: To regenerate calibration cache, please delete the existing one. TensorRT will generate a new calibration cache.
INFO: [TRT]: 
INFO: [TRT]: --------------- Layers running on DLA: 
INFO: [TRT]: 
INFO: [TRT]: --------------- Layers running on GPU: 
INFO: [TRT]: conv1 + activation_1/Relu, block_1a_conv_1 + activation_2/Relu, block_1a_conv_2, block_1a_conv_shortcut + add_1 + activation_3/Relu, block_2a_conv_1 + activation_4/Relu, block_2a_conv_2, block_2a_conv_shortcut + add_2 + activation_5/Relu, block_3a_conv_1 + activation_6/Relu, block_3a_conv_2, block_3a_conv_shortcut + add_3 + activation_7/Relu, block_4a_conv_1 + activation_8/Relu, block_4a_conv_2, block_4a_conv_shortcut + add_4 + activation_9/Relu, conv2d_cov, conv2d_cov/Sigmoid, conv2d_bbox, 
INFO: [TRT]: Detected 1 inputs and 2 output network tensors.
ERROR: Serialize engine failed because of file path: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine opened error
0:00:15.193611535 10139   0x55afb59a90 WARN                 nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:1743> [UID = 1]: failed to serialize cude engine to file: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine
INFO: [Implicit Engine Info]: layers num: 3
0   INPUT  kFLOAT input_1         3x368x640       
1   OUTPUT kFLOAT conv2d_bbox     16x23x40        
2   OUTPUT kFLOAT conv2d_cov/Sigmoid 4x23x40         

0:00:15.204773224 10139   0x55afb59a90 INFO                 nvinfer gstnvinfer_impl.cpp:313:notifyLoadModelStatus:<primary-nvinference-engine> [UID 1]: Load new model:dstest4_pgie_config.txt sucessfully
Running...
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
Frame Number = 0 Vehicle Count = 3 Person Count = 2
Frame Number = 1 Vehicle Count = 3 Person Count = 2
Frame Number = 2 Vehicle Count = 3 Person Count = 2
Frame Number = 3 Vehicle Count = 4 Person Count = 3
Frame Number = 4 Vehicle Count = 4 Person Count = 2
Frame Number = 5 Vehicle Count = 3 Person Count = 2
Frame Number = 6 Vehicle Count = 3 Person Count = 2
Frame Number = 7 Vehicle Count = 4 Person Count = 2
Frame Number = 8 Vehicle Count = 4 Person Count = 2
Frame Number = 9 Vehicle Count = 4 Person Count = 2
Frame Number = 10 Vehicle Count = 4 Person Count = 2
Frame Number = 11 Vehicle Count = 5 Person Count = 2
Frame Number = 12 Vehicle Count = 4 Person Count = 2
Frame Number = 13 Vehicle Count = 4 Person Count = 2
Frame Number = 14 Vehicle Count = 5 Person Count = 2
Frame Number = 15 Vehicle Count = 5 Person Count = 2
Frame Number = 16 Vehicle Count = 4 Person Count = 2
Frame Number = 17 Vehicle Count = 5 Person Count = 2

If everything works fine, the application will start. You can use consumer.py to receive messages.

python3 consumer.py

Output:

{
   "messageid":"bf45b525-1f51-4018-b559-5d9017c32d31",
   "mdsversion":"1.0",
   "@timestamp":"2020-12-09T14:32:56.178Z",
   "place":{
      "id":"1",
      "name":"XYZ",
      "type":"garage",
      "location":{
         "lat":30.32,
         "lon":-40.55,
         "alt":100.0
      },
      "aisle":{
         "id":"walsh",
         "name":"lane1",
         "level":"P2",
         "coordinate":{
            "x":1.0,
            "y":2.0,
            "z":3.0
         }
      }
   },
   "sensor":{
      "id":"CAMERA_ID",
      "type":"Camera",
      "description":"\"Entrance of Garage Right Lane\"",
      "location":{
         "lat":45.293701447,
         "lon":-75.8303914499,
         "alt":48.1557479338
      },
      "coordinate":{
         "x":5.2,
         "y":10.1,
         "z":11.2
      }
   },
   "analyticsModule":{
      "id":"XYZ",
      "description":"\"Vehicle Detection and License Plate Recognition\"",
      "source":"OpenALR",
      "version":"1.0",
      "confidence":-0.10000000149011612
   },
   "object":{
      "id":"-1",
      "speed":0.0,
      "direction":0.0,
      "orientation":0.0,
      "vehicle":{
         "type":"sedan",
         "make":"Bugatti",
         "model":"M",
         "color":"blue",
         "licenseState":"CA",
         "license":"XX1234",
         "confidence":-0.10000000149011612
      },
      "bbox":{
         "topleftx":1062,
         "toplefty":472,
         "bottomrightx":1107,
         "bottomrighty":498
      },
      "location":{
         "lat":0.0,
         "lon":0.0,
         "alt":0.0
      },
      "coordinate":{
         "x":0.0,
         "y":0.0,
         "z":0.0
      }
   },
   "event":{
      "id":"f6836b64-ba79-48fe-9487-6872b5e9dfd7",
      "type":"moving"
   },
   "videoPath":""
}

Notes:

  • Please read about kafka basics before trying to create your infrastructure, it will help a lot in debugging.
  • Use different group ids if you are planning to subscribe to same data from different consumers.
  • In case you want to balance the message in between consumers such that no two consumers get the same message, use same group id for both. However you need to create partitions on the topic.

I hope this will help others.
Thanks!

15 Likes