[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster

Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4cd1e85..0b3dd94 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,7 +26,6 @@
 	"github.com/opencord/voltha-go/rw_core/core/api"
 	"github.com/opencord/voltha-go/rw_core/core/device"
 	conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
-	"github.com/opencord/voltha-lib-go/v4/pkg/events"
 	grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
 	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -42,17 +41,29 @@
 	stopped  chan struct{}
 }
 
+const (
+	adapterMessageBus = "adapter-message-bus"
+	clusterMessageBus = "cluster-message-bus"
+)
+
 // NewCore creates instance of rw core
 func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
 	// If the context has a probe then fetch it and register our services
 	if p := probe.GetProbeFromContext(ctx); p != nil {
 		p.RegisterService(
 			ctx,
-			"message-bus",
+			adapterMessageBus,
 			"kv-store",
 			"adapter-manager",
 			"grpc-service",
 		)
+
+		if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+			p.RegisterService(
+				ctx,
+				clusterMessageBus,
+			)
+		}
 	}
 
 	// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
@@ -121,12 +132,17 @@
 		kafka.AutoCreateTopic(true),
 		kafka.MetadatMaxRetries(15),
 	)
+
 	// create event proxy
-	eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
-	if err := kafkaClientEvent.Start(ctx); err != nil {
-		logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+	eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+	if err != nil {
+		logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
 		return
 	}
+	if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+		// if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
+		go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
+	}
 
 	defer kafkaClientEvent.Stop(ctx)
 
@@ -141,11 +157,11 @@
 	// core.kmp must be created before deviceMgr and adapterMgr
 	kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
 	if err != nil {
-		logger.Warn(ctx, "failed-to-setup-kafka-connection")
+		logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
 		return
 	}
 	defer kmp.Stop(ctx)
-	go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+	go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
 
 	// create the core of the system, the device managers
 	endpointMgr := kafka.NewEndpointManager(backend)