[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster

Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index f8a956d..d45a84e 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"time"
 
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -32,7 +33,7 @@
 func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
 	logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
 
-	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+	probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
 
 	// create the kafka RPC proxy
 	kmp := kafka.NewInterContainerProxy(
@@ -40,7 +41,7 @@
 		kafka.MsgClient(kafkaClient),
 		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
 
-	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+	probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
 
 	// wait for connectivity
 	logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
@@ -51,8 +52,8 @@
 		if err := kmp.Start(ctx); err != nil {
 			// We failed to start. Delay and then try again later.
 			// Don't worry about liveness, as we can't be live until we've started.
-			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-			logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+			probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
 			select {
 			case <-time.After(connectionRetryInterval):
 			case <-ctx.Done():
@@ -68,6 +69,31 @@
 	return kmp, nil
 }
 
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+	ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
+	for {
+		if err := kafkaClient.Start(ctx); err != nil {
+			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
+			select {
+			case <-time.After(connectionRetryInterval):
+				continue
+			case <-ctx.Done():
+				return nil, ctx.Err()
+			}
+		}
+		logger.Info(ctx, "started-connection-on-kafka-cluster-address")
+		break
+	}
+	return ep, nil
+}
+
+// Interface that is valid for both EventProxy and InterContainerProxy
+type KafkaProxy interface {
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	SendLiveness(ctx context.Context) error
+}
+
 /*
  * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
  *
@@ -97,29 +123,29 @@
  * liveProbeInterval and notLiveProbeInterval can be configured separately,
  * though the current default is that both are set to 60 seconds.
  */
-func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
-	logger.Info(ctx, "started-kafka-message-proxy")
+func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
+	logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
 
 	livenessChannel := kmp.EnableLivenessChannel(ctx, true)
 
-	logger.Info(ctx, "enabled-kafka-liveness-channel")
+	logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
 
 	timeout := liveProbeInterval
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 		select {
 		case liveness := <-livenessChannel:
-			logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+			logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
 			// there was a state change in Kafka liveness
 			if !liveness {
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-				logger.Info(ctx, "kafka-manager-thread-set-server-notready")
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+				logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
 
 				// retry frequently while life is bad
 				timeout = notLiveProbeInterval
 			} else {
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-				logger.Info(ctx, "kafka-manager-thread-set-server-ready")
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
 
 				// retry infrequently while life is good
 				timeout = liveProbeInterval
@@ -128,14 +154,14 @@
 				<-timeoutTimer.C
 			}
 		case <-timeoutTimer.C:
-			logger.Info(ctx, "kafka-proxy-liveness-recheck")
+			logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
 			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
 			// the liveness probe may wait (and block) writing to our channel.
 			go func() {
 				err := kmp.SendLiveness(ctx)
 				if err != nil {
 					// Catch possible error case if sending liveness after Sarama has been stopped.
-					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
 				}
 			}()
 		case <-ctx.Done():