[VOL-1564] Refactor flow deletion

This update consists of the following:
1)  Refactor the flow management around flow deletion and
addition.
2) Update the simulated adapters to receive and do initial
processing of flow updates (bulk and incremental)
3) Add more tests to the flow utils test suite
4) Add a new flow management test for integration test in a
development environment (work in progress)

Change-Id: I9dbb2adf9e600af52ce267b727617be181c8f1ab
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 4a88779..5718614 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -237,7 +237,11 @@
 	ch <- nil
 }
 
+//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
+//adapters
 func (agent *DeviceAgent) addFlowsAndGroups(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry) error {
+	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+
 	if (len(newFlows) | len(newGroups)) == 0 {
 		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
 		return nil
@@ -245,93 +249,287 @@
 
 	agent.lockDevice.Lock()
 	defer agent.lockDevice.Unlock()
-	log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
-	var existingFlows *voltha.Flows
-	if device, err := agent.getDeviceWithoutLock(); err != nil {
-		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
-	} else {
-		existingFlows = proto.Clone(device.Flows).(*voltha.Flows)
-		existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-		log.Debugw("addFlows", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "existingFlows": existingFlows, "groups": newGroups, "existingGroups": existingGroups})
 
-		var updatedFlows []*ofp.OfpFlowStats
+	var device *voltha.Device
+	var err error
+	if device, err = agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	}
+
+	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+
+	var updatedFlows []*ofp.OfpFlowStats
+	var flowsToDelete []*ofp.OfpFlowStats
+	var groupsToDelete []*ofp.OfpGroupEntry
+	var updatedGroups []*ofp.OfpGroupEntry
+
+	// Process flows
+	for _, flow := range newFlows {
+		updatedFlows = append(updatedFlows, flow)
+	}
+	for _, flow := range existingFlows.Items {
+		if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+			updatedFlows = append(updatedFlows, flow)
+		} else {
+			flowsToDelete = append(flowsToDelete, flow)
+		}
+	}
+
+	// Process groups
+	for _, g := range newGroups {
+		updatedGroups = append(updatedGroups, g)
+	}
+	for _, group := range existingGroups.Items {
+		if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
+			updatedGroups = append(updatedGroups, group)
+		} else {
+			groupsToDelete = append(groupsToDelete, group)
+		}
+	}
+
+	// Sanity check
+	if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+		return nil
+	}
+
+	// Send update to adapters
+	// Create two channels to receive responses from the dB and from the adapters.
+	// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
+	// to send their responses.  These channels will be garbage collected once all the responses are
+	// received
+	chAdapters := make(chan interface{})
+	chdB := make(chan interface{})
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+
+		if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+			return nil
+		}
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: newFlows},
+			ToRemove: &voltha.Flows{Items: flowsToDelete},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: newGroups},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+	}
+
+	// store the changed data
+	device.Flows = &voltha.Flows{Items: updatedFlows}
+	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+	go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+
+	return nil
+}
+
+//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
+//adapters
+func (agent *DeviceAgent) deleteFlowsAndGroups(flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry) error {
+	log.Debugw("deleteFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
+
+	if (len(flowsToDel) | len(groupsToDel)) == 0 {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": flowsToDel, "groups": groupsToDel})
+		return nil
+	}
+
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+
+	var device *voltha.Device
+	var err error
+
+	if device, err = agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	}
+
+	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+
+	var flowsToKeep []*ofp.OfpFlowStats
+	var groupsToKeep []*ofp.OfpGroupEntry
+
+	// Process flows
+	for _, flow := range existingFlows.Items {
+		if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
+			flowsToKeep = append(flowsToKeep, flow)
+		}
+	}
+
+	// Process groups
+	for _, group := range existingGroups.Items {
+		if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
+			groupsToKeep = append(groupsToKeep, group)
+		}
+	}
+
+	log.Debugw("deleteFlowsAndGroups",
+		log.Fields{
+			"deviceId":     agent.deviceId,
+			"flowsToDel":   len(flowsToDel),
+			"flowsToKeep":  len(flowsToKeep),
+			"groupsToDel":  len(groupsToDel),
+			"groupsToKeep": len(groupsToKeep),
+		})
+
+	// Sanity check
+	if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+		return nil
+	}
+
+	// Send update to adapters
+	chAdapters := make(chan interface{})
+	chdB := make(chan interface{})
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
+			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
+			return nil
+		}
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, chAdapters)
+	} else {
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+			ToRemove: &voltha.Flows{Items: flowsToDel},
+		}
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDel},
+			ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+		}
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
+	}
+
+	// store the changed data
+	device.Flows = &voltha.Flows{Items: flowsToKeep}
+	device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
+	go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
+
+}
+
+//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
+//also sends the updates to the adapters
+func (agent *DeviceAgent) updateFlowsAndGroups(updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry) error {
+	log.Debugw("updateFlowsAndGroups", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
+
+	if (len(updatedFlows) | len(updatedGroups)) == 0 {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
+		return nil
+	}
+
+	agent.lockDevice.Lock()
+	defer agent.lockDevice.Unlock()
+	var device *voltha.Device
+	var err error
+	if device, err = agent.getDeviceWithoutLock(); err != nil {
+		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+	}
+	existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
+	existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+
+	if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+		log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
+		return nil
+	}
+
+	log.Debugw("updating-flows-and-groups",
+		log.Fields{
+			"deviceId":      agent.deviceId,
+			"updatedFlows":  updatedFlows,
+			"updatedGroups": updatedGroups,
+		})
+
+	chAdapters := make(chan interface{})
+	chdB := make(chan interface{})
+	dType := agent.adapterMgr.getDeviceType(device.Type)
+
+	// Process bulk flow update differently than incremental update
+	if !dType.AcceptsAddRemoveFlowUpdates {
+		go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
+	} else {
+		var flowsToAdd []*ofp.OfpFlowStats
 		var flowsToDelete []*ofp.OfpFlowStats
+		var groupsToAdd []*ofp.OfpGroupEntry
 		var groupsToDelete []*ofp.OfpGroupEntry
-		var updatedGroups []*ofp.OfpGroupEntry
 
 		// Process flows
-		for _, flow := range newFlows {
-			updatedFlows = append(updatedFlows, flow)
+		for _, flow := range updatedFlows {
+			if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
+				flowsToAdd = append(flowsToAdd, flow)
+			}
 		}
-
 		for _, flow := range existingFlows.Items {
-			if idx := fu.FindFlows(newFlows, flow); idx == -1 {
-				updatedFlows = append(updatedFlows, flow)
-			} else {
+			if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
 				flowsToDelete = append(flowsToDelete, flow)
 			}
 		}
 
 		// Process groups
-		for _, g := range newGroups {
-			updatedGroups = append(updatedGroups, g)
+		for _, g := range updatedGroups {
+			if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
+				groupsToAdd = append(groupsToAdd, g)
+			}
 		}
-
 		for _, group := range existingGroups.Items {
-			if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
-				updatedGroups = append(updatedGroups, group)
-			} else {
+			if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
 				groupsToDelete = append(groupsToDelete, group)
 			}
 		}
 
+		log.Debugw("updating-flows-and-groups",
+			log.Fields{
+				"deviceId":       agent.deviceId,
+				"flowsToAdd":     flowsToAdd,
+				"flowsToDelete":  flowsToDelete,
+				"groupsToAdd":    groupsToAdd,
+				"groupsToDelete": groupsToDelete,
+			})
+
 		// Sanity check
-		if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
-			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
+		if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
+			log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": updatedFlows, "groups": updatedGroups})
 			return nil
-
-		}
-		// Send update to adapters
-
-		// Create two channels to receive responses from the dB and from the adapters.
-		// Do not close these channels as this function may exit on timeout before the dB or adapters get a chance
-		// to send their responses.  These channels will be garbage collected once all the responses are
-		// received
-		chAdapters := make(chan interface{})
-		chdB := make(chan interface{})
-		dType := agent.adapterMgr.getDeviceType(device.Type)
-		if !dType.AcceptsAddRemoveFlowUpdates {
-
-			if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
-				log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceId, "flows": newFlows, "groups": newGroups})
-				return nil
-			}
-			go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, chAdapters)
-
-		} else {
-			flowChanges := &ofp.FlowChanges{
-				ToAdd:    &voltha.Flows{Items: newFlows},
-				ToRemove: &voltha.Flows{Items: flowsToDelete},
-			}
-			groupChanges := &ofp.FlowGroupChanges{
-				ToAdd:    &voltha.FlowGroups{Items: newGroups},
-				ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
-				ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
-			}
-			go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
 		}
 
-		// store the changed data
-		device.Flows = &voltha.Flows{Items: updatedFlows}
-		device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
-		go agent.updateDeviceWithoutLockAsync(device, chdB)
-
-		if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
-			return status.Errorf(codes.Aborted, "errors-%s", res)
+		flowChanges := &ofp.FlowChanges{
+			ToAdd:    &voltha.Flows{Items: flowsToAdd},
+			ToRemove: &voltha.Flows{Items: flowsToDelete},
 		}
-
-		return nil
+		groupChanges := &ofp.FlowGroupChanges{
+			ToAdd:    &voltha.FlowGroups{Items: groupsToAdd},
+			ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
+			ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
+		}
+		go agent.sendIncrementalFlowsToAdapters(device, flowChanges, groupChanges, chAdapters)
 	}
+
+	// store the updated data
+	device.Flows = &voltha.Flows{Items: updatedFlows}
+	device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+	go agent.updateDeviceWithoutLockAsync(device, chdB)
+
+	if res := fu.WaitForNilOrErrorResponses(agent.defaultTimeout, chAdapters, chdB); res != nil {
+		return status.Errorf(codes.Aborted, "errors-%s", res)
+	}
+	return nil
 }
 
 //disableDevice disable a device