[VOL-2370] - Fix the add-delete-add flow sequence
This commit fixes the issue where a flow is first added
and then deleted and readded again.
Change-Id: I1ec1d931037c3086c65299195c87875f6cb96717
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 06ba307..ca3ffbb 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -251,6 +251,65 @@
response.Done()
}
+//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
+//panic if the index is out of range.
+func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
+ flows[index] = flows[len(flows)-1]
+ flows[len(flows)-1] = nil
+ return flows[:len(flows)-1]
+}
+
+//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
+//panic if the index is out of range.
+func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
+ groups[index] = groups[len(groups)-1]
+ groups[len(groups)-1] = nil
+ return groups[:len(groups)-1]
+}
+
+func flowsToUpdateToDelete(newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
+ // Process flows
+ for _, flow := range existingFlows {
+ if idx := fu.FindFlows(newFlows, flow); idx == -1 {
+ updatedAllFlows = append(updatedAllFlows, flow)
+ } else {
+ // We have a matching flow (i.e. the following field matches: "TableId", "Priority", "Flags", "Cookie",
+ // "Match". If this is an exact match (i.e. all other fields matches as well) then this flow will be
+ // ignored. Otherwise, the previous flow will be deleted and the new one added
+ if proto.Equal(newFlows[idx], flow) {
+ // Flow already exist, remove it from the new flows but keep it in the updated flows slice
+ newFlows = deleteFlowWithoutPreservingOrder(newFlows, idx)
+ updatedAllFlows = append(updatedAllFlows, flow)
+ } else {
+ // Minor change to flow, delete old and add new one
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ }
+ }
+ updatedAllFlows = append(updatedAllFlows, newFlows...)
+ return newFlows, flowsToDelete, updatedAllFlows
+}
+
+func groupsToUpdateToDelete(newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
+ for _, group := range existingGroups {
+ if idx := fu.FindGroup(newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
+ updatedAllGroups = append(updatedAllGroups, group)
+ } else {
+ // Follow same logic as flows
+ if proto.Equal(newGroups[idx], group) {
+ // Group already exist, remove it from the new groups
+ newGroups = deleteGroupWithoutPreservingOrder(newGroups, idx)
+ updatedAllGroups = append(updatedAllGroups, group)
+ } else {
+ // Minor change to group, delete old and add new one
+ groupsToDelete = append(groupsToDelete, group)
+ }
+ }
+ }
+ updatedAllGroups = append(updatedAllGroups, newGroups...)
+ return newGroups, groupsToDelete, updatedAllGroups
+}
+
func (agent *DeviceAgent) addFlowsAndGroupsToAdapter(newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
log.Debugw("addFlowsAndGroups", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups, "flowMetadata": flowMetadata})
@@ -266,33 +325,14 @@
existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
- var updatedFlows []*ofp.OfpFlowStats
- var flowsToDelete []*ofp.OfpFlowStats
- var groupsToDelete []*ofp.OfpGroupEntry
- var updatedGroups []*ofp.OfpGroupEntry
-
// Process flows
- updatedFlows = append(updatedFlows, newFlows...)
- for _, flow := range existingFlows.Items {
- if idx := fu.FindFlows(newFlows, flow); idx == -1 {
- updatedFlows = append(updatedFlows, flow)
- } else {
- flowsToDelete = append(flowsToDelete, flow)
- }
- }
+ newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(newFlows, existingFlows.Items)
// Process groups
- updatedGroups = append(updatedGroups, newGroups...)
- for _, group := range existingGroups.Items {
- if fu.FindGroup(newGroups, group.Desc.GroupId) == -1 { // does not exist now
- updatedGroups = append(updatedGroups, group)
- } else {
- groupsToDelete = append(groupsToDelete, group)
- }
- }
+ newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(newGroups, existingGroups.Items)
// Sanity check
- if (len(updatedFlows) | len(flowsToDelete) | len(updatedGroups) | len(groupsToDelete)) == 0 {
+ if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
@@ -306,11 +346,11 @@
dType := agent.adapterMgr.getDeviceType(device.Type)
if !dType.AcceptsAddRemoveFlowUpdates {
- if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
+ if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
log.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flows": newFlows, "groups": newGroups})
return coreutils.DoneResponse(), nil
}
- go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, flowMetadata, response)
+ go agent.sendBulkFlowsToAdapters(device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata, response)
} else {
flowChanges := &ofp.FlowChanges{
@@ -326,8 +366,8 @@
}
// store the changed data
- device.Flows = &voltha.Flows{Items: updatedFlows}
- device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
+ device.Flows = &voltha.Flows{Items: updatedAllFlows}
+ device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
if err := agent.updateDeviceWithoutLock(device); err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
}