Using Kafka protocol for retrieving data from a deepstream pipeline.

Hi,

I have tried to use the out of the box kafka, used a script to subscribe to the topic as well.

My config file looks like. I have just added this part to the config after installing kafka protocol.

Kindly note that I have commented out the config file for the message-broker.

[message-broker]
enable=1
broker-proto-lib=/usr/local/deepstream/libnvds_kafka_proto.so
broker-conn-str=127.0.0.1;9092:ds_meta

instead ‘127.0.0.1’ I have provided ‘localhost’ also.
My script to receive data is like this,

import time, json, cPickle

from kafka import KafkaConsumer

class KConsumer():
	def __init__(self,ip,port):
		self.consumer = KafkaConsumer(bootstrap_servers='{}:{}'.format(ip,port), auto_offset_reset='earliest',consumer_timeout_ms=1000, value_deserializer=lambda m: cPickle.loads(m))
		
	def stop(self):
		self.consumer.close()
	
	def consume(self, key):
		print "KEY : ", key, type(key)
		print self.consumer.subscribe([key])
		print type(self.consumer.subscribe([key]))
		for message in self.consumer:
			print type(message),"\n", message.value
			print "MESSAGE   ::: ", message

if __name__ == '__main__':
	qconsumer = KConsumer("127.0.0.1", 9092)
	qconsumer.consume("ds_meta")

This script is always returning None values. Not sure this is the way I should create the kafka topic.

I have employed the code in the deepstream-app’s source4**.txt sample.

Let me know how exactly to retrieve the data.

Can you refer to analytic server: https://github.com/NVIDIA-AI-IOT/deepstream_360_d_smart_parking_application/

@abramjos have you figured it out i am also recompiling the same example but unable to get the correct output.

Thanks in Advance,

Hi @hidayat, No clue man, Still struggling to get the output.

Hi @hidayat, No clue man, Still struggling to get the output.

@bramjos

i was able to get the output, So you have to follow the following steps in order to receive message, the following environment is docker based you can do the same without using docker images

first,
you have to download and install zookeper

  1. sudo docker run -d --name zookeeper -p 2181:2181 --network kafka_net zookeeper:latest
    this will start zookeper in detached mode then you have to download kafka image

  2. sudo docker run -d --name kafka -p 9092:9092 advertised.listeners=“Ip_adress of your computer” --network kafka_net --env ZOOKEEPER_IP=zookeeper ches/kafka
    this will start kafka server/ message broker, after that you have to created topics this can be done using the following script

  3. sudo docker run --rm --network kafka_net ches/kafka \kafka-topics.sh --create --topic USER_CREATED_TOPIC --replication-factor 1 --partitions 1 --zookeeper zookeeper:2181
    This will create topic to which messages will be published and consumed.

Now you have setup the kafka server and created topic what you need is kafka consumer which will consume messages and producer which will produce. In our case the produces is the deep-stream so you can open another terminal and type the following script

sudo docker run --rm --network kafka_net ches/kafka \kafka-console-consumer.sh --topic
USER_CREATED_TOPIC --from-beginning --bootstrap-server kafka:9092

now you are all set to receive messages prodced by the deepstream now you can run the deepststream image by specifying the --env ZOOKEEPER_IP=zookeeper to the deepstream docker image
and in the connections string specify kafka;9092;USER_CREATED_TOPIC

thats it now you will be able to receive messages.

1 Like

hi,

sudo docker run -d --name kafka -p 9092:9092 advertised.listeners=“XXX.XXX.XXX.XXX” --network kafka_net --env ZOOKEEPER_IP=zookeeper ches/kafka

This is throwing some error saying:
docker: invalid reference format.

‘kafka_net’ is an arbitrary network right? what is ches/kafka?

No need for advertised.listeners you can try it without that it should work also verify it running the test scripts in the libs directory named kafka protocol adapter.

Hi,

Now the data objects I am getting is not that of deepstream-test4, which should be giving car make/model,
rather I am getting the data objects of deepstream-360 app

"orientation" : 0,
    "vehicle" : {
      "type" : "sedan",
      "make" : "Bugatti",
      "model" : "M",
      "color" : "blue",
      "licenseState" : "CA",
      "license" : "XX1234",
      "confidence" : 0
    },
    "bbox" : {
      "topleftx" : 0,
      "toplefty" : 0,
      "bottomrightx" : 0,
      "bottomrighty" : 0
    },
    "location" : {
      "lat" : 0,
      "lon" : 0,
      "alt" : 0
    },
    "coordinate" : {
      "x" : 0,
      "y" : 0,
      "z" : 0
    }
  },
  "event" : {
    "id" : "8cfc0af7-c5de-46ff-9c70-a60bdb3fc959",
    "type" : "moving"
  },
  "videoPath" : ""
}
{
  "messageid" : "acc7de61-3704-461c-8e12-f027cbe52752",
  "mdsversion" : "1.0",
  "@timestamp" : "2019-01-27T06:13:42.203Z",
  "place" : {
    "id" : "1",
    "name" : "XYZ",
    "type" : "garage",
    "location" : {
      "lat" : 30.32,
      "lon" : -40.549999999999997,
      "alt" : 100
    },
    "aisle" : {
      "id" : "walsh",
      "name" : "lane1",
      "level" : "P2",
      "coordinate" : {
        "x" : 1,
        "y" : 2,
        "z" : 3
      }
    }
  },
  "sensor" : {
    "id" : "CAMERA_ID",
    "type" : "Camera",
    "description" : "\"Entrance of Garage Right Lane\"",
    "location" : {
      "lat" : 45.293701446999997,
      "lon" : -75.830391449900006,
      "alt" : 48.155747933800001
    },
    "coordinate" : {
      "x" : 5.2000000000000002,
      "y" : 10.1,
      "z" : 11.199999999999999
    }
  },
  "analyticsModule" : {
    "id" : "XYZ",
    "description" : "\"Vehicle Detection and License Plate Recognition\"",
    "source" : "OpenALR",
    "version" : "1.0",
    "confidence" : 0
  },
  "object" : {
    "id" : "0",
    "speed" : 0,
    "direction" : 0,
    "orientation" : 0,
    "vehicle" : {
      "type" : "sedan",
      "make" : "Bugatti",
      "model" : "M",
      "color" : "blue",
      "licenseState" : "CA",
      "license" : "XX1234",
      "confidence" : 0
    },
    "bbox" : {
      "topleftx" : 0,
      "toplefty" : 0,
      "bottomrightx" : 0,
      "bottomrighty" : 0
    },
    "location" : {
      "lat" : 0,
      "lon" : 0,
      "alt" : 0
    },
    "coordinate" : {
      "x" : 0,
      "y" : 0,
      "z" : 0
    }
  },
  "event" : {
    "id" : "ad2490a8-4089-45ef-b1e6-26bcab26c5c4",
    "type" : "moving"
  },
  "videoPath" : ""

yes that’s true if you look into the code they are hard-coding these values check nvmsgcov

Hmm I see.
yeah. In fact this is what I was getting same as I was running analatical server also, I thought thats because of analatical server is for nvidia-360. Now the problem is with nvmsg plugin!

So @amycao, what can be done?

Hi Any update?
I saw that you got a fix for single stream.
Got any idea about multiple streams?

[message-broker]
enable=1
broker-proto-lib=/usr/local/deepstream/libnvds_kafka_proto.so
broker-conn-str=127.0.0.1;9092:ds_meta

From where do you get these configs, they are not mentioned in the documentation