Save message to file when Kafka broker is down

Hello, I have a simple Deepstream application in Python on my Jetson Nano, it looks like this:

source → pgie → tracker → msgconv → msgbroker

I want to handle an internet outage, meaning when the kafka broker is unreachable the application should save the message in a JSON file for example, and when it’s reachable again it continues sending the new messages to the broker as usual.

Basically the application should “switch” between online and offline mode at runtime. Any thoughts on how to do it?
Any help would be greatly appreciated!

• Hardware Platform: Jetson Nano 4GB
• DeepStream 5.1
• JetPack Version 4.5.1
• TensorRT Version 7.1.3
• Issue Type: Question

Hi,
when the connect_callback() within the caller (application or gst-msgbroker library) receives a connection error - you should first disconnect the existing connection handle, call reconnect()… reconnect() implementation should spawn a new thread, periodically call kafka adapter connect() and when success, it will return a new connection handle

Hello,

So from what I understand, with this change, in the absence of an internet connection, the msgbroker will keep trying to reconnect instead of failing to start, which means that the application will not survive an internet outage (it will be down until the internet connection is restored) so all the objects that are passing by at the time of that internet outage will be missed, that’s why I wanted the broker to save the payload in JSON files. In other words here’s the behavior that I’m hoping to acheive:

IF

The application is starting but the kafka broker is unreachable

OR

The application started fine (broker was initially reachable and metadata was being sent to kafka topic) but the broker suddenly became unreachable

THEN

The nvmsgbroker should save the payload of any new object metadata in a JSON file and simultaniously keep trying to reconnect until the internet connection is restored

AND

When the internet connection is restored, it should return back to sending any new payload as usual without having to worry about the previous payloads that have been saved as JSON files.

We will have auto reconnect feature in future release, i’d recommend you wait for the future release.

I wish I could wait for a new release but I don’t have the luxury of time, I need to deploy the app on my Nano ASAP.

I already implemented the reconnect feature by creating a flag self->online
If this flag is false then all subsequent calls to the legacy_gst_nvmsgbroker_render function should try to reconnect and each time they fail to do so, they save the current payload in a JSON file, like this:

static GstFlowReturn
legacy_gst_nvmsgbroker_render (GstBaseSink * sink, GstBuffer * buf)
{
...
          if (self->asyncSend) {
            if (!self->online) {
                g_print("TRYING TO RECONNECT--------------------------\n");
		        self->connHandle = self->nvds_msgapi_connect (self->connStr,
		                       (nvds_msgapi_connect_cb_t) nvds_msgapi_connect_callback,
		                       self->configFile);

		        if (self->connHandle) {
		        g_print("SWITCHED TO ONLINE MODE--------------------------\n");
		        self->online = TRUE;
		    }
		    else {
                //SAVE PAYLOAD TO JSON FILE
			    i = i+1;
			    g_mutex_lock (&self->flowLock);
			    sprintf(full_path, "%s%ld_%ld_%d.json", "/opt/nvidia/deepstream/deepstream-5.1/sources/yolo/out/json/", ((NvDsFrameMeta*)frame_meta)->ntp_timestamp, ((NvDsFrameMeta*)frame_meta)->buf_pts, i);
			    if((file = fopen(full_path, "wb")) != NULL) {
				    fwrite((uint8_t *) payload->payload, 1, payload->payloadSize, file);
				    fclose(file);
			    }
			    g_mutex_unlock (&self->flowLock);		    
		    }
            if (self->online) {
                //SEND TO KAFKA BROKER AS USUAL
	        }
        } //END ASYNC SENDING

Now I just need to somehow get the payload in the nvds_msgapi_send_callback() function so that I can do this:

  if (status != NVDS_MSGAPI_OK) {
    fwrite(...); // SAVE PAYLOAD TO FILE
    g_print("switching to offline mode");
    self->online = false;
    GST_ERROR_OBJECT (self, "error(%d) in sending data", status);
  }

A few line before
if (self->asyncSend) {,
as below have got the payload, you can use it for saving to file.

    if (user_meta && user_meta->base_meta.meta_type == NVDS_PAYLOAD_META) {
      payload = (NvDsPayload *) user_meta->user_meta_data;

This topic was automatically closed 60 days after the last reply. New replies are no longer allowed.