[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster

Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/go.mod b/go.mod
index deb76c4..021f5f3 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@
 	github.com/gogo/protobuf v1.3.2
 	github.com/golang/protobuf v1.3.2
 	github.com/google/uuid v1.1.1
-	github.com/opencord/voltha-lib-go/v4 v4.0.10
+	github.com/opencord/voltha-lib-go/v4 v4.0.11
 	github.com/opencord/voltha-protos/v4 v4.0.13
 	github.com/opentracing/opentracing-go v1.1.0
 	github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
diff --git a/go.sum b/go.sum
index 9c5a447..4037463 100644
--- a/go.sum
+++ b/go.sum
@@ -141,8 +141,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v4 v4.0.10 h1:mSi9e3TD+liit5NbV+AKEJ2dza3n+DpzdExJog2LtTo=
-github.com/opencord/voltha-lib-go/v4 v4.0.10/go.mod h1:K7lDkSkJ97EyfvX8fQtBmBvpj7n6MmwnAtD8Jz79HcQ=
+github.com/opencord/voltha-lib-go/v4 v4.0.11 h1:qBG+tOzJI/Iwd00sr3rkx+OfN0zxd8+Tou7hI6qumXk=
+github.com/opencord/voltha-lib-go/v4 v4.0.11/go.mod h1:K7lDkSkJ97EyfvX8fQtBmBvpj7n6MmwnAtD8Jz79HcQ=
 github.com/opencord/voltha-protos/v4 v4.0.12 h1:x8drb8inaUByjVLFbXSiQwRTU//dfde0MKIHyKb1JMw=
 github.com/opencord/voltha-protos/v4 v4.0.12/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
 github.com/opencord/voltha-protos/v4 v4.0.13 h1:4D6jZLrNDwWC3dhRxNtTGeXMv3GKzRUaSkm0aDQwINQ=
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4cd1e85..0b3dd94 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,7 +26,6 @@
 	"github.com/opencord/voltha-go/rw_core/core/api"
 	"github.com/opencord/voltha-go/rw_core/core/device"
 	conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
-	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -42,17 +41,29 @@
 	stopped  chan struct{}
 }
 
+const (
+	adapterMessageBus = "adapter-message-bus"
+	clusterMessageBus = "cluster-message-bus"
+)
+
 // NewCore creates instance of rw core
 func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
 	// If the context has a probe then fetch it and register our services
 	if p := probe.GetProbeFromContext(ctx); p != nil {
 		p.RegisterService(
 			ctx,
-			"message-bus",
+			adapterMessageBus,
 			"kv-store",
 			"adapter-manager",
 			"grpc-service",
 		)
+
+		if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+			p.RegisterService(
+				ctx,
+				clusterMessageBus,
+			)
+		}
 	}
 
 	// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
@@ -121,12 +132,17 @@
 		kafka.AutoCreateTopic(true),
 		kafka.MetadatMaxRetries(15),
 	)
+
 	// create event proxy
-	eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
-	if err := kafkaClientEvent.Start(ctx); err != nil {
-		logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+	if err != nil {
+		logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
 		return
 	}
+	if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+		// if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
+		go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
+	}
 
 	defer kafkaClientEvent.Stop(ctx)
 
@@ -141,11 +157,11 @@
 	// core.kmp must be created before deviceMgr and adapterMgr
 	kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
 	if err != nil {
-		logger.Warn(ctx, "failed-to-setup-kafka-connection")
+		logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
 		return
 	}
 	defer kmp.Stop(ctx)
-	go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+	go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
 
 	// create the core of the system, the device managers
 	endpointMgr := kafka.NewEndpointManager(backend)
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index f8a956d..d45a84e 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	"time"
 
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -32,7 +33,7 @@
 func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
 	logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
 
-	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+	probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
 
 	// create the kafka RPC proxy
 	kmp := kafka.NewInterContainerProxy(
@@ -40,7 +41,7 @@
 		kafka.MsgClient(kafkaClient),
 		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
 
-	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+	probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
 
 	// wait for connectivity
 	logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
@@ -51,8 +52,8 @@
 		if err := kmp.Start(ctx); err != nil {
 			// We failed to start. Delay and then try again later.
 			// Don't worry about liveness, as we can't be live until we've started.
-			probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-			logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+			probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
 			select {
 			case <-time.After(connectionRetryInterval):
 			case <-ctx.Done():
@@ -68,6 +69,31 @@
 	return kmp, nil
 }
 
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+	ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
+	for {
+		if err := kafkaClient.Start(ctx); err != nil {
+			probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+			logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
+			select {
+			case <-time.After(connectionRetryInterval):
+				continue
+			case <-ctx.Done():
+				return nil, ctx.Err()
+			}
+		}
+		logger.Info(ctx, "started-connection-on-kafka-cluster-address")
+		break
+	}
+	return ep, nil
+}
+
+// Interface that is valid for both EventProxy and InterContainerProxy
+type KafkaProxy interface {
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	SendLiveness(ctx context.Context) error
+}
+
 /*
  * monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
  *
@@ -97,29 +123,29 @@
  * liveProbeInterval and notLiveProbeInterval can be configured separately,
  * though the current default is that both are set to 60 seconds.
  */
-func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
-	logger.Info(ctx, "started-kafka-message-proxy")
+func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
+	logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
 
 	livenessChannel := kmp.EnableLivenessChannel(ctx, true)
 
-	logger.Info(ctx, "enabled-kafka-liveness-channel")
+	logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
 
 	timeout := liveProbeInterval
 	for {
 		timeoutTimer := time.NewTimer(timeout)
 		select {
 		case liveness := <-livenessChannel:
-			logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+			logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
 			// there was a state change in Kafka liveness
 			if !liveness {
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
-				logger.Info(ctx, "kafka-manager-thread-set-server-notready")
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+				logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
 
 				// retry frequently while life is bad
 				timeout = notLiveProbeInterval
 			} else {
-				probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
-				logger.Info(ctx, "kafka-manager-thread-set-server-ready")
+				probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+				logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
 
 				// retry infrequently while life is good
 				timeout = liveProbeInterval
@@ -128,14 +154,14 @@
 				<-timeoutTimer.C
 			}
 		case <-timeoutTimer.C:
-			logger.Info(ctx, "kafka-proxy-liveness-recheck")
+			logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
 			// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
 			// the liveness probe may wait (and block) writing to our channel.
 			go func() {
 				err := kmp.SendLiveness(ctx)
 				if err != nil {
 					// Catch possible error case if sending liveness after Sarama has been stopped.
-					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+					logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
 				}
 			}()
 		case <-ctx.Done():
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
index 7418ea1..35f821f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
@@ -29,6 +29,8 @@
 		subCategory EventSubCategory, raisedTs int64) error
 	SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
 		subCategory *EventSubCategory, raisedTs int64) error
+	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+	SendLiveness(ctx context.Context) error
 }
 
 const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
index c4014ee..2301f43 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
@@ -187,3 +187,11 @@
 
 	return nil
 }
+
+func (ep *EventProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+	return ep.kafkaClient.EnableLivenessChannel(ctx, enable)
+}
+
+func (ep *EventProxy) SendLiveness(ctx context.Context) error {
+	return ep.kafkaClient.SendLiveness(ctx)
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 93a59a8..fab3fa9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -84,7 +84,7 @@
 github.com/modern-go/concurrent
 # github.com/modern-go/reflect2 v1.0.1
 github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v4 v4.0.10
+# github.com/opencord/voltha-lib-go/v4 v4.0.11
 github.com/opencord/voltha-lib-go/v4/pkg/adapters
 github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif
 github.com/opencord/voltha-lib-go/v4/pkg/adapters/common