[VOL-4194] Add an Etcd Client Pool

This commit adds an Etcd client pool with a configurable
capacity as well as maximum concurrent requests per client.  While
tests were done locally, it was not done with all components
due to dependencies that need to be merged first.   Once the
dependencies are merged then this change can be fully tested.

Change-Id: I7e8c8953bd3871056a721de68990e3d85df8b688
diff --git a/pkg/db/kvstore/etcdclient.go b/pkg/db/kvstore/etcdclient.go
index c2a38c6..96ffc2f 100644
--- a/pkg/db/kvstore/etcdclient.go
+++ b/pkg/db/kvstore/etcdclient.go
@@ -19,54 +19,75 @@
 	"context"
 	"errors"
 	"fmt"
-	"sync"
-	"time"
-
 	"github.com/opencord/voltha-lib-go/v5/pkg/log"
 	v3Client "go.etcd.io/etcd/clientv3"
-
-	v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
 	v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
+	"os"
+	"strconv"
+	"sync"
+	"time"
+)
+
+const (
+	poolCapacityEnvName = "VOLTHA_ETCD_CLIENT_POOL_CAPACITY"
+	maxUsageEnvName     = "VOLTHA_ETCD_CLIENT_MAX_USAGE"
+)
+
+const (
+	defaultMaxPoolCapacity = 1000 // Default size of an Etcd Client pool
+	defaultMaxPoolUsage    = 100  // Maximum concurrent request an Etcd Client is allowed to process
 )
 
 // EtcdClient represents the Etcd KV store client
 type EtcdClient struct {
-	ectdAPI             *v3Client.Client
-	keyReservations     map[string]*v3Client.LeaseID
-	watchedChannels     sync.Map
-	keyReservationsLock sync.RWMutex
-	lockToMutexMap      map[string]*v3Concurrency.Mutex
-	lockToSessionMap    map[string]*v3Concurrency.Session
-	lockToMutexLock     sync.Mutex
+	pool               EtcdClientAllocator
+	watchedChannels    sync.Map
+	watchedClients     map[string]*v3Client.Client
+	watchedClientsLock sync.RWMutex
 }
 
 // NewEtcdCustomClient returns a new client for the Etcd KV store allowing
 // the called to specify etcd client configuration
-func NewEtcdCustomClient(ctx context.Context, config *v3Client.Config) (*EtcdClient, error) {
-	c, err := v3Client.New(*config)
-	if err != nil {
-		logger.Error(ctx, err)
-		return nil, err
+func NewEtcdCustomClient(ctx context.Context, addr string, timeout time.Duration, level log.LogLevel) (*EtcdClient, error) {
+	// Get the capacity and max usage from the environment
+	capacity := defaultMaxPoolCapacity
+	maxUsage := defaultMaxPoolUsage
+	if capacityStr, present := os.LookupEnv(poolCapacityEnvName); present {
+		if val, err := strconv.Atoi(capacityStr); err == nil {
+			capacity = val
+			logger.Infow(ctx, "env-variable-set", log.Fields{"pool-capacity": capacity})
+		} else {
+			logger.Warnw(ctx, "invalid-capacity-value", log.Fields{"error": err, "capacity": capacityStr})
+		}
+	}
+	if maxUsageStr, present := os.LookupEnv(maxUsageEnvName); present {
+		if val, err := strconv.Atoi(maxUsageStr); err == nil {
+			maxUsage = val
+			logger.Infow(ctx, "env-variable-set", log.Fields{"max-usage": maxUsage})
+		} else {
+			logger.Warnw(ctx, "invalid-max-usage-value", log.Fields{"error": err, "max-usage": maxUsageStr})
+		}
 	}
 
-	reservations := make(map[string]*v3Client.LeaseID)
-	lockMutexMap := make(map[string]*v3Concurrency.Mutex)
-	lockSessionMap := make(map[string]*v3Concurrency.Session)
+	var err error
 
-	return &EtcdClient{ectdAPI: c, keyReservations: reservations, lockToMutexMap: lockMutexMap,
-		lockToSessionMap: lockSessionMap}, nil
+	pool, err := NewRoundRobinEtcdClientAllocator([]string{addr}, timeout, capacity, maxUsage, level)
+	if err != nil {
+		logger.Errorw(ctx, "failed-to-create-rr-client", log.Fields{
+			"error": err,
+		})
+	}
+
+	logger.Infow(ctx, "etcd-pool-created", log.Fields{"capacity": capacity, "max-usage": maxUsage})
+
+	return &EtcdClient{pool: pool,
+		watchedClients: make(map[string]*v3Client.Client),
+	}, nil
 }
 
 // NewEtcdClient returns a new client for the Etcd KV store
 func NewEtcdClient(ctx context.Context, addr string, timeout time.Duration, level log.LogLevel) (*EtcdClient, error) {
-	logconfig := log.ConstructZapConfig(log.JSON, level, log.Fields{})
-
-	return NewEtcdCustomClient(
-		ctx,
-		&v3Client.Config{
-			Endpoints:   []string{addr},
-			DialTimeout: timeout,
-			LogConfig:   &logconfig})
+	return NewEtcdCustomClient(ctx, addr, timeout, level)
 }
 
 // IsConnectionUp returns whether the connection to the Etcd KV store is up.  If a timeout occurs then
@@ -76,14 +97,19 @@
 	if _, err := c.Get(ctx, "non-existent-key"); err != nil {
 		return false
 	}
-	//cancel()
 	return true
 }
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
-	resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	defer c.pool.Put(client)
+	resp, err := client.Get(ctx, key, v3Client.WithPrefix())
+
 	if err != nil {
 		logger.Error(ctx, err)
 		return nil, err
@@ -98,11 +124,17 @@
 // Get returns a key-value pair for a given key. Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return nil, err
+	}
+	defer c.pool.Put(client)
 
 	attempt := 0
+
 startLoop:
 	for {
-		resp, err := c.ectdAPI.Get(ctx, key)
+		resp, err := client.Get(ctx, key)
 		if err != nil {
 			switch err {
 			case context.Canceled:
@@ -145,25 +177,21 @@
 
 	// Validate that we can convert value to a string as etcd API expects a string
 	var val string
-	var er error
-	if val, er = ToString(value); er != nil {
+	var err error
+	if val, err = ToString(value); err != nil {
 		return fmt.Errorf("unexpected-type-%T", value)
 	}
 
-	// Check if there is already a lease for this key - if there is then use it, otherwise a PUT will make
-	// that KV key permanent instead of automatically removing it after a lease expiration
-	c.keyReservationsLock.RLock()
-	leaseID, ok := c.keyReservations[key]
-	c.keyReservationsLock.RUnlock()
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return err
+	}
+	defer c.pool.Put(client)
+
 	attempt := 0
 startLoop:
 	for {
-		var err error
-		if ok {
-			_, err = c.ectdAPI.Put(ctx, key, val, v3Client.WithLease(*leaseID))
-		} else {
-			_, err = c.ectdAPI.Put(ctx, key, val)
-		}
+		_, err = client.Put(ctx, key, val)
 		if err != nil {
 			switch err {
 			case context.Canceled:
@@ -197,11 +225,16 @@
 // Delete removes a key from the KV store. Timeout defines how long the function will
 // wait for a response
 func (c *EtcdClient) Delete(ctx context.Context, key string) error {
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return err
+	}
+	defer c.pool.Put(client)
 
 	attempt := 0
 startLoop:
 	for {
-		_, err := c.ectdAPI.Delete(ctx, key)
+		_, err = client.Delete(ctx, key)
 		if err != nil {
 			switch err {
 			case context.Canceled:
@@ -235,8 +268,14 @@
 
 func (c *EtcdClient) DeleteWithPrefix(ctx context.Context, prefixKey string) error {
 
+	client, err := c.pool.Get(ctx)
+	if err != nil {
+		return err
+	}
+	defer c.pool.Put(client)
+
 	//delete the prefix
-	if _, err := c.ectdAPI.Delete(ctx, prefixKey, v3Client.WithPrefix()); err != nil {
+	if _, err := client.Delete(ctx, prefixKey, v3Client.WithPrefix()); err != nil {
 		logger.Errorw(ctx, "failed-to-delete-prefix-key", log.Fields{"key": prefixKey, "error": err})
 		return err
 	}
@@ -244,150 +283,25 @@
 	return nil
 }
 
-// Reserve is invoked to acquire a key and set it to a given value. Value can only be a string or []byte since
-// the etcd API accepts only a string.  Timeout defines how long the function will wait for a response.  TTL
-// defines how long that reservation is valid.  When TTL expires the key is unreserved by the KV store itself.
-// If the key is acquired then the value returned will be the value passed in.  If the key is already acquired
-// then the value assigned to that key will be returned.
-func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
-	// Validate that we can convert value to a string as etcd API expects a string
-	var val string
-	var er error
-	if val, er = ToString(value); er != nil {
-		return nil, fmt.Errorf("unexpected-type%T", value)
-	}
-
-	resp, err := c.ectdAPI.Grant(ctx, int64(ttl.Seconds()))
-	if err != nil {
-		logger.Error(ctx, err)
-		return nil, err
-	}
-	// Register the lease id
-	c.keyReservationsLock.Lock()
-	c.keyReservations[key] = &resp.ID
-	c.keyReservationsLock.Unlock()
-
-	// Revoke lease if reservation is not successful
-	reservationSuccessful := false
-	defer func() {
-		if !reservationSuccessful {
-			if err = c.ReleaseReservation(context.Background(), key); err != nil {
-				logger.Error(ctx, "cannot-release-lease")
-			}
-		}
-	}()
-
-	// Try to grap the Key with the above lease
-	c.ectdAPI.Txn(context.Background())
-	txn := c.ectdAPI.Txn(context.Background())
-	txn = txn.If(v3Client.Compare(v3Client.Version(key), "=", 0))
-	txn = txn.Then(v3Client.OpPut(key, val, v3Client.WithLease(resp.ID)))
-	txn = txn.Else(v3Client.OpGet(key))
-	result, er := txn.Commit()
-	if er != nil {
-		return nil, er
-	}
-
-	if !result.Succeeded {
-		// Verify whether we are already the owner of that Key
-		if len(result.Responses) > 0 &&
-			len(result.Responses[0].GetResponseRange().Kvs) > 0 {
-			kv := result.Responses[0].GetResponseRange().Kvs[0]
-			if string(kv.Value) == val {
-				reservationSuccessful = true
-				return value, nil
-			}
-			return kv.Value, nil
-		}
-	} else {
-		// Read the Key to ensure this is our Key
-		m, err := c.Get(ctx, key)
-		if err != nil {
-			return nil, err
-		}
-		if m != nil {
-			if m.Key == key && isEqual(m.Value, value) {
-				// My reservation is successful - register it.  For now, support is only for 1 reservation per key
-				// per session.
-				reservationSuccessful = true
-				return value, nil
-			}
-			// My reservation has failed.  Return the owner of that key
-			return m.Value, nil
-		}
-	}
-	return nil, nil
-}
-
-// ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
-	c.keyReservationsLock.Lock()
-	defer c.keyReservationsLock.Unlock()
-
-	for key, leaseID := range c.keyReservations {
-		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
-		if err != nil {
-			logger.Errorw(ctx, "cannot-release-reservation", log.Fields{"key": key, "error": err})
-			return err
-		}
-		delete(c.keyReservations, key)
-	}
-	return nil
-}
-
-// ReleaseReservation releases reservation for a specific key.
-func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
-	// Get the leaseid using the key
-	logger.Debugw(ctx, "Release-reservation", log.Fields{"key": key})
-	var ok bool
-	var leaseID *v3Client.LeaseID
-	c.keyReservationsLock.Lock()
-	defer c.keyReservationsLock.Unlock()
-	if leaseID, ok = c.keyReservations[key]; !ok {
-		return nil
-	}
-
-	if leaseID != nil {
-		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
-		if err != nil {
-			logger.Error(ctx, err)
-			return err
-		}
-		delete(c.keyReservations, key)
-	}
-	return nil
-}
-
-// RenewReservation renews a reservation.  A reservation will go stale after the specified TTL (Time To Live)
-// period specified when reserving the key
-func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
-	// Get the leaseid using the key
-	var ok bool
-	var leaseID *v3Client.LeaseID
-	c.keyReservationsLock.RLock()
-	leaseID, ok = c.keyReservations[key]
-	c.keyReservationsLock.RUnlock()
-
-	if !ok {
-		return errors.New("key-not-reserved")
-	}
-
-	if leaseID != nil {
-		_, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
-		if err != nil {
-			logger.Errorw(ctx, "lease-may-have-expired", log.Fields{"error": err})
-			return err
-		}
-	} else {
-		return errors.New("lease-expired")
-	}
-	return nil
-}
-
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
 func (c *EtcdClient) Watch(ctx context.Context, key string, withPrefix bool) chan *Event {
-	w := v3Client.NewWatcher(c.ectdAPI)
+	var err error
+	// Reuse the Etcd client when multiple callees are watching the same key.
+	c.watchedClientsLock.Lock()
+	client, exist := c.watchedClients[key]
+	if !exist {
+		client, err = c.pool.Get(ctx)
+		if err != nil {
+			logger.Errorw(ctx, "failed-to-an-etcd-client", log.Fields{"key": key, "error": err})
+			c.watchedClientsLock.Unlock()
+			return nil
+		}
+		c.watchedClients[key] = client
+	}
+	c.watchedClientsLock.Unlock()
+
+	w := v3Client.NewWatcher(client)
 	ctx, cancel := context.WithCancel(ctx)
 	var channel v3Client.WatchChan
 	if withPrefix {
@@ -402,7 +316,6 @@
 	// Keep track of the created channels so they can be closed when required
 	channelMap := make(map[chan *Event]v3Client.Watcher)
 	channelMap[ch] = w
-
 	channelMaps := c.addChannelMap(key, channelMap)
 
 	// Changing the log field (from channelMaps) as the underlying logger cannot format the map of channels into a
@@ -484,6 +397,17 @@
 	if pos >= 0 {
 		channelMaps = c.removeChannelMap(key, pos)
 	}
+
+	// If we don't have any keys being watched then return the Etcd client to the pool
+	if len(channelMaps) == 0 {
+		c.watchedClientsLock.Lock()
+		// Sanity
+		if client, ok := c.watchedClients[key]; ok {
+			c.pool.Put(client)
+			delete(c.watchedClients, key)
+		}
+		c.watchedClientsLock.Unlock()
+	}
 	logger.Infow(ctx, "watcher-channel-exiting", log.Fields{"key": key, "channel": channelMaps})
 }
 
@@ -509,66 +433,41 @@
 	return UNKNOWN
 }
 
-// Close closes the KV store client
+// Close closes all the connection in the pool store client
 func (c *EtcdClient) Close(ctx context.Context) {
-	if err := c.ectdAPI.Close(); err != nil {
-		logger.Errorw(ctx, "error-closing-client", log.Fields{"error": err})
-	}
+	logger.Debug(ctx, "closing-etcd-pool")
+	c.pool.Close(ctx)
 }
 
-func (c *EtcdClient) addLockName(lockName string, lock *v3Concurrency.Mutex, session *v3Concurrency.Session) {
-	c.lockToMutexLock.Lock()
-	defer c.lockToMutexLock.Unlock()
-	c.lockToMutexMap[lockName] = lock
-	c.lockToSessionMap[lockName] = session
+// The APIs below are not used
+var errUnimplemented = errors.New("deprecated")
+
+// Reserve is deprecated
+func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl time.Duration) (interface{}, error) {
+	return nil, errUnimplemented
 }
 
-func (c *EtcdClient) deleteLockName(lockName string) {
-	c.lockToMutexLock.Lock()
-	defer c.lockToMutexLock.Unlock()
-	delete(c.lockToMutexMap, lockName)
-	delete(c.lockToSessionMap, lockName)
+// ReleaseAllReservations is deprecated
+func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
+	return errUnimplemented
 }
 
-func (c *EtcdClient) getLock(lockName string) (*v3Concurrency.Mutex, *v3Concurrency.Session) {
-	c.lockToMutexLock.Lock()
-	defer c.lockToMutexLock.Unlock()
-	var lock *v3Concurrency.Mutex
-	var session *v3Concurrency.Session
-	if l, exist := c.lockToMutexMap[lockName]; exist {
-		lock = l
-	}
-	if s, exist := c.lockToSessionMap[lockName]; exist {
-		session = s
-	}
-	return lock, session
+// ReleaseReservation is deprecated
+func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
+	return errUnimplemented
 }
 
+// RenewReservation is deprecated
+func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
+	return errUnimplemented
+}
+
+// AcquireLock is deprecated
 func (c *EtcdClient) AcquireLock(ctx context.Context, lockName string, timeout time.Duration) error {
-	session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
-	mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
-	if err := mu.Lock(context.Background()); err != nil {
-		//cancel()
-		return err
-	}
-	c.addLockName(lockName, mu, session)
-	return nil
+	return errUnimplemented
 }
 
+// ReleaseLock is deprecated
 func (c *EtcdClient) ReleaseLock(lockName string) error {
-	lock, session := c.getLock(lockName)
-	var err error
-	if lock != nil {
-		if e := lock.Unlock(context.Background()); e != nil {
-			err = e
-		}
-	}
-	if session != nil {
-		if e := session.Close(); e != nil {
-			err = e
-		}
-	}
-	c.deleteLockName(lockName)
-
-	return err
+	return errUnimplemented
 }