[VOL-4194] Add an Etcd Client Pool

This commit adds an Etcd client pool with a configurable
capacity as well as maximum concurrent requests per client.  While
tests were done locally, it was not done with all components
due to dependencies that need to be merged first.   Once the
dependencies are merged then this change can be fully tested.

Change-Id: I7e8c8953bd3871056a721de68990e3d85df8b688
diff --git a/VERSION b/VERSION
index 6b244dc..a1ef0ca 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-5.0.1
+5.0.2
diff --git a/pkg/db/kvstore/client.go b/pkg/db/kvstore/client.go
index b35f1f3..e4b1fff 100644
--- a/pkg/db/kvstore/client.go
+++ b/pkg/db/kvstore/client.go
@@ -79,14 +79,17 @@
 	Put(ctx context.Context, key string, value interface{}) error
 	Delete(ctx context.Context, key string) error
 	DeleteWithPrefix(ctx context.Context, prefixKey string) error
+	Watch(ctx context.Context, key string, withPrefix bool) chan *Event
+	IsConnectionUp(ctx context.Context) bool // timeout in second
+	CloseWatch(ctx context.Context, key string, ch chan *Event)
+	Close(ctx context.Context)
+
+	// These APIs are not used.  They will be cleaned up in release Voltha 2.9.
+	// It's not cleaned now to limit changes in all components
 	Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error)
 	ReleaseReservation(ctx context.Context, key string) error
 	ReleaseAllReservations(ctx context.Context) error
 	RenewReservation(ctx context.Context, key string) error
-	Watch(ctx context.Context, key string, withPrefix bool) chan *Event
 	AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error
 	ReleaseLock(lockName string) error
-	IsConnectionUp(ctx context.Context) bool // timeout in second
-	CloseWatch(ctx context.Context, key string, ch chan *Event)
-	Close(ctx context.Context)
 }
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index c2a38c6..96ffc2f 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -19,54 +19,75 @@
 	"context"
 	"errors"
 	"fmt"
-	"sync"
-	"time"
-
 	"github.com/opencord/voltha-lib-go/v5/pkg/log"
 	v3Client "go.etcd.io/etcd/clientv3"
-
-	v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
 	v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+	"os"
+	"strconv"
+	"sync"
+	"time"
+)
+
+const (
+	poolCapacityEnvName = "VOLTHA_ETCD_CLIENT_POOL_CAPACITY"
+	maxUsageEnvName     = "VOLTHA_ETCD_CLIENT_MAX_USAGE"
+)
+
+const (
+	defaultMaxPoolCapacity = 1000 // Default size of an Etcd Client pool
+	defaultMaxPoolUsage    = 100  // Maximum concurrent request an Etcd Client is allowed to process
 )
 
 // EtcdClient represents the Etcd KV store client
 type EtcdClient struct {
-	ectdAPI             *v3Client.Client
-	keyReservations     map[string]*v3Client.LeaseID
-	watchedChannels     sync.Map
-	keyReservationsLock sync.RWMutex
-	lockToMutexMap      map[string]*v3Concurrency.Mutex
-	lockToSessionMap    map[string]*v3Concurrency.Session
-	lockToMutexLock     sync.Mutex
+	pool               EtcdClientAllocator
+	watchedChannels    sync.Map
+	watchedClients     map[string]*v3Client.Client
+	watchedClientsLock sync.RWMutex
 }
 
 // NewEtcdCustomClient returns a new client for the Etcd KV store allowing
 // the called to specify etcd client configuration
-func NewEtcdCustomClient(ctx context.Context, config *v3Client.Config) (*EtcdClient, error) {
-	c, err := v3Client.New(*config)
-	if err != nil {
-		logger.Error(ctx, err)
-		return nil, err
+func NewEtcdCustomClient(ctx context.Context, addr string, timeout time.Duration, level log.LogLevel) (*EtcdClient, error) {
+	// Get the capacity and max usage from the environment
+	capacity := defaultMaxPoolCapacity
+	maxUsage := defaultMaxPoolUsage
+	if capacityStr, present := os.LookupEnv(poolCapacityEnvName); present {
+		if val, err := strconv.Atoi(capacityStr); err == nil {
+			capacity = val
+			logger.Infow(ctx, "env-variable-set", log.Fields{"pool-capacity": capacity})
+		} else {
+			logger.Warnw(ctx, "invalid-capacity-value", log.Fields{"error": err, "capacity": capacityStr})
+		}
+	}
+	if maxUsageStr, present := os.LookupEnv(maxUsageEnvName); present {
+		if val, err := strconv.Atoi(maxUsageStr); err == nil {
+			maxUsage = val
+			logger.Infow(ctx, "env-variable-set", log.Fields{"max-usage": maxUsage})
+		} else {
+			logger.Warnw(ctx, "invalid-max-usage-value", log.Fields{"error": err, "max-usage": maxUsageStr})
+		}
 	}
 
-	reservations := make(map[string]*v3Client.LeaseID)
-	lockMutexMap := make(map[string]*v3Concurrency.Mutex)
-	lockSessionMap := make(map[string]*v3Concurrency.Session)
+	var err error
 
-	return &EtcdClient{ectdAPI: c, keyReservations: reservations, lockToMutexMap: lockMutexMap,
-		lockToSessionMap: lockSessionMap}, nil
+	pool, err := NewRoundRobinEtcdClientAllocator([]string{addr}, timeout, capacity, maxUsage, level)
+	if err != nil {
+		logger.Errorw(ctx, "failed-to-create-rr-client", log.Fields{
+			"error": err,
+		})
+	}
+
+	logger.Infow(ctx, "etcd-pool-created", log.Fields{"capacity": capacity, "max-usage": maxUsage})
+
+	return &EtcdClient{pool: pool,
+		watchedClients: make(map[string]*v3Client.Client),
+	}, nil
 }
 
 // NewEtcdClient returns a new client for the Etcd KV store
 func NewEtcdClient(ctx context.Context, addr string, timeout time.Duration, level log.LogLevel) (*EtcdClient, error) {
-	logconfig := log.ConstructZapConfig(log.JSON, level, log.Fields{})
-
-	return NewEtcdCustomClient(
-		ctx,
-		&v3Client.Config{
-			Endpoints:   []string{addr},
-			DialTimeout: timeout,
-			LogConfig:   &logconfig})
+	return NewEtcdCustomClient(ctx, addr, timeout, level)
 }
 
 // IsConnectionUp returns whether the connection to the Etcd KV store is up.  If a timeout occurs then
@@ -76,14 +97,19 @@
 	if _, err := c.Get(ctx, "non-existent-key"); err != nil {
 		return false
 	}
-	//cancel()
 	return true
 }
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
-	resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	defer c.pool.Put(client)
+	resp, err := client.Get(ctx, key, v3Client.WithPrefix())
+
 	if err != nil {
 		logger.Error(ctx, err)
 		return nil, err
@@ -98,11 +124,17 @@
 // Get returns a key-value pair for a given key. Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	defer c.pool.Put(client)
 
 	attempt := 0
+
 startLoop:
 	for {
-		resp, err := c.ectdAPI.Get(ctx, key)
+		resp, err := client.Get(ctx, key)
 		if err != nil {
 			switch err {
 			case context.Canceled:
@@ -145,25 +177,21 @@
 
 	// Validate that we can convert value to a string as etcd API expects a string
 	var val string
-	var er error
-	if val, er = ToString(value); er != nil {
+	var err error
+	if val, err = ToString(value); err != nil {
 		return fmt.Errorf("unexpected-type-%T", value)
 	}
 
-	// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
-	// that KV key permanent instead of automatically removing it after a lease expiration
-	c.keyReservationsLock.RLock()
-	leaseID, ok := c.keyReservations[key]
-	c.keyReservationsLock.RUnlock()
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return err
+	}
+	defer c.pool.Put(client)
+
 	attempt := 0
 startLoop:
 	for {
-		var err error
-		if ok {
-			_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
-		} else {
-			_, err = c.ectdAPI.Put(ctx, key, val)
-		}
+		_, err = client.Put(ctx, key, val)
 		if err != nil {
 			switch err {
 			case context.Canceled:
@@ -197,11 +225,16 @@
 // Delete removes a key from the KV store. Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) Delete(ctx context.Context, key string) error {
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return err
+	}
+	defer c.pool.Put(client)
 
 	attempt := 0
 startLoop:
 	for {
-		_, err := c.ectdAPI.Delete(ctx, key)
+		_, err = client.Delete(ctx, key)
 		if err != nil {
 			switch err {
 			case context.Canceled:
@@ -235,8 +268,14 @@
 
 func (c *EtcdClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
 
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return err
+	}
+	defer c.pool.Put(client)
+
 	//delete the prefix
-	if _, err := c.ectdAPI.Delete(ctx, prefixKey, v3Client.WithPrefix()); err != nil {
+	if _, err := client.Delete(ctx, prefixKey, v3Client.WithPrefix()); err != nil {
 		logger.Errorw(ctx, "failed-to-delete-prefix-key", log.Fields{"key": prefixKey, "error": err})
 		return err
 	}
@@ -244,150 +283,25 @@
 	return nil
 }
 
-// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
-// the etcd API accepts only a string.  Timeout defines how long the function will wait for a response.  TTL
-// defines how long that reservation is valid.  When TTL expires the key is unreserved by the KV store itself.
-// If the key is acquired then the value returned will be the value passed in.  If the key is already acquired
-// then the value assigned to that key will be returned.
-func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
-	// Validate that we can convert value to a string as etcd API expects a string
-	var val string
-	var er error
-	if val, er = ToString(value); er != nil {
-		return nil, fmt.Errorf("unexpected-type%T", value)
-	}
-
-	resp, err := c.ectdAPI.Grant(ctx, int64(ttl.Seconds()))
-	if err != nil {
-		logger.Error(ctx, err)
-		return nil, err
-	}
-	// Register the lease id
-	c.keyReservationsLock.Lock()
-	c.keyReservations[key] = &resp.ID
-	c.keyReservationsLock.Unlock()
-
-	// Revoke lease if reservation is not successful
-	reservationSuccessful := false
-	defer func() {
-		if !reservationSuccessful {
-			if err = c.ReleaseReservation(context.Background(), key); err != nil {
-				logger.Error(ctx, "cannot-release-lease")
-			}
-		}
-	}()
-
-	// Try to grap the Key with the above lease
-	c.ectdAPI.Txn(context.Background())
-	txn := c.ectdAPI.Txn(context.Background())
-	txn = txn.If(v3Client.Compare(v3Client.Version(key), "=", 0))
-	txn = txn.Then(v3Client.OpPut(key, val, v3Client.WithLease(resp.ID)))
-	txn = txn.Else(v3Client.OpGet(key))
-	result, er := txn.Commit()
-	if er != nil {
-		return nil, er
-	}
-
-	if !result.Succeeded {
-		// Verify whether we are already the owner of that Key
-		if len(result.Responses) > 0 &&
-			len(result.Responses[0].GetResponseRange().Kvs) > 0 {
-			kv := result.Responses[0].GetResponseRange().Kvs[0]
-			if string(kv.Value) == val {
-				reservationSuccessful = true
-				return value, nil
-			}
-			return kv.Value, nil
-		}
-	} else {
-		// Read the Key to ensure this is our Key
-		m, err := c.Get(ctx, key)
-		if err != nil {
-			return nil, err
-		}
-		if m != nil {
-			if m.Key == key && isEqual(m.Value, value) {
-				// My reservation is successful - register it.  For now, support is only for 1 reservation per key
-				// per session.
-				reservationSuccessful = true
-				return value, nil
-			}
-			// My reservation has failed.  Return the owner of that key
-			return m.Value, nil
-		}
-	}
-	return nil, nil
-}
-
-// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
-	c.keyReservationsLock.Lock()
-	defer c.keyReservationsLock.Unlock()
-
-	for key, leaseID := range c.keyReservations {
-		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
-		if err != nil {
-			logger.Errorw(ctx, "cannot-release-reservation", log.Fields{"key": key, "error": err})
-			return err
-		}
-		delete(c.keyReservations, key)
-	}
-	return nil
-}
-
-// ReleaseReservation releases reservation for a specific key.
-func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
-	// Get the leaseid using the key
-	logger.Debugw(ctx, "Release-reservation", log.Fields{"key": key})
-	var ok bool
-	var leaseID *v3Client.LeaseID
-	c.keyReservationsLock.Lock()
-	defer c.keyReservationsLock.Unlock()
-	if leaseID, ok = c.keyReservations[key]; !ok {
-		return nil
-	}
-
-	if leaseID != nil {
-		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
-		if err != nil {
-			logger.Error(ctx, err)
-			return err
-		}
-		delete(c.keyReservations, key)
-	}
-	return nil
-}
-
-// RenewReservation renews a reservation.  A reservation will go stale after the specified TTL (Time To Live)
-// period specified when reserving the key
-func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
-	// Get the leaseid using the key
-	var ok bool
-	var leaseID *v3Client.LeaseID
-	c.keyReservationsLock.RLock()
-	leaseID, ok = c.keyReservations[key]
-	c.keyReservationsLock.RUnlock()
-
-	if !ok {
-		return errors.New("key-not-reserved")
-	}
-
-	if leaseID != nil {
-		_, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
-		if err != nil {
-			logger.Errorw(ctx, "lease-may-have-expired", log.Fields{"error": err})
-			return err
-		}
-	} else {
-		return errors.New("lease-expired")
-	}
-	return nil
-}
-
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
 func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
-	w := v3Client.NewWatcher(c.ectdAPI)
+	var err error
+	// Reuse the Etcd client when multiple callees are watching the same key.
+	c.watchedClientsLock.Lock()
+	client, exist := c.watchedClients[key]
+	if !exist {
+		client, err = c.pool.Get(ctx)
+		if err != nil {
+			logger.Errorw(ctx, "failed-to-an-etcd-client", log.Fields{"key": key, "error": err})
+			c.watchedClientsLock.Unlock()
+			return nil
+		}
+		c.watchedClients[key] = client
+	}
+	c.watchedClientsLock.Unlock()
+
+	w := v3Client.NewWatcher(client)
 	ctx, cancel := context.WithCancel(ctx)
 	var channel v3Client.WatchChan
 	if withPrefix {
@@ -402,7 +316,6 @@
 	// Keep track of the created channels so they can be closed when required
 	channelMap := make(map[chan *Event]v3Client.Watcher)
 	channelMap[ch] = w
-
 	channelMaps := c.addChannelMap(key, channelMap)
 
 	// Changing the log field (from channelMaps) as the underlying logger cannot format the map of channels into a
@@ -484,6 +397,17 @@
 	if pos >= 0 {
 		channelMaps = c.removeChannelMap(key, pos)
 	}
+
+	// If we don't have any keys being watched then return the Etcd client to the pool
+	if len(channelMaps) == 0 {
+		c.watchedClientsLock.Lock()
+		// Sanity
+		if client, ok := c.watchedClients[key]; ok {
+			c.pool.Put(client)
+			delete(c.watchedClients, key)
+		}
+		c.watchedClientsLock.Unlock()
+	}
 	logger.Infow(ctx, "watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
 }
 
@@ -509,66 +433,41 @@
 	return UNKNOWN
 }
 
-// Close closes the KV store client
+// Close closes all the connection in the pool store client
 func (c *EtcdClient) Close(ctx context.Context) {
-	if err := c.ectdAPI.Close(); err != nil {
-		logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
-	}
+	logger.Debug(ctx, "closing-etcd-pool")
+	c.pool.Close(ctx)
 }
 
-func (c *EtcdClient) addLockName(lockName string, lock *v3Concurrency.Mutex, session *v3Concurrency.Session) {
-	c.lockToMutexLock.Lock()
-	defer c.lockToMutexLock.Unlock()
-	c.lockToMutexMap[lockName] = lock
-	c.lockToSessionMap[lockName] = session
+// The APIs below are not used
+var errUnimplemented = errors.New("deprecated")
+
+// Reserve is deprecated
+func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
+	return nil, errUnimplemented
 }
 
-func (c *EtcdClient) deleteLockName(lockName string) {
-	c.lockToMutexLock.Lock()
-	defer c.lockToMutexLock.Unlock()
-	delete(c.lockToMutexMap, lockName)
-	delete(c.lockToSessionMap, lockName)
+// ReleaseAllReservations is deprecated
+func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
+	return errUnimplemented
 }
 
-func (c *EtcdClient) getLock(lockName string) (*v3Concurrency.Mutex, *v3Concurrency.Session) {
-	c.lockToMutexLock.Lock()
-	defer c.lockToMutexLock.Unlock()
-	var lock *v3Concurrency.Mutex
-	var session *v3Concurrency.Session
-	if l, exist := c.lockToMutexMap[lockName]; exist {
-		lock = l
-	}
-	if s, exist := c.lockToSessionMap[lockName]; exist {
-		session = s
-	}
-	return lock, session
+// ReleaseReservation is deprecated
+func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
+	return errUnimplemented
 }
 
+// RenewReservation is deprecated
+func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
+	return errUnimplemented
+}
+
+// AcquireLock is deprecated
 func (c *EtcdClient) AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error {
-	session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
-	mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
-	if err := mu.Lock(context.Background()); err != nil {
-		//cancel()
-		return err
-	}
-	c.addLockName(lockName, mu, session)
-	return nil
+	return errUnimplemented
 }
 
+// ReleaseLock is deprecated
 func (c *EtcdClient) ReleaseLock(lockName string) error {
-	lock, session := c.getLock(lockName)
-	var err error
-	if lock != nil {
-		if e := lock.Unlock(context.Background()); e != nil {
-			err = e
-		}
-	}
-	if session != nil {
-		if e := session.Close(); e != nil {
-			err = e
-		}
-	}
-	c.deleteLockName(lockName)
-
-	return err
+	return errUnimplemented
 }
diff --git a/pkg/db/kvstore/etcdpool.go b/pkg/db/kvstore/etcdpool.go
new file mode 100644
index 0000000..6af7d3d
--- /dev/null
+++ b/pkg/db/kvstore/etcdpool.go
@@ -0,0 +1,239 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kvstore
+
+import (
+	"container/list"
+	"context"
+	"errors"
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	"go.etcd.io/etcd/clientv3"
+	"sync"
+	"time"
+)
+
+// EtcdClientAllocator represents a generic interface to allocate an Etcd Client
+type EtcdClientAllocator interface {
+	Get(context.Context) (*clientv3.Client, error)
+	Put(*clientv3.Client)
+	Close(ctx context.Context)
+}
+
+// NewRoundRobinEtcdClientAllocator creates a new ETCD Client Allocator using a Round Robin scheme
+func NewRoundRobinEtcdClientAllocator(endpoints []string, timeout time.Duration, capacity, maxUsage int, level log.LogLevel) (EtcdClientAllocator, error) {
+	return &roundRobin{
+		all:       make(map[*clientv3.Client]*rrEntry),
+		full:      make(map[*clientv3.Client]*rrEntry),
+		waitList:  list.New(),
+		max:       maxUsage,
+		capacity:  capacity,
+		timeout:   timeout,
+		endpoints: endpoints,
+		logLevel:  level,
+		closingCh: make(chan struct{}, capacity*maxUsage),
+		stopCh:    make(chan struct{}),
+	}, nil
+}
+
+type rrEntry struct {
+	client *clientv3.Client
+	count  int
+	age    time.Time
+}
+
+type roundRobin struct {
+	//block chan struct{}
+	sync.Mutex
+	available []*rrEntry
+	all       map[*clientv3.Client]*rrEntry
+	full      map[*clientv3.Client]*rrEntry
+	waitList  *list.List
+	max       int
+	capacity  int
+	timeout   time.Duration
+	//ageOut    time.Duration
+	endpoints []string
+	size      int
+	logLevel  log.LogLevel
+	closing   bool
+	closingCh chan struct{}
+	stopCh    chan struct{}
+}
+
+// Get returns an Etcd client. If not is available, it will create one
+// until the maximum allowed capacity.  If maximum capacity has been
+// reached then it will wait until s used one is freed.
+func (r *roundRobin) Get(ctx context.Context) (*clientv3.Client, error) {
+	r.Lock()
+
+	if r.closing {
+		r.Unlock()
+		return nil, errors.New("pool-is-closing")
+	}
+
+	// first determine if we need to block, which would mean the
+	// available queue is empty and we are at capacity
+	if len(r.available) == 0 && r.size >= r.capacity {
+
+		// create a channel on which to wait and
+		// add it to the list
+		ch := make(chan struct{})
+		element := r.waitList.PushBack(ch)
+		r.Unlock()
+
+		// block until it is our turn or context
+		// expires or is canceled
+		select {
+		case <-r.stopCh:
+			logger.Info(ctx, "stop-waiting-pool-is-closing")
+			r.waitList.Remove(element)
+			return nil, errors.New("stop-waiting-pool-is-closing")
+		case <-ch:
+			r.waitList.Remove(element)
+		case <-ctx.Done():
+			r.waitList.Remove(element)
+			return nil, ctx.Err()
+		}
+		r.Lock()
+	}
+
+	defer r.Unlock()
+	if len(r.available) > 0 {
+		// pull off back end as it is operationally quicker
+		last := len(r.available) - 1
+		entry := r.available[last]
+		entry.count++
+		if entry.count >= r.max {
+			r.available = r.available[:last]
+			r.full[entry.client] = entry
+		}
+		entry.age = time.Now()
+		return entry.client, nil
+	}
+
+	logConfig := log.ConstructZapConfig(log.JSON, r.logLevel, log.Fields{})
+	// increase capacity
+	client, err := clientv3.New(clientv3.Config{
+		Endpoints:   r.endpoints,
+		DialTimeout: r.timeout,
+		LogConfig:   &logConfig,
+	})
+	if err != nil {
+		return nil, err
+	}
+	entry := &rrEntry{
+		client: client,
+		count:  1,
+	}
+	r.all[entry.client] = entry
+
+	if r.max > 1 {
+		r.available = append(r.available, entry)
+	} else {
+		r.full[entry.client] = entry
+	}
+	r.size++
+	return client, nil
+}
+
+// Put returns the Etcd Client back to the pool
+func (r *roundRobin) Put(client *clientv3.Client) {
+	r.Lock()
+
+	entry := r.all[client]
+	entry.count--
+
+	if r.closing {
+		// Close client if count is 0
+		if entry.count == 0 {
+			if err := entry.client.Close(); err != nil {
+				logger.Warnw(context.Background(), "error-closing-client", log.Fields{"error": err})
+			}
+			delete(r.all, entry.client)
+		}
+		// Notify Close function that a client was returned to the pool
+		r.closingCh <- struct{}{}
+		r.Unlock()
+		return
+	}
+
+	// This entry is now available for use, so
+	// if in full map add it to available and
+	// remove from full
+	if _, ok := r.full[client]; ok {
+		r.available = append(r.available, entry)
+		delete(r.full, client)
+	}
+
+	front := r.waitList.Front()
+	if front != nil {
+		ch := r.waitList.Remove(front)
+		r.Unlock()
+		// need to unblock if someone is waiting
+		ch.(chan struct{}) <- struct{}{}
+		return
+	}
+	r.Unlock()
+}
+
+func (r *roundRobin) Close(ctx context.Context) {
+	r.Lock()
+	r.closing = true
+
+	// Notify anyone waiting for a client to stop waiting
+	close(r.stopCh)
+
+	// Clean-up unused clients
+	for i := 0; i < len(r.available); i++ {
+		// Count 0 means no one is using that client
+		if r.available[i].count == 0 {
+			if err := r.available[i].client.Close(); err != nil {
+				logger.Warnw(ctx, "failure-closing-client", log.Fields{"client": r.available[i].client, "error": err})
+			}
+			// Remove client for all list
+			delete(r.all, r.available[i].client)
+		}
+	}
+
+	// Figure out how many clients are in use
+	numberInUse := 0
+	for _, rrEntry := range r.all {
+		numberInUse += rrEntry.count
+	}
+	r.Unlock()
+
+	if numberInUse == 0 {
+		logger.Info(ctx, "no-connection-in-use")
+		return
+	}
+
+	logger.Infow(ctx, "waiting-for-clients-return", log.Fields{"count": numberInUse})
+
+	// Wait for notifications when a client is returned to the pool
+	for {
+		select {
+		case <-r.closingCh:
+			numberInUse--
+			if numberInUse == 0 {
+				logger.Info(ctx, "all-connections-closed")
+				return
+			}
+		case <-ctx.Done():
+			logger.Warnw(ctx, "context-done", log.Fields{"error": ctx.Err()})
+			return
+		}
+	}
+}
diff --git a/pkg/db/kvstore/etcdpool_test.go b/pkg/db/kvstore/etcdpool_test.go
new file mode 100644
index 0000000..55ea6ef
--- /dev/null
+++ b/pkg/db/kvstore/etcdpool_test.go
@@ -0,0 +1,250 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kvstore
+
+import (
+	"context"
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	mocks "github.com/opencord/voltha-lib-go/v5/pkg/mocks/etcd"
+	"github.com/phayes/freeport"
+	"github.com/stretchr/testify/assert"
+	"os"
+	"strconv"
+	"sync"
+	"testing"
+	"time"
+)
+
+const (
+	embedEtcdServerHost = "localhost"
+	defaultTimeout      = 1 * time.Second
+)
+
+var (
+	embedEtcdServerPort int
+)
+
+func TestMain(m *testing.M) {
+	ctx := context.Background()
+	var err error
+	embedEtcdServerPort, err = freeport.GetFreePort()
+	if err != nil {
+		logger.Fatal(ctx, err)
+	}
+	peerPort, err := freeport.GetFreePort()
+	if err != nil {
+		logger.Fatal(ctx, err)
+	}
+	etcdServer := mocks.StartEtcdServer(ctx, mocks.MKConfig(ctx,
+		"voltha.db.kvstore.test",
+		embedEtcdServerPort,
+		peerPort,
+		"voltha.lib.db.kvstore",
+		"error"))
+	res := m.Run()
+
+	etcdServer.Stop(ctx)
+	os.Exit(res)
+}
+
+func TestNewRoundRobinEtcdClientAllocator(t *testing.T) {
+	address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
+	capacity := 20
+	maxUsage := 10
+
+	pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
+	assert.NotNil(t, pool)
+	assert.Nil(t, err)
+}
+
+func TestRoundRobin_Get_Put(t *testing.T) {
+	address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
+	capacity := 20
+	maxUsage := 10
+
+	pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
+	assert.NotNil(t, pool)
+	assert.Nil(t, err)
+
+	// Verify we can obtain the expected number of clients with no errors or waiting time
+	var wg sync.WaitGroup
+	for i := 0; i < capacity*maxUsage; i++ {
+		wg.Add(1)
+		go func() {
+			ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+			defer cancel()
+			c, err := pool.Get(ctx)
+			assert.NotNil(t, c)
+			assert.Nil(t, err)
+			time.Sleep(5 * time.Second)
+			pool.Put(c)
+			wg.Done()
+		}()
+	}
+	wg.Wait()
+}
+
+func TestRoundRobin_Get_Put_various_capacity(t *testing.T) {
+	// Test single client with 1 concurrent usage
+	getPutVaryingCapacity(t, 1, 1)
+
+	// Test single client with multiple concurrent usage
+	getPutVaryingCapacity(t, 1, 100)
+
+	// Test multiple clients with single concurrent usage
+	getPutVaryingCapacity(t, 10, 1)
+
+	// Test multiple clients with multiple concurrent usage
+	getPutVaryingCapacity(t, 10, 10)
+}
+
+func getPutVaryingCapacity(t *testing.T, capacity, maxUsage int) {
+	address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
+
+	pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
+	assert.NotNil(t, pool)
+	assert.Nil(t, err)
+
+	// Verify we can obtain the expected number of clients with no errors or waiting time
+	var wg sync.WaitGroup
+	totalSize := capacity * maxUsage
+	ch := make(chan struct{})
+	for i := 0; i < totalSize; i++ {
+		wg.Add(1)
+		go func() {
+			ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+			defer cancel()
+			c, err := pool.Get(ctx)
+			assert.NotNil(t, c)
+			assert.Nil(t, err)
+			// Inform the waiting loop that a client has been allocated
+			ch <- struct{}{}
+			// Keep the client for 5s and then return it to the pool
+			time.Sleep(5 * time.Second)
+			pool.Put(c)
+			wg.Done()
+		}()
+	}
+
+	// Wait until all clients are allocated
+	allocated := 0
+	for range ch {
+		allocated++
+		if allocated == totalSize {
+			break
+		}
+	}
+
+	// Try to get above capacity/usage with low timeout
+	ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+	c, err := pool.Get(ctx)
+	assert.NotNil(t, err)
+	assert.Nil(t, c)
+	cancel()
+
+	// Try to get above capacity/usage with longer timeout
+	ctx, cancel = context.WithTimeout(context.Background(), 6*time.Second)
+	c, err = pool.Get(ctx)
+	assert.NotNil(t, c)
+	assert.Nil(t, err)
+	pool.Put(c)
+	cancel()
+
+	wg.Wait()
+
+	// Close the connection
+	pool.Close(context.Background())
+}
+
+func TestRoundRobin_Close_various_capacity(t *testing.T) {
+	// Test single client with 1 concurrent usage
+	closeWithVaryingCapacity(t, 1, 1)
+
+	// Test single client with multiple concurrent usage
+	closeWithVaryingCapacity(t, 1, 100)
+
+	// Test multiple clients with single concurrent usage
+	closeWithVaryingCapacity(t, 10, 1)
+
+	// Test multiple clients with multiple concurrent usage
+	closeWithVaryingCapacity(t, 10, 10)
+}
+
+func closeWithVaryingCapacity(t *testing.T, capacity, maxUsage int) {
+	address := embedEtcdServerHost + ":" + strconv.Itoa(embedEtcdServerPort)
+
+	pool, err := NewRoundRobinEtcdClientAllocator([]string{address}, defaultTimeout, capacity, maxUsage, log.ErrorLevel)
+	assert.NotNil(t, pool)
+	assert.Nil(t, err)
+
+	// Verify we can obtain the expected number of clients with no errors or waiting time
+	var wg sync.WaitGroup
+	totalSize := capacity * maxUsage
+	ch := make(chan struct{})
+	for i := 0; i < totalSize; i++ {
+		wg.Add(1)
+		go func() {
+			ctx, cancel := context.WithTimeout(context.Background(), 2*time.Second)
+			defer cancel()
+			c, err := pool.Get(ctx)
+			assert.NotNil(t, c)
+			assert.Nil(t, err)
+			// Inform the waiting loop that a client has been allocated
+			ch <- struct{}{}
+			// Keep the client for 5s and then return it to the pool
+			time.Sleep(5 * time.Second)
+			pool.Put(c)
+			wg.Done()
+		}()
+	}
+
+	// Wait until all clients are allocated
+	allocated := 0
+	for range ch {
+		allocated++
+		if allocated == totalSize {
+			break
+		}
+	}
+	// Try to get above capacity/usage with longer timeout
+	wg.Add(1)
+	go func() {
+		ctx, cancel := context.WithTimeout(context.Background(), 10*time.Second)
+		c, err := pool.Get(ctx)
+		assert.NotNil(t, err)
+		expected := err.Error() == "pool-is-closing" || err.Error() == "stop-waiting-pool-is-closing"
+		assert.True(t, expected)
+		assert.Nil(t, c)
+		cancel()
+		wg.Done()
+	}()
+
+	// Invoke close on the pool
+	wg.Add(1)
+	go func() {
+		pool.Close(context.Background())
+		wg.Done()
+	}()
+
+	// Try to get new client and ensure we can't get one
+	c, err := pool.Get(context.Background())
+	assert.NotNil(t, err)
+	expected := err.Error() == "pool-is-closing" || err.Error() == "stop-waiting-pool-is-closing"
+	assert.True(t, expected)
+	assert.Nil(t, c)
+
+	wg.Wait()
+}
diff --git a/pkg/db/kvstore/kvutils.go b/pkg/db/kvstore/kvutils.go
index 946dbf2..ca57542 100644
--- a/pkg/db/kvstore/kvutils.go
+++ b/pkg/db/kvstore/kvutils.go
@@ -16,7 +16,6 @@
 package kvstore
 
 import (
-	"bytes"
 	"context"
 	"fmt"
 	"math"
@@ -57,17 +56,6 @@
 	}
 }
 
-// Helper function to verify mostly whether the content of two interface types are the same.  Focus is []byte and
-// string types
-func isEqual(val1 interface{}, val2 interface{}) bool {
-	b1, err := ToByte(val1)
-	b2, er := ToByte(val2)
-	if err == nil && er == nil {
-		return bytes.Equal(b1, b2)
-	}
-	return val1 == val2
-}
-
 // backoff waits an amount of time that is proportional to the attempt value.  The wait time in a range of
 // minRetryInterval and maxRetryInterval.
 func backoff(ctx context.Context, attempt int) error {
diff --git a/pkg/mocks/etcd/etcd_server_test.go b/pkg/mocks/etcd/etcd_server_test.go
index ba7ed5b..a6eeb7d 100644
--- a/pkg/mocks/etcd/etcd_server_test.go
+++ b/pkg/mocks/etcd/etcd_server_test.go
@@ -67,15 +67,6 @@
 	assert.Equal(t, value, val)
 }
 
-func TestEtcdServerReserve(t *testing.T) {
-	assert.NotNil(t, client)
-	txId := "tnxId-1"
-	val, err := client.Reserve(context.Background(), "transactions", txId, 10)
-	assert.Nil(t, err)
-	assert.NotNil(t, val)
-	assert.Equal(t, val, txId)
-}
-
 func shutdown() {
 	if client != nil {
 		client.Close(context.Background())