[VOL-1862] rwCore waits for Kafka and KV Store
This commit consists of the following:
1) Make the rw_core wait for kafka and KV Store to be up before
proceeding with its processing. Previously, rw_core would have
bailed out if kafka/KV was not available. The number of retries
(default=-1, i.e retry forever) and retry interval are both
configurable.
2) Remove a non-json printable variable from the debug log in the
model.
Change-Id: I8d1ef90bf5d202a6f8ae59610aa780292b2844ce
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 41d96f4..133b1a4 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -52,6 +52,8 @@
default_CoreTimeout = int64(500)
default_CoreBindingKey = "voltha_backend_name"
default_CorePairTopic = "rwcore_1"
+ default_MaxConnectionRetries = -1 // retries forever
+ default_ConnectionRetryInterval = 2 // in seconds
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -84,6 +86,8 @@
DefaultCoreTimeout int64
CoreBindingKey string
CorePairTopic string
+ MaxConnectionRetries int
+ ConnectionRetryInterval int
}
func init() {
@@ -120,6 +124,8 @@
DefaultCoreTimeout: default_CoreTimeout,
CoreBindingKey: default_CoreBindingKey,
CorePairTopic: default_CorePairTopic,
+ MaxConnectionRetries: default_MaxConnectionRetries,
+ ConnectionRetryInterval: default_ConnectionRetryInterval,
}
return &rwCoreFlag
}
@@ -201,5 +207,11 @@
help = fmt.Sprintf("Core pairing group topic")
flag.StringVar(&cf.CorePairTopic, "core_pair_topic", default_CorePairTopic, help)
+ help = fmt.Sprintf("The number of retries to connect to a dependent component")
+ flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
+
+ help = fmt.Sprintf("The number of seconds between each connection retry attempt ")
+ flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+
flag.Parse()
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index fca8cce..4ddea05 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -25,6 +25,9 @@
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "time"
)
type Core struct {
@@ -59,9 +62,6 @@
core.kafkaClient = kafkaClient
// Setup the KV store
- // Do not call NewBackend constructor; it creates its own KV client
- // Commented the backend for now until the issue between the model and the KV store
- // is resolved.
backend := model.Backend{
Client: kvClient,
StoreType: cf.KVStoreType,
@@ -77,10 +77,17 @@
}
func (core *Core) Start(ctx context.Context) {
- log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
- if err := core.startKafkaMessagingProxy(ctx); err != nil {
+ log.Info("starting-core-services", log.Fields{"coreId": core.instanceId})
+
+ // Wait until connection to KV Store is up
+ if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
+ log.Fatal("Unable-to-connect-to-KV-store")
+ }
+
+ if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
log.Fatal("Failure-starting-kafkaMessagingProxy")
}
+
log.Debugw("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core)
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId, core.deviceMgr)
@@ -90,6 +97,7 @@
if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
}
+
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
@@ -99,7 +107,7 @@
core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
"service/voltha/owns_device", 10)
- log.Info("adaptercore-started")
+ log.Info("core-services-started")
}
func (core *Core) Stop(ctx context.Context) {
@@ -138,7 +146,7 @@
log.Info("grpc-server-started")
}
-func (core *Core) startKafkaMessagingProxy(ctx context.Context) error {
+func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
var err error
@@ -151,16 +159,58 @@
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
-
- if err = core.kmp.Start(); err != nil {
- log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
- return err
+ count := 0
+ for {
+ if err = core.kmp.Start(); err != nil {
+ log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return err
+ }
+ }
+ count += 1
+ log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+ // Take a nap before retrying
+ time.Sleep(time.Duration(retryInterval) * time.Second)
+ } else {
+ break
+ }
}
-
log.Info("kafka-messaging-proxy-created")
return nil
}
+// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
+ log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
+ "port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
+ // Get timeout in seconds with 1 second set as minimum
+ timeout := int(core.config.DefaultCoreTimeout / 1000)
+ if timeout < 1 {
+ timeout = 1
+ }
+ count := 0
+ for {
+ if !core.kvClient.IsConnectionUp(timeout) {
+ log.Info("KV-store-unreachable")
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return status.Error(codes.Unavailable, "kv store unreachable")
+ }
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(time.Duration(retryInterval) * time.Second)
+ log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+
+ } else {
+ break
+ }
+ }
+ log.Info("KV-store-reachable")
+ return nil
+}
+
func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {