[VOL-3215] Reorganize functions in agent.go
Functions moved without any change in contents

Change-Id: I42ba327e648bacf25e5d328743835b36be89f4b4
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index cb9557c..41bd9ac 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -17,7 +17,17 @@
 package device
 
 import (
+	"context"
+	"strconv"
+
+	"github.com/gogo/protobuf/proto"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 // listDeviceGroups returns logical device flow groups
@@ -32,3 +42,263 @@
 	}
 	return groups
 }
+
+func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+
+	if (len(newGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.ListIDs()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
+	groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
+	for _, group := range newGroups {
+
+		groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
+
+		if created {
+			groupsToAdd = append(groupsToAdd, group)
+			updatedAllGroups = append(updatedAllGroups, group)
+		} else {
+			groupToChange := groupHandle.GetReadOnly()
+			if !proto.Equal(groupToChange, group) {
+				//Group needs to be updated.
+				if err := groupHandle.Update(ctx, group); err != nil {
+					groupHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+				}
+				groupsToDelete = append(groupsToDelete, groupToChange)
+				groupsToAdd = append(groupsToAdd, group)
+				updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
+			} else {
+				//No need to change the group. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+			}
+		}
+
+		groupHandle.Unlock()
+	}
+	// Sanity check
+	if (len(groupsToAdd)) == 0 {
+		logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
+
+	if (len(groupsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+		return coreutils.DoneResponse(), nil
+	}
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.ListIDs()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+
+	for _, group := range groupsToDel {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Delete(ctx); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
+				updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
+			}
+			groupHandle.Unlock()
+		}
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+
+	if (len(updatedGroups)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+	}
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		groupIDs := agent.groupLoader.ListIDs()
+		for groupID := range groupIDs {
+			if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+				updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+				grpHandle.Unlock()
+			}
+		}
+	}
+	groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
+
+	for _, group := range updatedGroups {
+		if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+			// Update the store and cache
+			if err := groupHandle.Update(ctx, group); err != nil {
+				groupHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			groupsToUpdate = append(groupsToUpdate, group)
+			updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
+			groupHandle.Unlock()
+		}
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		logger.Debugw("updating-groups",
+			log.Fields{
+				"device-id":        agent.deviceID,
+				"groups-to-update": groupsToUpdate,
+			})
+
+		// Sanity check
+		if (len(groupsToUpdate)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
+			cancel()
+			return coreutils.DoneResponse(), nil
+		}
+
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+
+	return response, nil
+}
+
+//replaceGroupInList removes the old group from list and adds the new one.
+func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+	if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
+		groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
+	}
+	groupList = append(groupList, newGroup)
+	return groupList
+}
+
+//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice.  This function will
+//panic if the index is out of range.
+func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
+	groups[index] = groups[len(groups)-1]
+	groups[len(groups)-1] = nil
+	return groups[:len(groups)-1]
+}