[VOL-3885] Adding retry mechanism when connecting to the kafka event cluster
Change-Id: I38267923ba006ea099b25863e98ff286efa9bbd6
diff --git a/go.mod b/go.mod
index deb76c4..021f5f3 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@
github.com/gogo/protobuf v1.3.2
github.com/golang/protobuf v1.3.2
github.com/google/uuid v1.1.1
- github.com/opencord/voltha-lib-go/v4 v4.0.10
+ github.com/opencord/voltha-lib-go/v4 v4.0.11
github.com/opencord/voltha-protos/v4 v4.0.13
github.com/opentracing/opentracing-go v1.1.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
diff --git a/go.sum b/go.sum
index 9c5a447..4037463 100644
--- a/go.sum
+++ b/go.sum
@@ -141,8 +141,8 @@
github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v4 v4.0.10 h1:mSi9e3TD+liit5NbV+AKEJ2dza3n+DpzdExJog2LtTo=
-github.com/opencord/voltha-lib-go/v4 v4.0.10/go.mod h1:K7lDkSkJ97EyfvX8fQtBmBvpj7n6MmwnAtD8Jz79HcQ=
+github.com/opencord/voltha-lib-go/v4 v4.0.11 h1:qBG+tOzJI/Iwd00sr3rkx+OfN0zxd8+Tou7hI6qumXk=
+github.com/opencord/voltha-lib-go/v4 v4.0.11/go.mod h1:K7lDkSkJ97EyfvX8fQtBmBvpj7n6MmwnAtD8Jz79HcQ=
github.com/opencord/voltha-protos/v4 v4.0.12 h1:x8drb8inaUByjVLFbXSiQwRTU//dfde0MKIHyKb1JMw=
github.com/opencord/voltha-protos/v4 v4.0.12/go.mod h1:W/OIFIyvFh/C0vchRUuarIsMylEhzCRM9pNxLvkPtKc=
github.com/opencord/voltha-protos/v4 v4.0.13 h1:4D6jZLrNDwWC3dhRxNtTGeXMv3GKzRUaSkm0aDQwINQ=
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 4cd1e85..0b3dd94 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -26,7 +26,6 @@
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
conf "github.com/opencord/voltha-lib-go/v4/pkg/config"
- "github.com/opencord/voltha-lib-go/v4/pkg/events"
grpcserver "github.com/opencord/voltha-lib-go/v4/pkg/grpc"
"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
@@ -42,17 +41,29 @@
stopped chan struct{}
}
+const (
+ adapterMessageBus = "adapter-message-bus"
+ clusterMessageBus = "cluster-message-bus"
+)
+
// NewCore creates instance of rw core
func NewCore(ctx context.Context, id string, cf *config.RWCoreFlags) *Core {
// If the context has a probe then fetch it and register our services
if p := probe.GetProbeFromContext(ctx); p != nil {
p.RegisterService(
ctx,
- "message-bus",
+ adapterMessageBus,
"kv-store",
"adapter-manager",
"grpc-service",
)
+
+ if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+ p.RegisterService(
+ ctx,
+ clusterMessageBus,
+ )
+ }
}
// new threads will be given a new cancelable context, so that they can be aborted later when Stop() is called
@@ -121,12 +132,17 @@
kafka.AutoCreateTopic(true),
kafka.MetadatMaxRetries(15),
)
+
// create event proxy
- eventProxy := events.NewEventProxy(events.MsgClient(kafkaClientEvent), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
- if err := kafkaClientEvent.Start(ctx); err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address")
+ eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+ if err != nil {
+ logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
return
}
+ if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
+ // if we're using a single kafka cluster we don't need two liveliness probes on the same cluster
+ go monitorKafkaLiveness(ctx, eventProxy, cf.LiveProbeInterval, cf.NotLiveProbeInterval, clusterMessageBus)
+ }
defer kafkaClientEvent.Stop(ctx)
@@ -141,11 +157,11 @@
// core.kmp must be created before deviceMgr and adapterMgr
kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
if err != nil {
- logger.Warn(ctx, "failed-to-setup-kafka-connection")
+ logger.Warn(ctx, "failed-to-setup-kafka-adapter-proxy-connection")
return
}
defer kmp.Stop(ctx)
- go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval)
+ go monitorKafkaLiveness(ctx, kmp, cf.LiveProbeInterval, cf.NotLiveProbeInterval, adapterMessageBus)
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index f8a956d..d45a84e 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,6 +18,7 @@
import (
"context"
+ "github.com/opencord/voltha-lib-go/v4/pkg/events"
"time"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -32,7 +33,7 @@
func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
logger.Infow(ctx, "initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPreparing)
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
@@ -40,7 +41,7 @@
kafka.MsgClient(kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusPrepared)
// wait for connectivity
logger.Infow(ctx, "starting-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
@@ -51,8 +52,8 @@
if err := kmp.Start(ctx); err != nil {
// We failed to start. Delay and then try again later.
// Don't worry about liveness, as we can't be live until we've started.
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Infow(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ probe.UpdateStatusFromContext(ctx, adapterMessageBus, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "error-starting-kafka-messaging-proxy", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
case <-ctx.Done():
@@ -68,6 +69,31 @@
return kmp, nil
}
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+ ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
+ for {
+ if err := kafkaClient.Start(ctx); err != nil {
+ probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+ logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
+ select {
+ case <-time.After(connectionRetryInterval):
+ continue
+ case <-ctx.Done():
+ return nil, ctx.Err()
+ }
+ }
+ logger.Info(ctx, "started-connection-on-kafka-cluster-address")
+ break
+ }
+ return ep, nil
+}
+
+// Interface that is valid for both EventProxy and InterContainerProxy
+type KafkaProxy interface {
+ EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+ SendLiveness(ctx context.Context) error
+}
+
/*
* monitorKafkaLiveness is responsible for monitoring the Kafka Interadapter Proxy connectivity state
*
@@ -97,29 +123,29 @@
* liveProbeInterval and notLiveProbeInterval can be configured separately,
* though the current default is that both are set to 60 seconds.
*/
-func monitorKafkaLiveness(ctx context.Context, kmp kafka.InterContainerProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration) {
- logger.Info(ctx, "started-kafka-message-proxy")
+func monitorKafkaLiveness(ctx context.Context, kmp KafkaProxy, liveProbeInterval time.Duration, notLiveProbeInterval time.Duration, serviceName string) {
+ logger.Infow(ctx, "started-kafka-message-proxy", log.Fields{"service": serviceName})
livenessChannel := kmp.EnableLivenessChannel(ctx, true)
- logger.Info(ctx, "enabled-kafka-liveness-channel")
+ logger.Infow(ctx, "enabled-kafka-liveness-channel", log.Fields{"service": serviceName})
timeout := liveProbeInterval
for {
timeoutTimer := time.NewTimer(timeout)
select {
case liveness := <-livenessChannel:
- logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness})
+ logger.Infow(ctx, "kafka-manager-thread-liveness-event", log.Fields{"liveness": liveness, "service": serviceName})
// there was a state change in Kafka liveness
if !liveness {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusNotReady)
- logger.Info(ctx, "kafka-manager-thread-set-server-notready")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
+ logger.Infow(ctx, "kafka-manager-thread-set-server-notready", log.Fields{"service": serviceName})
// retry frequently while life is bad
timeout = notLiveProbeInterval
} else {
- probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusRunning)
- logger.Info(ctx, "kafka-manager-thread-set-server-ready")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "kafka-manager-thread-set-server-ready", log.Fields{"service": serviceName})
// retry infrequently while life is good
timeout = liveProbeInterval
@@ -128,14 +154,14 @@
<-timeoutTimer.C
}
case <-timeoutTimer.C:
- logger.Info(ctx, "kafka-proxy-liveness-recheck")
+ logger.Infow(ctx, "kafka-proxy-liveness-recheck", log.Fields{"service": serviceName})
// send the liveness probe in a goroutine; we don't want to deadlock ourselves as
// the liveness probe may wait (and block) writing to our channel.
go func() {
err := kmp.SendLiveness(ctx)
if err != nil {
// Catch possible error case if sending liveness after Sarama has been stopped.
- logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err})
+ logger.Warnw(ctx, "error-kafka-send-liveness", log.Fields{"error": err, "service": serviceName})
}
}()
case <-ctx.Done():
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
index 7418ea1..35f821f 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/eventif/events_proxy_if.go
@@ -29,6 +29,8 @@
subCategory EventSubCategory, raisedTs int64) error
SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
subCategory *EventSubCategory, raisedTs int64) error
+ EnableLivenessChannel(ctx context.Context, enable bool) chan bool
+ SendLiveness(ctx context.Context) error
}
const (
diff --git a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
index c4014ee..2301f43 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v4/pkg/events/events_proxy.go
@@ -187,3 +187,11 @@
return nil
}
+
+func (ep *EventProxy) EnableLivenessChannel(ctx context.Context, enable bool) chan bool {
+ return ep.kafkaClient.EnableLivenessChannel(ctx, enable)
+}
+
+func (ep *EventProxy) SendLiveness(ctx context.Context) error {
+ return ep.kafkaClient.SendLiveness(ctx)
+}
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 93a59a8..fab3fa9 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -84,7 +84,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v4 v4.0.10
+# github.com/opencord/voltha-lib-go/v4 v4.0.11
github.com/opencord/voltha-lib-go/v4/pkg/adapters
github.com/opencord/voltha-lib-go/v4/pkg/adapters/adapterif
github.com/opencord/voltha-lib-go/v4/pkg/adapters/common