VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 4053b46..913f9e4 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -170,12 +170,12 @@
 }
 
 // Load the most recent state from the KVStore for the device.
-func (agent *DeviceAgent) reconcileWithKVStore() {
+func (agent *DeviceAgent) reconcileWithKVStore(ctx context.Context) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debug("reconciling-device-agent-devicetype")
 	// TODO: context timeout
-	device, err := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceID, 1, true, "")
+	device, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
 	if err != nil {
 		log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
 		return
@@ -238,7 +238,7 @@
 	cloned.AdminState = voltha.AdminState_ENABLED
 	cloned.OperStatus = voltha.OperStatus_ACTIVATING
 
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 
@@ -258,16 +258,16 @@
 	return nil
 }
 
-func (agent *DeviceAgent) sendBulkFlowsToAdapters(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
-	if err := agent.adapterProxy.UpdateFlowsBulk(device, flows, groups, flowMetadata); err != nil {
+func (agent *DeviceAgent) sendBulkFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
+	if err := agent.adapterProxy.UpdateFlowsBulk(ctx, device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-bulk-error", log.Fields{"id": agent.deviceID, "error": err})
 		response.Error(err)
 	}
 	response.Done()
 }
 
-func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
-	if err := agent.adapterProxy.UpdateFlowsIncremental(device, flows, groups, flowMetadata); err != nil {
+func (agent *DeviceAgent) sendIncrementalFlowsToAdapters(ctx context.Context, device *voltha.Device, flows *ofp.FlowChanges, groups *ofp.FlowGroupChanges, flowMetadata *voltha.FlowMetadata, response coreutils.Response) {
+	if err := agent.adapterProxy.UpdateFlowsIncremental(ctx, device, flows, groups, flowMetadata); err != nil {
 		log.Debugw("update-flow-incremental-error", log.Fields{"id": agent.deviceID, "error": err})
 		response.Error(err)
 	}
@@ -333,7 +333,7 @@
 	return newGroups, groupsToDelete, updatedAllGroups
 }
 
-func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
 
 	if (len(newFlows) | len(newGroups)) == 0 {
@@ -377,7 +377,7 @@
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
 			return coreutils.DoneResponse(), nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
+		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
 
 	} else {
 		flowChanges := &ofp.FlowChanges{
@@ -389,13 +389,13 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
+		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the changed data
 	device.Flows = &voltha.Flows{Items: updatedAllFlows}
 	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
-	if err := agent.updateDeviceWithoutLock(device); err != nil {
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
@@ -404,8 +404,8 @@
 
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
 //adapters
-func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.addFlowsAndGroupsToAdapter(newFlows, newGroups, flowMetadata)
+func (agent *DeviceAgent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+	response, err := agent.addFlowsAndGroupsToAdapter(ctx, newFlows, newGroups, flowMetadata)
 	if err != nil {
 		return err
 	}
@@ -416,7 +416,7 @@
 	return nil
 }
 
-func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *DeviceAgent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
 
 	if (len(flowsToDel) | len(groupsToDel)) == 0 {
@@ -476,7 +476,7 @@
 			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
 			return coreutils.DoneResponse(), nil
 		}
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
+		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
 			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
@@ -487,13 +487,13 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
+		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the changed data
 	device.Flows = &voltha.Flows{Items: flowsToKeep}
 	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
-	if err := agent.updateDeviceWithoutLock(device); err != nil {
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
@@ -503,8 +503,8 @@
 
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
 //adapters
-func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.deleteFlowsAndGroupsFromAdapter(flowsToDel, groupsToDel, flowMetadata)
+func (agent *DeviceAgent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+	response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDel, groupsToDel, flowMetadata)
 	if err != nil {
 		return err
 	}
@@ -514,7 +514,7 @@
 	return nil
 }
 
-func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+func (agent *DeviceAgent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
 	log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
 
 	if (len(updatedFlows) | len(updatedGroups)) == 0 {
@@ -551,7 +551,7 @@
 
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
+		go agent.sendBulkFlowsToAdapters(ctx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil, response)
 	} else {
 		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
@@ -606,13 +606,13 @@
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
 		}
-		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, flowMetadata, response)
+		go agent.sendIncrementalFlowsToAdapters(ctx, device, flowChanges, groupChanges, flowMetadata, response)
 	}
 
 	// store the updated data
 	device.Flows = &voltha.Flows{Items: updatedFlows}
 	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
-	if err := agent.updateDeviceWithoutLock(device); err != nil {
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
 	}
 
@@ -621,8 +621,8 @@
 
 //updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
 //also sends the updates to the adapters
-func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.updateFlowsAndGroupsToAdapter(updatedFlows, updatedGroups, flowMetadata)
+func (agent *DeviceAgent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
+	response, err := agent.updateFlowsAndGroupsToAdapter(ctx, updatedFlows, updatedGroups, flowMetadata)
 	if err != nil {
 		return err
 	}
@@ -653,7 +653,7 @@
 	// Update the Admin State and operational state before sending the request out
 	cloned.AdminState = voltha.AdminState_DISABLED
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 	if err := agent.adapterProxy.DisableDevice(ctx, proto.Clone(cloned).(*voltha.Device)); err != nil {
@@ -663,7 +663,7 @@
 	return nil
 }
 
-func (agent *DeviceAgent) updateAdminState(adminState voltha.AdminState_Types) error {
+func (agent *DeviceAgent) updateAdminState(ctx context.Context, adminState voltha.AdminState_Types) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("updateAdminState", log.Fields{"id": agent.deviceID})
@@ -676,7 +676,7 @@
 	}
 	// Received an Ack (no error found above).  Now update the device in the model to the expected state
 	cloned.AdminState = adminState
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 	return nil
@@ -721,13 +721,14 @@
 	//	Set the state to deleted after we receive an Ack - this will trigger some background process to clean up
 	//	the device as well as its association with the logical device
 	cloned.AdminState = voltha.AdminState_DELETED
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 	//	If this is a child device then remove the associated peer ports on the parent device
 	if !cloned.Root {
 		go func() {
-			err := agent.deviceMgr.deletePeerPorts(cloned.ParentId, cloned.Id)
+			// since the caller does not wait for this for complete, use background context
+			err := agent.deviceMgr.deletePeerPorts(context.Background(), cloned.ParentId, cloned.Id)
 			if err != nil {
 				log.Errorw("unable-to-delete-peer-ports", log.Fields{"error": err})
 			}
@@ -736,7 +737,7 @@
 	return nil
 }
 
-func (agent *DeviceAgent) setParentID(device *voltha.Device, parentID string) error {
+func (agent *DeviceAgent) setParentID(ctx context.Context, device *voltha.Device, parentID string) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("setParentId", log.Fields{"deviceId": device.Id, "parentId": parentID})
@@ -744,7 +745,7 @@
 	cloned := agent.getDeviceWithoutLock()
 	cloned.ParentId = parentID
 	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 	return nil
@@ -758,7 +759,7 @@
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 	// Send the request to the adapter
@@ -769,7 +770,7 @@
 	return nil
 }
 
-func (agent *DeviceAgent) initPmConfigs(pmConfigs *voltha.PmConfigs) error {
+func (agent *DeviceAgent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("initPmConfigs", log.Fields{"id": pmConfigs.Id})
@@ -777,7 +778,7 @@
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 	// Store the device
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
 	if err != nil {
 		return status.Errorf(codes.Internal, "%s", agent.deviceID)
@@ -828,7 +829,7 @@
 			cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
 		}
 		cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+		if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 			return nil, err
 		}
 		// Send the request to the adapter
@@ -873,7 +874,7 @@
 	if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
 		// Set the device to Enabled
 		cloned.AdminState = voltha.AdminState_ENABLED
-		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+		if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 			return nil, err
 		}
 		// Send the request to the adapter
@@ -907,7 +908,7 @@
 	}
 	// Set the device to downloading_image
 	cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return nil, err
 	}
 
@@ -942,7 +943,7 @@
 		}
 	}
 
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return nil, err
 	}
 
@@ -967,7 +968,7 @@
 	return resp, nil
 }
 
-func (agent *DeviceAgent) updateImageDownload(img *voltha.ImageDownload) error {
+func (agent *DeviceAgent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("updateImageDownload", log.Fields{"id": agent.deviceID})
@@ -991,7 +992,7 @@
 		cloned.AdminState = voltha.AdminState_ENABLED
 	}
 
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		return err
 	}
 	return nil
@@ -1023,7 +1024,7 @@
 func (agent *DeviceAgent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
 	log.Debugw("getPorts", log.Fields{"id": agent.deviceID, "portType": portType})
 	ports := &voltha.Ports{}
-	if device, _ := agent.deviceMgr.GetDevice(agent.deviceID); device != nil {
+	if device, _ := agent.deviceMgr.GetDevice(ctx, agent.deviceID); device != nil {
 		for _, port := range device.Ports {
 			if port.Type == portType {
 				ports.Items = append(ports.Items, port)
@@ -1037,7 +1038,7 @@
 // parent device
 func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
 	log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceID})
-	device, err := agent.deviceMgr.GetDevice(agent.deviceID)
+	device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
 	if device == nil {
 		return nil, err
 	}
@@ -1053,7 +1054,7 @@
 // device
 func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceID})
-	device, err := agent.deviceMgr.GetDevice(agent.deviceID)
+	device, err := agent.deviceMgr.GetDevice(ctx, agent.deviceID)
 	if device == nil {
 		return nil, err
 	}
@@ -1065,14 +1066,14 @@
 	return portCap, nil
 }
 
-func (agent *DeviceAgent) packetOut(outPort uint32, packet *ofp.OfpPacketOut) error {
+func (agent *DeviceAgent) packetOut(ctx context.Context, outPort uint32, packet *ofp.OfpPacketOut) error {
 	// If deviceType=="" then we must have taken ownership of this device.
 	// Fixes VOL-2226 where a core would take ownership and have stale data
 	if agent.deviceType == "" {
-		agent.reconcileWithKVStore()
+		agent.reconcileWithKVStore(ctx)
 	}
 	//	Send packet to adapter
-	if err := agent.adapterProxy.packetOut(agent.deviceType, agent.deviceID, outPort, packet); err != nil {
+	if err := agent.adapterProxy.packetOut(ctx, agent.deviceType, agent.deviceID, outPort, packet); err != nil {
 		log.Debugw("packet-out-error", log.Fields{
 			"id":     agent.deviceID,
 			"error":  err,
@@ -1084,7 +1085,7 @@
 }
 
 // processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
-func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
+func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
 	//// Run this callback in its own go routine
 	go func(args ...interface{}) interface{} {
 		var previous *voltha.Device
@@ -1103,8 +1104,8 @@
 			log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
 			return nil
 		}
-		// Perform the state transition in it's own go routine
-		if err := agent.deviceMgr.processTransition(previous, current); err != nil {
+		// Perform the state transition in it's own go routine (since the caller doesn't wait for this, use a background context)
+		if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
 			log.Errorw("failed-process-transition", log.Fields{"deviceId": previous.Id,
 				"previousAdminState": previous.AdminState, "currentAdminState": current.AdminState})
 		}
@@ -1127,7 +1128,7 @@
 	cloned.Reason = device.Reason
 	return cloned, nil
 }
-func (agent *DeviceAgent) updateDeviceUsingAdapterData(device *voltha.Device) error {
+func (agent *DeviceAgent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("updateDeviceUsingAdapterData", log.Fields{"deviceId": device.Id})
@@ -1137,16 +1138,16 @@
 		return status.Errorf(codes.Internal, "%s", err.Error())
 	}
 	cloned := proto.Clone(updatedDevice).(*voltha.Device)
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
+func (agent *DeviceAgent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	cloned := proto.Clone(device).(*voltha.Device)
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
+func (agent *DeviceAgent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 
@@ -1163,10 +1164,10 @@
 	}
 	log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) enablePorts() error {
+func (agent *DeviceAgent) enablePorts(ctx context.Context) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 
@@ -1177,10 +1178,10 @@
 		port.OperStatus = voltha.OperStatus_ACTIVE
 	}
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) disablePorts() error {
+func (agent *DeviceAgent) disablePorts(ctx context.Context) error {
 	log.Debugw("disablePorts", log.Fields{"deviceid": agent.deviceID})
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
@@ -1190,10 +1191,10 @@
 		port.OperStatus = voltha.OperStatus_UNKNOWN
 	}
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) updatePortState(portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *DeviceAgent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	// Work only on latest data
@@ -1217,10 +1218,10 @@
 	}
 	log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) deleteAllPorts() error {
+func (agent *DeviceAgent) deleteAllPorts(ctx context.Context) error {
 	log.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
@@ -1240,10 +1241,10 @@
 	cloned.Ports = []*voltha.Port{}
 	log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) addPort(port *voltha.Port) error {
+func (agent *DeviceAgent) addPort(ctx context.Context, port *voltha.Port) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
@@ -1269,10 +1270,10 @@
 	}
 	cloned.Ports = append(cloned.Ports, cp)
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) addPeerPort(port *voltha.Port_PeerPort) error {
+func (agent *DeviceAgent) addPeerPort(ctx context.Context, port *voltha.Port_PeerPort) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debug("addPeerPort")
@@ -1289,10 +1290,10 @@
 		}
 	}
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *DeviceAgent) deletePeerPorts(deviceID string) error {
+func (agent *DeviceAgent) deletePeerPorts(ctx context.Context, deviceID string) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	log.Debug("deletePeerPorts")
@@ -1311,11 +1312,11 @@
 	}
 
 	// Store the device with updated peer ports
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
 // TODO: A generic device update by attribute
-func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
+func (agent *DeviceAgent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 	if value == nil {
@@ -1345,7 +1346,7 @@
 	log.Debugw("update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
 	//	Save the data
 
-	if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
 		log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
 	}
 }
@@ -1367,8 +1368,8 @@
 
 //This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
 // It is an internal helper function.
-func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(device *voltha.Device, strict bool, txid string) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
+	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
 	if err != nil {
 		return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
@@ -1383,7 +1384,7 @@
 	return nil
 }
 
-func (agent *DeviceAgent) updateDeviceReason(reason string) error {
+func (agent *DeviceAgent) updateDeviceReason(ctx context.Context, reason string) error {
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
 
@@ -1391,5 +1392,5 @@
 	cloned.Reason = reason
 	log.Debugw("updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }