Using Kafka protocol for retrieving data from a deepstream pipeline.

Hi,

I have tried to use the out of the box kafka, used a script to subscribe to the topic as well.

My config file looks like. I have just added this part to the config after installing kafka protocol.

Kindly note that I have commented out the config file for the message-broker.

[message-broker]
enable=1
broker-proto-lib=/usr/local/deepstream/libnvds_kafka_proto.so
broker-conn-str=127.0.0.1;9092:ds_meta

instead ‘127.0.0.1’ I have provided ‘localhost’ also.
My script to receive data is like this,

import time, json, cPickle

from kafka import KafkaConsumer

class KConsumer():
	def __init__(self,ip,port):
		self.consumer = KafkaConsumer(bootstrap_servers='{}:{}'.format(ip,port), auto_offset_reset='earliest',consumer_timeout_ms=1000, value_deserializer=lambda m: cPickle.loads(m))
		
	def stop(self):
		self.consumer.close()
	
	def consume(self, key):
		print "KEY : ", key, type(key)
		print self.consumer.subscribe([key])
		print type(self.consumer.subscribe([key]))
		for message in self.consumer:
			print type(message),"\n", message.value
			print "MESSAGE   ::: ", message

if __name__ == '__main__':
	qconsumer = KConsumer("127.0.0.1", 9092)
	qconsumer.consume("ds_meta")

This script is always returning None values. Not sure this is the way I should create the kafka topic.

I have employed the code in the deepstream-app’s source4**.txt sample.

Let me know how exactly to retrieve the data.

Can you refer to analytic server: https://github.com/NVIDIA-AI-IOT/deepstream_360_d_smart_parking_application/

@abramjos have you figured it out i am also recompiling the same example but unable to get the correct output.

Thanks in Advance,

Hi @hidayat, No clue man, Still struggling to get the output.

Hi @hidayat, No clue man, Still struggling to get the output.

@bramjos

i was able to get the output, So you have to follow the following steps in order to receive message, the following environment is docker based you can do the same without using docker images

first,
you have to download and install zookeper

  1. sudo docker run -d --name zookeeper -p 2181:2181 --network kafka_net zookeeper:latest
    this will start zookeper in detached mode then you have to download kafka image

  2. sudo docker run -d --name kafka -p 9092:9092 advertised.listeners=“Ip_adress of your computer” --network kafka_net --env ZOOKEEPER_IP=zookeeper ches/kafka
    this will start kafka server/ message broker, after that you have to created topics this can be done using the following script

  3. sudo docker run --rm --network kafka_net ches/kafka \kafka-topics.sh --create --topic USER_CREATED_TOPIC --replication-factor 1 --partitions 1 --zookeeper zookeeper:2181
    This will create topic to which messages will be published and consumed.

Now you have setup the kafka server and created topic what you need is kafka consumer which will consume messages and producer which will produce. In our case the produces is the deep-stream so you can open another terminal and type the following script

sudo docker run --rm --network kafka_net ches/kafka \kafka-console-consumer.sh --topic
USER_CREATED_TOPIC --from-beginning --bootstrap-server kafka:9092

now you are all set to receive messages prodced by the deepstream now you can run the deepststream image by specifying the --env ZOOKEEPER_IP=zookeeper to the deepstream docker image
and in the connections string specify kafka;9092;USER_CREATED_TOPIC

thats it now you will be able to receive messages.

1 Like

hi,

sudo docker run -d --name kafka -p 9092:9092 advertised.listeners=“XXX.XXX.XXX.XXX” --network kafka_net --env ZOOKEEPER_IP=zookeeper ches/kafka

This is throwing some error saying:
docker: invalid reference format.

‘kafka_net’ is an arbitrary network right? what is ches/kafka?

No need for advertised.listeners you can try it without that it should work also verify it running the test scripts in the libs directory named kafka protocol adapter.

Hi,

Now the data objects I am getting is not that of deepstream-test4, which should be giving car make/model,
rather I am getting the data objects of deepstream-360 app

"orientation" : 0,
    "vehicle" : {
      "type" : "sedan",
      "make" : "Bugatti",
      "model" : "M",
      "color" : "blue",
      "licenseState" : "CA",
      "license" : "XX1234",
      "confidence" : 0
    },
    "bbox" : {
      "topleftx" : 0,
      "toplefty" : 0,
      "bottomrightx" : 0,
      "bottomrighty" : 0
    },
    "location" : {
      "lat" : 0,
      "lon" : 0,
      "alt" : 0
    },
    "coordinate" : {
      "x" : 0,
      "y" : 0,
      "z" : 0
    }
  },
  "event" : {
    "id" : "8cfc0af7-c5de-46ff-9c70-a60bdb3fc959",
    "type" : "moving"
  },
  "videoPath" : ""
}
{
  "messageid" : "acc7de61-3704-461c-8e12-f027cbe52752",
  "mdsversion" : "1.0",
  "@timestamp" : "2019-01-27T06:13:42.203Z",
  "place" : {
    "id" : "1",
    "name" : "XYZ",
    "type" : "garage",
    "location" : {
      "lat" : 30.32,
      "lon" : -40.549999999999997,
      "alt" : 100
    },
    "aisle" : {
      "id" : "walsh",
      "name" : "lane1",
      "level" : "P2",
      "coordinate" : {
        "x" : 1,
        "y" : 2,
        "z" : 3
      }
    }
  },
  "sensor" : {
    "id" : "CAMERA_ID",
    "type" : "Camera",
    "description" : "\"Entrance of Garage Right Lane\"",
    "location" : {
      "lat" : 45.293701446999997,
      "lon" : -75.830391449900006,
      "alt" : 48.155747933800001
    },
    "coordinate" : {
      "x" : 5.2000000000000002,
      "y" : 10.1,
      "z" : 11.199999999999999
    }
  },
  "analyticsModule" : {
    "id" : "XYZ",
    "description" : "\"Vehicle Detection and License Plate Recognition\"",
    "source" : "OpenALR",
    "version" : "1.0",
    "confidence" : 0
  },
  "object" : {
    "id" : "0",
    "speed" : 0,
    "direction" : 0,
    "orientation" : 0,
    "vehicle" : {
      "type" : "sedan",
      "make" : "Bugatti",
      "model" : "M",
      "color" : "blue",
      "licenseState" : "CA",
      "license" : "XX1234",
      "confidence" : 0
    },
    "bbox" : {
      "topleftx" : 0,
      "toplefty" : 0,
      "bottomrightx" : 0,
      "bottomrighty" : 0
    },
    "location" : {
      "lat" : 0,
      "lon" : 0,
      "alt" : 0
    },
    "coordinate" : {
      "x" : 0,
      "y" : 0,
      "z" : 0
    }
  },
  "event" : {
    "id" : "ad2490a8-4089-45ef-b1e6-26bcab26c5c4",
    "type" : "moving"
  },
  "videoPath" : ""

yes that’s true if you look into the code they are hard-coding these values check nvmsgcov

Hmm I see.
yeah. In fact this is what I was getting same as I was running analatical server also, I thought thats because of analatical server is for nvidia-360. Now the problem is with nvmsg plugin!

So @amycao, what can be done?

Hi Any update?
I saw that you got a fix for single stream.
Got any idea about multiple streams?

[message-broker]
enable=1
broker-proto-lib=/usr/local/deepstream/libnvds_kafka_proto.so
broker-conn-str=127.0.0.1;9092:ds_meta

From where do you get these configs, they are not mentioned in the documentation

Hi Everyone,

I have been trying to get DeepStream and Kafka working together from a very long time.
I am writing all the steps here that I followed to receive the data from DS to my python consumer.

Background on Kafka
Kafka is a distributed streaming platform that is used to publish and subscribe to streams of records.

Step 1: Start Kafka broker on Host machine
Make sure you have docker and docker-compose installed on your host machine.
create a file namely docker-compose.yml and put below content.

version: "3"
services:
  zookeeper:
    image: confluentinc/cp-zookeeper:5.4.3
    environment:
      ZOOKEEPER_CLIENT_PORT: 2181
      ZOOKEEPER_TICK_TIME: 2000

  kafka:
    image: confluentinc/cp-kafka:5.4.3
    depends_on:
      - zookeeper
    ports:
     - 9092:9092
    environment:
      KAFKA_BROKER_ID: 0
      KAFKA_LOG_DIRS: /kafka/logs
      KAFKA_ZOOKEEPER_CONNECT: zookeeper:2181
      KAFKA_LISTENER_SECURITY_PROTOCOL_MAP: PLAINTEXT:PLAINTEXT,PLAINTEXT_HOST:PLAINTEXT
      KAFKA_INTER_BROKER_LISTENER_NAME: PLAINTEXT
      KAFKA_ADVERTISED_LISTENERS: PLAINTEXT://kafka:29092,PLAINTEXT_HOST://<ADD-YOUR-HOST-IP-HERE>:9092
      KAFKA_AUTO_CREATE_TOPICS_ENABLE: "true"
      KAFKA_OFFSETS_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_TRANSACTION_STATE_LOG_MIN_ISR: 1
      KAFKA_TRANSACTION_STATE_LOG_REPLICATION_FACTOR: 1
      KAFKA_GROUP_INITIAL_REBALANCE_DELAY_MS: 100
      KAFKA_CONFLUENT_LICENSE_TOPIC_REPLICATION_FACTOR: 1
      KAFKA_CONFLUENT_BALANCER_TOPIC_REPLICATION_FACTOR: 1
      CONFLUENT_METRICS_REPORTER_BOOTSTRAP_SERVERS: kafka:29092
      CONFLUENT_METRICS_REPORTER_TOPIC_REPLICAS: 1
      CONFLUENT_METRICS_ENABLE: 'true'
      CONFLUENT_SUPPORT_CUSTOMER_ID: 'anonymous'

Yes you need to have zookeper container running in order for kafka to work, please update <ADD-YOUR-HOST-IP-HERE> in the above file otherwise deepstream will not be able to connect to your host pc. Start the containers using below command

sudo docker-compose up

Step 2: Test if kafka is working
Very basic thing you can do to make sure Kafka is running is by checking the running containers, open another terminal on Host machine and run

sudo docker ps

Output:

CONTAINER ID        IMAGE                             COMMAND                  CREATED             STATUS              PORTS                          NAMES
0e9c253875b4        confluentinc/cp-kafka:5.4.3       "/etc/confluent/dock…"   7 hours ago         Up 52 minutes       0.0.0.0:9092->9092/tcp         kafka-exp_kafka_1
58b24793db15        confluentinc/cp-zookeeper:5.4.3   "/etc/confluent/dock…"   7 hours ago         Up 52 minutes       2181/tcp, 2888/tcp, 3888/tcp   kafka-exp_zookeeper_1

Now test if you can publish / subscribe to kafka, I am using python for this.
Install python client using:

pip3 install kafka-python==2.0.2

Create a file called producer.py and put in below content.

import time
from json import dumps
from kafka import KafkaProducer

producer = KafkaProducer(
    bootstrap_servers='<ADD-YOUR-HOST-IP-HERE>:9092',
    value_serializer=lambda x: dumps(x).encode('utf-8')
)

idx = 0
while True:
    data = {'idx': idx}
    producer.send('test', value=data)
    print(data)
    time.sleep(0.03)
    idx += 1

Run producer.py

python3 producer.py

Output:

{'idx': 0}
{'idx': 1}
.
.
.

This means we are able to publish data to kafka. Now let’s try to consume.
Create another file consumer.py and put in below content.

import time
from kafka import KafkaConsumer
from json import loads
import uuid 

consumer = KafkaConsumer(
    'test',
    bootstrap_servers='<ADD-YOUR-HOST-IP-HERE>:9092',
    auto_offset_reset='latest',from latest
    enable_auto_commit=True,
    group_id=str(uuid.uuid1()),
    value_deserializer=lambda x: loads(x.decode('utf-8'))
)

# do a dummy poll to retrieve some message
consumer.poll()

# go to end of the stream
consumer.seek_to_end()

for event in consumer:
    event_data = event.value
    print(event_data)

Note I am using random group_id so that I can have Independent consumers receiving the same data.
Run consumer.py

python3 consumer.py

Output:

{'idx': 30}
{'idx': 31}
.
.
.

If you are able to receive the messages, that means your kafka broker is working. just to make sure that deepstream will be able to connect, try to run the producer from Jetson and consumer from Host machine. This will confirm that the kafka is reachable from other machines too.

Step 3: Setup kafka dependency on Jetson
We are going to run deepstream-test4 in this example. First let’s install librdkafka

cd /opt/nvidia/deepstream/deepstream/sources/libs/kafka_protocol_adaptor
sudo git clone https://github.com/edenhill/librdkafka.git
cd librdkafka
sudo git reset --hard 7101c2310341ab3f4675fc565f64f0967e135a6a
sudo ./configure
sudo make
sudo make install
sudo cp /usr/local/lib/librdkafka* /opt/nvidia/deepstream/deepstream/lib/
sudo ldconfig
sudo apt-get install libglib2.0 libglib2.0-dev
sudo apt-get install  libjansson4  libjansson-dev

Now librdkafka is installed, we will start out deepstream-application with proper configuration.

cd /opt/nvidia/deepstream/deepstream/sources/apps/sample_apps/deepstream-test4
sudo make
./deepstream-test4-app -i /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264 \
    -p /opt/nvidia/deepstream/deepstream/lib/libnvds_kafka_proto.so \
    --conn-str="<ADD-YOUR-HOST-IP-HERE>;9092" \
    --topic="test" -s 0

Output:

Now playing: /opt/nvidia/deepstream/deepstream/samples/streams/sample_720p.h264

Using winsys: x11 
Opening in BLOCKING MODE 
ERROR: Deserialize engine failed because file path: /opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test4/../../../../samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine open error
0:00:01.507939438 10139   0x55afb59a90 WARN                 nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::deserializeEngineAndBackend() <nvdsinfer_context_impl.cpp:1690> [UID = 1]: deserialize engine from file :/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test4/../../../../samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine failed
0:00:01.508133807 10139   0x55afb59a90 WARN                 nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::generateBackendContext() <nvdsinfer_context_impl.cpp:1797> [UID = 1]: deserialize backend context from engine from file :/opt/nvidia/deepstream/deepstream-5.0/sources/apps/sample_apps/deepstream-test4/../../../../samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine failed, try rebuild
0:00:01.508171504 10139   0x55afb59a90 INFO                 nvinfer gstnvinfer.cpp:619:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Info from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:1715> [UID = 1]: Trying to create engine from model files
INFO: [TRT]: Reading Calibration Cache for calibrator: EntropyCalibration2
INFO: [TRT]: Generated calibration scales using calibration cache. Make sure that calibration cache has latest scales.
INFO: [TRT]: To regenerate calibration cache, please delete the existing one. TensorRT will generate a new calibration cache.
INFO: [TRT]: 
INFO: [TRT]: --------------- Layers running on DLA: 
INFO: [TRT]: 
INFO: [TRT]: --------------- Layers running on GPU: 
INFO: [TRT]: conv1 + activation_1/Relu, block_1a_conv_1 + activation_2/Relu, block_1a_conv_2, block_1a_conv_shortcut + add_1 + activation_3/Relu, block_2a_conv_1 + activation_4/Relu, block_2a_conv_2, block_2a_conv_shortcut + add_2 + activation_5/Relu, block_3a_conv_1 + activation_6/Relu, block_3a_conv_2, block_3a_conv_shortcut + add_3 + activation_7/Relu, block_4a_conv_1 + activation_8/Relu, block_4a_conv_2, block_4a_conv_shortcut + add_4 + activation_9/Relu, conv2d_cov, conv2d_cov/Sigmoid, conv2d_bbox, 
INFO: [TRT]: Detected 1 inputs and 2 output network tensors.
ERROR: Serialize engine failed because of file path: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine opened error
0:00:15.193611535 10139   0x55afb59a90 WARN                 nvinfer gstnvinfer.cpp:616:gst_nvinfer_logger:<primary-nvinference-engine> NvDsInferContext[UID 1]: Warning from NvDsInferContextImpl::buildModel() <nvdsinfer_context_impl.cpp:1743> [UID = 1]: failed to serialize cude engine to file: /opt/nvidia/deepstream/deepstream-5.0/samples/models/Primary_Detector/resnet10.caffemodel_b1_gpu0_int8.engine
INFO: [Implicit Engine Info]: layers num: 3
0   INPUT  kFLOAT input_1         3x368x640       
1   OUTPUT kFLOAT conv2d_bbox     16x23x40        
2   OUTPUT kFLOAT conv2d_cov/Sigmoid 4x23x40         

0:00:15.204773224 10139   0x55afb59a90 INFO                 nvinfer gstnvinfer_impl.cpp:313:notifyLoadModelStatus:<primary-nvinference-engine> [UID 1]: Load new model:dstest4_pgie_config.txt sucessfully
Running...
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
Frame Number = 0 Vehicle Count = 3 Person Count = 2
Frame Number = 1 Vehicle Count = 3 Person Count = 2
Frame Number = 2 Vehicle Count = 3 Person Count = 2
Frame Number = 3 Vehicle Count = 4 Person Count = 3
Frame Number = 4 Vehicle Count = 4 Person Count = 2
Frame Number = 5 Vehicle Count = 3 Person Count = 2
Frame Number = 6 Vehicle Count = 3 Person Count = 2
Frame Number = 7 Vehicle Count = 4 Person Count = 2
Frame Number = 8 Vehicle Count = 4 Person Count = 2
Frame Number = 9 Vehicle Count = 4 Person Count = 2
Frame Number = 10 Vehicle Count = 4 Person Count = 2
Frame Number = 11 Vehicle Count = 5 Person Count = 2
Frame Number = 12 Vehicle Count = 4 Person Count = 2
Frame Number = 13 Vehicle Count = 4 Person Count = 2
Frame Number = 14 Vehicle Count = 5 Person Count = 2
Frame Number = 15 Vehicle Count = 5 Person Count = 2
Frame Number = 16 Vehicle Count = 4 Person Count = 2
Frame Number = 17 Vehicle Count = 5 Person Count = 2

If everything works fine, the application will start. You can use consumer.py to receive messages.

python3 consumer.py

Output:

{
   "messageid":"bf45b525-1f51-4018-b559-5d9017c32d31",
   "mdsversion":"1.0",
   "@timestamp":"2020-12-09T14:32:56.178Z",
   "place":{
      "id":"1",
      "name":"XYZ",
      "type":"garage",
      "location":{
         "lat":30.32,
         "lon":-40.55,
         "alt":100.0
      },
      "aisle":{
         "id":"walsh",
         "name":"lane1",
         "level":"P2",
         "coordinate":{
            "x":1.0,
            "y":2.0,
            "z":3.0
         }
      }
   },
   "sensor":{
      "id":"CAMERA_ID",
      "type":"Camera",
      "description":"\"Entrance of Garage Right Lane\"",
      "location":{
         "lat":45.293701447,
         "lon":-75.8303914499,
         "alt":48.1557479338
      },
      "coordinate":{
         "x":5.2,
         "y":10.1,
         "z":11.2
      }
   },
   "analyticsModule":{
      "id":"XYZ",
      "description":"\"Vehicle Detection and License Plate Recognition\"",
      "source":"OpenALR",
      "version":"1.0",
      "confidence":-0.10000000149011612
   },
   "object":{
      "id":"-1",
      "speed":0.0,
      "direction":0.0,
      "orientation":0.0,
      "vehicle":{
         "type":"sedan",
         "make":"Bugatti",
         "model":"M",
         "color":"blue",
         "licenseState":"CA",
         "license":"XX1234",
         "confidence":-0.10000000149011612
      },
      "bbox":{
         "topleftx":1062,
         "toplefty":472,
         "bottomrightx":1107,
         "bottomrighty":498
      },
      "location":{
         "lat":0.0,
         "lon":0.0,
         "alt":0.0
      },
      "coordinate":{
         "x":0.0,
         "y":0.0,
         "z":0.0
      }
   },
   "event":{
      "id":"f6836b64-ba79-48fe-9487-6872b5e9dfd7",
      "type":"moving"
   },
   "videoPath":""
}

Notes:

  • Please read about kafka basics before trying to create your infrastructure, it will help a lot in debugging.
  • Use different group ids if you are planning to subscribe to same data from different consumers.
  • In case you want to balance the message in between consumers such that no two consumers get the same message, use same group id for both. However you need to create partitions on the topic.

I hope this will help others.
Thanks!

9 Likes

Hi, I want to send my custom json data to the kafka, do you know how can we add that?
Also, can we use KafkaProducer from deepstream instead of using msgbroker? I tested it and it is not affecting the FPS. What do you suggest?

Hello, have you tried test5?
deepstrea-app -c test5_dec_infer-resnet_tracker_sgie_tield_display_int8
I tried it. It is found that consumer.py cannot print out messages like test4.