[VOL-1862] rwCore waits for kafka and KV Store
This commit is a cherry pick into the master branch from the
voltha 2.1 branch of patch https://gerrit.opencord.org/#/c/15030/
Change-Id: I8a306c8b37ad700ef8234466919e0604e14787cd
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index fca8cce..4ddea05 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -25,6 +25,9 @@
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "time"
)
type Core struct {
@@ -59,9 +62,6 @@
core.kafkaClient = kafkaClient
// Setup the KV store
- // Do not call NewBackend constructor; it creates its own KV client
- // Commented the backend for now until the issue between the model and the KV store
- // is resolved.
backend := model.Backend{
Client: kvClient,
StoreType: cf.KVStoreType,
@@ -77,10 +77,17 @@
}
func (core *Core) Start(ctx context.Context) {
- log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
- if err := core.startKafkaMessagingProxy(ctx); err != nil {
+ log.Info("starting-core-services", log.Fields{"coreId": core.instanceId})
+
+ // Wait until connection to KV Store is up
+ if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
+ log.Fatal("Unable-to-connect-to-KV-store")
+ }
+
+ if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
log.Fatal("Failure-starting-kafkaMessagingProxy")
}
+
log.Debugw("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core)
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId, core.deviceMgr)
@@ -90,6 +97,7 @@
if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
}
+
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
@@ -99,7 +107,7 @@
core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
"service/voltha/owns_device", 10)
- log.Info("adaptercore-started")
+ log.Info("core-services-started")
}
func (core *Core) Stop(ctx context.Context) {
@@ -138,7 +146,7 @@
log.Info("grpc-server-started")
}
-func (core *Core) startKafkaMessagingProxy(ctx context.Context) error {
+func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
var err error
@@ -151,16 +159,58 @@
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
-
- if err = core.kmp.Start(); err != nil {
- log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
- return err
+ count := 0
+ for {
+ if err = core.kmp.Start(); err != nil {
+ log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return err
+ }
+ }
+ count += 1
+ log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+ // Take a nap before retrying
+ time.Sleep(time.Duration(retryInterval) * time.Second)
+ } else {
+ break
+ }
}
-
log.Info("kafka-messaging-proxy-created")
return nil
}
+// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
+ log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
+ "port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
+ // Get timeout in seconds with 1 second set as minimum
+ timeout := int(core.config.DefaultCoreTimeout / 1000)
+ if timeout < 1 {
+ timeout = 1
+ }
+ count := 0
+ for {
+ if !core.kvClient.IsConnectionUp(timeout) {
+ log.Info("KV-store-unreachable")
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return status.Error(codes.Unavailable, "kv store unreachable")
+ }
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(time.Duration(retryInterval) * time.Second)
+ log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+
+ } else {
+ break
+ }
+ }
+ log.Info("KV-store-reachable")
+ return nil
+}
+
func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {