[VOL-1564] Refactor flow deletion

This update consists of the following:
1)  Refactor the flow management around flow deletion and
addition.
2) Update the simulated adapters to receive and do initial
processing of flow updates (bulk and incremental)
3) Add more tests to the flow utils test suite
4) Add a new flow management test for integration test in a
development environment (work in progress)

Change-Id: I9dbb2adf9e600af52ce267b727617be181c8f1ab
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 489c79f..cae40e1 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -533,7 +533,6 @@
 	}
 
 	updatedFlows := make([]*ofp.OfpFlowStats, 0)
-	//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
 	changed := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
@@ -568,18 +567,16 @@
 		}
 	}
 	if changed {
-		// Launch a routine to decompose the flows
-		if err := agent.decomposeAndSendFlows(&ofp.Flows{Items: updatedFlows}, lDevice.FlowGroups); err != nil {
-			log.Errorw("decomposing-and-sending-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: updatedFlows}, *lDevice.FlowGroups)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
 			return err
 		}
 
 		//	Update model
-		flowsToUpdate := &ofp.Flows{}
-		if lDevice.Flows != nil {
-			flowsToUpdate = &ofp.Flows{Items: flows}
-		}
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -587,31 +584,6 @@
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) decomposeAndSendFlows(flows *ofp.Flows, groups *ofp.FlowGroups) error {
-	log.Debugw("decomposeAndSendFlows", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
-
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *groups)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-
-	chnlsList := make([]chan interface{}, 0)
-	for deviceId, value := range deviceRules.GetRules() {
-		ch := make(chan interface{})
-		chnlsList = append(chnlsList, ch)
-		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
-			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
-				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId})
-				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
-			}
-			ch <- nil
-		}(deviceId, value.ListFlows(), value.ListGroups())
-	}
-	// Wait for completion
-	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
-		return status.Errorf(codes.Aborted, "errors-%s", res)
-	}
-	return nil
-}
-
 //flowDelete deletes a flow from the flow table of that logical device
 func (agent *LogicalDeviceAgent) flowDelete(mod *ofp.OfpFlowMod) error {
 	log.Debug("flowDelete")
@@ -631,14 +603,33 @@
 
 	//build a list of what to keep vs what to delete
 	toKeep := make([]*ofp.OfpFlowStats, 0)
+	toDelete := make([]*ofp.OfpFlowStats, 0)
 	for _, f := range flows {
+		// Check whether the flow and the flowmod matches
+		if fu.FlowMatch(f, fu.FlowStatsEntryFromFlowModMessage(mod)) {
+			toDelete = append(toDelete, f)
+			continue
+		}
+		// Check wild card match
 		if !fu.FlowMatchesMod(f, mod) {
 			toKeep = append(toKeep, f)
+		} else {
+			toDelete = append(toDelete, f)
 		}
 	}
 
+	log.Debugw("flowDelete", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "toKeep": len(toKeep), "toDelete": toDelete})
+
 	//Update flows
-	if len(toKeep) < len(flows) {
+	if len(toDelete) > 0 {
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: toDelete}, ofp.FlowGroups{})
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+			return err
+		}
+
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
@@ -649,37 +640,68 @@
 	return nil
 }
 
-//flowStatsDelete deletes a flow from the flow table of that logical device
-func (agent *LogicalDeviceAgent) flowStatsDelete(flow *ofp.OfpFlowStats) error {
-	log.Debug("flowStatsDelete")
-	if flow == nil {
-		return nil
-	}
-	agent.lockLogicalDevice.Lock()
-	defer agent.lockLogicalDevice.Unlock()
+func (agent *LogicalDeviceAgent) addDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+	log.Debugw("addDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
 
-	var lDevice *voltha.LogicalDevice
-	var err error
-	if lDevice, err = agent.getLogicalDeviceWithoutLock(); err != nil {
-		log.Errorw("no-logical-device-present", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
-		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
+	chnlsList := make([]chan interface{}, 0)
+	for deviceId, value := range deviceRules.GetRules() {
+		ch := make(chan interface{})
+		chnlsList = append(chnlsList, ch)
+		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+			if err := agent.deviceMgr.addFlowsAndGroups(deviceId, flows, groups); err != nil {
+				log.Errorw("flow-add-failed", log.Fields{"deviceID": deviceId, "error": err})
+				ch <- status.Errorf(codes.Internal, "flow-add-failed: %s", deviceId)
+			}
+			ch <- nil
+		}(deviceId, value.ListFlows(), value.ListGroups())
 	}
-	flows := lDevice.Flows.Items
-
-	//build a list of what to keep vs what to delete
-	toKeep := make([]*ofp.OfpFlowStats, 0)
-	for _, f := range flows {
-		if !fu.FlowMatch(f, flow) {
-			toKeep = append(toKeep, f)
-		}
+	// Wait for completion
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
+	return nil
+}
 
-	//Update flows
-	if len(toKeep) < len(flows) {
-		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
-			return err
-		}
+func (agent *LogicalDeviceAgent) deleteDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+	log.Debugw("deleteDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+	chnlsList := make([]chan interface{}, 0)
+	for deviceId, value := range deviceRules.GetRules() {
+		ch := make(chan interface{})
+		chnlsList = append(chnlsList, ch)
+		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+			if err := agent.deviceMgr.deleteFlowsAndGroups(deviceId, flows, groups); err != nil {
+				log.Error("flow-delete-failed", log.Fields{"deviceID": deviceId, "error": err})
+				ch <- status.Errorf(codes.Internal, "flow-delete-failed: %s", deviceId)
+			}
+			ch <- nil
+		}(deviceId, value.ListFlows(), value.ListGroups())
+	}
+	// Wait for completion
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
+}
+
+func (agent *LogicalDeviceAgent) updateDeviceFlowsAndGroups(deviceRules *fu.DeviceRules) error {
+	log.Debugw("updateDeviceFlowsAndGroups", log.Fields{"logicalDeviceID": agent.logicalDeviceId})
+
+	chnlsList := make([]chan interface{}, 0)
+	for deviceId, value := range deviceRules.GetRules() {
+		ch := make(chan interface{})
+		chnlsList = append(chnlsList, ch)
+		go func(deviceId string, flows []*ofp.OfpFlowStats, groups []*ofp.OfpGroupEntry) {
+			if err := agent.deviceMgr.updateFlowsAndGroups(deviceId, flows, groups); err != nil {
+				log.Error("flow-update-failed", log.Fields{"deviceID": deviceId, "error": err})
+				ch <- status.Errorf(codes.Internal, "flow-update-failed: %s", deviceId)
+			}
+			ch <- nil
+		}(deviceId, value.ListFlows(), value.ListGroups())
+	}
+	// Wait for completion
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chnlsList...); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
 	}
 	return nil
 }
@@ -711,12 +733,19 @@
 	}
 
 	if changed {
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: []*ofp.OfpFlowStats{flow}}, ofp.FlowGroups{})
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		if err := agent.deleteDeviceFlowsAndGroups(deviceRules); err != nil {
+			log.Errorw("failure-deleting-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+			return err
+		}
+
 		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
-
 	return nil
 }
 
@@ -747,6 +776,15 @@
 	groups := lDevice.FlowGroups.Items
 	if fu.FindGroup(groups, groupMod.GroupId) == -1 {
 		groups = append(groups, fu.GroupEntryFromGroupMod(groupMod))
+
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *lDevice.Flows, ofp.FlowGroups{Items: groups})
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		if err := agent.addDeviceFlowsAndGroups(deviceRules); err != nil {
+			log.Errorw("failure-updating-device-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+			return err
+		}
+
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
@@ -790,6 +828,16 @@
 			groupsChanged = true
 		}
 	}
+	if flowsChanged || groupsChanged {
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flows}, ofp.FlowGroups{Items: groups})
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+			return err
+		}
+	}
+
 	if groupsChanged {
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
@@ -802,7 +850,6 @@
 			return err
 		}
 	}
-
 	return nil
 }
 
@@ -832,6 +879,14 @@
 		groupsChanged = true
 	}
 	if groupsChanged {
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: lDevice.Flows.Items}, ofp.FlowGroups{Items: groups})
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		if err := agent.updateDeviceFlowsAndGroups(deviceRules); err != nil {
+			log.Errorw("failure-updating-device-flows-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "error": err})
+			return err
+		}
+
 		//lDevice.FlowGroups.Items = groups
 		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})