VOL-1497 : Add more control to kv/memory access
- Added kv locking mechanism (etcd only)
- (watch) control path access whenever possible
- (watch) use a transaction for updates and merge with memory
- cleaned up vendoring
- misc changes to fix exceptions found along the way
Amendments:
- Copyright header got removed in auto-generated file
- Changed default locking to false for KV list operation
- Updated backend api to allow the passing of locking parameter
Change-Id: Ie1a55d3ca8b9d92ae71a85ce42bb22fcf1419e2c
diff --git a/db/kvstore/client.go b/db/kvstore/client.go
index a7cbf2b..a8e6311 100644
--- a/db/kvstore/client.go
+++ b/db/kvstore/client.go
@@ -75,10 +75,10 @@
// Client represents the set of APIs a KV Client must implement
type Client interface {
- List(key string, timeout int) (map[string]*KVPair, error)
- Get(key string, timeout int) (*KVPair, error)
- Put(key string, value interface{}, timeout int) error
- Delete(key string, timeout int) error
+ List(key string, timeout int, lock ...bool) (map[string]*KVPair, error)
+ Get(key string, timeout int, lock ...bool) (*KVPair, error)
+ Put(key string, value interface{}, timeout int, lock ...bool) error
+ Delete(key string, timeout int, lock ...bool) error
Reserve(key string, value interface{}, ttl int64) (interface{}, error)
ReleaseReservation(key string) error
ReleaseAllReservations() error
diff --git a/db/kvstore/consulclient.go b/db/kvstore/consulclient.go
index e4f6baf..a5c71ac 100644
--- a/db/kvstore/consulclient.go
+++ b/db/kvstore/consulclient.go
@@ -65,7 +65,7 @@
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) List(key string, timeout int) (map[string]*KVPair, error) {
+func (c *ConsulClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
duration := GetDuration(timeout)
kv := c.consul.KV()
@@ -86,7 +86,7 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Get(key string, timeout int) (*KVPair, error) {
+func (c *ConsulClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
duration := GetDuration(timeout)
@@ -109,7 +109,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the consul API
// accepts only a []byte as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Put(key string, value interface{}, timeout int) error {
+func (c *ConsulClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
// Validate that we can create a byte array from the value as consul API expects a byte array
var val []byte
@@ -135,7 +135,7 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *ConsulClient) Delete(key string, timeout int) error {
+func (c *ConsulClient) Delete(key string, timeout int, lock ...bool) error {
kv := c.consul.KV()
var writeOptions consulapi.WriteOptions
c.writeLock.Lock()
diff --git a/db/kvstore/etcdclient.go b/db/kvstore/etcdclient.go
index 490a477..9ecddca 100644
--- a/db/kvstore/etcdclient.go
+++ b/db/kvstore/etcdclient.go
@@ -16,14 +16,12 @@
package kvstore
import (
- //log "../common"
"context"
"errors"
"fmt"
- //v3Client "github.com/coreos/etcd/clientv3"
- //v3rpcTypes "github.com/coreos/etcd/etcdserver/api/v3rpc/rpctypes"
- log "github.com/opencord/voltha-go/common/log"
+ "github.com/opencord/voltha-go/common/log"
v3Client "go.etcd.io/etcd/clientv3"
+ v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
"sync"
)
@@ -39,7 +37,6 @@
// NewEtcdClient returns a new client for the Etcd KV store
func NewEtcdClient(addr string, timeout int) (*EtcdClient, error) {
-
duration := GetDuration(timeout)
c, err := v3Client.New(v3Client.Config{
@@ -52,15 +49,26 @@
}
wc := make(map[string][]map[chan *Event]v3Client.Watcher)
reservations := make(map[string]*v3Client.LeaseID)
+
return &EtcdClient{ectdAPI: c, watchedChannels: wc, keyReservations: reservations}, nil
}
// List returns an array of key-value pairs with key as a prefix. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
+func (c *EtcdClient) List(key string, timeout int, lock ...bool) (map[string]*KVPair, error) {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // DO NOT lock by default; otherwise lock per instructed value
+ if len(lock) > 0 && lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
cancel()
if err != nil {
@@ -76,10 +84,20 @@
// Get returns a key-value pair for a given key. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
+func (c *EtcdClient) Get(key string, timeout int, lock ...bool) (*KVPair, error) {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // Lock by default; otherwise lock per instructed value
+ if len(lock) == 0 || lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
resp, err := c.ectdAPI.Get(ctx, key)
cancel()
if err != nil {
@@ -96,7 +114,7 @@
// Put writes a key-value pair to the KV store. Value can only be a string or []byte since the etcd API
// accepts only a string as a value for a put operation. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
+func (c *EtcdClient) Put(key string, value interface{}, timeout int, lock ...bool) error {
// Validate that we can convert value to a string as etcd API expects a string
var val string
@@ -108,6 +126,16 @@
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // Lock by default; otherwise lock per instructed value
+ if len(lock) == 0 || lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
c.writeLock.Lock()
defer c.writeLock.Unlock()
_, err := c.ectdAPI.Put(ctx, key, val)
@@ -130,11 +158,21 @@
// Delete removes a key from the KV store. Timeout defines how long the function will
// wait for a response
-func (c *EtcdClient) Delete(key string, timeout int) error {
+func (c *EtcdClient) Delete(key string, timeout int, lock ...bool) error {
duration := GetDuration(timeout)
ctx, cancel := context.WithTimeout(context.Background(), duration)
+
+ // Lock by default; otherwise lock per instructed value
+ if len(lock) == 0 || lock[0] {
+ session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
+ mu := v3Concurrency.NewMutex(session, "/lock" + key)
+ mu.Lock(context.Background())
+ defer mu.Unlock(context.Background())
+ defer session.Close()
+ }
+
defer cancel()
c.writeLock.Lock()
@@ -228,7 +266,7 @@
}
} else {
// Read the Key to ensure this is our Key
- m, err := c.Get(key, defaultKVGetTimeout)
+ m, err := c.Get(key, defaultKVGetTimeout, false)
if err != nil {
return nil, err
}