[VOL-1862] rwCore waits for Kafka and KV Store
This commit consists of the following:
1) Make the rw_core wait for kafka and KV Store to be up before
proceeding with its processing. Previously, rw_core would have
bailed out if kafka/KV was not available. The number of retries
(default=-1, i.e retry forever) and retry interval are both
configurable.
2) Remove a non-json printable variable from the debug log in the
model.
Change-Id: I8d1ef90bf5d202a6f8ae59610aa780292b2844ce
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index 937eefe..67c9219 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -77,7 +77,7 @@
return evnt
}
-// Client represents the set of APIs a KV Client must implement
+// Client represents the set of APIs a KV Client must implement
type Client interface {
List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
Get(key string, timeout int, lock ...bool) (*KVPair, error)
@@ -90,6 +90,7 @@
Watch(key string) chan *Event
AcquireLock(lockName string, timeout int) error
ReleaseLock(lockName string) error
+ IsConnectionUp(timeout int) bool // timeout in second
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 4b25b5f..c4fa0af 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -63,6 +63,12 @@
return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
}
+// IsConnectionUp returns whether the connection to the Consul KV store is up
+func (c *ConsulClient) IsConnectionUp(timeout int) bool {
+ log.Error("Unimplemented function")
+ return false
+}
+
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 7f6940a..f19f365 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -59,6 +59,16 @@
lockToSessionMap: lockSessionMap}, nil
}
+// IsConnectionUp returns whether the connection to the Etcd KV store is up. If a timeout occurs then
+// it is assumed the connection is down or unreachable.
+func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+ // Let's try to get a non existent key. If the connection is up then there will be no error returned.
+ if _, err := c.Get("non-existent-key", timeout); err != nil {
+ return false
+ }
+ return true
+}
+
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {