[VOL-2972] Revert added flows on failure

This commit consists of:
1) Reverts added flows on flow addition failure (e.g. one adapter
may return a failure in which case any corresponding flow that
has been successfully added to the other adapter will be removed).
The corresponding logical device flow will be removed as well.

2) Some minor refactoring in the mocks adapter

3) Some minor logging change to decrease the clutter when running
unit tests.

Change-Id: Ia63243e83516ef81152893563bef76c830bea022
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 7bc8e4d..6a977b1 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -518,6 +518,14 @@
 	return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
 }
 
+func cloneFlows(flows []*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	return proto.Clone(&ofp.Flows{Items: flows}).(*ofp.Flows).Items
+}
+
+func cloneMeters(meters []*ofp.OfpMeterEntry) []*ofp.OfpMeterEntry {
+	return proto.Clone(&ofp.Meters{Items: meters}).(*ofp.Meters).Items
+}
+
 //updateLogicalDevicePortsWithoutLock updates the
 func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
 	oldPorts := device.Ports
@@ -790,7 +798,7 @@
 	return changed, flows
 }
 
-func (agent *LogicalAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
+func (agent *LogicalAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
 
 	flowCommand := modCommand.GetCommand()
 	meterID := fu.GetMeterIdFromFlow(flow)
@@ -807,10 +815,18 @@
 	for _, meter := range meters {
 		if meterID == meter.Config.MeterId { // Found meter in Logicaldevice
 			if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
-				meter.Stats.FlowCount++
+				if revertUpdate {
+					meter.Stats.FlowCount--
+				} else {
+					meter.Stats.FlowCount++
+				}
 				changedMeter = true
 			} else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
-				meter.Stats.FlowCount--
+				if revertUpdate {
+					meter.Stats.FlowCount++
+				} else {
+					meter.Stats.FlowCount--
+				}
 				changedMeter = true
 			}
 			logger.Debugw("Found meter, updated meter flow stats", log.Fields{" meterId": meterID})
@@ -835,6 +851,7 @@
 
 	var flows []*ofp.OfpFlowStats
 	var meters []*ofp.OfpMeterEntry
+	var flowToReplace *ofp.OfpFlowStats
 	var flow *ofp.OfpFlowStats
 	var err error
 
@@ -870,12 +887,12 @@
 		}
 		idx := fu.FindFlows(flows, flow)
 		if idx >= 0 {
-			oldFlow := flows[idx]
+			flowToReplace = flows[idx]
 			if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
-				flow.ByteCount = oldFlow.ByteCount
-				flow.PacketCount = oldFlow.PacketCount
+				flow.ByteCount = flowToReplace.ByteCount
+				flow.PacketCount = flowToReplace.PacketCount
 			}
-			if !proto.Equal(oldFlow, flow) {
+			if !proto.Equal(flowToReplace, flow) {
 				flows[idx] = flow
 				updatedFlows = append(updatedFlows, flow)
 				changed = true
@@ -906,8 +923,9 @@
 			logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
 		if !updated {
-			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow)
+			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow, false)
 			metersToUpdate := &ofp.Meters{}
 			if lDevice.Meters != nil {
 				metersToUpdate = &ofp.Meters{Items: meters}
@@ -915,7 +933,7 @@
 			if changedMeterStats {
 				//Update model
 				if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
-					logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+					logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 					return err
 				}
 				logger.Debugw("meter-stats-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
@@ -929,14 +947,70 @@
 		go func() {
 			// Wait for completion
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
-				logger.Warnw("failure-to-add-flows", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
-				// TODO : revert added flow
+				logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
+				// Revert added flows
+				if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
+					logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+				}
 			}
 		}()
 	}
 	return nil
 }
 
+// revertAddedFlows reverts flows after the flowAdd request has failed.  All flows corresponding to that flowAdd request
+// will be reverted, both from the logical devices and the devices.
+func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
+	logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+	if err := agent.requestQueue.WaitForGreenLight(context.Background()); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	lDevice := agent.getLogicalDeviceWithoutLock()
+
+	// Revert flows
+	clonedFlows := cloneFlows(lDevice.Flows.Items)
+	idx := fu.FindFlows(clonedFlows, addedFlow)
+	if idx < 0 {
+		// Not found - do nothing
+		log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
+		return nil
+	}
+	if replacedFlow != nil {
+		clonedFlows[idx] = replacedFlow
+	} else {
+		clonedFlows = deleteFlowWithoutPreservingOrder(clonedFlows, idx)
+	}
+	lDevice.Flows = &ofp.Flows{Items: clonedFlows}
+
+	// Revert meters
+	meters := cloneMeters(lDevice.Meters.Items)
+	changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, addedFlow, true)
+	if changedMeterStats {
+		lDevice.Meters = &ofp.Meters{Items: meters}
+	}
+
+	// Update the model
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
+		logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+		return err
+	}
+
+	// Update the devices
+	respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
+
+	// Wait for the responses
+	go func() {
+		// Since this action is taken following an add failure, we may also receive a failure for the revert
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+		}
+	}()
+
+	return nil
+}
+
 // GetMeterConfig returns meter config
 func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
 	m := make(map[uint32]bool)
@@ -1205,7 +1279,7 @@
 	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 	idx := fu.FindFlows(flows, flow)
 	if idx >= 0 {
-		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flows[idx])
+		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flows[idx], false)
 		flowsToDelete = append(flowsToDelete, flows[idx])
 		flows = append(flows[:idx], flows[idx+1:]...)
 		changedFlow = true