VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/transaction.go b/rw_core/core/transaction.go
index 93dd28f..fb51d2e 100644
--- a/rw_core/core/transaction.go
+++ b/rw_core/core/transaction.go
@@ -30,6 +30,7 @@
 package core
 
 import (
+	"context"
 	"time"
 
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -173,7 +174,7 @@
  */
 
 // Acquired aquires transaction status
-func (c *KVTransaction) Acquired(minDuration int64, ownedByMe ...bool) (bool, error) {
+func (c *KVTransaction) Acquired(ctx context.Context, minDuration int64, ownedByMe ...bool) (bool, error) {
 	var acquired bool
 	var currOwner string
 	var res int
@@ -197,15 +198,15 @@
 		// Keep the reservation longer that the minDuration (which is really the request timeout) to ensure the
 		// transaction key stays in the KV store until after the Core finalize a request timeout condition (which is
 		// a success from a request completion perspective).
-		if err := c.tryToReserveTxn(durationInSecs * 2); err == nil {
+		if err := c.tryToReserveTxn(ctx, durationInSecs*2); err == nil {
 			res = SeizedBySelf
 		} else {
 			log.Debugw("watch-other-server",
 				log.Fields{"transactionId": c.txnID, "owner": currOwner, "timeout": durationInSecs})
-			res = c.Watch(durationInSecs)
+			res = c.Watch(ctx, durationInSecs)
 		}
 	} else {
-		res = c.Watch(durationInSecs)
+		res = c.Watch(ctx, durationInSecs)
 	}
 	switch res {
 	case SeizedBySelf, AbandonedByOther:
@@ -217,11 +218,11 @@
 	return acquired, nil
 }
 
-func (c *KVTransaction) tryToReserveTxn(durationInSecs int64) error {
+func (c *KVTransaction) tryToReserveTxn(ctxt context.Context, durationInSecs int64) error {
 	var currOwner string
 	var res int
 	var err error
-	value, _ := ctx.kvClient.Reserve(c.txnKey, ctx.owner, durationInSecs)
+	value, _ := ctx.kvClient.Reserve(ctxt, c.txnKey, ctx.owner, durationInSecs)
 	if value != nil {
 		if currOwner, err = kvstore.ToString(value); err != nil { // This should never happen
 			log.Errorw("unexpected-owner-type", log.Fields{"transactionId": c.txnID, "error": err})
@@ -231,7 +232,7 @@
 			log.Debugw("acquired-transaction", log.Fields{"transactionId": c.txnID, "result": txnState[res]})
 			// Setup the monitoring channel
 			c.monitorCh = make(chan int)
-			go c.holdOnToTxnUntilProcessingCompleted(c.txnKey, ctx.owner, durationInSecs)
+			go c.holdOnToTxnUntilProcessingCompleted(ctxt, c.txnKey, ctx.owner, durationInSecs)
 			return nil
 		}
 	}
@@ -239,22 +240,21 @@
 }
 
 // Watch watches transaction
-func (c *KVTransaction) Watch(durationInSecs int64) int {
+func (c *KVTransaction) Watch(ctxt context.Context, durationInSecs int64) int {
 	var res int
-
-	events := ctx.kvClient.Watch(c.txnKey)
+	events := ctx.kvClient.Watch(ctxt, c.txnKey)
 	defer ctx.kvClient.CloseWatch(c.txnKey, events)
 
 	transactionWasAcquiredByOther := false
 
 	//Check whether the transaction was already completed by the other Core before we got here.
-	if kvp, _ := ctx.kvClient.Get(c.txnKey, ctx.kvOperationTimeout); kvp != nil {
+	if kvp, _ := ctx.kvClient.Get(ctxt, c.txnKey); kvp != nil {
 		transactionWasAcquiredByOther = true
 		if val, err := kvstore.ToString(kvp.Value); err == nil {
 			if val == TransactionComplete {
 				res = CompletedByOther
 				// Do an immediate delete of the transaction in the KV Store to free up KV Storage faster
-				err = c.Delete()
+				err = c.Delete(ctxt)
 				if err != nil {
 					log.Errorw("unable-to-delete-the-transaction", log.Fields{"error": err})
 				}
@@ -283,7 +283,7 @@
 					if val == TransactionComplete {
 						res = CompletedByOther
 						// Successful request completion has been detected. Remove the transaction key
-						err := c.Delete()
+						err := c.Delete(ctxt)
 						if err != nil {
 							log.Errorw("unable-to-delete-the-transaction", log.Fields{"error": err})
 						}
@@ -315,12 +315,12 @@
 }
 
 // Close closes transaction
-func (c *KVTransaction) Close() error {
+func (c *KVTransaction) Close(ctxt context.Context) error {
 	log.Debugw("close", log.Fields{"txn": c.txnID})
 	// Stop monitoring the key (applies only when there has been no transaction switch over)
 	if c.monitorCh != nil {
 		close(c.monitorCh)
-		err := ctx.kvClient.Put(c.txnKey, TransactionComplete, ctx.kvOperationTimeout)
+		err := ctx.kvClient.Put(ctxt, c.txnKey, TransactionComplete)
 
 		if err != nil {
 			log.Errorw("unable-to-write-a-key-value-pair-to-the-KV-store", log.Fields{"error": err})
@@ -330,15 +330,15 @@
 }
 
 // Delete deletes transaction
-func (c *KVTransaction) Delete() error {
+func (c *KVTransaction) Delete(ctxt context.Context) error {
 	log.Debugw("delete", log.Fields{"txn": c.txnID})
-	return ctx.kvClient.Delete(c.txnKey, ctx.kvOperationTimeout)
+	return ctx.kvClient.Delete(ctxt, c.txnKey)
 }
 
 // holdOnToTxnUntilProcessingCompleted renews the transaction lease until the transaction is complete.  durationInSecs
 // is used to calculate the frequency at which the Core processing the transaction renews the lease.  This function
 // exits only when the transaction is Closed, i.e completed.
-func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(key string, owner string, durationInSecs int64) {
+func (c *KVTransaction) holdOnToTxnUntilProcessingCompleted(ctxt context.Context, key string, owner string, durationInSecs int64) {
 	log.Debugw("holdOnToTxnUntilProcessingCompleted", log.Fields{"txn": c.txnID})
 	renewInterval := durationInSecs / NumTxnRenewalPerRequest
 	if renewInterval < MinTxnRenewalIntervalInSec {
@@ -351,7 +351,7 @@
 			log.Debugw("transaction-renewal-exits", log.Fields{"txn": c.txnID})
 			break forLoop
 		case <-time.After(time.Duration(renewInterval) * time.Second):
-			if err := ctx.kvClient.RenewReservation(c.txnKey); err != nil {
+			if err := ctx.kvClient.RenewReservation(ctxt, c.txnKey); err != nil {
 				// Log and continue.
 				log.Warnw("transaction-renewal-failed", log.Fields{"txnId": c.txnKey, "error": err})
 			}