VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 0378260..fd54a50 100755
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -150,13 +150,13 @@
 	var acquired bool
 	if id != nil {
 		var ownedByMe bool
-		if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(id); err != nil {
+		if ownedByMe, err = handler.core.deviceOwnership.OwnedByMe(ctx, id); err != nil {
 			log.Warnw("getting-ownership-failed", log.Fields{"deviceId": id, "error": err})
 			return nil, errorIDNotFound
 		}
-		acquired, err = txn.Acquired(timeout, ownedByMe)
+		acquired, err = txn.Acquired(ctx, timeout, ownedByMe)
 	} else {
-		acquired, err = txn.Acquired(timeout)
+		acquired, err = txn.Acquired(ctx, timeout)
 	}
 	if err == nil && acquired {
 		log.Debugw("transaction-acquired", log.Fields{"transactionId": txn.txnID})
@@ -252,9 +252,9 @@
 		if err != nil {
 			return &voltha.LogicalPort{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
-	return handler.logicalDeviceMgr.getLogicalPort(id)
+	return handler.logicalDeviceMgr.getLogicalPort(ctx, id)
 }
 
 // EnableLogicalDevicePort enables logical device port
@@ -269,7 +269,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -290,7 +290,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -311,7 +311,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -332,7 +332,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -344,7 +344,7 @@
 // GetDevice must be implemented in the read-only containers - should it also be implemented here?
 func (handler *APIHandler) GetDevice(ctx context.Context, id *voltha.ID) (*voltha.Device, error) {
 	log.Debugw("GetDevice-request", log.Fields{"id": id})
-	return handler.deviceMgr.GetDevice(id.Id)
+	return handler.deviceMgr.GetDevice(ctx, id.Id)
 }
 
 // GetDevice must be implemented in the read-only containers - should it also be implemented here?
@@ -352,7 +352,7 @@
 // ListDevices retrieves the latest devices from the data model
 func (handler *APIHandler) ListDevices(ctx context.Context, empty *empty.Empty) (*voltha.Devices, error) {
 	log.Debug("ListDevices")
-	devices, err := handler.deviceMgr.ListDevices()
+	devices, err := handler.deviceMgr.ListDevices(ctx)
 	if err != nil {
 		log.Errorw("Failed to list devices", log.Fields{"error": err})
 		return nil, err
@@ -392,9 +392,9 @@
 		if err != nil {
 			return &voltha.LogicalDevice{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
-	return handler.logicalDeviceMgr.getLogicalDevice(id.Id)
+	return handler.logicalDeviceMgr.getLogicalDevice(ctx, id.Id)
 }
 
 // ListLogicalDevices returns the list of all logical devices
@@ -405,14 +405,14 @@
 		if err != nil {
 			return &voltha.LogicalDevices{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 		if handler.isOFControllerRequest(ctx) {
 			//	Since an OF controller is only interested in the set of logical devices managed by thgis Core then return
 			//	only logical devices managed/monitored by this Core.
 			return handler.logicalDeviceMgr.listManagedLogicalDevices()
 		}
 	}
-	return handler.logicalDeviceMgr.listLogicalDevices()
+	return handler.logicalDeviceMgr.listLogicalDevices(ctx)
 }
 
 // ListAdapters returns the contents of all adapters known to the system
@@ -429,7 +429,7 @@
 		if err != nil {
 			return &openflow_13.Flows{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceFlows(ctx, id.Id)
 }
@@ -442,7 +442,7 @@
 		if err != nil {
 			return &openflow_13.FlowGroups{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceFlowGroups(ctx, id.Id)
 }
@@ -455,7 +455,7 @@
 		if err != nil {
 			return &voltha.LogicalPorts{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 	return handler.logicalDeviceMgr.ListLogicalDevicePorts(ctx, id.Id)
 }
@@ -477,7 +477,7 @@
 		if err != nil {
 			return &voltha.Device{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -491,7 +491,7 @@
 				return nil, err
 			}
 			if d, ok := res.(*voltha.Device); ok {
-				_, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: d.Id})
+				_, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: d.Id})
 				if err != nil {
 					log.Errorw("unable-to-find-core-instance-active-owns-this-device", log.Fields{"error": err})
 				}
@@ -519,7 +519,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -540,7 +540,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -561,7 +561,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -581,14 +581,14 @@
 		txn, err := handler.takeRequestOwnership(ctx, &utils.DeviceID{ID: id.Id})
 		if err != nil {
 			if err == errorTransactionNotAcquired {
-				if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.DeviceID{ID: id.Id}); !ownedByMe && err == nil {
+				if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.DeviceID{ID: id.Id}); !ownedByMe && err == nil {
 					// Remove the device in memory
-					handler.deviceMgr.stopManagingDevice(id.Id)
+					handler.deviceMgr.stopManagingDevice(ctx, id.Id)
 				}
 			}
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -605,10 +605,10 @@
 		if err != nil {
 			return &voltha.Ports{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
-	device, err := handler.deviceMgr.GetDevice(id.Id)
+	device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
 	if err != nil {
 		return &voltha.Ports{}, err
 	}
@@ -625,10 +625,10 @@
 		if err != nil {
 			return &openflow_13.Flows{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
-	device, err := handler.deviceMgr.GetDevice(id.Id)
+	device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
 	if err != nil {
 		return &openflow_13.Flows{}, err
 	}
@@ -641,7 +641,7 @@
 func (handler *APIHandler) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
 	log.Debugw("ListDeviceFlowGroups", log.Fields{"deviceid": id})
 
-	if device, _ := handler.deviceMgr.GetDevice(id.Id); device != nil {
+	if device, _ := handler.deviceMgr.GetDevice(ctx, id.Id); device != nil {
 		return device.GetFlowGroups(), nil
 	}
 	return &voltha.FlowGroups{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
@@ -712,7 +712,7 @@
 		if err != nil {
 			return &common.OperationResp{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
@@ -808,7 +808,7 @@
 		if err != nil {
 			return failedresponse, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -871,7 +871,7 @@
 // GetImages returns all images for a specific device entry
 func (handler *APIHandler) GetImages(ctx context.Context, id *voltha.ID) (*voltha.Images, error) {
 	log.Debugw("GetImages", log.Fields{"deviceid": id.Id})
-	device, err := handler.deviceMgr.GetDevice(id.Id)
+	device, err := handler.deviceMgr.GetDevice(ctx, id.Id)
 	if err != nil {
 		return &voltha.Images{}, err
 	}
@@ -889,7 +889,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -906,7 +906,7 @@
 		if err != nil {
 			return &voltha.PmConfigs{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 	return handler.deviceMgr.listPmConfigs(ctx, id.Id)
 }
@@ -947,14 +947,14 @@
 	return &voltha.SelfTestResponse{}, errors.New("UnImplemented")
 }
 
-func (handler *APIHandler) forwardPacketOut(packet *openflow_13.PacketOut) {
+func (handler *APIHandler) forwardPacketOut(ctx context.Context, packet *openflow_13.PacketOut) {
 	log.Debugw("forwardPacketOut-request", log.Fields{"packet": packet})
 	//TODO: Update this logic once the OF Controller (OFAgent in this case) can include a transaction Id in its
 	// request.  For performance reason we can let both Cores in a Core-Pair forward the Packet to the adapters and
 	// let once of the shim layer (kafka proxy or adapter request handler filters out the duplicate packet)
-	if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(&utils.LogicalDeviceID{ID: packet.Id}); ownedByMe && err == nil {
-		if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(packet.Id); agent != nil {
-			agent.packetOut(packet.PacketOut)
+	if ownedByMe, err := handler.core.deviceOwnership.OwnedByMe(ctx, &utils.LogicalDeviceID{ID: packet.Id}); ownedByMe && err == nil {
+		if agent := handler.logicalDeviceMgr.getLogicalDeviceAgent(ctx, packet.Id); agent != nil {
+			agent.packetOut(ctx, packet.PacketOut)
 		} else {
 			log.Errorf("No logical device agent present", log.Fields{"logicaldeviceID": packet.Id})
 		}
@@ -985,7 +985,7 @@
 			continue
 		}
 
-		handler.forwardPacketOut(packet)
+		handler.forwardPacketOut(packets.Context(), packet)
 	}
 
 	log.Debugw("StreamPacketsOut-request-done", log.Fields{"packets": packets})
@@ -1142,7 +1142,7 @@
 		if err != nil {
 			return &openflow_13.Meters{}, err // TODO: Return empty meter entry
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 	return handler.logicalDeviceMgr.ListLogicalDeviceMeters(ctx, id.Id)
 }
@@ -1176,7 +1176,7 @@
 			failedresponse := &common.OperationResp{Code: voltha.OperationResp_OPERATION_FAILURE}
 			return failedresponse, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})
@@ -1198,7 +1198,7 @@
 		if err != nil {
 			return &empty.Empty{}, err
 		}
-		defer txn.Close()
+		defer txn.Close(ctx)
 	}
 
 	ch := make(chan interface{})