[VOL-1862] rwCore waits for kafka and KV Store
This commit is a cherry pick into the master branch from the
voltha 2.1 branch of patch https://gerrit.opencord.org/#/c/15030/
Change-Id: I8a306c8b37ad700ef8234466919e0604e14787cd
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index 937eefe..67c9219 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -77,7 +77,7 @@
return evnt
}
-// Client represents the set of APIs a KV Client must implement
+// Client represents the set of APIs a KV Client must implement
type Client interface {
List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
Get(key string, timeout int, lock ...bool) (*KVPair, error)
@@ -90,6 +90,7 @@
Watch(key string) chan *Event
AcquireLock(lockName string, timeout int) error
ReleaseLock(lockName string) error
+ IsConnectionUp(timeout int) bool // timeout in second
CloseWatch(key string, ch chan *Event)
Close()
}
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index 4b25b5f..c4fa0af 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -63,6 +63,12 @@
return &ConsulClient{consul: consul, doneCh: &doneCh, watchedChannelsContext: wChannelsContext, keyReservations: reservations}, nil
}
+// IsConnectionUp returns whether the connection to the Consul KV store is up
+func (c *ConsulClient) IsConnectionUp(timeout int) bool {
+ log.Error("Unimplemented function")
+ return false
+}
+
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 7f6940a..f19f365 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -59,6 +59,16 @@
lockToSessionMap: lockSessionMap}, nil
}
+// IsConnectionUp returns whether the connection to the Etcd KV store is up. If a timeout occurs then
+// it is assumed the connection is down or unreachable.
+func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+ // Let's try to get a non existent key. If the connection is up then there will be no error returned.
+ if _, err := c.Get("non-existent-key", timeout); err != nil {
+ return false
+ }
+ return true
+}
+
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
diff --git a/db/model/node.go b/db/model/node.go
index fcd3b5f..c9815fa 100644
--- a/db/model/node.go
+++ b/db/model/node.go
@@ -457,7 +457,7 @@
n.mutex.Lock()
defer n.mutex.Unlock()
- log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid, "makeBranch": makeBranch})
+ log.Debugw("node-update-request", log.Fields{"path": path, "strict": strict, "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]
@@ -626,7 +626,7 @@
n.mutex.Lock()
defer n.mutex.Unlock()
- log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid, "makeBranch": makeBranch})
+ log.Debugw("node-add-request", log.Fields{"path": path, "txid": txid})
for strings.HasPrefix(path, "/") {
path = path[1:]