[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster
Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4cd1e85..0b3dd94 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,7 +26,6 @@
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events"
grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -42,17 +41,29 @@
stopped chan struct{}
}
+const (
+ adapterMessageBus = "adapter-message-bus"
+ clusterMessageBus = "cluster-message-bus"
+)
+
// NewCore creates instance of rw core
func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
// If the context has a probe then fetch it and register our services
if p := probe.GetProbeFromContext(ctx); p != nil {
p.RegisterService(
ctx,
- "message-bus",
+ adapterMessageBus,
"kv-store",
"adapter-manager",
"grpc-service",
)
+
+ if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+ p.RegisterService(
+ ctx,
+ clusterMessageBus,
+ )
+ }
}
// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
@@ -121,12 +132,17 @@
kafka.AutoCreateTopic(true),
kafka.MetadatMaxRetries(15),
)
+
// create event proxy
- eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
- if err := kafkaClientEvent.Start(ctx); err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+ eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+ if err != nil {
+ logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
return
}
+ if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+ // if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
+ go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
+ }
defer kafkaClientEvent.Stop(ctx)
@@ -141,11 +157,11 @@
// core.kmp must be created before deviceMgr and adapterMgr
kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
if err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-connection")
+ logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
return
}
defer kmp.Stop(ctx)
- go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+ go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index f8a956d..d45a84e 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,6 +18,7 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"time"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -32,7 +33,7 @@
func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
@@ -40,7 +41,7 @@
kafka.MsgClient(kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
// wait for connectivity
logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
@@ -51,8 +52,8 @@
if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
case <-ctx.Done():
@@ -68,6 +69,31 @@
return kmp, nil
}
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+ ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
+ for {
+ if err := kafkaClient.Start(ctx); err != nil {
+ probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
+ select {
+ case <-time.After(connectionRetryInterval):
+ continue
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+ logger.Info(ctx, "started-connection-on-kafka-cluster-address")
+ break
+ }
+ return ep, nil
+}
+
+// Interface that is valid for both EventProxy and InterContainerProxy
+type KafkaProxy interface {
+ EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+ SendLiveness(ctx context.Context) error
+}
+
/*
* monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
*
@@ -97,29 +123,29 @@
* liveProbeInterval and notLiveProbeInterval can be configured separately,
* though the current default is that both are set to 60 seconds.
*/
-func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
- logger.Info(ctx, "started-kafka-message-proxy")
+func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
+ logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
livenessChannel := kmp.EnableLivenessChannel(ctx, true)
- logger.Info(ctx, "enabled-kafka-liveness-channel")
+ logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
timeout := liveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveness := <-livenessChannel:
- logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+ logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
// there was a state change in Kafka liveness
if !liveness {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Info(ctx, "kafka-manager-thread-set-server-notready")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
// retry frequently while life is bad
timeout = notLiveProbeInterval
} else {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- logger.Info(ctx, "kafka-manager-thread-set-server-ready")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
// retry infrequently while life is good
timeout = liveProbeInterval
@@ -128,14 +154,14 @@
<-timeoutTimer.C
}
case <-timeoutTimer.C:
- logger.Info(ctx, "kafka-proxy-liveness-recheck")
+ logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
- logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+ logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
}
}()
case <-ctx.Done():