[VOL-3215] Reorganize functions in agent.go
Functions moved without any change in contents

Change-Id: I42ba327e648bacf25e5d328743835b36be89f4b4
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 58f9aa9..2af224c 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -17,7 +17,16 @@
 package device
 
 import (
+	"context"
+
+	"github.com/gogo/protobuf/proto"
+	coreutils "github.com/opencord/voltha-go/rw_core/utils"
+	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
 )
 
 // listDeviceFlows returns device flows
@@ -32,3 +41,313 @@
 	}
 	return flows
 }
+
+func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+
+	if (len(newFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		return coreutils.DoneResponse(), nil
+	}
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.ListIDs()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+	for _, flow := range newFlows {
+		flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+		if err != nil {
+			return coreutils.DoneResponse(), err
+		}
+
+		if created {
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = append(updatedAllFlows, flow)
+		} else {
+			flowToReplace := flowHandle.GetReadOnly()
+			if !proto.Equal(flowToReplace, flow) {
+				//Flow needs to be updated.
+				if err := flowHandle.Update(ctx, flow); err != nil {
+					flowHandle.Unlock()
+					return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+				}
+				flowsToDelete = append(flowsToDelete, flowToReplace)
+				flowsToAdd = append(flowsToAdd, flow)
+				updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
+			} else {
+				//No need to change the flow. It is already exist.
+				logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+			}
+		}
+
+		flowHandle.Unlock()
+	}
+
+	// Sanity check
+	if (len(flowsToAdd)) == 0 {
+		logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+		return coreutils.DoneResponse(), nil
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
+			ToRemove: &voltha.Flows{Items: flowsToDelete},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+
+	if (len(flowsToDel)) == 0 {
+		logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.ListIDs()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	for _, flow := range flowsToDel {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			// Update the store and cache
+			flowToDelete := flowHandle.GetReadOnly()
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+			if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
+				updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
+			}
+			flowHandle.Unlock()
+		}
+	}
+
+	// Send update to adapters
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: flowsToDel},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+	return response, nil
+}
+
+func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+	logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+
+	if (len(updatedFlows)) == 0 {
+		logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+		return coreutils.DoneResponse(), nil
+	}
+
+	device := agent.getDeviceWithoutLock()
+	if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
+	}
+	dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+	if err != nil {
+		return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+	}
+	updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		flowIDs := agent.flowLoader.ListIDs()
+		for flowID := range flowIDs {
+			if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+				updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+				flowHandle.Unlock()
+			}
+		}
+	}
+	flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+
+	for _, flow := range updatedFlows {
+		if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+			flowToDelete := flowHandle.GetReadOnly()
+			// Update the store and cache
+			if err := flowHandle.Update(ctx, flow); err != nil {
+				flowHandle.Unlock()
+				return coreutils.DoneResponse(), err
+			}
+
+			flowsToDelete = append(flowsToDelete, flowToDelete)
+			flowsToAdd = append(flowsToAdd, flow)
+			updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
+			flowHandle.Unlock()
+		}
+	}
+
+	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+	response := coreutils.NewResponse()
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	} else {
+		logger.Debugw("updating-flows-and-groups",
+			log.Fields{
+				"device-id":       agent.deviceID,
+				"flows-to-add":    flowsToAdd,
+				"flows-to-delete": flowsToDelete,
+			})
+		// Sanity check
+		if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
+			logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
+			cancel()
+			return coreutils.DoneResponse(), nil
+		}
+
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
+			ToRemove: &voltha.Flows{Items: flowsToDelete},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+		if err != nil {
+			cancel()
+			return coreutils.DoneResponse(), err
+		}
+		go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+	}
+
+	return response, nil
+}
+
+//replaceFlowInList removes the old flow from list and adds the new one.
+func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
+		flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
+	}
+	flowList = append(flowList, newFlow)
+	return flowList
+}
+
+//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice.  This function will
+//panic if the index is out of range.
+func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
+	flows[index] = flows[len(flows)-1]
+	flows[len(flows)-1] = nil
+	return flows[:len(flows)-1]
+}
+
+//filterOutFlows removes flows from a device using the uni-port as filter
+func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
+	var flowsToDelete []*ofp.OfpFlowStats
+	// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
+	for flowID := range agent.flowLoader.ListIDs() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			flow := flowHandle.GetReadOnly()
+			if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
+				flowsToDelete = append(flowsToDelete, flow)
+			}
+			flowHandle.Unlock()
+		}
+	}
+
+	logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
+	if len(flowsToDelete) == 0 {
+		return nil
+	}
+
+	response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
+	if err != nil {
+		return err
+	}
+	if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
+}
+
+//deleteAllFlows deletes all flows in the device table
+func (agent *Agent) deleteAllFlows(ctx context.Context) error {
+	logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
+
+	for flowID := range agent.flowLoader.ListIDs() {
+		if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+			// Update the store and cache
+			if err := flowHandle.Delete(ctx); err != nil {
+				flowHandle.Unlock()
+				logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+				continue
+			}
+			flowHandle.Unlock()
+		}
+	}
+	return nil
+}