A problem about the plugin of Gst-nvmsgbroker

When I added the plug-in gst-nvmsgbroker, the program was unable to accept the RTSP stream and blocked.My code is as follows:
gboolean
create_cloud_bin(NvDsCloudConfig * config, NvDsCloudBin * bin)
{
gboolean ret = FALSE;

bin->bin = gst_bin_new("cloud_bin");
if (!bin->bin) {
	NVGSTDS_ERR_MSG_V("Failed to create 'cloud_bin'");
	goto done;
}

bin->queue = 
	gst_element_factory_make("queue", "cloud_queue");
if (!bin->queue) {
	NVGSTDS_ERR_MSG_V("Failed to create 'queue'");
	goto done;
}

bin->msgconv =
	gst_element_factory_make("nvmsgconv", "msgconv");
if (!bin->msgconv) {
	NVGSTDS_ERR_MSG_V("Failed to create 'msgconv'");
	goto done;
}

bin->msgbroker =
	gst_element_factory_make("nvmsgbroker", "msgbroker");
if (!bin->msgbroker) {
	NVGSTDS_ERR_MSG_V("Failed to create 'msgbroker'");
	goto done;
}

g_object_set(G_OBJECT(bin->msgconv), "config", config->config,
	"payload-type", config->payload_type,
	NULL);

g_object_set(G_OBJECT(bin->msgbroker), "config", config->broker_config,
	"conn-str", config->conn_str,
	"proto-lib", config->proto_lib,
	"topic", config->topic, "sync", FALSE,
	NULL);

gst_bin_add_many(GST_BIN(bin->bin), bin->queue, bin->msgconv, bin->msgbroker, NULL);

NVGSTDS_LINK_ELEMENT(bin->queue, bin->msgconv);

NVGSTDS_LINK_ELEMENT(bin->msgconv, bin->msgbroker);

NVGSTDS_BIN_ADD_GHOST_PAD(bin->bin, bin->queue, "sink");

ret = TRUE;

GST_CAT_DEBUG(NVDS_APP, "cloud bin created successfully");

done:
if (!ret) {
NVGSTDS_ERR_MSG_V("%s failed", func);
}
return ret;
}

The contents of the configuration file I tested worked well in deepstream-test4, which was able to load RTSP streams and play smoothly while also sending data to the server.

I looked inside the plug-in nvmsgbroker when the function gst_nvmsgbroker_do_work is executed, it is blocked at g_cond_wait (&self->flowCond, &self->flowLock), as follows:
static gpointer
gst_nvmsgbroker_do_work (gpointer data)
{

GstNvMsgBroker *self = (GstNvMsgBroker *) data;

while (self->isRunning) {
g_mutex_lock (&self->flowLock);
while (self->isRunning && self->pendingCbCount <= 0) {
g_cond_wait (&self->flowCond, &self->flowLock);
}
g_mutex_unlock (&self->flowLock);

if (!self->isRunning) {
	return NULL;
}
self->nvds_msgapi_do_work (self->connHandle);
// wait 10ms.
g_usleep (10 * 1000);

}

return self;
}

Hi,
deepstream-test5 should be same as your usecase. Please refer to it. It can configure to RTSP source and also enable nvmsgbroker.

Hi DaneLLL
I compiled deepstream-test5 and ran the following command:

<i><b>./deepstream-test5-app -c configs/test5_dec_infer-resnet_tracker_sgie_tiled_display_int8.txt -p 0</b></i>

The program crashed while it was running

Using winsys: x11 
Creating LL OSD context new
0:00:00.798397027 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<secondary_gie_2> NvDsInferContext[UID 6]:useEngineFile(): Failed to read from model engine file
0:00:00.798466915 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<secondary_gie_2> NvDsInferContext[UID 6]:initialize(): Trying to create engine from model files
0:00:00.798631620 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<secondary_gie_2> NvDsInferContext[UID 6]:generateTRTModel(): INT8 not supported by platform. Trying FP16 mode.
Warning: Flatten layer ignored. TensorRT implicitly flattens input to FullyConnected layers, but in other circumstances this will result in undefined behavior.

0:01:06.287058986 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<secondary_gie_2> NvDsInferContext[UID 6]:generateTRTModel(): Storing the serialized cuda engine to file at /opt/nvidia/deepstream/deepstream-4.0/samples/models/Secondary_CarMake/resnet18.caffemodel_b16_fp16.engine
0:01:06.397680516 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<secondary_gie_1> NvDsInferContext[UID 5]:useEngineFile(): Failed to read from model engine file
0:01:06.397734564 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<secondary_gie_1> NvDsInferContext[UID 5]:initialize(): Trying to create engine from model files
0:01:06.397841348 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<secondary_gie_1> NvDsInferContext[UID 5]:generateTRTModel(): INT8 not supported by platform. Trying FP16 mode.
Warning: Flatten layer ignored. TensorRT implicitly flattens input to FullyConnected layers, but in other circumstances this will result in undefined behavior.

0:02:07.233649928 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<secondary_gie_1> NvDsInferContext[UID 5]:generateTRTModel(): Storing the serialized cuda engine to file at /opt/nvidia/deepstream/deepstream-4.0/samples/models/Secondary_CarColor/resnet18.caffemodel_b16_fp16.engine
0:02:07.302645265 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<secondary_gie_0> NvDsInferContext[UID 4]:useEngineFile(): Failed to read from model engine file
0:02:07.302699729 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<secondary_gie_0> NvDsInferContext[UID 4]:initialize(): Trying to create engine from model files
0:02:07.302825554 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<secondary_gie_0> NvDsInferContext[UID 4]:generateTRTModel(): INT8 not supported by platform. Trying FP16 mode.
Warning: Flatten layer ignored. TensorRT implicitly flattens input to FullyConnected layers, but in other circumstances this will result in undefined behavior.
0:03:06.541424653 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<secondary_gie_0> NvDsInferContext[UID 4]:generateTRTModel(): Storing the serialized cuda engine to file at /opt/nvidia/deepstream/deepstream-4.0/samples/models/Secondary_VehicleTypes/resnet18.caffemodel_b16_fp16.engine
gstnvtracker: Loading low-level lib at /opt/nvidia/deepstream/deepstream-4.0/lib/libnvds_mot_klt.so
gstnvtracker: Optional NvMOT_RemoveStreams not implemented
gstnvtracker: Batch processing is OFF
0:03:06.721991949 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<primary_gie_classifier> NvDsInferContext[UID 1]:useEngineFile(): Failed to read from model engine file
0:03:06.722112173 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<primary_gie_classifier> NvDsInferContext[UID 1]:initialize(): Trying to create engine from model files
0:03:06.722231629 16110   0x55955948d0 WARN                 nvinfer gstnvinfer.cpp:515:gst_nvinfer_logger:<primary_gie_classifier> NvDsInferContext[UID 1]:generateTRTModel(): INT8 not supported by platform. Trying FP16 mode.
0:03:48.372441958 16110   0x55955948d0 INFO                 nvinfer gstnvinfer.cpp:519:gst_nvinfer_logger:<primary_gie_classifier> NvDsInferContext[UID 1]:generateTRTModel(): Storing the serialized cuda engine to file at /opt/nvidia/deepstream/deepstream-4.0/samples/models/Primary_Detector/resnet10.caffemodel_b2_fp16.engine

Runtime commands:
	h: Print this help
	q: Quit

	p: Pause
	r: Resume

NOTE: To expand a source in the 2D tiled display and view object details, left-click on the source.
      To go back to the tiled display, right-click anywhere on the window.

**PERF: FPS 0 (Avg)	FPS 1 (Avg)	
**PERF: 0.00 (0.00)	0.00 (0.00)	
** INFO: <bus_callback:189>: Pipeline ready

Opening in BLOCKING MODE 
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
Opening in BLOCKING MODE 
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
Creating LL OSD context new
** INFO: <bus_callback:175>: Pipeline running

KLT Tracker Init
KLT Tracker Init
ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Reset source pipeline reset_source_pipeline 0x7f7c1e0080
,ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Opening in BLOCKING MODE 
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
** WARN: <bbox_generated_probe_after_analytics:554>: Source 0: NTP timestamps are backward in time. Current: 0 previous: 1960000000
**PERF: 20.48 (20.48)	25.24 (25.24)	
ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Reset source pipeline reset_source_pipeline 0x7f7c1e0080
,ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Opening in BLOCKING MODE 
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
** WARN: <bbox_generated_probe_after_analytics:554>: Source 0: NTP timestamps are backward in time. Current: 0 previous: 5721000000
**PERF: 21.70 (21.13)	25.43 (25.34)	
ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Reset source pipeline reset_source_pipeline 0x7f7c1e0080
,ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Opening in BLOCKING MODE 
NvMMLiteOpen : Block : BlockType = 261 
NVMEDIA: Reading vendor.tegra.display-size : status: 6 
NvMMLiteBlockCreate : Block : BlockType = 261 
** WARN: <bbox_generated_probe_after_analytics:554>: Source 0: NTP timestamps are backward in time. Current: 0 previous: 1960000000
ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
Reset source pipeline reset_source_pipeline 0x7f7c1e0080
,ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
**PERF: 18.98 (20.39)	24.98 (25.22)	
ERROR from src_elem0: Could not read from resource.
Debug info: gstrtspsrc.c(5216): gst_rtspsrc_loop_interleaved (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
Could not receive message. (Parse error)
ERROR from src_elem0: Internal data stream error.
Debug info: gstrtspsrc.c(5653): gst_rtspsrc_loop (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstRTSPSrc:src_elem0:
streaming stopped, reason error (-5)
ERROR from decodebin_elem0: GStreamer encountered a general stream error.
Debug info: gstdecodebin2.c(4695): gst_decode_bin_expose (): /GstPipeline:pipeline/GstBin:multi_src_bin/GstBin:src_sub_bin0/GstDecodeBin:decodebin_elem0:
all streams without buffers
Quitting
App run failed

I changed the configuration file as follows:

[i][b][source0]
uri=rtsp://admin:1111@10.0.0.155:554
[sink1]
msg-broker-conn-str=39.97.109.63;9092;deepstream-data
topic=deepstream-data[/b][/i]

This is duplicate of https://devtalk.nvidia.com/default/topic/1065555/