[VOL-3215] Reorganize functions in agent.go
Functions moved without any change in contents

Change-Id: I42ba327e648bacf25e5d328743835b36be89f4b4
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 00a054e..dbaeb2f 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -22,7 +22,6 @@
 	"errors"
 	"fmt"
 	"reflect"
-	"strconv"
 	"sync"
 	"time"
 
@@ -31,7 +30,6 @@
 	"github.com/opencord/voltha-go/rw_core/core/device/flow"
 	"github.com/opencord/voltha-go/rw_core/core/device/group"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
-	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 
 	"github.com/gogo/protobuf/proto"
@@ -330,220 +328,6 @@
 	}
 }
 
-//replaceFlowInList removes the old flow from list and adds the new one.
-func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
-	if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
-		flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
-	}
-	flowList = append(flowList, newFlow)
-	return flowList
-}
-
-//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
-//panic if the index is out of range.
-func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
-	flows[index] = flows[len(flows)-1]
-	flows[len(flows)-1] = nil
-	return flows[:len(flows)-1]
-}
-
-//replaceGroupInList removes the old group from list and adds the new one.
-func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
-	if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
-		groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
-	}
-	groupList = append(groupList, newGroup)
-	return groupList
-}
-
-//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
-//panic if the index is out of range.
-func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
-	groups[index] = groups[len(groups)-1]
-	groups[len(groups)-1] = nil
-	return groups[:len(groups)-1]
-}
-
-func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
-
-	if (len(newFlows)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
-		return coreutils.DoneResponse(), nil
-	}
-	device := agent.getDeviceWithoutLock()
-	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
-	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
-	}
-	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		flowIDs := agent.flowLoader.ListIDs()
-		for flowID := range flowIDs {
-			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
-				flowHandle.Unlock()
-			}
-		}
-	}
-	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
-	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
-	for _, flow := range newFlows {
-		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
-		if err != nil {
-			return coreutils.DoneResponse(), err
-		}
-
-		if created {
-			flowsToAdd = append(flowsToAdd, flow)
-			updatedAllFlows = append(updatedAllFlows, flow)
-		} else {
-			flowToReplace := flowHandle.GetReadOnly()
-			if !proto.Equal(flowToReplace, flow) {
-				//Flow needs to be updated.
-				if err := flowHandle.Update(ctx, flow); err != nil {
-					flowHandle.Unlock()
-					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
-				}
-				flowsToDelete = append(flowsToDelete, flowToReplace)
-				flowsToAdd = append(flowsToAdd, flow)
-				updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
-			} else {
-				//No need to change the flow. It is already exist.
-				logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
-			}
-		}
-
-		flowHandle.Unlock()
-	}
-
-	// Sanity check
-	if (len(flowsToAdd)) == 0 {
-		logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
-		return coreutils.DoneResponse(), nil
-	}
-
-	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	} else {
-		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: flowsToAdd},
-			ToRemove: &voltha.Flows{Items: flowsToDelete},
-		}
-		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	}
-	return response, nil
-}
-
-func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
-
-	if (len(newGroups)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
-		return coreutils.DoneResponse(), nil
-	}
-
-	device := agent.getDeviceWithoutLock()
-	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
-	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
-	}
-	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		groupIDs := agent.groupLoader.ListIDs()
-		for groupID := range groupIDs {
-			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
-				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
-				grpHandle.Unlock()
-			}
-		}
-	}
-
-	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
-	groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
-	for _, group := range newGroups {
-
-		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
-		if err != nil {
-			return coreutils.DoneResponse(), err
-		}
-
-		if created {
-			groupsToAdd = append(groupsToAdd, group)
-			updatedAllGroups = append(updatedAllGroups, group)
-		} else {
-			groupToChange := groupHandle.GetReadOnly()
-			if !proto.Equal(groupToChange, group) {
-				//Group needs to be updated.
-				if err := groupHandle.Update(ctx, group); err != nil {
-					groupHandle.Unlock()
-					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
-				}
-				groupsToDelete = append(groupsToDelete, groupToChange)
-				groupsToAdd = append(groupsToAdd, group)
-				updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
-			} else {
-				//No need to change the group. It is already exist.
-				logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
-			}
-		}
-
-		groupHandle.Unlock()
-	}
-	// Sanity check
-	if (len(groupsToAdd)) == 0 {
-		logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
-		return coreutils.DoneResponse(), nil
-	}
-
-	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	} else {
-		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-		}
-		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
-			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
-			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	}
-	return response, nil
-}
-
 //addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
 //adapters
 func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
@@ -564,141 +348,6 @@
 	return nil
 }
 
-func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
-
-	if (len(flowsToDel)) == 0 {
-		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
-		return coreutils.DoneResponse(), nil
-	}
-
-	device := agent.getDeviceWithoutLock()
-	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
-	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
-	}
-	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		flowIDs := agent.flowLoader.ListIDs()
-		for flowID := range flowIDs {
-			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
-				flowHandle.Unlock()
-			}
-		}
-	}
-	for _, flow := range flowsToDel {
-		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
-			// Update the store and cache
-			flowToDelete := flowHandle.GetReadOnly()
-			if err := flowHandle.Delete(ctx); err != nil {
-				flowHandle.Unlock()
-				return coreutils.DoneResponse(), err
-			}
-			if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
-				updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
-			}
-			flowHandle.Unlock()
-		}
-	}
-
-	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	} else {
-		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-			ToRemove: &voltha.Flows{Items: flowsToDel},
-		}
-		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	}
-	return response, nil
-}
-
-func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
-
-	if (len(groupsToDel)) == 0 {
-		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
-		return coreutils.DoneResponse(), nil
-	}
-	device := agent.getDeviceWithoutLock()
-	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
-	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
-	}
-	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		groupIDs := agent.groupLoader.ListIDs()
-		for groupID := range groupIDs {
-			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
-				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
-				grpHandle.Unlock()
-			}
-		}
-	}
-
-	for _, group := range groupsToDel {
-		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
-			// Update the store and cache
-			if err := groupHandle.Delete(ctx); err != nil {
-				groupHandle.Unlock()
-				return coreutils.DoneResponse(), err
-			}
-			if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
-				updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
-			}
-			groupHandle.Unlock()
-		}
-	}
-
-	// Send update to adapters
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	} else {
-		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-		}
-		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
-			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	}
-	return response, nil
-}
-
 //deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
 //adapters
 func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
@@ -717,209 +366,6 @@
 	return nil
 }
 
-//filterOutFlows removes flows from a device using the uni-port as filter
-func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
-	var flowsToDelete []*ofp.OfpFlowStats
-	// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
-	for flowID := range agent.flowLoader.ListIDs() {
-		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-			flow := flowHandle.GetReadOnly()
-			if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
-				flowsToDelete = append(flowsToDelete, flow)
-			}
-			flowHandle.Unlock()
-		}
-	}
-
-	logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
-	if len(flowsToDelete) == 0 {
-		return nil
-	}
-
-	response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
-	if err != nil {
-		return err
-	}
-	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
-}
-
-func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
-
-	if (len(updatedFlows)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
-		return coreutils.DoneResponse(), nil
-	}
-
-	device := agent.getDeviceWithoutLock()
-	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
-	}
-	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
-	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
-	}
-	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		flowIDs := agent.flowLoader.ListIDs()
-		for flowID := range flowIDs {
-			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
-				flowHandle.Unlock()
-			}
-		}
-	}
-	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
-	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
-
-	for _, flow := range updatedFlows {
-		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
-			flowToDelete := flowHandle.GetReadOnly()
-			// Update the store and cache
-			if err := flowHandle.Update(ctx, flow); err != nil {
-				flowHandle.Unlock()
-				return coreutils.DoneResponse(), err
-			}
-
-			flowsToDelete = append(flowsToDelete, flowToDelete)
-			flowsToAdd = append(flowsToAdd, flow)
-			updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
-			flowHandle.Unlock()
-		}
-	}
-
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
-	// Process bulk flow update differently than incremental update
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	} else {
-		logger.Debugw("updating-flows-and-groups",
-			log.Fields{
-				"device-id":       agent.deviceID,
-				"flows-to-add":    flowsToAdd,
-				"flows-to-delete": flowsToDelete,
-			})
-		// Sanity check
-		if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
-			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
-			cancel()
-			return coreutils.DoneResponse(), nil
-		}
-
-		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: flowsToAdd},
-			ToRemove: &voltha.Flows{Items: flowsToDelete},
-		}
-		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	}
-
-	return response, nil
-}
-
-func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
-	logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
-
-	if (len(updatedGroups)) == 0 {
-		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
-		return coreutils.DoneResponse(), nil
-	}
-
-	device := agent.getDeviceWithoutLock()
-	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
-	}
-	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
-	if err != nil {
-		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
-	}
-	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		groupIDs := agent.groupLoader.ListIDs()
-		for groupID := range groupIDs {
-			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
-				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
-				grpHandle.Unlock()
-			}
-		}
-	}
-	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
-
-	for _, group := range updatedGroups {
-		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
-			// Update the store and cache
-			if err := groupHandle.Update(ctx, group); err != nil {
-				groupHandle.Unlock()
-				return coreutils.DoneResponse(), err
-			}
-			groupsToUpdate = append(groupsToUpdate, group)
-			updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
-			groupHandle.Unlock()
-		}
-	}
-
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	response := coreutils.NewResponse()
-	// Process bulk flow update differently than incremental update
-	if !dType.AcceptsAddRemoveFlowUpdates {
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	} else {
-		logger.Debugw("updating-groups",
-			log.Fields{
-				"device-id":        agent.deviceID,
-				"groups-to-update": groupsToUpdate,
-			})
-
-		// Sanity check
-		if (len(groupsToUpdate)) == 0 {
-			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
-			cancel()
-			return coreutils.DoneResponse(), nil
-		}
-
-		flowChanges := &ofp.FlowChanges{
-			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
-		}
-		groupChanges := &ofp.FlowGroupChanges{
-			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
-		}
-		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
-		if err != nil {
-			cancel()
-			return coreutils.DoneResponse(), err
-		}
-		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
-	}
-
-	return response, nil
-}
-
 //updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
 //also sends the updates to the adapters
 func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
@@ -938,24 +384,6 @@
 	return nil
 }
 
-//deleteAllFlows deletes all flows in the device table
-func (agent *Agent) deleteAllFlows(ctx context.Context) error {
-	logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
-
-	for flowID := range agent.flowLoader.ListIDs() {
-		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
-			// Update the store and cache
-			if err := flowHandle.Delete(ctx); err != nil {
-				flowHandle.Unlock()
-				logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
-				continue
-			}
-			flowHandle.Unlock()
-		}
-	}
-	return nil
-}
-
 //disableDevice disable a device
 func (agent *Agent) disableDevice(ctx context.Context) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -1058,323 +486,6 @@
 	return nil
 }
 
-func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
-
-	cloned := agent.getDeviceWithoutLock()
-	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
-		return err
-	}
-	// Send the request to the adapter
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
-	if err != nil {
-		cancel()
-		return err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
-	return nil
-}
-
-func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
-
-	cloned := agent.getDeviceWithoutLock()
-	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
-
-	return agent.getDeviceWithoutLock().PmConfigs, nil
-}
-
-func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-
-	logger.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
-
-	device := agent.getDeviceWithoutLock()
-
-	if device.AdminState != voltha.AdminState_ENABLED {
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
-	}
-	// Save the image
-	clonedImg := proto.Clone(img).(*voltha.ImageDownload)
-	clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
-	cloned := proto.Clone(device).(*voltha.Device)
-	if cloned.ImageDownloads == nil {
-		cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
-	} else {
-		if device.AdminState != voltha.AdminState_ENABLED {
-			logger.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
-			return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
-		}
-		// Save the image
-		clonedImg := proto.Clone(img).(*voltha.ImageDownload)
-		clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
-		if device.ImageDownloads == nil {
-			device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
-		} else {
-			device.ImageDownloads = append(device.ImageDownloads, clonedImg)
-		}
-		if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
-			return nil, err
-		}
-
-		// Send the request to the adapter
-		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-		ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
-		if err != nil {
-			cancel()
-			return nil, err
-		}
-		go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
-	}
-	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-// isImageRegistered is a helper method to figure out if an image is already registered
-func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
-	for _, image := range device.ImageDownloads {
-		if image.Id == img.Id && image.Name == img.Name {
-			return true
-		}
-	}
-	return false
-}
-
-func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-
-	logger.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
-
-	device := agent.getDeviceWithoutLock()
-
-	// Verify whether the Image is in the list of image being downloaded
-	if !isImageRegistered(img, device) {
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
-	}
-
-	// Update image download state
-	for _, image := range device.ImageDownloads {
-		if image.Id == img.Id && image.Name == img.Name {
-			image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
-		}
-	}
-
-	if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
-		// Set the device to Enabled
-		if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
-			return nil, err
-		}
-		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-		ch, err := agent.adapterProxy.CancelImageDownload(subCtx, device, img)
-		if err != nil {
-			cancel()
-			return nil, err
-		}
-		go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
-	}
-	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
-	cloned := agent.getDeviceWithoutLock()
-
-	// Verify whether the Image is in the list of image being downloaded
-	if !isImageRegistered(img, cloned) {
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
-	}
-
-	if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
-		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
-	}
-	// Update image download state
-	for _, image := range cloned.ImageDownloads {
-		if image.Id == img.Id && image.Name == img.Name {
-			image.ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
-		}
-	}
-	// Set the device to downloading_image
-	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
-		return nil, err
-	}
-
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
-	if err != nil {
-		cancel()
-		return nil, err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
-
-	// The status of the AdminState will be changed following the update_download_status response from the adapter
-	// The image name will also be removed from the device list
-	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
-
-	cloned := agent.getDeviceWithoutLock()
-
-	// Verify whether the Image is in the list of image being downloaded
-	if !isImageRegistered(img, cloned) {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
-	}
-
-	if cloned.AdminState != voltha.AdminState_ENABLED {
-		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
-	}
-	// Update image download state
-	for _, image := range cloned.ImageDownloads {
-		if image.Id == img.Id && image.Name == img.Name {
-			image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
-		}
-	}
-
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
-		return nil, err
-	}
-
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
-	if err != nil {
-		cancel()
-		return nil, err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
-
-	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
-}
-
-func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
-
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	device := agent.getDeviceWithoutLock()
-	ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
-	agent.requestQueue.RequestComplete()
-	if err != nil {
-		return nil, err
-	}
-	// Wait for the adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
-	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-	// Successful response
-	imgDownload := &voltha.ImageDownload{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-	}
-	return imgDownload, nil
-}
-
-func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
-
-	cloned := agent.getDeviceWithoutLock()
-
-	// Update the image as well as remove it if the download was cancelled
-	clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
-	for _, image := range cloned.ImageDownloads {
-		if image.Id == img.Id && image.Name == img.Name {
-			if image.DownloadState != voltha.ImageDownload_DOWNLOAD_CANCELLED {
-				clonedImages = append(clonedImages, img)
-			}
-		}
-	}
-	cloned.ImageDownloads = clonedImages
-	// Set the Admin state to enabled if required
-	if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
-		img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
-		(img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
-		return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
-	}
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
-
-	cloned := agent.getDeviceWithoutLock()
-	for _, image := range cloned.ImageDownloads {
-		if image.Id == img.Id && image.Name == img.Name {
-			return image, nil
-		}
-	}
-	return nil, status.Errorf(codes.NotFound, "image-not-found:%s", img.Name)
-}
-
-func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return nil, err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
-
-	return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
-}
-
-// getPorts retrieves the ports information of the device based on the port type.
-func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
-	logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
-	ports := &voltha.Ports{}
-	if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
-		for _, port := range device.Ports {
-			if port.Type == portType {
-				ports.Items = append(ports.Items, port)
-			}
-		}
-	}
-	return ports
-}
-
 // getSwitchCapability retrieves the switch capability of a parent device
 func (agent *Agent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
 	logger.Debugw("getSwitchCapability", log.Fields{"device-id": agent.deviceID})
@@ -1404,33 +515,6 @@
 	return switchCap, nil
 }
 
-// getPortCapability retrieves the port capability of a device
-func (agent *Agent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
-	logger.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
-	device, err := agent.getDevice(ctx)
-	if err != nil {
-		return nil, err
-	}
-	ch, err := agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo)
-	if err != nil {
-		return nil, err
-	}
-	// Wait for adapter response
-	rpcResponse, ok := <-ch
-	if !ok {
-		return nil, status.Errorf(codes.Aborted, "channel-closed")
-	}
-	if rpcResponse.Err != nil {
-		return nil, rpcResponse.Err
-	}
-	// Successful response
-	portCap := &ic.PortCapability{}
-	if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
-		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
-	}
-	return portCap, nil
-}
-
 func (agent *Agent) onPacketFailure(rpc string, response interface{}, args ...interface{}) {
 	// packet data is encoded in the args param as the first parameter
 	var packet []byte
@@ -1519,145 +603,6 @@
 	return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
 }
 
-func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
-	logger.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	cloned := agent.getDeviceWithoutLock()
-	for _, port := range cloned.Ports {
-		port.OperStatus = operStatus
-	}
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	// Work only on latest data
-	// TODO: Get list of ports from device directly instead of the entire device
-	cloned := agent.getDeviceWithoutLock()
-
-	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
-	if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
-		return status.Errorf(codes.InvalidArgument, "%s", portType)
-	}
-	for _, port := range cloned.Ports {
-		if port.Type == portType && port.PortNo == portNo {
-			port.OperStatus = operStatus
-		}
-	}
-	logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) deleteAllPorts(ctx context.Context) error {
-	logger.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-
-	cloned := agent.getDeviceWithoutLock()
-
-	if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
-		err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
-		logger.Warnw("invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
-		return err
-	}
-	if len(cloned.Ports) == 0 {
-		logger.Debugw("no-ports-present", log.Fields{"deviceId": agent.deviceID})
-		return nil
-	}
-
-	cloned.Ports = []*voltha.Port{}
-	logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
-
-	cloned := agent.getDeviceWithoutLock()
-	updatePort := false
-	if cloned.Ports == nil {
-		//	First port
-		logger.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
-		cloned.Ports = make([]*voltha.Port, 0)
-	} else {
-		for _, p := range cloned.Ports {
-			if p.Type == port.Type && p.PortNo == port.PortNo {
-				if p.Label == "" && p.Type == voltha.Port_PON_OLT {
-					//Creation of OLT PON port is being processed after a default PON port was created.  Just update it.
-					logger.Infow("update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
-					p.Label = port.Label
-					p.OperStatus = port.OperStatus
-					updatePort = true
-					break
-				}
-				logger.Debugw("port already exists", log.Fields{"port": port})
-				return nil
-			}
-		}
-	}
-	if !updatePort {
-		cp := proto.Clone(port).(*voltha.Port)
-		// Set the admin state of the port to ENABLE
-		cp.AdminState = voltha.AdminState_ENABLED
-		cloned.Ports = append(cloned.Ports, cp)
-	}
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
-func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
-
-	cloned := agent.getDeviceWithoutLock()
-
-	// Get the peer port on the device based on the peerPort no
-	found := false
-	for _, port := range cloned.Ports {
-		if port.PortNo == peerPort.PortNo { // found peerPort
-			cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
-			port.Peers = append(port.Peers, cp)
-			logger.Debugw("found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
-			found = true
-			break
-		}
-	}
-	if !found && agent.isRootdevice {
-		// An ONU PON port has been created before the corresponding creation of the OLT PON port.  Create the OLT PON port
-		// with default values which will be updated once the OLT PON port creation is processed.
-		ponPort := &voltha.Port{
-			PortNo:     peerPort.PortNo,
-			Type:       voltha.Port_PON_OLT,
-			AdminState: voltha.AdminState_ENABLED,
-			DeviceId:   agent.deviceID,
-			Peers:      []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
-		}
-		cloned.Ports = append(cloned.Ports, ponPort)
-		logger.Infow("adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
-	}
-
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
 // TODO: A generic device update by attribute
 func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -1768,88 +713,6 @@
 	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
-func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
-	var cp *voltha.Port
-	// Get the most up to date the device info
-	device := agent.getDeviceWithoutLock()
-	for _, port := range device.Ports {
-		if port.PortNo == Port.PortNo {
-			port.AdminState = voltha.AdminState_DISABLED
-			cp = proto.Clone(port).(*voltha.Port)
-			break
-
-		}
-	}
-	if cp == nil {
-		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
-	}
-
-	if cp.Type != voltha.Port_PON_OLT {
-		return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
-	}
-	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
-		logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
-		return err
-	}
-
-	//send request to adapter
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
-	if err != nil {
-		cancel()
-		return err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
-	return nil
-}
-
-func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
-
-	var cp *voltha.Port
-	// Get the most up to date the device info
-	device := agent.getDeviceWithoutLock()
-	for _, port := range device.Ports {
-		if port.PortNo == Port.PortNo {
-			port.AdminState = voltha.AdminState_ENABLED
-			cp = proto.Clone(port).(*voltha.Port)
-			break
-		}
-	}
-
-	if cp == nil {
-		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
-	}
-
-	if cp.Type != voltha.Port_PON_OLT {
-		return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
-	}
-	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
-		logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
-		return err
-	}
-	//send request to adapter
-	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
-	if err != nil {
-		cancel()
-		return err
-	}
-	go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
-	return nil
-}
-
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 58f9aa9..2af224c 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -17,7 +17,16 @@
 package device
 
 import (
+	"context"
+
+	"github.com/gogo/protobuf/proto"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 // listDeviceFlows returns device flows
@@ -32,3 +41,313 @@
 	}
 	return flows
 }
+
+func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+
+	if (len(newFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		return coreutils.DoneResponse(), nil
+	}
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.ListIDs()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+	for _, flow := range newFlows {
+		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
+
+		if created {
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = append(updatedAllFlows, flow)
+		} else {
+			flowToReplace := flowHandle.GetReadOnly()
+			if !proto.Equal(flowToReplace, flow) {
+				//Flow needs to be updated.
+				if err := flowHandle.Update(ctx, flow); err != nil {
+					flowHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+				}
+				flowsToDelete = append(flowsToDelete, flowToReplace)
+				flowsToAdd = append(flowsToAdd, flow)
+				updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
+			} else {
+				//No need to change the flow. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+			}
+		}
+
+		flowHandle.Unlock()
+	}
+
+	// Sanity check
+	if (len(flowsToAdd)) == 0 {
+		logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		return coreutils.DoneResponse(), nil
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
+			ToRemove: &voltha.Flows{Items: flowsToDelete},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+
+	if (len(flowsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.ListIDs()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	for _, flow := range flowsToDel {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			// Update the store and cache
+			flowToDelete := flowHandle.GetReadOnly()
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
+				updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
+			}
+			flowHandle.Unlock()
+		}
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: flowsToDel},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+
+	if (len(updatedFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+	}
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.ListIDs()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+
+	for _, flow := range updatedFlows {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			flowToDelete := flowHandle.GetReadOnly()
+			// Update the store and cache
+			if err := flowHandle.Update(ctx, flow); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+
+			flowsToDelete = append(flowsToDelete, flowToDelete)
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
+			flowHandle.Unlock()
+		}
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		logger.Debugw("updating-flows-and-groups",
+			log.Fields{
+				"device-id":       agent.deviceID,
+				"flows-to-add":    flowsToAdd,
+				"flows-to-delete": flowsToDelete,
+			})
+		// Sanity check
+		if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+			cancel()
+			return coreutils.DoneResponse(), nil
+		}
+
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
+			ToRemove: &voltha.Flows{Items: flowsToDelete},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+
+	return response, nil
+}
+
+//replaceFlowInList removes the old flow from list and adds the new one.
+func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
+		flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
+	}
+	flowList = append(flowList, newFlow)
+	return flowList
+}
+
+//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
+//panic if the index is out of range.
+func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
+	flows[index] = flows[len(flows)-1]
+	flows[len(flows)-1] = nil
+	return flows[:len(flows)-1]
+}
+
+//filterOutFlows removes flows from a device using the uni-port as filter
+func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
+	var flowsToDelete []*ofp.OfpFlowStats
+	// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
+	for flowID := range agent.flowLoader.ListIDs() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			flow := flowHandle.GetReadOnly()
+			if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
+				flowsToDelete = append(flowsToDelete, flow)
+			}
+			flowHandle.Unlock()
+		}
+	}
+
+	logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
+	if len(flowsToDelete) == 0 {
+		return nil
+	}
+
+	response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
+	if err != nil {
+		return err
+	}
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
+}
+
+//deleteAllFlows deletes all flows in the device table
+func (agent *Agent) deleteAllFlows(ctx context.Context) error {
+	logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
+
+	for flowID := range agent.flowLoader.ListIDs() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			// Update the store and cache
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+				continue
+			}
+			flowHandle.Unlock()
+		}
+	}
+	return nil
+}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index cb9557c..41bd9ac 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -17,7 +17,17 @@
 package device
 
 import (
+	"context"
+	"strconv"
+
+	"github.com/gogo/protobuf/proto"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 // listDeviceGroups returns logical device flow groups
@@ -32,3 +42,263 @@
 	}
 	return groups
 }
+
+func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+
+	if (len(newGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.ListIDs()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
+	groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
+	for _, group := range newGroups {
+
+		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
+
+		if created {
+			groupsToAdd = append(groupsToAdd, group)
+			updatedAllGroups = append(updatedAllGroups, group)
+		} else {
+			groupToChange := groupHandle.GetReadOnly()
+			if !proto.Equal(groupToChange, group) {
+				//Group needs to be updated.
+				if err := groupHandle.Update(ctx, group); err != nil {
+					groupHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+				}
+				groupsToDelete = append(groupsToDelete, groupToChange)
+				groupsToAdd = append(groupsToAdd, group)
+				updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
+			} else {
+				//No need to change the group. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+			}
+		}
+
+		groupHandle.Unlock()
+	}
+	// Sanity check
+	if (len(groupsToAdd)) == 0 {
+		logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
+
+	if (len(groupsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+		return coreutils.DoneResponse(), nil
+	}
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.ListIDs()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	for _, group := range groupsToDel {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Delete(ctx); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
+				updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
+			}
+			groupHandle.Unlock()
+		}
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+
+	if (len(updatedGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+	}
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.ListIDs()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
+
+	for _, group := range updatedGroups {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Update(ctx, group); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			groupsToUpdate = append(groupsToUpdate, group)
+			updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
+			groupHandle.Unlock()
+		}
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		logger.Debugw("updating-groups",
+			log.Fields{
+				"device-id":        agent.deviceID,
+				"groups-to-update": groupsToUpdate,
+			})
+
+		// Sanity check
+		if (len(groupsToUpdate)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
+			cancel()
+			return coreutils.DoneResponse(), nil
+		}
+
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+
+	return response, nil
+}
+
+//replaceGroupInList removes the old group from list and adds the new one.
+func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+	if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
+		groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
+	}
+	groupList = append(groupList, newGroup)
+	return groupList
+}
+
+//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
+//panic if the index is out of range.
+func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
+	groups[index] = groups[len(groups)-1]
+	groups[len(groups)-1] = nil
+	return groups[:len(groups)-1]
+}
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
new file mode 100644
index 0000000..208d817
--- /dev/null
+++ b/rw_core/core/device/agent_image.go
@@ -0,0 +1,285 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	"context"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+func (agent *Agent) downloadImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	logger.Debugw("downloadImage", log.Fields{"device-id": agent.deviceID})
+
+	device := agent.getDeviceWithoutLock()
+
+	if device.AdminState != voltha.AdminState_ENABLED {
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+	}
+	// Save the image
+	clonedImg := proto.Clone(img).(*voltha.ImageDownload)
+	clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
+	cloned := proto.Clone(device).(*voltha.Device)
+	if cloned.ImageDownloads == nil {
+		cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+	} else {
+		if device.AdminState != voltha.AdminState_ENABLED {
+			logger.Debugw("device-not-enabled", log.Fields{"id": agent.deviceID})
+			return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+		}
+		// Save the image
+		clonedImg := proto.Clone(img).(*voltha.ImageDownload)
+		clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
+		if device.ImageDownloads == nil {
+			device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+		} else {
+			device.ImageDownloads = append(device.ImageDownloads, clonedImg)
+		}
+		if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
+			return nil, err
+		}
+
+		// Send the request to the adapter
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
+		if err != nil {
+			cancel()
+			return nil, err
+		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
+	}
+	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+// isImageRegistered is a helper method to figure out if an image is already registered
+func isImageRegistered(img *voltha.ImageDownload, device *voltha.Device) bool {
+	for _, image := range device.ImageDownloads {
+		if image.Id == img.Id && image.Name == img.Name {
+			return true
+		}
+	}
+	return false
+}
+
+func (agent *Agent) cancelImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	logger.Debugw("cancelImageDownload", log.Fields{"device-id": agent.deviceID})
+
+	device := agent.getDeviceWithoutLock()
+
+	// Verify whether the Image is in the list of image being downloaded
+	if !isImageRegistered(img, device) {
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
+	}
+
+	// Update image download state
+	for _, image := range device.ImageDownloads {
+		if image.Id == img.Id && image.Name == img.Name {
+			image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
+		}
+	}
+
+	if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+		// Set the device to Enabled
+		if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
+			return nil, err
+		}
+		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.CancelImageDownload(subCtx, device, img)
+		if err != nil {
+			cancel()
+			return nil, err
+		}
+		go agent.waitForAdapterResponse(subCtx, cancel, "cancelImageDownload", ch, agent.onSuccess, agent.onFailure)
+	}
+	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+func (agent *Agent) activateImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("activateImage", log.Fields{"device-id": agent.deviceID})
+	cloned := agent.getDeviceWithoutLock()
+
+	// Verify whether the Image is in the list of image being downloaded
+	if !isImageRegistered(img, cloned) {
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
+	}
+
+	if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+		return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
+	}
+	// Update image download state
+	for _, image := range cloned.ImageDownloads {
+		if image.Id == img.Id && image.Name == img.Name {
+			image.ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
+		}
+	}
+	// Set the device to downloading_image
+	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		return nil, err
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+	if err != nil {
+		cancel()
+		return nil, err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "activateImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
+	// The status of the AdminState will be changed following the update_download_status response from the adapter
+	// The image name will also be removed from the device list
+	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+func (agent *Agent) revertImage(ctx context.Context, img *voltha.ImageDownload) (*voltha.OperationResp, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("revertImage", log.Fields{"device-id": agent.deviceID})
+
+	cloned := agent.getDeviceWithoutLock()
+
+	// Verify whether the Image is in the list of image being downloaded
+	if !isImageRegistered(img, cloned) {
+		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
+	}
+
+	if cloned.AdminState != voltha.AdminState_ENABLED {
+		return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
+	}
+	// Update image download state
+	for _, image := range cloned.ImageDownloads {
+		if image.Id == img.Id && image.Name == img.Name {
+			image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
+		}
+	}
+
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+		return nil, err
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+	if err != nil {
+		cancel()
+		return nil, err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "revertImageUpdate", ch, agent.onSuccess, agent.onFailure)
+
+	return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
+}
+
+func (agent *Agent) getImageDownloadStatus(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+	logger.Debugw("getImageDownloadStatus", log.Fields{"device-id": agent.deviceID})
+
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	device := agent.getDeviceWithoutLock()
+	ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
+	agent.requestQueue.RequestComplete()
+	if err != nil {
+		return nil, err
+	}
+	// Wait for the adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed-device-id-%s", agent.deviceID)
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	imgDownload := &voltha.ImageDownload{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, imgDownload); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+	return imgDownload, nil
+}
+
+func (agent *Agent) updateImageDownload(ctx context.Context, img *voltha.ImageDownload) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
+
+	cloned := agent.getDeviceWithoutLock()
+
+	// Update the image as well as remove it if the download was cancelled
+	clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
+	for _, image := range cloned.ImageDownloads {
+		if image.Id == img.Id && image.Name == img.Name {
+			if image.DownloadState != voltha.ImageDownload_DOWNLOAD_CANCELLED {
+				clonedImages = append(clonedImages, img)
+			}
+		}
+	}
+	cloned.ImageDownloads = clonedImages
+	// Set the Admin state to enabled if required
+	if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
+		img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
+		(img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
+		return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
+	}
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("getImageDownload", log.Fields{"device-id": agent.deviceID})
+
+	cloned := agent.getDeviceWithoutLock()
+	for _, image := range cloned.ImageDownloads {
+		if image.Id == img.Id && image.Name == img.Name {
+			return image, nil
+		}
+	}
+	return nil, status.Errorf(codes.NotFound, "image-not-found:%s", img.Name)
+}
+
+func (agent *Agent) listImageDownloads(ctx context.Context, deviceID string) (*voltha.ImageDownloads, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("listImageDownloads", log.Fields{"device-id": agent.deviceID})
+
+	return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
+}
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
new file mode 100644
index 0000000..8432b9b
--- /dev/null
+++ b/rw_core/core/device/agent_pm_config.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	"context"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+func (agent *Agent) updatePmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
+
+	cloned := agent.getDeviceWithoutLock()
+	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+	// Store the device
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+		return err
+	}
+	// Send the request to the adapter
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.UpdatePmConfigs(subCtx, cloned, pmConfigs)
+	if err != nil {
+		cancel()
+		return err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "updatePmConfigs", ch, agent.onSuccess, agent.onFailure)
+	return nil
+}
+
+func (agent *Agent) initPmConfigs(ctx context.Context, pmConfigs *voltha.PmConfigs) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
+
+	cloned := agent.getDeviceWithoutLock()
+	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return nil, err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("listPmConfigs", log.Fields{"device-id": agent.deviceID})
+
+	return agent.getDeviceWithoutLock().PmConfigs, nil
+}
diff --git a/rw_core/core/device/agent_port.go b/rw_core/core/device/agent_port.go
new file mode 100644
index 0000000..2d7d210
--- /dev/null
+++ b/rw_core/core/device/agent_port.go
@@ -0,0 +1,291 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+	"context"
+	"fmt"
+
+	"github.com/gogo/protobuf/proto"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+// getPorts retrieves the ports information of the device based on the port type.
+func (agent *Agent) getPorts(ctx context.Context, portType voltha.Port_PortType) *voltha.Ports {
+	logger.Debugw("getPorts", log.Fields{"device-id": agent.deviceID, "port-type": portType})
+	ports := &voltha.Ports{}
+	if device, _ := agent.deviceMgr.getDevice(ctx, agent.deviceID); device != nil {
+		for _, port := range device.Ports {
+			if port.Type == portType {
+				ports.Items = append(ports.Items, port)
+			}
+		}
+	}
+	return ports
+}
+
+// getPortCapability retrieves the port capability of a device
+func (agent *Agent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
+	logger.Debugw("getPortCapability", log.Fields{"device-id": agent.deviceID})
+	device, err := agent.getDevice(ctx)
+	if err != nil {
+		return nil, err
+	}
+	ch, err := agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo)
+	if err != nil {
+		return nil, err
+	}
+	// Wait for adapter response
+	rpcResponse, ok := <-ch
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "channel-closed")
+	}
+	if rpcResponse.Err != nil {
+		return nil, rpcResponse.Err
+	}
+	// Successful response
+	portCap := &ic.PortCapability{}
+	if err := ptypes.UnmarshalAny(rpcResponse.Reply, portCap); err != nil {
+		return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
+	}
+	return portCap, nil
+}
+func (agent *Agent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
+	logger.Debugw("updatePortsOperState", log.Fields{"device-id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	cloned := agent.getDeviceWithoutLock()
+	for _, port := range cloned.Ports {
+		port.OperStatus = operStatus
+	}
+	// Store the device
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) updatePortState(ctx context.Context, portType voltha.Port_PortType, portNo uint32, operStatus voltha.OperStatus_Types) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	// Work only on latest data
+	// TODO: Get list of ports from device directly instead of the entire device
+	cloned := agent.getDeviceWithoutLock()
+
+	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
+	if _, ok := voltha.Port_PortType_value[portType.String()]; !ok {
+		return status.Errorf(codes.InvalidArgument, "%s", portType)
+	}
+	for _, port := range cloned.Ports {
+		if port.Type == portType && port.PortNo == portNo {
+			port.OperStatus = operStatus
+		}
+	}
+	logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+	// Store the device
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) deleteAllPorts(ctx context.Context) error {
+	logger.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	cloned := agent.getDeviceWithoutLock()
+
+	if cloned.AdminState != voltha.AdminState_DISABLED && cloned.AdminState != voltha.AdminState_DELETED {
+		err := status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", cloned.AdminState))
+		logger.Warnw("invalid-state-removing-ports", log.Fields{"state": cloned.AdminState, "error": err})
+		return err
+	}
+	if len(cloned.Ports) == 0 {
+		logger.Debugw("no-ports-present", log.Fields{"deviceId": agent.deviceID})
+		return nil
+	}
+
+	cloned.Ports = []*voltha.Port{}
+	logger.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+	// Store the device
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) addPort(ctx context.Context, port *voltha.Port) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("addPort", log.Fields{"deviceId": agent.deviceID})
+
+	cloned := agent.getDeviceWithoutLock()
+	updatePort := false
+	if cloned.Ports == nil {
+		//	First port
+		logger.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceID})
+		cloned.Ports = make([]*voltha.Port, 0)
+	} else {
+		for _, p := range cloned.Ports {
+			if p.Type == port.Type && p.PortNo == port.PortNo {
+				if p.Label == "" && p.Type == voltha.Port_PON_OLT {
+					//Creation of OLT PON port is being processed after a default PON port was created.  Just update it.
+					logger.Infow("update-pon-port-created-by-default", log.Fields{"default-port": p, "port-to-add": port})
+					p.Label = port.Label
+					p.OperStatus = port.OperStatus
+					updatePort = true
+					break
+				}
+				logger.Debugw("port already exists", log.Fields{"port": port})
+				return nil
+			}
+		}
+	}
+	if !updatePort {
+		cp := proto.Clone(port).(*voltha.Port)
+		// Set the admin state of the port to ENABLE
+		cp.AdminState = voltha.AdminState_ENABLED
+		cloned.Ports = append(cloned.Ports, cp)
+	}
+	// Store the device
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) addPeerPort(ctx context.Context, peerPort *voltha.Port_PeerPort) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("adding-peer-peerPort", log.Fields{"device-id": agent.deviceID, "peer-peerPort": peerPort})
+
+	cloned := agent.getDeviceWithoutLock()
+
+	// Get the peer port on the device based on the peerPort no
+	found := false
+	for _, port := range cloned.Ports {
+		if port.PortNo == peerPort.PortNo { // found peerPort
+			cp := proto.Clone(peerPort).(*voltha.Port_PeerPort)
+			port.Peers = append(port.Peers, cp)
+			logger.Debugw("found-peer", log.Fields{"device-id": agent.deviceID, "portNo": peerPort.PortNo, "deviceId": agent.deviceID})
+			found = true
+			break
+		}
+	}
+	if !found && agent.isRootdevice {
+		// An ONU PON port has been created before the corresponding creation of the OLT PON port.  Create the OLT PON port
+		// with default values which will be updated once the OLT PON port creation is processed.
+		ponPort := &voltha.Port{
+			PortNo:     peerPort.PortNo,
+			Type:       voltha.Port_PON_OLT,
+			AdminState: voltha.AdminState_ENABLED,
+			DeviceId:   agent.deviceID,
+			Peers:      []*voltha.Port_PeerPort{proto.Clone(peerPort).(*voltha.Port_PeerPort)},
+		}
+		cloned.Ports = append(cloned.Ports, ponPort)
+		logger.Infow("adding-default-pon-port", log.Fields{"device-id": agent.deviceID, "peer": peerPort, "pon-port": ponPort})
+	}
+
+	// Store the device
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+}
+
+func (agent *Agent) disablePort(ctx context.Context, Port *voltha.Port) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("disablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+	var cp *voltha.Port
+	// Get the most up to date the device info
+	device := agent.getDeviceWithoutLock()
+	for _, port := range device.Ports {
+		if port.PortNo == Port.PortNo {
+			port.AdminState = voltha.AdminState_DISABLED
+			cp = proto.Clone(port).(*voltha.Port)
+			break
+
+		}
+	}
+	if cp == nil {
+		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+	}
+
+	if cp.Type != voltha.Port_PON_OLT {
+		return status.Errorf(codes.InvalidArgument, "Disabling of Port Type %v unimplemented", cp.Type)
+	}
+	// Store the device
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+		logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+		return err
+	}
+
+	//send request to adapter
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.DisablePort(ctx, device, cp)
+	if err != nil {
+		cancel()
+		return err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "disablePort", ch, agent.onSuccess, agent.onFailure)
+	return nil
+}
+
+func (agent *Agent) enablePort(ctx context.Context, Port *voltha.Port) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	logger.Debugw("enablePort", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo})
+
+	var cp *voltha.Port
+	// Get the most up to date the device info
+	device := agent.getDeviceWithoutLock()
+	for _, port := range device.Ports {
+		if port.PortNo == Port.PortNo {
+			port.AdminState = voltha.AdminState_ENABLED
+			cp = proto.Clone(port).(*voltha.Port)
+			break
+		}
+	}
+
+	if cp == nil {
+		return status.Errorf(codes.InvalidArgument, "%v", Port.PortNo)
+	}
+
+	if cp.Type != voltha.Port_PON_OLT {
+		return status.Errorf(codes.InvalidArgument, "Enabling of Port Type %v unimplemented", cp.Type)
+	}
+	// Store the device
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+		logger.Debugw("updateDeviceInStoreWithoutLock error ", log.Fields{"device-id": agent.deviceID, "port-no": Port.PortNo, "error": err})
+		return err
+	}
+	//send request to adapter
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	ch, err := agent.adapterProxy.EnablePort(ctx, device, cp)
+	if err != nil {
+		cancel()
+		return err
+	}
+	go agent.waitForAdapterResponse(subCtx, cancel, "enablePort", ch, agent.onSuccess, agent.onFailure)
+	return nil
+}