Outputting input image to kafka

Hi everyone,

I’m trying to output original input image along with the inference output to kafka queue. What I did so far:

I can extract the inference input image with adding two modules after nvinferserver (nvvidconv+filter_caps) just like in deepstream_imagedata-multistream.py and add a source pad probe after the filter module. But this doesn’t give me the input image but rather resized image for the inference model. My videos are 1920x1080 but this method gives me 256x256, which is my model’s input size.

The second thing I tried is creating a pipeline like this:

nvstreammux -> tee ->              pgie            -> funnel
                   -> nvvidconv -> filter -> queue ->

I’ve added tee to output of the streammux element and created two streams. The first stream only contains nvinferserver module. The second stream contains nvvidconv+filter+queue. I’m concatenating these two streams with a funnel and pipeline continues to kafka msg broker module

I’ve added a pad probe to funnel output. But I cannot get the image at all. Deepstream gives me this error:
RuntimeError: get_nvds_buf_Surface: Currently we only support RGBA color Format

I’m getting this error if I don’t add the nvvidconv+filter to my pipeline. My guess is pgie output is overwrites filter output. How can I achieve this, any ideas?

I’m using Jetson Nano, Deepstream 5.0.1, Jetpack 4.5, Tensorrt 7.1.3


Please check the following topic for an example on accessing the raw image data:

For the second question, pgie doesn’t change the buffer since it just uses the data to calculate the dnn output.
A possible cause is from nvvidconv, may I know the detailed parameter setting in the nvvidconv+filter?


Hi @AastaLLL ,

Thanks for your reply.

Here is my pipeline:

	pgie = make_elm_or_print_err("nvinferserver", "primary-inference", "Nvinferserver")
	nvvidconv1 = Gst.ElementFactory.make("nvvideoconvert", "convertor1")
	if not nvvidconv1:
		sys.stderr.write(" Unable to create nvvidconv1 \n")
	print("Creating filter1 \n ")
	caps1 = Gst.Caps.from_string("video/x-raw(memory:NVMM), format=RGBA")
	filter1 = Gst.ElementFactory.make("capsfilter", "filter1")
	if not filter1:
		sys.stderr.write(" Unable to get the caps filter1 \n")
	filter1.set_property("caps", caps1)
		filtersrcpad = filter1.get_static_pad("src")
	if not filtersrcpad:
		sys.stderr.write(" Unable to get src pad of primary infer \n")
	filtersrcpad.add_probe(Gst.PadProbeType.BUFFER, filter_src_pad_buffer_probe, 0)

And this is my filter_src_pad_buffer_probe function:

def filter_src_pad_buffer_probe(pad, info, u_data):
	gst_buffer = info.get_buffer()
	if not gst_buffer:
		print("Unable to get GstBuffer ")

	global count
	global start_date
	if count == 30:
		end_date = time.time()
		print(f"fps: {count / (end_date - start_date)}")
		count = 0
		start_date = end_date
		count += 1

	# Retrieve batch metadata from the gst_buffer
	# Note that pyds.gst_buffer_get_nvds_batch_meta() expects the
	# C address of gst_buffer as input, which is obtained with hash(gst_buffer)
	batch_meta = pyds.gst_buffer_get_nvds_batch_meta(hash(gst_buffer))
	l_frame = batch_meta.frame_meta_list

	detection_params = DetectionParam(CLASS_NB, ACCURACY_ALL_CLASS)
	box_size_param = BoxSizeParam(IMAGE_HEIGHT, IMAGE_WIDTH,
	                              MIN_BOX_WIDTH, MIN_BOX_HEIGHT)
	nms_param = NmsParam(TOP_K, IOU_THRESHOLD)

	while l_frame is not None:
			# Note that l_frame.data needs a cast to pyds.NvDsFrameMeta
			# The casting also keeps ownership of the underlying memory
			# in the C code, so the Python garbage collector will leave
			# it alone.
			frame_meta = pyds.NvDsFrameMeta.cast(l_frame.data)
		except StopIteration:

		n_frame = pyds.get_nvds_buf_surface(hash(gst_buffer), frame_meta.batch_id)
		# convert python array into numy array format.
		frame_image = np.array(n_frame, copy=True, order='C')
		# covert the array into cv2 default color format
		frame_image = cv2.cvtColor(frame_image, cv2.COLOR_RGBA2BGRA)

		all_poses = []
		l_user = frame_meta.frame_user_meta_list
		while l_user is not None:
				# Note that l_user.data needs a cast to pyds.NvDsUserMeta
				# The casting also keeps ownership of the underlying memory
				# in the C code, so the Python garbage collector will leave
				# it alone.
				user_meta = pyds.NvDsUserMeta.cast(l_user.data)
			except StopIteration:

			if (

			tensor_meta = pyds.NvDsInferTensorMeta.cast(user_meta.user_meta_data)

			# Boxes in the tensor meta should be in network resolution which is
			# found in tensor_meta.network_info. Use this info to scale boxes to
			# the input frame resolution.
			layers_info = []

			for i in range(tensor_meta.num_output_layers):
				layer = pyds.get_nvds_LayerInfo(tensor_meta, i)

			frame_object_list = nvds_infer_parse_custom_tf_ssd(
				layers_info, detection_params, box_size_param, nms_param
				l_user = l_user.next
			except StopIteration:

			all_poses += frame_object_list

		add_obj_meta_to_frame(frame_image, all_poses, batch_meta, frame_meta)
			l_frame = l_frame.next
		except StopIteration:
	return Gst.PadProbeReturn.OK

Another weird thing is, I can’t get any of my layer outputs inside layers_info if I attach the probe to filter. If I attach this function to pgie src pad, I can get layer outputs.

Thanks again!


That’s because layer output is the metadata generated by the TensorRT.
Do you get the raw image with the sample shared above?



Thank you very much. My problem was the output shape of streammux. That’s why I was getting a small image. Totally missed that :)

Thanks again.

Good to know this.
Is every working after correcting the streammux issue?