VOL-2180 context changes in voltha-go

Passed context up as far as possible.
Where context reached the gRPC api, the context is passed through directly.
Where context reached the kafka api, context.TODO() was used (as this NBI does not support context or request cancelation)
Anywhere a new thread is started, and the creating thread makes no attempt to wait, context.Background() was used.
Anywhere a new thread is started, and the creating thread waits for completion, the ctx is passed through from the creating thread.
Cancelation of gRPC NBI requests should recursively cancel all the way through to the KV.

Change-Id: I7a65b49ae4e8c1d5263c27d2627e0ffe4d1eb71b
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 3fd8740..80dcfd7 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -142,7 +142,7 @@
 		//	load from dB - the logical may not exist at this time.  On error, just return and the calling function
 		// will destroy this agent.
 		agent.lockLogicalDevice.Lock()
-		logicalDevice, err := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+		logicalDevice, err := agent.clusterDataProxy.Get(ctx, "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
 		if err != nil {
 			return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
 		}
@@ -207,7 +207,7 @@
 
 	// Setup the device graph - run it in its own routine
 	if loadFromdB {
-		go agent.generateDeviceGraph()
+		go agent.generateDeviceGraph(context.Background())
 	}
 	return nil
 }
@@ -283,13 +283,13 @@
 }
 
 //updateLogicalDeviceFlowsWithoutLock updates the logical device with the latest flows in the model.
-func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(ctx context.Context, flows *ofp.Flows) error {
 	ld := agent.getLogicalDeviceWithoutLock()
 
 	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
 	ld.Flows = flows
 
-	if err := agent.updateLogicalDeviceWithoutLock(ld); err != nil {
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
 		log.Errorw("error-updating-logical-device-with-flows", log.Fields{"error": err})
 		return err
 	}
@@ -297,13 +297,13 @@
 }
 
 //updateLogicalDeviceMetersWithoutLock updates the logical device with the meters info
-func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(meters *ofp.Meters) error {
+func (agent *LogicalDeviceAgent) updateLogicalDeviceMetersWithoutLock(ctx context.Context, meters *ofp.Meters) error {
 	ld := agent.getLogicalDeviceWithoutLock()
 
 	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
 	ld.Meters = meters
 
-	if err := agent.updateLogicalDeviceWithoutLock(ld); err != nil {
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
 		log.Errorw("error-updating-logical-device-with-meters", log.Fields{"error": err})
 		return err
 	}
@@ -311,13 +311,13 @@
 }
 
 //updateLogicalDeviceFlowGroupsWithoutLock updates the logical device with the flow groups
-func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(ctx context.Context, flowGroups *ofp.FlowGroups) error {
 	ld := agent.getLogicalDeviceWithoutLock()
 
 	log.Debugw("logical-device-before", log.Fields{"lports": len(ld.Ports)})
 	ld.FlowGroups = flowGroups
 
-	if err := agent.updateLogicalDeviceWithoutLock(ld); err != nil {
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, ld); err != nil {
 		log.Errorw("error-updating-logical-device-with-flowgroups", log.Fields{"error": err})
 		return err
 	}
@@ -330,22 +330,22 @@
 	return proto.Clone(agent.logicalDevice).(*voltha.LogicalDevice)
 }
 
-func (agent *LogicalDeviceAgent) updateLogicalPort(device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalDeviceAgent) updateLogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
 	log.Debugw("updateLogicalPort", log.Fields{"deviceId": device.Id, "port": port})
 	var err error
 	if port.Type == voltha.Port_ETHERNET_NNI {
-		if _, err = agent.addNNILogicalPort(device, port); err != nil {
+		if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
 			return err
 		}
 		agent.addLogicalPortToMap(port.PortNo, true)
 	} else if port.Type == voltha.Port_ETHERNET_UNI {
-		if _, err = agent.addUNILogicalPort(device, port); err != nil {
+		if _, err = agent.addUNILogicalPort(ctx, device, port); err != nil {
 			return err
 		}
 		agent.addLogicalPortToMap(port.PortNo, false)
 	} else {
 		// Update the device graph to ensure all routes on the logical device have been calculated
-		if err = agent.updateRoutes(device, port); err != nil {
+		if err = agent.updateRoutes(ctx, device, port); err != nil {
 			log.Errorw("failed-to-update-routes", log.Fields{"deviceId": device.Id, "port": port, "error": err})
 			return err
 		}
@@ -359,13 +359,13 @@
 func (agent *LogicalDeviceAgent) setupLogicalPorts(ctx context.Context) error {
 	log.Infow("setupLogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	// First add any NNI ports which could have been missing
-	if err := agent.setupNNILogicalPorts(context.TODO(), agent.rootDeviceID); err != nil {
+	if err := agent.setupNNILogicalPorts(ctx, agent.rootDeviceID); err != nil {
 		log.Errorw("error-setting-up-NNI-ports", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
 		return err
 	}
 
 	// Now, set up the UNI ports if needed.
-	children, err := agent.deviceMgr.getAllChildDevices(agent.rootDeviceID)
+	children, err := agent.deviceMgr.getAllChildDevices(ctx, agent.rootDeviceID)
 	if err != nil {
 		log.Errorw("error-getting-child-devices", log.Fields{"error": err, "deviceId": agent.rootDeviceID})
 		return err
@@ -375,7 +375,7 @@
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(child *voltha.Device) {
-			if err = agent.setupUNILogicalPorts(context.TODO(), child); err != nil {
+			if err = agent.setupUNILogicalPorts(ctx, child); err != nil {
 				log.Error("setting-up-UNI-ports-failed", log.Fields{"deviceID": child.Id})
 				response.Error(status.Errorf(codes.Internal, "UNI-ports-setup-failed: %s", child.Id))
 			}
@@ -396,7 +396,7 @@
 	var err error
 
 	var device *voltha.Device
-	if device, err = agent.deviceMgr.GetDevice(deviceID); err != nil {
+	if device, err = agent.deviceMgr.GetDevice(ctx, deviceID); err != nil {
 		log.Errorw("error-retrieving-device", log.Fields{"error": err, "deviceId": deviceID})
 		return err
 	}
@@ -404,7 +404,7 @@
 	//Get UNI port number
 	for _, port := range device.Ports {
 		if port.Type == voltha.Port_ETHERNET_NNI {
-			if _, err = agent.addNNILogicalPort(device, port); err != nil {
+			if _, err = agent.addNNILogicalPort(ctx, device, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
 			agent.addLogicalPortToMap(port.PortNo, true)
@@ -414,7 +414,7 @@
 }
 
 // updatePortState updates the port state of the device
-func (agent *LogicalDeviceAgent) updatePortState(deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
+func (agent *LogicalDeviceAgent) updatePortState(ctx context.Context, deviceID string, portNo uint32, operStatus voltha.OperStatus_Types) error {
 	log.Infow("updatePortState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "portNo": portNo, "state": operStatus})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -430,7 +430,7 @@
 				cloned.Ports[idx].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
 			}
 			// Update the logical device
-			if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+			if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
 				log.Errorw("error-updating-logical-device", log.Fields{"error": err})
 				return err
 			}
@@ -441,7 +441,7 @@
 }
 
 // updatePortsState updates the ports state related to the device
-func (agent *LogicalDeviceAgent) updatePortsState(device *voltha.Device, state voltha.AdminState_Types) error {
+func (agent *LogicalDeviceAgent) updatePortsState(ctx context.Context, device *voltha.Device, state voltha.AdminState_Types) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -462,7 +462,7 @@
 		}
 	}
 	// Updating the logical device will trigger the poprt change events to be populated to the controller
-	if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
 		log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
 		return err
 	}
@@ -478,7 +478,7 @@
 	//Get UNI port number
 	for _, port := range childDevice.Ports {
 		if port.Type == voltha.Port_ETHERNET_UNI {
-			if added, err = agent.addUNILogicalPort(childDevice, port); err != nil {
+			if added, err = agent.addUNILogicalPort(ctx, childDevice, port); err != nil {
 				log.Errorw("error-adding-UNI-port", log.Fields{"error": err})
 			}
 			if added {
@@ -490,7 +490,7 @@
 }
 
 // deleteAllLogicalPorts deletes all logical ports associated with this device
-func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(device *voltha.Device) error {
+func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -507,7 +507,7 @@
 	if len(updateLogicalPorts) < len(cloned.Ports) {
 		cloned.Ports = updateLogicalPorts
 		// Updating the logical device will trigger the poprt change events to be populated to the controller
-		if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+		if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
 			log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
@@ -518,8 +518,8 @@
 }
 
 //updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
-func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(ctx context.Context, logicalDevice *voltha.LogicalDevice) error {
+	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
 	if err != nil {
 		log.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
@@ -534,7 +534,7 @@
 
 //generateDeviceGraphIfNeeded generates the device graph if the logical device has been updated since the last time
 //that device graph was generated.
-func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded() error {
+func (agent *LogicalDeviceAgent) generateDeviceGraphIfNeeded(ctx context.Context) error {
 	ld := agent.GetLogicalDevice()
 	agent.lockDeviceGraph.Lock()
 	defer agent.lockDeviceGraph.Unlock()
@@ -542,7 +542,7 @@
 		return nil
 	}
 	log.Debug("Generation of device graph required")
-	agent.generateDeviceGraph()
+	agent.generateDeviceGraph(ctx)
 	return nil
 }
 
@@ -552,16 +552,16 @@
 	if flow == nil {
 		return nil
 	}
-	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+	if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
 		return err
 	}
 	switch flow.GetCommand() {
 	case ofp.OfpFlowModCommand_OFPFC_ADD:
-		return agent.flowAdd(flow)
+		return agent.flowAdd(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_DELETE:
-		return agent.flowDelete(flow)
+		return agent.flowDelete(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT:
-		return agent.flowDeleteStrict(flow)
+		return agent.flowDeleteStrict(ctx, flow)
 	case ofp.OfpFlowModCommand_OFPFC_MODIFY:
 		return agent.flowModify(flow)
 	case ofp.OfpFlowModCommand_OFPFC_MODIFY_STRICT:
@@ -577,16 +577,16 @@
 	if groupMod == nil {
 		return nil
 	}
-	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+	if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
 		return err
 	}
 	switch groupMod.GetCommand() {
 	case ofp.OfpGroupModCommand_OFPGC_ADD:
-		return agent.groupAdd(groupMod)
+		return agent.groupAdd(ctx, groupMod)
 	case ofp.OfpGroupModCommand_OFPGC_DELETE:
-		return agent.groupDelete(groupMod)
+		return agent.groupDelete(ctx, groupMod)
 	case ofp.OfpGroupModCommand_OFPGC_MODIFY:
-		return agent.groupModify(groupMod)
+		return agent.groupModify(ctx, groupMod)
 	}
 	return status.Errorf(codes.Internal,
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, groupMod.GetCommand())
@@ -598,23 +598,23 @@
 	if meterMod == nil {
 		return nil
 	}
-	if err := agent.generateDeviceGraphIfNeeded(); err != nil {
+	if err := agent.generateDeviceGraphIfNeeded(ctx); err != nil {
 		return err
 	}
 	switch meterMod.GetCommand() {
 	case ofp.OfpMeterModCommand_OFPMC_ADD:
-		return agent.meterAdd(meterMod)
+		return agent.meterAdd(ctx, meterMod)
 	case ofp.OfpMeterModCommand_OFPMC_DELETE:
-		return agent.meterDelete(meterMod)
+		return agent.meterDelete(ctx, meterMod)
 	case ofp.OfpMeterModCommand_OFPMC_MODIFY:
-		return agent.meterModify(meterMod)
+		return agent.meterModify(ctx, meterMod)
 	}
 	return status.Errorf(codes.Internal,
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceID, meterMod.GetCommand())
 
 }
 
-func (agent *LogicalDeviceAgent) meterAdd(meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalDeviceAgent) meterAdd(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
 	log.Debugw("meterAdd", log.Fields{"metermod": *meterMod})
 	if meterMod == nil {
 		return nil
@@ -641,7 +641,7 @@
 	meterEntry := fu.MeterEntryFromMeterMod(meterMod)
 	meters = append(meters, meterEntry)
 	//Update model
-	if err := agent.updateLogicalDeviceMetersWithoutLock(&ofp.Meters{Items: meters}); err != nil {
+	if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, &ofp.Meters{Items: meters}); err != nil {
 		log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 		return err
 	}
@@ -649,7 +649,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) meterDelete(meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalDeviceAgent) meterDelete(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
 	log.Debug("meterDelete", log.Fields{"meterMod": *meterMod})
 	if meterMod == nil {
 		return nil
@@ -685,7 +685,7 @@
 		if lDevice.Meters != nil {
 			metersToUpdate = &ofp.Meters{Items: meters}
 		}
-		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+		if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
 			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -694,7 +694,7 @@
 	}
 	if changedFow {
 		//Update model
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: updatedFlows}); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: updatedFlows}); err != nil {
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -705,7 +705,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) meterModify(meterMod *ofp.OfpMeterMod) error {
+func (agent *LogicalDeviceAgent) meterModify(ctx context.Context, meterMod *ofp.OfpMeterMod) error {
 	log.Debug("meterModify")
 	if meterMod == nil {
 		return nil
@@ -736,7 +736,7 @@
 		if lDevice.Meters != nil {
 			metersToUpdate = &ofp.Meters{Items: meters}
 		}
-		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+		if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
 			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -794,7 +794,7 @@
 }
 
 //flowAdd adds a flow to the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowAdd(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalDeviceAgent) flowAdd(ctx context.Context, mod *ofp.OfpFlowMod) error {
 	log.Debugw("flowAdd", log.Fields{"flow": mod})
 	if mod == nil {
 		return nil
@@ -859,16 +859,16 @@
 			log.Error("Meter-referred-in-flows-not-present")
 			return err
 		}
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+		deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.addDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
+		if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
 
 		//	Update model
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -880,7 +880,7 @@
 			}
 			if changedMeterStats {
 				//Update model
-				if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+				if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
 					log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 					return err
 				}
@@ -923,7 +923,7 @@
 }
 
 //flowDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalDeviceAgent) flowDelete(ctx context.Context, mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDelete")
 	if mod == nil {
 		return nil
@@ -975,15 +975,15 @@
 			log.Error("Meter-referred-in-flows-not-present")
 			return errors.New("Meter-referred-in-flows-not-present")
 		}
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
+		deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{Items: flowGroups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
+		if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
 
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -993,7 +993,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID, "deviceRules": deviceRules, "flowMetadata": flowMetadata})
 
 	responses := make([]coreutils.Response, 0)
@@ -1001,7 +1001,7 @@
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
-			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+			if err := agent.deviceMgr.addFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId))
 			}
@@ -1015,7 +1015,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
@@ -1023,7 +1023,7 @@
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
-			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId))
 			}
@@ -1037,7 +1037,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(ctx context.Context, deviceRules *fu.DeviceRules, flowMetadata *voltha.FlowMetadata) error {
 	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceID})
 
 	responses := make([]coreutils.Response, 0)
@@ -1045,7 +1045,7 @@
 		response := coreutils.NewResponse()
 		responses = append(responses, response)
 		go func(deviceId string, value *fu.FlowsAndGroups) {
-			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
+			if err := agent.deviceMgr.updateFlowsAndGroups(ctx, deviceId, value.ListFlows(), value.ListGroups(), flowMetadata); err != nil {
 				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
 				response.Error(status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId))
 			}
@@ -1060,7 +1060,7 @@
 }
 
 //flowDeleteStrict deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowDeleteStrict(mod *ofp.OfpFlowMod) error {
+func (agent *LogicalDeviceAgent) flowDeleteStrict(ctx context.Context, mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDeleteStrict")
 	if mod == nil {
 		return nil
@@ -1102,7 +1102,7 @@
 		if lDevice.Meters != nil {
 			metersToUpdate = &ofp.Meters{Items: meters}
 		}
-		if err := agent.updateLogicalDeviceMetersWithoutLock(metersToUpdate); err != nil {
+		if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
 			log.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -1114,15 +1114,15 @@
 			log.Error("meter-referred-in-flows-not-present")
 			return err
 		}
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
+		deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{Items: flowGroups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.deleteDeviceFlowsAndGroups(deviceRules, &flowMetadata); err != nil {
+		if err := agent.deleteDeviceFlowsAndGroups(ctx, deviceRules, &flowMetadata); err != nil {
 			log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
 
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -1140,7 +1140,7 @@
 	return errors.New("flowModifyStrict not implemented")
 }
 
-func (agent *LogicalDeviceAgent) groupAdd(groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalDeviceAgent) groupAdd(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
 	log.Debug("groupAdd")
 	if groupMod == nil {
 		return nil
@@ -1161,12 +1161,12 @@
 		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 		log.Debugw("rules", log.Fields{"rules for group-add": deviceRules.String()})
-		if err := agent.addDeviceFlowsAndGroups(deviceRules, &voltha.FlowMetadata{}); err != nil {
+		if err := agent.addDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
 			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
 
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -1176,7 +1176,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) groupDelete(groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalDeviceAgent) groupDelete(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
 	log.Debug("groupDelete")
 	if groupMod == nil {
 		return nil
@@ -1205,23 +1205,23 @@
 		groupsChanged = true
 	}
 	if flowsChanged || groupsChanged {
-		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+		deviceRules := agent.flowDecomposer.DecomposeRules(ctx, agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
 		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
-		if err := agent.updateDeviceFlowsAndGroups(deviceRules, nil); err != nil {
+		if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, nil); err != nil {
 			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
 	}
 
 	if groupsChanged {
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
 	}
 	if flowsChanged {
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(ctx, &ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -1229,7 +1229,7 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) groupModify(groupMod *ofp.OfpGroupMod) error {
+func (agent *LogicalDeviceAgent) groupModify(ctx context.Context, groupMod *ofp.OfpGroupMod) error {
 	log.Debug("groupModify")
 	if groupMod == nil {
 		return nil
@@ -1257,13 +1257,13 @@
 		deviceRules.AddFlowsAndGroup(agent.rootDeviceID, fg)
 
 		log.Debugw("rules", log.Fields{"rules for group-modify": deviceRules.String()})
-		if err := agent.updateDeviceFlowsAndGroups(deviceRules, &voltha.FlowMetadata{}); err != nil {
+		if err := agent.updateDeviceFlowsAndGroups(ctx, deviceRules, &voltha.FlowMetadata{}); err != nil {
 			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 			return err
 		}
 
 		//lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(ctx, &ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
@@ -1272,7 +1272,7 @@
 }
 
 // deleteLogicalPort removes the logical port
-func (agent *LogicalDeviceAgent) deleteLogicalPort(lPort *voltha.LogicalPort) error {
+func (agent *LogicalDeviceAgent) deleteLogicalPort(ctx context.Context, lPort *voltha.LogicalPort) error {
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 
@@ -1290,18 +1290,18 @@
 		logicalDevice.Ports[len(logicalDevice.Ports)-1] = nil
 		logicalDevice.Ports = logicalDevice.Ports[:len(logicalDevice.Ports)-1]
 		log.Debugw("logical-port-deleted", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
-		if err := agent.updateLogicalDeviceWithoutLock(logicalDevice); err != nil {
+		if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
 			log.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
 		// Reset the logical device graph
-		go agent.generateDeviceGraph()
+		go agent.generateDeviceGraph(context.Background())
 	}
 	return nil
 }
 
 // deleteLogicalPorts removes the logical ports associated with that deviceId
-func (agent *LogicalDeviceAgent) deleteLogicalPorts(deviceID string) error {
+func (agent *LogicalDeviceAgent) deleteLogicalPorts(ctx context.Context, deviceID string) error {
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 
@@ -1314,18 +1314,18 @@
 	}
 	logicalDevice.Ports = updatedLPorts
 	log.Debugw("updated-logical-ports", log.Fields{"ports": updatedLPorts})
-	if err := agent.updateLogicalDeviceWithoutLock(logicalDevice); err != nil {
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice); err != nil {
 		log.Errorw("logical-device-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 		return err
 	}
 	// Reset the logical device graph
-	go agent.generateDeviceGraph()
+	go agent.generateDeviceGraph(context.Background())
 
 	return nil
 }
 
 // enableLogicalPort enables the logical port
-func (agent *LogicalDeviceAgent) enableLogicalPort(lPortID string) error {
+func (agent *LogicalDeviceAgent) enableLogicalPort(ctx context.Context, lPortID string) error {
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 
@@ -1340,13 +1340,13 @@
 	}
 	if index >= 0 {
 		logicalDevice.Ports[index].OfpPort.Config = logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-		return agent.updateLogicalDeviceWithoutLock(logicalDevice)
+		return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
 	}
 	return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
 }
 
 // disableLogicalPort disabled the logical port
-func (agent *LogicalDeviceAgent) disableLogicalPort(lPortID string) error {
+func (agent *LogicalDeviceAgent) disableLogicalPort(ctx context.Context, lPortID string) error {
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 
@@ -1361,7 +1361,7 @@
 	}
 	if index >= 0 {
 		logicalDevice.Ports[index].OfpPort.Config = (logicalDevice.Ports[index].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)) | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
-		return agent.updateLogicalDeviceWithoutLock(logicalDevice)
+		return agent.updateLogicalDeviceWithoutLock(ctx, logicalDevice)
 	}
 	return status.Errorf(codes.NotFound, "Port %s on Logical Device %s", lPortID, agent.logicalDeviceID)
 }
@@ -1472,7 +1472,7 @@
 }
 
 //updateRoutes rebuilds the device graph if not done already
-func (agent *LogicalDeviceAgent) updateRoutes(device *voltha.Device, port *voltha.Port) error {
+func (agent *LogicalDeviceAgent) updateRoutes(ctx context.Context, device *voltha.Device, port *voltha.Port) error {
 	log.Debugf("updateRoutes", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "device": device.Id, "port": port})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -1483,25 +1483,25 @@
 	lDevice := agent.getLogicalDeviceWithoutLock()
 
 	//TODO:  Find a better way to refresh only missing routes
-	agent.deviceGraph.ComputeRoutes(lDevice.Ports)
+	agent.deviceGraph.ComputeRoutes(ctx, lDevice.Ports)
 	agent.deviceGraph.Print()
 	return nil
 }
 
 //updateDeviceGraph updates the device graph if not done already and setup the default rules as well
-func (agent *LogicalDeviceAgent) updateDeviceGraph(lp *voltha.LogicalPort) {
+func (agent *LogicalDeviceAgent) updateDeviceGraph(ctx context.Context, lp *voltha.LogicalPort) {
 	log.Debugf("updateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	if agent.deviceGraph == nil {
 		agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
 	}
-	agent.deviceGraph.AddPort(lp)
+	agent.deviceGraph.AddPort(ctx, lp)
 	agent.deviceGraph.Print()
 }
 
 //generateDeviceGraph regenerates the device graph
-func (agent *LogicalDeviceAgent) generateDeviceGraph() {
+func (agent *LogicalDeviceAgent) generateDeviceGraph(ctx context.Context) {
 	log.Debugw("generateDeviceGraph", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
@@ -1511,7 +1511,7 @@
 	if agent.deviceGraph == nil {
 		agent.deviceGraph = graph.NewDeviceGraph(agent.logicalDeviceID, agent.deviceMgr.GetDevice)
 	}
-	agent.deviceGraph.ComputeRoutes(ld.Ports)
+	agent.deviceGraph.ComputeRoutes(ctx, ld.Ports)
 	agent.deviceGraph.Print()
 }
 
@@ -1555,7 +1555,7 @@
 // portUpdated is invoked when a port is updated on the logical device.  Until
 // the POST_ADD notification is fixed, we will use the logical device to
 // update that data.
-func (agent *LogicalDeviceAgent) portUpdated(args ...interface{}) interface{} {
+func (agent *LogicalDeviceAgent) portUpdated(ctx context.Context, args ...interface{}) interface{} {
 	log.Debugw("portUpdated-callback", log.Fields{"argsLen": len(args)})
 
 	var oldLD *voltha.LogicalDevice
@@ -1600,7 +1600,7 @@
 // added and an eror in case a valid error is encountered. If the port was successfully added it will return
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addNNILogicalPort(device *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalDeviceAgent) addNNILogicalPort(ctx context.Context, device *voltha.Device, port *voltha.Port) (bool, error) {
 	log.Debugw("addNNILogicalPort", log.Fields{"NNI": port})
 	if device.AdminState != voltha.AdminState_ENABLED || device.OperStatus != voltha.OperStatus_ACTIVE {
 		log.Infow("device-not-ready", log.Fields{"deviceId": device.Id, "admin": device.AdminState, "oper": device.OperStatus})
@@ -1617,7 +1617,7 @@
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
-	if portCap, err = agent.deviceMgr.getPortCapability(context.TODO(), device.Id, port.PortNo); err != nil {
+	if portCap, err = agent.deviceMgr.getPortCapability(ctx, device.Id, port.PortNo); err != nil {
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
 		return false, err
 	}
@@ -1646,14 +1646,14 @@
 	}
 	cloned.Ports = append(cloned.Ports, lp)
 
-	if err = agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+	if err = agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
 		log.Errorw("error-updating-logical-device", log.Fields{"error": err})
 		return false, err
 	}
 
 	// Update the device graph with this new logical port
 	clonedLP := (proto.Clone(lp)).(*voltha.LogicalPort)
-	go agent.updateDeviceGraph(clonedLP)
+	go agent.updateDeviceGraph(context.Background(), clonedLP)
 
 	return true, nil
 }
@@ -1672,7 +1672,7 @@
 // added and an eror in case a valid error is encountered. If the port was successfully added it will return
 // (true, nil).   If the device is not in the correct state it will return (false, nil) as this is a valid
 // scenario. This also applies to the case where the port was already added.
-func (agent *LogicalDeviceAgent) addUNILogicalPort(childDevice *voltha.Device, port *voltha.Port) (bool, error) {
+func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device, port *voltha.Port) (bool, error) {
 	log.Debugw("addUNILogicalPort", log.Fields{"port": port})
 	if childDevice.AdminState != voltha.AdminState_ENABLED || childDevice.OperStatus != voltha.OperStatus_ACTIVE {
 		log.Infow("device-not-ready", log.Fields{"deviceId": childDevice.Id, "admin": childDevice.AdminState, "oper": childDevice.OperStatus})
@@ -1688,7 +1688,7 @@
 	var portCap *ic.PortCapability
 	var err error
 	// First get the port capability
-	if portCap, err = agent.deviceMgr.getPortCapability(context.TODO(), childDevice.Id, port.PortNo); err != nil {
+	if portCap, err = agent.deviceMgr.getPortCapability(ctx, childDevice.Id, port.PortNo); err != nil {
 		log.Errorw("error-retrieving-port-capabilities", log.Fields{"error": err})
 		return false, err
 	}
@@ -1713,16 +1713,16 @@
 		cloned.Ports = make([]*voltha.LogicalPort, 0)
 	}
 	cloned.Ports = append(cloned.Ports, portCap.Port)
-	if err := agent.updateLogicalDeviceWithoutLock(cloned); err != nil {
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
 		return false, err
 	}
 	// Update the device graph with this new logical port
 	clonedLP := (proto.Clone(portCap.Port)).(*voltha.LogicalPort)
-	go agent.updateDeviceGraph(clonedLP)
+	go agent.updateDeviceGraph(context.Background(), clonedLP)
 	return true, nil
 }
 
-func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
+func (agent *LogicalDeviceAgent) packetOut(ctx context.Context, packet *ofp.OfpPacketOut) {
 	log.Debugw("packet-out", log.Fields{
 		"packet": hex.EncodeToString(packet.Data),
 		"inPort": packet.GetInPort(),
@@ -1730,7 +1730,7 @@
 	outPort := fu.GetPacketOutPort(packet)
 	//frame := packet.GetData()
 	//TODO: Use a channel between the logical agent and the device agent
-	if err := agent.deviceMgr.packetOut(agent.rootDeviceID, outPort, packet); err != nil {
+	if err := agent.deviceMgr.packetOut(ctx, agent.rootDeviceID, outPort, packet); err != nil {
 		log.Error("packetout-failed", log.Fields{"logicalDeviceID": agent.rootDeviceID})
 	}
 }