Hello, I have a simple Deepstream application in Python on my Jetson Nano, it looks like this:
source → pgie → tracker → msgconv → msgbroker
I want to handle an internet outage, meaning when the kafka broker is unreachable the application should save the message in a JSON file for example, and when it’s reachable again it continues sending the new messages to the broker as usual.
Basically the application should “switch” between online and offline mode at runtime. Any thoughts on how to do it?
Any help would be greatly appreciated!
Hi,
when the connect_callback() within the caller (application or gst-msgbroker library) receives a connection error - you should first disconnect the existing connection handle, call reconnect()… reconnect() implementation should spawn a new thread, periodically call kafka adapter connect() and when success, it will return a new connection handle
So from what I understand, with this change, in the absence of an internet connection, the msgbroker will keep trying to reconnect instead of failing to start, which means that the application will not survive an internet outage (it will be down until the internet connection is restored) so all the objects that are passing by at the time of that internet outage will be missed, that’s why I wanted the broker to save the payload in JSON files. In other words here’s the behavior that I’m hoping to acheive:
IF
The application is starting but the kafka broker is unreachable
OR
The application started fine (broker was initially reachable and metadata was being sent to kafka topic) but the broker suddenly became unreachable
THEN
The nvmsgbroker should save the payload of any new object metadata in a JSON file and simultaniously keep trying to reconnect until the internet connection is restored
AND
When the internet connection is restored, it should return back to sending any new payload as usual without having to worry about the previous payloads that have been saved as JSON files.
I wish I could wait for a new release but I don’t have the luxury of time, I need to deploy the app on my Nano ASAP.
I already implemented the reconnect feature by creating a flag self->online
If this flag is false then all subsequent calls to the legacy_gst_nvmsgbroker_render function should try to reconnect and each time they fail to do so, they save the current payload in a JSON file, like this:
static GstFlowReturn
legacy_gst_nvmsgbroker_render (GstBaseSink * sink, GstBuffer * buf)
{
...
if (self->asyncSend) {
if (!self->online) {
g_print("TRYING TO RECONNECT--------------------------\n");
self->connHandle = self->nvds_msgapi_connect (self->connStr,
(nvds_msgapi_connect_cb_t) nvds_msgapi_connect_callback,
self->configFile);
if (self->connHandle) {
g_print("SWITCHED TO ONLINE MODE--------------------------\n");
self->online = TRUE;
}
else {
//SAVE PAYLOAD TO JSON FILE
i = i+1;
g_mutex_lock (&self->flowLock);
sprintf(full_path, "%s%ld_%ld_%d.json", "/opt/nvidia/deepstream/deepstream-5.1/sources/yolo/out/json/", ((NvDsFrameMeta*)frame_meta)->ntp_timestamp, ((NvDsFrameMeta*)frame_meta)->buf_pts, i);
if((file = fopen(full_path, "wb")) != NULL) {
fwrite((uint8_t *) payload->payload, 1, payload->payloadSize, file);
fclose(file);
}
g_mutex_unlock (&self->flowLock);
}
if (self->online) {
//SEND TO KAFKA BROKER AS USUAL
}
} //END ASYNC SENDING
Now I just need to somehow get the payload in the nvds_msgapi_send_callback() function so that I can do this:
if (status != NVDS_MSGAPI_OK) {
fwrite(...); // SAVE PAYLOAD TO FILE
g_print("switching to offline mode");
self->online = false;
GST_ERROR_OBJECT (self, "error(%d) in sending data", status);
}