[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster
Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4cd1e85..0b3dd94 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,7 +26,6 @@
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events"
grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -42,17 +41,29 @@
stopped chan struct{}
}
+const (
+ adapterMessageBus = "adapter-message-bus"
+ clusterMessageBus = "cluster-message-bus"
+)
+
// NewCore creates instance of rw core
func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
// If the context has a probe then fetch it and register our services
if p := probe.GetProbeFromContext(ctx); p != nil {
p.RegisterService(
ctx,
- "message-bus",
+ adapterMessageBus,
"kv-store",
"adapter-manager",
"grpc-service",
)
+
+ if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+ p.RegisterService(
+ ctx,
+ clusterMessageBus,
+ )
+ }
}
// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
@@ -121,12 +132,17 @@
kafka.AutoCreateTopic(true),
kafka.MetadatMaxRetries(15),
)
+
// create event proxy
- eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
- if err := kafkaClientEvent.Start(ctx); err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+ eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+ if err != nil {
+ logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
return
}
+ if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+ // if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
+ go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
+ }
defer kafkaClientEvent.Stop(ctx)
@@ -141,11 +157,11 @@
// core.kmp must be created before deviceMgr and adapterMgr
kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
if err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-connection")
+ logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
return
}
defer kmp.Stop(ctx)
- go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+ go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)