[VOL-3005] Separate Flows from Device

Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.

Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 918432f..fe379d6 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -22,18 +22,21 @@
 	"errors"
 	"fmt"
 	"reflect"
+	"strconv"
 	"sync"
 	"time"
 
 	"github.com/golang/protobuf/ptypes"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
+	"github.com/opencord/voltha-go/rw_core/core/device/flow"
+	"github.com/opencord/voltha-go/rw_core/core/device/group"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/db/model"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
-	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -59,10 +62,13 @@
 	startOnce      sync.Once
 	stopOnce       sync.Once
 	stopped        bool
+
+	flowLoader  *flow.Loader
+	groupLoader *group.Loader
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbProxy *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
 	var agent Agent
 	agent.adapterProxy = ap
 	if device.Id == "" {
@@ -81,6 +87,9 @@
 	agent.defaultTimeout = timeout
 	agent.device = proto.Clone(device).(*voltha.Device)
 	agent.requestQueue = coreutils.NewRequestQueue()
+	agent.flowLoader = flow.NewLoader(dbProxy.SubPath("flows").Proxy(device.Id))
+	agent.groupLoader = group.NewLoader(dbProxy.SubPath("groups").Proxy(device.Id))
+
 	return &agent
 }
 
@@ -114,6 +123,9 @@
 
 		agent.deviceType = device.Adapter
 		agent.device = proto.Clone(device).(*voltha.Device)
+		// load the flows and groups from KV to cache
+		agent.flowLoader.Load(ctx)
+		agent.groupLoader.Load(ctx)
 
 		logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
 	} else {
@@ -138,7 +150,6 @@
 		}
 		agent.device = device
 	}
-
 	startSucceeded = true
 	logger.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
 
@@ -191,6 +202,8 @@
 
 	agent.deviceType = device.Adapter
 	agent.device = device
+	agent.flowLoader.Load(ctx)
+	agent.groupLoader.Load(ctx)
 	logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
 }
 
@@ -317,6 +330,15 @@
 	}
 }
 
+//replaceFlowInList removes the old flow from list and adds the new one.
+func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
+		flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
+	}
+	flowList = append(flowList, newFlow)
+	return flowList
+}
+
 //deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
 //panic if the index is out of range.
 func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
@@ -325,6 +347,15 @@
 	return flows[:len(flows)-1]
 }
 
+//replaceGroupInList removes the old group from list and adds the new one.
+func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+	if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
+		groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
+	}
+	groupList = append(groupList, newGroup)
+	return groupList
+}
+
 //deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
 //panic if the index is out of range.
 func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
@@ -333,100 +364,70 @@
 	return groups[:len(groups)-1]
 }
 
-func flowsToUpdateToDelete(newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
-	// Process flows
-	for _, flow := range existingFlows {
-		if idx := fu.FindFlows(newFlows, flow); idx == -1 {
-			updatedAllFlows = append(updatedAllFlows, flow)
-		} else {
-			// We have a matching flow (i.e. the following field matches: "TableId", "Priority", "Flags", "Cookie",
-			// "Match".  If this is an exact match (i.e. all other fields matches as well) then this flow will be
-			// ignored.  Otherwise, the previous flow will be deleted and the new one added
-			if proto.Equal(newFlows[idx], flow) {
-				// Flow already exist, remove it from the new flows but keep it in the updated flows slice
-				newFlows = deleteFlowWithoutPreservingOrder(newFlows, idx)
-				updatedAllFlows = append(updatedAllFlows, flow)
-			} else {
-				// Minor change to flow, delete old and add new one
-				flowsToDelete = append(flowsToDelete, flow)
-			}
-		}
-	}
-	updatedAllFlows = append(updatedAllFlows, newFlows...)
-	return newFlows, flowsToDelete, updatedAllFlows
-}
+func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
 
-func groupsToUpdateToDelete(newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
-	for _, group := range existingGroups {
-		if idx := fu.FindGroup(newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
-			updatedAllGroups = append(updatedAllGroups, group)
-		} else {
-			// Follow same logic as flows
-			if proto.Equal(newGroups[idx], group) {
-				// Group already exist, remove it from the new groups
-				newGroups = deleteGroupWithoutPreservingOrder(newGroups, idx)
-				updatedAllGroups = append(updatedAllGroups, group)
-			} else {
-				// Minor change to group, delete old and add new one
-				groupsToDelete = append(groupsToDelete, group)
-			}
-		}
-	}
-	updatedAllGroups = append(updatedAllGroups, newGroups...)
-	return newGroups, groupsToDelete, updatedAllGroups
-}
-
-func (agent *Agent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
-
-	if (len(newFlows) | len(newGroups)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
+	if (len(newFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
 		return coreutils.DoneResponse(), nil
 	}
-
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
-	}
-	defer agent.requestQueue.RequestComplete()
-
 	device := agent.getDeviceWithoutLock()
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.List()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+	for _, flow := range newFlows {
+		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
 
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
-	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+		if created {
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = append(updatedAllFlows, flow)
+		} else {
+			flowToReplace := flowHandle.GetReadOnly()
+			if !proto.Equal(flowToReplace, flow) {
+				//Flow needs to be updated.
+				if err := flowHandle.Update(ctx, flow); err != nil {
+					flowHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+				}
+				flowsToDelete = append(flowsToDelete, flowToReplace)
+				flowsToAdd = append(flowsToAdd, flow)
+				updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
+			} else {
+				//No need to change the flow. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+			}
+		}
 
-	// Process flows
-	newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(newFlows, existingFlows.Items)
-
-	// Process groups
-	newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(newGroups, existingGroups.Items)
-
-	// Sanity check
-	if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
-		return coreutils.DoneResponse(), nil
+		flowHandle.Unlock()
 	}
 
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: updatedAllFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
+	// Sanity check
+	if (len(flowsToAdd)) == 0 {
+		logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		return coreutils.DoneResponse(), nil
 	}
 
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
-			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
-			cancel()
-			return coreutils.DoneResponse(), nil
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -434,11 +435,102 @@
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
 		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: newFlows},
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
 			ToRemove: &voltha.Flows{Items: flowsToDelete},
 		}
 		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: newGroups},
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+
+	if (len(newGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.List()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
+	groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
+	for _, group := range newGroups {
+
+		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
+
+		if created {
+			groupsToAdd = append(groupsToAdd, group)
+			updatedAllGroups = append(updatedAllGroups, group)
+		} else {
+			groupToChange := groupHandle.GetReadOnly()
+			if !proto.Equal(groupToChange, group) {
+				//Group needs to be updated.
+				if err := groupHandle.Update(ctx, group); err != nil {
+					groupHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+				}
+				groupsToDelete = append(groupsToDelete, groupToChange)
+				groupsToAdd = append(groupsToAdd, group)
+				updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
+			} else {
+				//No need to change the group. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+			}
+		}
+
+		groupHandle.Unlock()
+	}
+	// Sanity check
+	if (len(groupsToAdd)) == 0 {
+		logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
 			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
@@ -455,88 +547,66 @@
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
 //adapters
 func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.addFlowsAndGroupsToAdapter(ctx, newFlows, newGroups, flowMetadata)
-	if err != nil {
+	var flwResponse, grpResponse coreutils.Response
+	var err error
+	//if new flow list is empty then the called function returns quickly
+	if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
 		return err
 	}
-	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+	//if new group list is empty then the called function returns quickly
+	if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
+		return err
+	}
+	if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
 		logger.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
 		return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
 	}
 	return nil
 }
 
-func (agent *Agent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 
-	if (len(flowsToDel) | len(groupsToDel)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+	if (len(flowsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
 		return coreutils.DoneResponse(), nil
 	}
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
-	}
-	defer agent.requestQueue.RequestComplete()
-
 	device := agent.getDeviceWithoutLock()
 	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
-	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-
-	var flowsToKeep []*ofp.OfpFlowStats
-	var groupsToKeep []*ofp.OfpGroupEntry
-
-	// Process flows
-	for _, flow := range existingFlows.Items {
-		if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
-			flowsToKeep = append(flowsToKeep, flow)
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.List()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
 		}
 	}
-
-	// Process groups
-	for _, group := range existingGroups.Items {
-		if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
-			groupsToKeep = append(groupsToKeep, group)
+	for _, flow := range flowsToDel {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			// Update the store and cache
+			flowToDelete := flowHandle.GetReadOnly()
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
+				updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
+			}
+			flowHandle.Unlock()
 		}
 	}
 
-	logger.Debugw("deleteFlowsAndGroups",
-		log.Fields{
-			"device-id":      agent.deviceID,
-			"flows-to-del":   len(flowsToDel),
-			"flows-to-keep":  len(flowsToKeep),
-			"groups-to-del":  len(groupsToDel),
-			"groups-to-keep": len(groupsToKeep),
-		})
-
-	// Sanity check
-	if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
-		return coreutils.DoneResponse(), nil
-	}
-
-	// store the changed data
-	device.Flows = &voltha.Flows{Items: flowsToKeep}
-	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
-	}
-
 	// Send update to adapters
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
-			logger.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
-			cancel()
-			return coreutils.DoneResponse(), nil
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
@@ -549,6 +619,73 @@
 		}
 		groupChanges := &ofp.FlowGroupChanges{
 			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
+
+	if (len(groupsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+		return coreutils.DoneResponse(), nil
+	}
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.List()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	for _, group := range groupsToDel {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Delete(ctx); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
+				updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
+			}
+			groupHandle.Unlock()
+		}
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
 			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
 		}
@@ -565,11 +702,16 @@
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
 //adapters
 func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDel, groupsToDel, flowMetadata)
-	if err != nil {
+	var flwResponse, grpResponse coreutils.Response
+	var err error
+	if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil {
+		return err
+	}
+
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -577,25 +719,24 @@
 
 //filterOutFlows removes flows from a device using the uni-port as filter
 func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
-	device, err := agent.getDevice(ctx)
-	if err != nil {
-		return err
-	}
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
 	var flowsToDelete []*ofp.OfpFlowStats
-
 	// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
-	for _, flow := range existingFlows.Items {
-		if fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort) {
-			flowsToDelete = append(flowsToDelete, flow)
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			flow := flowHandle.GetReadOnly()
+			if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
+				flowsToDelete = append(flowsToDelete, flow)
+			}
+			flowHandle.Unlock()
 		}
 	}
+
 	logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
 	if len(flowsToDelete) == 0 {
 		return nil
 	}
 
-	response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDelete, []*ofp.OfpGroupEntry{}, flowMetadata)
+	response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
 	if err != nil {
 		return err
 	}
@@ -605,19 +746,14 @@
 	return nil
 }
 
-func (agent *Agent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 
-	if (len(updatedFlows) | len(updatedGroups)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+	if (len(updatedFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 		return coreutils.DoneResponse(), nil
 	}
 
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return coreutils.DoneResponse(), err
-	}
-	defer agent.requestQueue.RequestComplete()
-
 	device := agent.getDeviceWithoutLock()
 	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
@@ -626,81 +762,55 @@
 	if err != nil {
 		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
 	}
-
-	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
-	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-
-	if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
-		return coreutils.DoneResponse(), nil
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.List()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
 	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 
-	logger.Debugw("updating-flows-and-groups",
-		log.Fields{
-			"device-id":      agent.deviceID,
-			"updated-flows":  updatedFlows,
-			"updated-groups": updatedGroups,
-		})
+	for _, flow := range updatedFlows {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			flowToDelete := flowHandle.GetReadOnly()
+			// Update the store and cache
+			if err := flowHandle.Update(ctx, flow); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
 
-	// store the updated data
-	device.Flows = &voltha.Flows{Items: updatedFlows}
-	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+			flowsToDelete = append(flowsToDelete, flowToDelete)
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
+			flowHandle.Unlock()
+		}
 	}
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	response := coreutils.NewResponse()
 	// Process bulk flow update differently than incremental update
 	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
 		if err != nil {
 			cancel()
 			return coreutils.DoneResponse(), err
 		}
 		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
 	} else {
-		var flowsToAdd []*ofp.OfpFlowStats
-		var flowsToDelete []*ofp.OfpFlowStats
-		var groupsToAdd []*ofp.OfpGroupEntry
-		var groupsToDelete []*ofp.OfpGroupEntry
-
-		// Process flows
-		for _, flow := range updatedFlows {
-			if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
-				flowsToAdd = append(flowsToAdd, flow)
-			}
-		}
-		for _, flow := range existingFlows.Items {
-			if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
-				flowsToDelete = append(flowsToDelete, flow)
-			}
-		}
-
-		// Process groups
-		for _, g := range updatedGroups {
-			if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
-				groupsToAdd = append(groupsToAdd, g)
-			}
-		}
-		for _, group := range existingGroups.Items {
-			if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
-				groupsToDelete = append(groupsToDelete, group)
-			}
-		}
-
 		logger.Debugw("updating-flows-and-groups",
 			log.Fields{
-				"device-id":        agent.deviceID,
-				"flows-to-add":     flowsToAdd,
-				"flows-to-delete":  flowsToDelete,
-				"groups-to-add":    groupsToAdd,
-				"groups-to-delete": groupsToDelete,
+				"device-id":       agent.deviceID,
+				"flows-to-add":    flowsToAdd,
+				"flows-to-delete": flowsToDelete,
 			})
-
 		// Sanity check
-		if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
-			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+		if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
 			cancel()
 			return coreutils.DoneResponse(), nil
 		}
@@ -710,9 +820,94 @@
 			ToRemove: &voltha.Flows{Items: flowsToDelete},
 		}
 		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
-			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
-			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+
+	return response, nil
+}
+
+func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+
+	if (len(updatedGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+	}
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.List()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
+
+	for _, group := range updatedGroups {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Update(ctx, group); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			groupsToUpdate = append(groupsToUpdate, group)
+			updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
+			groupHandle.Unlock()
+		}
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		logger.Debugw("updating-groups",
+			log.Fields{
+				"device-id":        agent.deviceID,
+				"groups-to-update": groupsToUpdate,
+			})
+
+		// Sanity check
+		if (len(groupsToUpdate)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
+			cancel()
+			return coreutils.DoneResponse(), nil
+		}
+
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
 		}
 		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
 		if err != nil {
@@ -728,11 +923,16 @@
 //updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
 //also sends the updates to the adapters
 func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
-	response, err := agent.updateFlowsAndGroupsToAdapter(ctx, updatedFlows, updatedGroups, flowMetadata)
-	if err != nil {
+	var flwResponse, grpResponse coreutils.Response
+	var err error
+	if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil {
 		return err
 	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+	if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil {
+		return err
+	}
+
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
 		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
@@ -741,17 +941,17 @@
 //deleteAllFlows deletes all flows in the device table
 func (agent *Agent) deleteAllFlows(ctx context.Context) error {
 	logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
 
-	device := agent.getDeviceWithoutLock()
-	// purge all flows on the device by setting it to nil
-	device.Flows = &ofp.Flows{Items: nil}
-	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
-		// The caller logs the error
-		return err
+	for flowID := range agent.flowLoader.List() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			// Update the store and cache
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+				continue
+			}
+			flowHandle.Unlock()
+		}
 	}
 	return nil
 }
@@ -1296,13 +1496,6 @@
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *Agent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
-	logger.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	//cloned := proto.Clone(device).(*voltha.Device)
-	cloned := device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
 func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err