[VOL-3981] Voltha Core restarts if it can't subscribe to Kafka
This commit fixes the following issues:
1) It creates a loop to try to resubscribe to kafka on error. This
is an issue that occurs randomly, especially when the kafka broker
is up and running but not truly ready to create a new topic.
2) Fixes an issue where the event proxy start was incorrect setting
the cluster messaging bus probe to out of service and never sets it
to running. This was causing the Core to wait forever for the
probe to be ready.
Change-Id: Idf22481f85e4b576440301f2859da7ddf2d8c688
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 2b5017e..6510964 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,104 +24,113 @@
// RW Core service default constants
const (
- EtcdStoreName = "etcd"
- defaultGrpcAddress = ":50057"
- defaultKafkaAdapterAddress = "127.0.0.1:9092"
- defaultKafkaClusterAddress = "127.0.0.1:9094"
- defaultKVStoreType = EtcdStoreName
- defaultKVStoreTimeout = 5 * time.Second
- defaultKVStoreAddress = "127.0.0.1:2379" // Etcd = 2379
- defaultKVTxnKeyDelTime = 60
- defaultLogLevel = "WARN"
- defaultBanner = false
- defaultDisplayVersionOnly = false
- defaultCoreTopic = "rwcore"
- defaultEventTopic = "voltha.events"
- defaultRWCoreEndpoint = "rwcore"
- defaultRWCoreKey = "pki/voltha.key"
- defaultRWCoreCert = "pki/voltha.crt"
- defaultRWCoreCA = "pki/voltha-CA.pem"
- defaultLongRunningRequestTimeout = 2000 * time.Millisecond
- defaultDefaultRequestTimeout = 1000 * time.Millisecond
- defaultCoreTimeout = 1000 * time.Millisecond
- defaultCoreBindingKey = "voltha_backend_name"
- defaultMaxConnectionRetries = -1 // retries forever
- defaultConnectionRetryInterval = 2 * time.Second
- defaultLiveProbeInterval = 60 * time.Second
- defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
- defaultProbeAddress = ":8080"
- defaultTraceEnabled = false
- defaultTraceAgentAddress = "127.0.0.1:6831"
- defaultLogCorrelationEnabled = true
- defaultVolthaStackID = "voltha"
+ EtcdStoreName = "etcd"
+ defaultGrpcAddress = ":50057"
+ defaultKafkaAdapterAddress = "127.0.0.1:9092"
+ defaultKafkaClusterAddress = "127.0.0.1:9094"
+ defaultKVStoreType = EtcdStoreName
+ defaultKVStoreTimeout = 5 * time.Second
+ defaultKVStoreAddress = "127.0.0.1:2379" // Etcd = 2379
+ defaultKVTxnKeyDelTime = 60
+ defaultLogLevel = "WARN"
+ defaultBanner = false
+ defaultDisplayVersionOnly = false
+ defaultCoreTopic = "rwcore"
+ defaultEventTopic = "voltha.events"
+ defaultRWCoreEndpoint = "rwcore"
+ defaultRWCoreKey = "pki/voltha.key"
+ defaultRWCoreCert = "pki/voltha.crt"
+ defaultRWCoreCA = "pki/voltha-CA.pem"
+ defaultLongRunningRequestTimeout = 2000 * time.Millisecond
+ defaultDefaultRequestTimeout = 1000 * time.Millisecond
+ defaultCoreTimeout = 1000 * time.Millisecond
+ defaultCoreBindingKey = "voltha_backend_name"
+ defaultMaxConnectionRetries = -1 // retries forever
+ defaultConnectionRetryInterval = 2 * time.Second
+ defaultLiveProbeInterval = 60 * time.Second
+ defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
+ defaultProbeAddress = ":8080"
+ defaultTraceEnabled = false
+ defaultTraceAgentAddress = "127.0.0.1:6831"
+ defaultLogCorrelationEnabled = true
+ defaultVolthaStackID = "voltha"
+ defaultBackoffRetryInitialInterval = 500 * time.Millisecond
+ defaultBackoffRetryMaxElapsedTime = 0
+ defaultBackoffRetryMaxInterval = 1 * time.Minute
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
- RWCoreEndpoint string
- GrpcAddress string
- KafkaAdapterAddress string
- KafkaClusterAddress string
- KVStoreType string
- KVStoreTimeout time.Duration
- KVStoreAddress string
- KVTxnKeyDelTime int
- CoreTopic string
- EventTopic string
- LogLevel string
- Banner bool
- DisplayVersionOnly bool
- RWCoreKey string
- RWCoreCert string
- RWCoreCA string
- LongRunningRequestTimeout time.Duration
- DefaultRequestTimeout time.Duration
- DefaultCoreTimeout time.Duration
- CoreBindingKey string
- MaxConnectionRetries int
- ConnectionRetryInterval time.Duration
- LiveProbeInterval time.Duration
- NotLiveProbeInterval time.Duration
- ProbeAddress string
- TraceEnabled bool
- TraceAgentAddress string
- LogCorrelationEnabled bool
- VolthaStackID string
+ RWCoreEndpoint string
+ GrpcAddress string
+ KafkaAdapterAddress string
+ KafkaClusterAddress string
+ KVStoreType string
+ KVStoreTimeout time.Duration
+ KVStoreAddress string
+ KVTxnKeyDelTime int
+ CoreTopic string
+ EventTopic string
+ LogLevel string
+ Banner bool
+ DisplayVersionOnly bool
+ RWCoreKey string
+ RWCoreCert string
+ RWCoreCA string
+ LongRunningRequestTimeout time.Duration
+ DefaultRequestTimeout time.Duration
+ DefaultCoreTimeout time.Duration
+ CoreBindingKey string
+ MaxConnectionRetries int
+ ConnectionRetryInterval time.Duration
+ LiveProbeInterval time.Duration
+ NotLiveProbeInterval time.Duration
+ ProbeAddress string
+ TraceEnabled bool
+ TraceAgentAddress string
+ LogCorrelationEnabled bool
+ VolthaStackID string
+ BackoffRetryInitialInterval time.Duration
+ BackoffRetryMaxElapsedTime time.Duration
+ BackoffRetryMaxInterval time.Duration
}
// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
- RWCoreEndpoint: defaultRWCoreEndpoint,
- GrpcAddress: defaultGrpcAddress,
- KafkaAdapterAddress: defaultKafkaAdapterAddress,
- KafkaClusterAddress: defaultKafkaClusterAddress,
- KVStoreType: defaultKVStoreType,
- KVStoreTimeout: defaultKVStoreTimeout,
- KVStoreAddress: defaultKVStoreAddress,
- KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
- CoreTopic: defaultCoreTopic,
- EventTopic: defaultEventTopic,
- LogLevel: defaultLogLevel,
- Banner: defaultBanner,
- DisplayVersionOnly: defaultDisplayVersionOnly,
- RWCoreKey: defaultRWCoreKey,
- RWCoreCert: defaultRWCoreCert,
- RWCoreCA: defaultRWCoreCA,
- DefaultRequestTimeout: defaultDefaultRequestTimeout,
- LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
- DefaultCoreTimeout: defaultCoreTimeout,
- CoreBindingKey: defaultCoreBindingKey,
- MaxConnectionRetries: defaultMaxConnectionRetries,
- ConnectionRetryInterval: defaultConnectionRetryInterval,
- LiveProbeInterval: defaultLiveProbeInterval,
- NotLiveProbeInterval: defaultNotLiveProbeInterval,
- ProbeAddress: defaultProbeAddress,
- TraceEnabled: defaultTraceEnabled,
- TraceAgentAddress: defaultTraceAgentAddress,
- LogCorrelationEnabled: defaultLogCorrelationEnabled,
- VolthaStackID: defaultVolthaStackID,
+ RWCoreEndpoint: defaultRWCoreEndpoint,
+ GrpcAddress: defaultGrpcAddress,
+ KafkaAdapterAddress: defaultKafkaAdapterAddress,
+ KafkaClusterAddress: defaultKafkaClusterAddress,
+ KVStoreType: defaultKVStoreType,
+ KVStoreTimeout: defaultKVStoreTimeout,
+ KVStoreAddress: defaultKVStoreAddress,
+ KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
+ CoreTopic: defaultCoreTopic,
+ EventTopic: defaultEventTopic,
+ LogLevel: defaultLogLevel,
+ Banner: defaultBanner,
+ DisplayVersionOnly: defaultDisplayVersionOnly,
+ RWCoreKey: defaultRWCoreKey,
+ RWCoreCert: defaultRWCoreCert,
+ RWCoreCA: defaultRWCoreCA,
+ DefaultRequestTimeout: defaultDefaultRequestTimeout,
+ LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
+ DefaultCoreTimeout: defaultCoreTimeout,
+ CoreBindingKey: defaultCoreBindingKey,
+ MaxConnectionRetries: defaultMaxConnectionRetries,
+ ConnectionRetryInterval: defaultConnectionRetryInterval,
+ LiveProbeInterval: defaultLiveProbeInterval,
+ NotLiveProbeInterval: defaultNotLiveProbeInterval,
+ ProbeAddress: defaultProbeAddress,
+ TraceEnabled: defaultTraceEnabled,
+ TraceAgentAddress: defaultTraceAgentAddress,
+ LogCorrelationEnabled: defaultLogCorrelationEnabled,
+ VolthaStackID: defaultVolthaStackID,
+ BackoffRetryInitialInterval: defaultBackoffRetryInitialInterval,
+ BackoffRetryMaxElapsedTime: defaultBackoffRetryMaxElapsedTime,
+ BackoffRetryMaxInterval: defaultBackoffRetryMaxInterval,
}
return &rwCoreFlag
}
@@ -209,5 +218,14 @@
help = fmt.Sprintf("ID for the current voltha stack")
flag.StringVar(&cf.VolthaStackID, "stack_id", defaultVolthaStackID, help)
+ help = fmt.Sprintf("The initial number of milliseconds an exponential backoff will wait before a retry")
+ flag.DurationVar(&(cf.BackoffRetryInitialInterval), "backoff_retry_initial_interval", defaultBackoffRetryInitialInterval, help)
+
+ help = fmt.Sprintf("The maximum number of milliseconds an exponential backoff can elasped")
+ flag.DurationVar(&(cf.BackoffRetryMaxElapsedTime), "backoff_retry_max_elapsed_time", defaultBackoffRetryMaxElapsedTime, help)
+
+ help = fmt.Sprintf("The maximum number of milliseconds of an exponential backoff interval")
+ flag.DurationVar(&(cf.BackoffRetryMaxInterval), "backoff_retry_max_interval", defaultBackoffRetryMaxInterval, help)
+
flag.Parse()
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 459c2fa..4b864de 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -56,6 +56,7 @@
"kv-store",
"adapter-manager",
"grpc-service",
+ "adapter-request-handler",
)
if cf.KafkaAdapterAddress != cf.KafkaClusterAddress {
@@ -134,7 +135,8 @@
)
// create event proxy
- eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval)
+ updateProbeClusterService := cf.KafkaAdapterAddress != cf.KafkaClusterAddress
+ eventProxy, err := startEventProxy(ctx, kafkaClientEvent, cf.EventTopic, cf.ConnectionRetryInterval, updateProbeClusterService)
if err != nil {
logger.Warn(ctx, "failed-to-setup-kafka-event-proxy-connection")
return
@@ -168,7 +170,7 @@
deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout, eventProxy, cf.VolthaStackID)
// register kafka RPC handler
- registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf.CoreTopic)
+ registerAdapterRequestHandlers(ctx, kmp, deviceMgr, adapterMgr, cf, "adapter-request-handler")
// start gRPC handler
grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index d45a84e..ae33090 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -18,7 +18,10 @@
import (
"context"
+ "github.com/cenkalti/backoff/v3"
+ "github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-lib-go/v4/pkg/events"
+
"time"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -69,11 +72,13 @@
return kmp, nil
}
-func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration) (*events.EventProxy, error) {
+func startEventProxy(ctx context.Context, kafkaClient kafka.Client, eventTopic string, connectionRetryInterval time.Duration, updateProbeService bool) (*events.EventProxy, error) {
ep := events.NewEventProxy(events.MsgClient(kafkaClient), events.MsgTopic(kafka.Topic{Name: eventTopic}))
for {
if err := kafkaClient.Start(ctx); err != nil {
- probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+ if updateProbeService {
+ probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusNotReady)
+ }
logger.Warnw(ctx, "failed-to-setup-kafka-connection-on-kafka-cluster-address", log.Fields{"error": err})
select {
case <-time.After(connectionRetryInterval):
@@ -82,6 +87,9 @@
return nil, ctx.Err()
}
}
+ if updateProbeService {
+ probe.UpdateStatusFromContext(ctx, clusterMessageBus, probe.ServiceStatusRunning)
+ }
logger.Info(ctx, "started-connection-on-kafka-cluster-address")
break
}
@@ -170,13 +178,45 @@
}
}
-func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
+func registerAdapterRequestHandlers(ctx context.Context, kmp kafka.InterContainerProxy, dMgr *device.Manager,
+ aMgr *adapter.Manager, cf *config.RWCoreFlags, serviceName string) {
+ logger.Infow(ctx, "registering-request-handler", log.Fields{"topic": cf.CoreTopic})
+
+ // Set the exponential backoff params
+ kafkaRetryBackoff := backoff.NewExponentialBackOff()
+ kafkaRetryBackoff.InitialInterval = cf.BackoffRetryInitialInterval
+ kafkaRetryBackoff.MaxElapsedTime = cf.BackoffRetryMaxElapsedTime
+ kafkaRetryBackoff.MaxInterval = cf.BackoffRetryMaxInterval
+
+ //For initial request, do not wait
+ backoffTimer := time.NewTimer(0)
+
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusNotReady)
requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
-
- // Register the broadcast topic to handle any core-bound broadcast requests
- if err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
- logger.Fatalw(ctx, "Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
+ for {
+ select {
+ case <-backoffTimer.C:
+ // Register the broadcast topic to handle any core-bound broadcast requests
+ err := kmp.SubscribeWithRequestHandlerInterface(ctx, kafka.Topic{Name: cf.CoreTopic}, requestProxy)
+ if err == nil {
+ logger.Infow(ctx, "request-handler-registered", log.Fields{"topic": cf.CoreTopic})
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ return
+ }
+ logger.Errorw(ctx, "failed-registering-broadcast-handler-retrying", log.Fields{"topic": cf.CoreTopic})
+ duration := kafkaRetryBackoff.NextBackOff()
+ //This case should never occur(by default) as max elapsed time for backoff is 0(by default) , so it will never return stop
+ if duration == backoff.Stop {
+ // If we reach a maximum then warn and reset the backoff timer and keep attempting.
+ logger.Warnw(ctx, "maximum-kafka-retry-backoff-reached-resetting",
+ log.Fields{"max-kafka-retry-backoff": kafkaRetryBackoff.MaxElapsedTime})
+ kafkaRetryBackoff.Reset()
+ duration = kafkaRetryBackoff.NextBackOff()
+ }
+ backoffTimer = time.NewTimer(duration)
+ case <-ctx.Done():
+ logger.Infow(ctx, "context-closed", log.Fields{"topic": cf.CoreTopic})
+ return
+ }
}
-
- logger.Info(ctx, "request-handler-registered")
}