[VOL-3981] Voltha Core restarts if it can't subscribe to Kafka

This commit fixes the following issues:
1) It creates a loop to try to resubscribe to kafka on error.  This
is an issue that occurs randomly, especially when the kafka broker
is up and running but not truly ready to create a new topic.

2) Fixes an issue where the event proxy start was incorrect setting
the cluster messaging bus probe to out of service and never sets it
to running.   This was causing the Core to wait forever for the
probe to be ready.

Change-Id: Idf22481f85e4b576440301f2859da7ddf2d8c688
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index d45a84e..ae33090 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,7 +18,10 @@
 
 import (
 	"context"
+	"github.com/cenkalti/backoff/v3"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"github.com/opencord/voltha-lib-go/v4/pkg/events"
+
 	"time"
 
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -69,11 +72,13 @@
 	return kmp, nil
 }
 
-func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration, updateProbeService bool) (*events.EventProxy, error) {
 	ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
 	for {
 		if err := kafkaClient.Start(ctx); err != nil {
-			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			if updateProbeService {
+				probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			}
 			logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
 			select {
 			case <-time.After(connectionRetryInterval):
@@ -82,6 +87,9 @@
 				return nil, ctx.Err()
 			}
 		}
+		if updateProbeService {
+			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
+		}
 		logger.Info(ctx, "started-connection-on-kafka-cluster-address")
 		break
 	}
@@ -170,13 +178,45 @@
 	}
 }
 
-func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager,
+	aMgr *adapter.Manager, cf *config.RWCoreFlags, serviceName string) {
+	logger.Infow(ctx, "registering-request-handler", log.Fields{"topic": cf.CoreTopic})
+
+	// Set the exponential backoff params
+	kafkaRetryBackoff := backoff.NewExponentialBackOff()
+	kafkaRetryBackoff.InitialInterval = cf.BackoffRetryInitialInterval
+	kafkaRetryBackoff.MaxElapsedTime = cf.BackoffRetryMaxElapsedTime
+	kafkaRetryBackoff.MaxInterval = cf.BackoffRetryMaxInterval
+
+	//For initial request, do not wait
+	backoffTimer := time.NewTimer(0)
+
+	probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
 	requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
-
-	// Register the broadcast topic to handle any core-bound broadcast requests
-	if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
-		logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+	for {
+		select {
+		case <-backoffTimer.C:
+			// Register the broadcast topic to handle any core-bound broadcast requests
+			err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: cf.CoreTopic}, requestProxy)
+			if err == nil {
+				logger.Infow(ctx, "request-handler-registered", log.Fields{"topic": cf.CoreTopic})
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				return
+			}
+			logger.Errorw(ctx, "failed-registering-broadcast-handler-retrying", log.Fields{"topic": cf.CoreTopic})
+			duration := kafkaRetryBackoff.NextBackOff()
+			//This case should never occur(by default) as max elapsed time for backoff is 0(by default) , so it will never return stop
+			if duration == backoff.Stop {
+				// If we reach a maximum then warn and reset the backoff timer and keep attempting.
+				logger.Warnw(ctx, "maximum-kafka-retry-backoff-reached-resetting",
+					log.Fields{"max-kafka-retry-backoff": kafkaRetryBackoff.MaxElapsedTime})
+				kafkaRetryBackoff.Reset()
+				duration = kafkaRetryBackoff.NextBackOff()
+			}
+			backoffTimer = time.NewTimer(duration)
+		case <-ctx.Done():
+			logger.Infow(ctx, "context-closed", log.Fields{"topic": cf.CoreTopic})
+			return
+		}
 	}
-
-	logger.Info(ctx, "request-handler-registered")
 }