VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
index 7096748..a0f39cd 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore/etcdclient.go
@@ -19,11 +19,12 @@
 	"context"
 	"errors"
 	"fmt"
+	"sync"
+
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	v3Client "go.etcd.io/etcd/clientv3"
 	v3Concurrency "go.etcd.io/etcd/clientv3/concurrency"
 	v3rpcTypes "go.etcd.io/etcd/etcdserver/api/v3rpc/rpctypes"
-	"sync"
 )
 
 // EtcdClient represents the Etcd KV store client
@@ -64,23 +65,19 @@
 
 // IsConnectionUp returns whether the connection to the Etcd KV store is up.  If a timeout occurs then
 // it is assumed the connection is down or unreachable.
-func (c *EtcdClient) IsConnectionUp(timeout int) bool {
+func (c *EtcdClient) IsConnectionUp(ctx context.Context) bool {
 	// Let's try to get a non existent key.  If the connection is up then there will be no error returned.
-	if _, err := c.Get("non-existent-key", timeout); err != nil {
+	if _, err := c.Get(ctx, "non-existent-key"); err != nil {
 		return false
 	}
+	//cancel()
 	return true
 }
 
 // List returns an array of key-value pairs with key as a prefix.  Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) List(key string, timeout int) (map[string]*KVPair, error) {
-	duration := GetDuration(timeout)
-
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-
+func (c *EtcdClient) List(ctx context.Context, key string) (map[string]*KVPair, error) {
 	resp, err := c.ectdAPI.Get(ctx, key, v3Client.WithPrefix())
-	cancel()
 	if err != nil {
 		logger.Error(err)
 		return nil, err
@@ -94,13 +91,10 @@
 
 // Get returns a key-value pair for a given key. Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) Get(key string, timeout int) (*KVPair, error) {
-	duration := GetDuration(timeout)
-
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
+func (c *EtcdClient) Get(ctx context.Context, key string) (*KVPair, error) {
 
 	resp, err := c.ectdAPI.Get(ctx, key)
-	cancel()
+
 	if err != nil {
 		logger.Error(err)
 		return nil, err
@@ -115,7 +109,7 @@
 // Put writes a key-value pair to the KV store.  Value can only be a string or []byte since the etcd API
 // accepts only a string as a value for a put operation. Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) Put(key string, value interface{}, timeout int) error {
+func (c *EtcdClient) Put(ctx context.Context, key string, value interface{}) error {
 
 	// Validate that we can convert value to a string as etcd API expects a string
 	var val string
@@ -124,10 +118,6 @@
 		return fmt.Errorf("unexpected-type-%T", value)
 	}
 
-	duration := GetDuration(timeout)
-
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
 
@@ -139,7 +129,7 @@
 	} else {
 		_, err = c.ectdAPI.Put(ctx, key, val)
 	}
-	cancel()
+
 	if err != nil {
 		switch err {
 		case context.Canceled:
@@ -158,13 +148,7 @@
 
 // Delete removes a key from the KV store. Timeout defines how long the function will
 // wait for a response
-func (c *EtcdClient) Delete(key string, timeout int) error {
-
-	duration := GetDuration(timeout)
-
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-
-	defer cancel()
+func (c *EtcdClient) Delete(ctx context.Context, key string) error {
 
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
@@ -183,7 +167,7 @@
 // defines how long that reservation is valid.  When TTL expires the key is unreserved by the KV store itself.
 // If the key is acquired then the value returned will be the value passed in.  If the key is already acquired
 // then the value assigned to that key will be returned.
-func (c *EtcdClient) Reserve(key string, value interface{}, ttl int64) (interface{}, error) {
+func (c *EtcdClient) Reserve(ctx context.Context, key string, value interface{}, ttl int64) (interface{}, error) {
 	// Validate that we can convert value to a string as etcd API expects a string
 	var val string
 	var er error
@@ -191,12 +175,6 @@
 		return nil, fmt.Errorf("unexpected-type%T", value)
 	}
 
-	duration := GetDuration(connTimeout)
-
-	// Create a lease
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-	defer cancel()
-
 	resp, err := c.ectdAPI.Grant(ctx, ttl)
 	if err != nil {
 		logger.Error(err)
@@ -211,7 +189,7 @@
 	reservationSuccessful := false
 	defer func() {
 		if !reservationSuccessful {
-			if err = c.ReleaseReservation(key); err != nil {
+			if err = c.ReleaseReservation(context.Background(), key); err != nil {
 				logger.Error("cannot-release-lease")
 			}
 		}
@@ -241,7 +219,7 @@
 		}
 	} else {
 		// Read the Key to ensure this is our Key
-		m, err := c.Get(key, defaultKVGetTimeout)
+		m, err := c.Get(ctx, key)
 		if err != nil {
 			return nil, err
 		}
@@ -260,12 +238,9 @@
 }
 
 // ReleaseAllReservations releases all key reservations previously made (using Reserve API)
-func (c *EtcdClient) ReleaseAllReservations() error {
+func (c *EtcdClient) ReleaseAllReservations(ctx context.Context) error {
 	c.writeLock.Lock()
 	defer c.writeLock.Unlock()
-	duration := GetDuration(connTimeout)
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-	defer cancel()
 
 	for key, leaseID := range c.keyReservations {
 		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -279,7 +254,7 @@
 }
 
 // ReleaseReservation releases reservation for a specific key.
-func (c *EtcdClient) ReleaseReservation(key string) error {
+func (c *EtcdClient) ReleaseReservation(ctx context.Context, key string) error {
 	// Get the leaseid using the key
 	logger.Debugw("Release-reservation", log.Fields{"key": key})
 	var ok bool
@@ -289,9 +264,6 @@
 	if leaseID, ok = c.keyReservations[key]; !ok {
 		return nil
 	}
-	duration := GetDuration(connTimeout)
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-	defer cancel()
 
 	if leaseID != nil {
 		_, err := c.ectdAPI.Revoke(ctx, *leaseID)
@@ -306,7 +278,7 @@
 
 // RenewReservation renews a reservation.  A reservation will go stale after the specified TTL (Time To Live)
 // period specified when reserving the key
-func (c *EtcdClient) RenewReservation(key string) error {
+func (c *EtcdClient) RenewReservation(ctx context.Context, key string) error {
 	// Get the leaseid using the key
 	var ok bool
 	var leaseID *v3Client.LeaseID
@@ -315,9 +287,6 @@
 	if leaseID, ok = c.keyReservations[key]; !ok {
 		return errors.New("key-not-reserved")
 	}
-	duration := GetDuration(connTimeout)
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-	defer cancel()
 
 	if leaseID != nil {
 		_, err := c.ectdAPI.KeepAliveOnce(ctx, *leaseID)
@@ -333,9 +302,9 @@
 
 // Watch provides the watch capability on a given key.  It returns a channel onto which the callee needs to
 // listen to receive Events.
-func (c *EtcdClient) Watch(key string) chan *Event {
+func (c *EtcdClient) Watch(ctx context.Context, key string) chan *Event {
 	w := v3Client.NewWatcher(c.ectdAPI)
-	ctx, cancel := context.WithCancel(context.Background())
+	ctx, cancel := context.WithCancel(ctx)
 	channel := w.Watch(ctx, key)
 
 	// Create a new channel
@@ -490,14 +459,11 @@
 	return lock, session
 }
 
-func (c *EtcdClient) AcquireLock(lockName string, timeout int) error {
-	duration := GetDuration(timeout)
-	ctx, cancel := context.WithTimeout(context.Background(), duration)
-	defer cancel()
+func (c *EtcdClient) AcquireLock(ctx context.Context, lockName string, timeout int) error {
 	session, _ := v3Concurrency.NewSession(c.ectdAPI, v3Concurrency.WithContext(ctx))
 	mu := v3Concurrency.NewMutex(session, "/devicelock_"+lockName)
 	if err := mu.Lock(context.Background()); err != nil {
-		cancel()
+		//cancel()
 		return err
 	}
 	c.addLockName(lockName, mu, session)