[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster
Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index f8a956d..d45a84e 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,6 +18,7 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"time"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -32,7 +33,7 @@
func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
@@ -40,7 +41,7 @@
kafka.MsgClient(kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
// wait for connectivity
logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
@@ -51,8 +52,8 @@
if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
case <-ctx.Done():
@@ -68,6 +69,31 @@
return kmp, nil
}
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+ ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
+ for {
+ if err := kafkaClient.Start(ctx); err != nil {
+ probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
+ select {
+ case <-time.After(connectionRetryInterval):
+ continue
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+ logger.Info(ctx, "started-connection-on-kafka-cluster-address")
+ break
+ }
+ return ep, nil
+}
+
+// Interface that is valid for both EventProxy and InterContainerProxy
+type KafkaProxy interface {
+ EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+ SendLiveness(ctx context.Context) error
+}
+
/*
* monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
*
@@ -97,29 +123,29 @@
* liveProbeInterval and notLiveProbeInterval can be configured separately,
* though the current default is that both are set to 60 seconds.
*/
-func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
- logger.Info(ctx, "started-kafka-message-proxy")
+func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
+ logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
livenessChannel := kmp.EnableLivenessChannel(ctx, true)
- logger.Info(ctx, "enabled-kafka-liveness-channel")
+ logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
timeout := liveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveness := <-livenessChannel:
- logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+ logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
// there was a state change in Kafka liveness
if !liveness {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Info(ctx, "kafka-manager-thread-set-server-notready")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
// retry frequently while life is bad
timeout = notLiveProbeInterval
} else {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- logger.Info(ctx, "kafka-manager-thread-set-server-ready")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
// retry infrequently while life is good
timeout = liveProbeInterval
@@ -128,14 +154,14 @@
<-timeoutTimer.C
}
case <-timeoutTimer.C:
- logger.Info(ctx, "kafka-proxy-liveness-recheck")
+ logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
- logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+ logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
}
}()
case <-ctx.Done():