VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 07db0be..dbdcc31 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -66,7 +66,7 @@
 }
 
 // This is a helper function that attempts to acquire the request by using the device ownership model
-func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
+func (rhp *AdapterRequestHandlerProxy) takeRequestOwnership(ctx context.Context, transactionID string, devID string, maxTimeout ...int64) (*KVTransaction, error) {
 	timeout := rhp.defaultRequestTimeout
 	if len(maxTimeout) > 0 {
 		timeout = maxTimeout[0]
@@ -80,13 +80,13 @@
 	var err error
 	if devID != "" {
 		var ownedByMe bool
-		if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: devID}); err != nil {
+		if ownedByMe, err = rhp.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: devID}); err != nil {
 			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": devID, "error": err})
 			return nil, kafka.ErrorTransactionInvalidId
 		}
-		acquired, err = txn.Acquired(timeout, ownedByMe)
+		acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
 	} else {
-		acquired, err = txn.Acquired(timeout)
+		acquired, err = txn.Acquired(ctx, timeout)
 	}
 	if err == nil && acquired {
 		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
@@ -135,16 +135,16 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, "")
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, "")
 		if err != nil {
 			if err.Error() == kafka.ErrorTransactionNotAcquired.Error() {
 				log.Debugw("Another core handled the request", log.Fields{"transactionId": transactionID})
 				// Update our adapters in memory
-				go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(adapter)
+				go rhp.adapterMgr.updateAdaptersAndDevicetypesInMemory(context.TODO(), adapter)
 			}
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -181,12 +181,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -194,7 +194,7 @@
 	}
 
 	// Get the device via the device manager
-	device, err := rhp.deviceMgr.GetDevice(pID.Id)
+	device, err := rhp.deviceMgr.GetDevice(context.TODO(), pID.Id)
 	if err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
 	}
@@ -230,12 +230,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, device.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, device.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	log.Debugw("DeviceUpdate got txn", log.Fields{"deviceID": device.Id, "transactionID": transactionID.Val})
@@ -243,7 +243,7 @@
 		return new(empty.Empty), nil
 	}
 	go func() {
-		err := rhp.deviceMgr.updateDeviceUsingAdapterData(device)
+		err := rhp.deviceMgr.updateDeviceUsingAdapterData(context.TODO(), device)
 		if err != nil {
 			log.Errorw("unable-to-update-device-using-adapter-data", log.Fields{"error": err})
 		}
@@ -297,18 +297,18 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Device{Id: pID.Id}, nil
 	}
-	return rhp.deviceMgr.GetChildDevice(pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
+	return rhp.deviceMgr.GetChildDevice(context.TODO(), pID.Id, serialNumber.Val, onuID.Val, parentPortNo.Val)
 }
 
 // GetChildDeviceWithProxyAddress returns details of child device with proxy address
@@ -339,18 +339,18 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, proxyAddress.DeviceId)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, proxyAddress.DeviceId)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Device{Id: proxyAddress.DeviceId}, nil
 	}
-	return rhp.deviceMgr.GetChildDeviceWithProxyAddress(proxyAddress)
+	return rhp.deviceMgr.GetChildDeviceWithProxyAddress(context.TODO(), proxyAddress)
 }
 
 // GetPorts returns the ports information of the device based on the port type.
@@ -391,12 +391,12 @@
 	}
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	return rhp.deviceMgr.getPorts(context.TODO(), deviceID.Id, voltha.Port_PortType(pt.Val))
@@ -430,19 +430,19 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return &voltha.Devices{Items: nil}, nil
 	}
 
-	return rhp.deviceMgr.getAllChildDevices(pID.Id)
+	return rhp.deviceMgr.getAllChildDevices(context.TODO(), pID.Id)
 }
 
 // ChildDeviceDetected is invoked when a child device is detected.  The following
@@ -513,18 +513,18 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, pID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
-	device, err := rhp.deviceMgr.childDeviceDetected(pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
+	device, err := rhp.deviceMgr.childDeviceDetected(context.TODO(), pID.Id, portNo.Val, dt.Val, chnlID.Val, vendorID.Val, serialNumber.Val, onuID.Val)
 	if err != nil {
 		log.Errorw("child-detection-failed", log.Fields{"parentID": pID.Id, "onuID": onuID.Val, "error": err})
 		return nil, err
@@ -573,12 +573,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -586,7 +586,7 @@
 	}
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
 	go func() {
-		err := rhp.deviceMgr.updateDeviceStatus(deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+		err := rhp.deviceMgr.updateDeviceStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
 			voltha.ConnectStatus_Types(connStatus.Val))
 		if err != nil {
 			log.Errorw("unable-to-update-device-status", log.Fields{"error": err})
@@ -636,12 +636,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -650,7 +650,7 @@
 
 	// When the enum is not set (i.e. -1), Go still convert to the Enum type with the value being -1
 	go func() {
-		err := rhp.deviceMgr.updateChildrenStatus(deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
+		err := rhp.deviceMgr.updateChildrenStatus(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val),
 			voltha.ConnectStatus_Types(connStatus.Val))
 		if err != nil {
 			log.Errorw("unable-to-update-children-status", log.Fields{"error": err})
@@ -697,12 +697,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -710,7 +710,7 @@
 	}
 
 	go func() {
-		err := rhp.deviceMgr.updatePortsState(deviceID.Id, voltha.OperStatus_Types(operStatus.Val))
+		err := rhp.deviceMgr.updatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val))
 		if err != nil {
 			log.Errorw("unable-to-update-ports-state", log.Fields{"error": err})
 		}
@@ -765,12 +765,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -778,7 +778,7 @@
 	}
 
 	go func() {
-		err := rhp.deviceMgr.updatePortState(deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
+		err := rhp.deviceMgr.updatePortState(context.TODO(), deviceID.Id, voltha.Port_PortType(portType.Val), uint32(portNo.Val),
 			voltha.OperStatus_Types(operStatus.Val))
 		if err != nil {
 			log.Errorw("unable-to-update-port-state", log.Fields{"error": err})
@@ -819,12 +819,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -832,7 +832,7 @@
 	}
 
 	go func() {
-		err := rhp.deviceMgr.deleteAllPorts(deviceID.Id)
+		err := rhp.deviceMgr.deleteAllPorts(context.TODO(), deviceID.Id)
 		if err != nil {
 			log.Errorw("unable-to-delete-ports", log.Fields{"error": err})
 		}
@@ -869,12 +869,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -882,7 +882,7 @@
 	}
 
 	go func() {
-		err := rhp.deviceMgr.childDevicesLost(parentDeviceID.Id)
+		err := rhp.deviceMgr.childDevicesLost(context.TODO(), parentDeviceID.Id)
 		if err != nil {
 			log.Errorw("unable-to-disable-child-devices", log.Fields{"error": err})
 		}
@@ -919,19 +919,19 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
 
-	if err := rhp.deviceMgr.childDevicesDetected(parentDeviceID.Id); err != nil {
+	if err := rhp.deviceMgr.childDevicesDetected(context.TODO(), parentDeviceID.Id); err != nil {
 		log.Errorw("child-devices-dection-failed", log.Fields{"parentID": parentDeviceID.Id, "error": err})
 		return nil, err
 	}
@@ -972,19 +972,19 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
 	go func() {
-		err := rhp.deviceMgr.addPort(deviceID.Id, port)
+		err := rhp.deviceMgr.addPort(context.TODO(), deviceID.Id, port)
 		if err != nil {
 			log.Errorw("unable-to-add-port", log.Fields{"error": err})
 		}
@@ -1021,12 +1021,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, pmConfigs.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, pmConfigs.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -1034,7 +1034,7 @@
 	}
 
 	go func() {
-		err := rhp.deviceMgr.initPmConfigs(pmConfigs.Id, pmConfigs)
+		err := rhp.deviceMgr.initPmConfigs(context.TODO(), pmConfigs.Id, pmConfigs)
 		if err != nil {
 			log.Errorw("unable-to-initialize-pm-configs", log.Fields{"error": err})
 		}
@@ -1085,18 +1085,18 @@
 	// TODO: If this adds too much latencies then needs to remove transaction and let OFAgent filter out
 	// duplicates.
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
 	go func() {
-		err := rhp.deviceMgr.PacketIn(deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
+		err := rhp.deviceMgr.PacketIn(context.TODO(), deviceID.Id, uint32(portNo.Val), transactionID.Val, packet.Payload)
 		if err != nil {
 			log.Errorw("unable-to-receive-packet-from-adapter", log.Fields{"error": err})
 		}
@@ -1139,19 +1139,19 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
 	go func() {
-		err := rhp.deviceMgr.updateImageDownload(deviceID.Id, img)
+		err := rhp.deviceMgr.updateImageDownload(context.TODO(), deviceID.Id, img)
 		if err != nil {
 			log.Errorw("unable-to-update-image-download", log.Fields{"error": err})
 		}
@@ -1189,12 +1189,12 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, parentDeviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, parentDeviceID.Id)
 		if err != nil {
 			log.Debugw("Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
@@ -1203,7 +1203,7 @@
 
 	// Run it in its own routine
 	go func() {
-		err := rhp.deviceMgr.reconcileChildDevices(parentDeviceID.Id)
+		err := rhp.deviceMgr.reconcileChildDevices(context.TODO(), parentDeviceID.Id)
 		if err != nil {
 			log.Errorw("unable-to-reconcile-child-devices", log.Fields{"error": err})
 		}
@@ -1246,21 +1246,21 @@
 
 	// Try to grab the transaction as this core may be competing with another Core
 	if rhp.competeForTransaction() {
-		txn, err := rhp.takeRequestOwnership(transactionID.Val, deviceID.Id)
+		txn, err := rhp.takeRequestOwnership(context.TODO(), transactionID.Val, deviceID.Id)
 		if err != nil {
 			log.Debugw("DeviceReasonUpdate: Core did not process request", log.Fields{"transactionID": transactionID, "error": err})
 			return nil, err
 		}
-		defer txn.Close()
+		defer txn.Close(context.TODO())
 	}
 
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
 
-	// Run it in its own routine
+	// Run it in its own routine (w/ background context)
 	go func() {
-		err := rhp.deviceMgr.updateDeviceReason(deviceID.Id, reason.Val)
+		err := rhp.deviceMgr.updateDeviceReason(context.TODO(), deviceID.Id, reason.Val)
 		if err != nil {
 			log.Errorw("unable-to-update-device-reason", log.Fields{"error": err})
 		}