[VOL-1862] rwCore waits for Kafka and KV Store
This commit consists of the following:
1) Make the rw_core wait for kafka and KV Store to be up before
proceeding with its processing. Previously, rw_core would have
bailed out if kafka/KV was not available. The number of retries
(default=-1, i.e retry forever) and retry interval are both
configurable.
2) Remove a non-json printable variable from the debug log in the
model.
Change-Id: I8d1ef90bf5d202a6f8ae59610aa780292b2844ce
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index 937eefe..67c9219 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -77,7 +77,7 @@
return evnt
}
-// Client represents the set of APIs a KV Client must implement
+// Client represents the set of APIs a KV Client must implement
type Client interface {
List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
Get(key string, timeout int, lock ...bool) (*KVPair, error)
@@ -90,6 +90,7 @@
Watch(key string) chan *Event
AcquireLock(lockName string, timeout int) error
ReleaseLock(lockName string) error
+ IsConnectionUp(timeout int) bool // timeout in second
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 4b25b5f..c4fa0af 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -63,6 +63,12 @@
return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
}
+// IsConnectionUp returns whether the connection to the Consul KV store is up
+func (c *ConsulClient) IsConnectionUp(timeout int) bool {
+ log.Error("Unimplemented function")
+ return false
+}
+
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 7f6940a..f19f365 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -59,6 +59,16 @@
lockToSessionMap: lockSessionMap}, nil
}
+// IsConnectionUp returns whether the connection to the Etcd KV store is up. If a timeout occurs then
+// it is assumed the connection is down or unreachable.
+func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+ // Let's try to get a non existent key. If the connection is up then there will be no error returned.
+ if _, err := c.Get("non-existent-key", timeout); err != nil {
+ return false
+ }
+ return true
+}
+
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/model/node.go b/db/model/node.go
index fcd3b5f..c9815fa 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -457,7 +457,7 @@
n.mutex.Lock()
defer n.mutex.Unlock()
- log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid, "makeBranch": makeBranch})
+ log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]
@@ -626,7 +626,7 @@
n.mutex.Lock()
defer n.mutex.Unlock()
- log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid, "makeBranch": makeBranch})
+ log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 0576da9..8037002 100755
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -228,6 +228,13 @@
var err error
+ // Add a cleanup in case of failure to startup
+ defer func() {
+ if err != nil {
+ sc.Stop()
+ }
+ }()
+
// Create the Cluster Admin
if err = sc.createClusterAdmin(); err != nil {
log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 41d96f4..133b1a4 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -52,6 +52,8 @@
default_CoreTimeout = int64(500)
default_CoreBindingKey = "voltha_backend_name"
default_CorePairTopic = "rwcore_1"
+ default_MaxConnectionRetries = -1 // retries forever
+ default_ConnectionRetryInterval = 2 // in seconds
)
// RWCoreFlags represents the set of configurations used by the read-write core service
@@ -84,6 +86,8 @@
DefaultCoreTimeout int64
CoreBindingKey string
CorePairTopic string
+ MaxConnectionRetries int
+ ConnectionRetryInterval int
}
func init() {
@@ -120,6 +124,8 @@
DefaultCoreTimeout: default_CoreTimeout,
CoreBindingKey: default_CoreBindingKey,
CorePairTopic: default_CorePairTopic,
+ MaxConnectionRetries: default_MaxConnectionRetries,
+ ConnectionRetryInterval: default_ConnectionRetryInterval,
}
return &rwCoreFlag
}
@@ -201,5 +207,11 @@
help = fmt.Sprintf("Core pairing group topic")
flag.StringVar(&cf.CorePairTopic, "core_pair_topic", default_CorePairTopic, help)
+ help = fmt.Sprintf("The number of retries to connect to a dependent component")
+ flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", default_MaxConnectionRetries, help)
+
+ help = fmt.Sprintf("The number of seconds between each connection retry attempt ")
+ flag.IntVar(&(cf.ConnectionRetryInterval), "connection_retry_interval", default_ConnectionRetryInterval, help)
+
flag.Parse()
}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index fca8cce..4ddea05 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -25,6 +25,9 @@
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-protos/go/voltha"
"google.golang.org/grpc"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "time"
)
type Core struct {
@@ -59,9 +62,6 @@
core.kafkaClient = kafkaClient
// Setup the KV store
- // Do not call NewBackend constructor; it creates its own KV client
- // Commented the backend for now until the issue between the model and the KV store
- // is resolved.
backend := model.Backend{
Client: kvClient,
StoreType: cf.KVStoreType,
@@ -77,10 +77,17 @@
}
func (core *Core) Start(ctx context.Context) {
- log.Info("starting-adaptercore", log.Fields{"coreId": core.instanceId})
- if err := core.startKafkaMessagingProxy(ctx); err != nil {
+ log.Info("starting-core-services", log.Fields{"coreId": core.instanceId})
+
+ // Wait until connection to KV Store is up
+ if err := core.waitUntilKVStoreReachableOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
+ log.Fatal("Unable-to-connect-to-KV-store")
+ }
+
+ if err := core.waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx, core.config.MaxConnectionRetries, core.config.ConnectionRetryInterval); err != nil {
log.Fatal("Failure-starting-kafkaMessagingProxy")
}
+
log.Debugw("values", log.Fields{"kmp": core.kmp})
core.deviceMgr = newDeviceManager(core)
core.adapterMgr = newAdapterManager(core.clusterDataProxy, core.instanceId, core.deviceMgr)
@@ -90,6 +97,7 @@
if err := core.registerAdapterRequestHandlers(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.adapterMgr, core.clusterDataProxy, core.localDataProxy); err != nil {
log.Fatal("Failure-registering-adapterRequestHandler")
}
+
go core.startDeviceManager(ctx)
go core.startLogicalDeviceManager(ctx)
go core.startGRPCService(ctx)
@@ -99,7 +107,7 @@
core.deviceOwnership = NewDeviceOwnership(core.instanceId, core.kvClient, core.deviceMgr, core.logicalDeviceMgr,
"service/voltha/owns_device", 10)
- log.Info("adaptercore-started")
+ log.Info("core-services-started")
}
func (core *Core) Stop(ctx context.Context) {
@@ -138,7 +146,7 @@
log.Info("grpc-server-started")
}
-func (core *Core) startKafkaMessagingProxy(ctx context.Context) error {
+func (core *Core) waitUntilKafkaMessagingProxyIsUpOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
var err error
@@ -151,16 +159,58 @@
log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
return err
}
-
- if err = core.kmp.Start(); err != nil {
- log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
- return err
+ count := 0
+ for {
+ if err = core.kmp.Start(); err != nil {
+ log.Infow("error-starting-kafka-messaging-proxy", log.Fields{"error": err})
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return err
+ }
+ }
+ count += 1
+ log.Infow("retry-starting-kafka-messaging-proxy", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+ // Take a nap before retrying
+ time.Sleep(time.Duration(retryInterval) * time.Second)
+ } else {
+ break
+ }
}
-
log.Info("kafka-messaging-proxy-created")
return nil
}
+// waitUntilKVStoreReachableOrMaxTries will wait until it can connect to a KV store or until maxtries has been reached
+func (core *Core) waitUntilKVStoreReachableOrMaxTries(ctx context.Context, maxRetries int, retryInterval int) error {
+ log.Infow("verifying-KV-store-connectivity", log.Fields{"host": core.config.KVStoreHost,
+ "port": core.config.KVStorePort, "retries": maxRetries, "retryInterval": retryInterval})
+ // Get timeout in seconds with 1 second set as minimum
+ timeout := int(core.config.DefaultCoreTimeout / 1000)
+ if timeout < 1 {
+ timeout = 1
+ }
+ count := 0
+ for {
+ if !core.kvClient.IsConnectionUp(timeout) {
+ log.Info("KV-store-unreachable")
+ if maxRetries != -1 {
+ if count >= maxRetries {
+ return status.Error(codes.Unavailable, "kv store unreachable")
+ }
+ }
+ count += 1
+ // Take a nap before retrying
+ time.Sleep(time.Duration(retryInterval) * time.Second)
+ log.Infow("retry-KV-store-connectivity", log.Fields{"retryCount": count, "maxRetries": maxRetries, "retryInterval": retryInterval})
+
+ } else {
+ break
+ }
+ }
+ log.Info("KV-store-reachable")
+ return nil
+}
+
func (core *Core) registerAdapterRequestHandlers(ctx context.Context, coreInstanceId string, dMgr *DeviceManager,
ldMgr *LogicalDeviceManager, aMgr *AdapterManager, cdProxy *model.Proxy, ldProxy *model.Proxy,
) error {