[VOL-2972] Revert added flows on failure

This commit consists of:
1) Reverts added flows on flow addition failure (e.g. one adapter
may return a failure in which case any corresponding flow that
has been successfully added to the other adapter will be removed).
The corresponding logical device flow will be removed as well.

2) Some minor refactoring in the mocks adapter

3) Some minor logging change to decrease the clutter when running
unit tests.

Change-Id: Ia63243e83516ef81152893563bef76c830bea022
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index e14457d..3c29a01 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -17,6 +17,7 @@
 package mocks
 
 import (
+	"fmt"
 	"strconv"
 	"strings"
 	"sync"
@@ -25,8 +26,6 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 func macAddressToUint32Array(mac string) []uint32 {
@@ -45,39 +44,40 @@
 
 // Adapter represents adapter attributes
 type Adapter struct {
-	coreProxy adapterif.CoreProxy
-	devices   sync.Map
+	coreProxy      adapterif.CoreProxy
+	flows          map[uint64]*voltha.OfpFlowStats
+	flowLock       sync.RWMutex
+	devices        map[string]*voltha.Device
+	deviceLock     sync.RWMutex
+	failFlowAdd    bool
+	failFlowDelete bool
 }
 
 // NewAdapter creates adapter instance
 func NewAdapter(cp adapterif.CoreProxy) *Adapter {
 	return &Adapter{
+		flows:     map[uint64]*voltha.OfpFlowStats{},
+		devices:   map[string]*voltha.Device{},
 		coreProxy: cp,
 	}
 }
 
 func (ta *Adapter) storeDevice(d *voltha.Device) {
+	ta.deviceLock.Lock()
+	defer ta.deviceLock.Unlock()
 	if d != nil {
-		ta.devices.Store(d.Id, d)
+		ta.devices[d.Id] = d
 	}
 }
 
 func (ta *Adapter) getDevice(id string) *voltha.Device {
-	if val, ok := ta.devices.Load(id); ok && val != nil {
-		if device, ok := val.(*voltha.Device); ok {
-			return device
-		}
-	}
-	return nil
+	ta.deviceLock.RLock()
+	defer ta.deviceLock.RUnlock()
+	return ta.devices[id]
 }
 
-func (ta *Adapter) updateDevice(d *voltha.Device) error {
-	if d != nil {
-		if _, ok := ta.devices.LoadOrStore(d.Id, d); !ok {
-			return status.Errorf(codes.Internal, "error updating device %s", d.Id)
-		}
-	}
-	return nil
+func (ta *Adapter) updateDevice(d *voltha.Device) {
+	ta.storeDevice(d)
 }
 
 // Adapter_descriptor -
@@ -145,8 +145,27 @@
 	return nil
 }
 
-// Update_flows_incrementally -
+// Update_flows_incrementally mocks the incremental flow update
 func (ta *Adapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	ta.flowLock.Lock()
+	defer ta.flowLock.Unlock()
+
+	if flows.ToAdd != nil {
+		if ta.failFlowAdd {
+			return fmt.Errorf("flow-add-error")
+		}
+		for _, f := range flows.ToAdd.Items {
+			ta.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		if ta.failFlowDelete {
+			return fmt.Errorf("flow-delete-error")
+		}
+		for _, f := range flows.ToRemove.Items {
+			delete(ta.flows, f.Id)
+		}
+	}
 	return nil
 }
 
@@ -263,3 +282,25 @@
 func (ta *Adapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { //nolint
 	return nil, nil
 }
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (ta *Adapter) GetFlowCount() int {
+	ta.flowLock.RLock()
+	defer ta.flowLock.RUnlock()
+
+	return len(ta.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (ta *Adapter) ClearFlows() {
+	ta.flowLock.Lock()
+	defer ta.flowLock.Unlock()
+
+	ta.flows = map[uint64]*voltha.OfpFlowStats{}
+}
+
+// SetFlowAction sets the adapter action on addition and deletion of flows
+func (ta *Adapter) SetFlowAction(failFlowAdd, failFlowDelete bool) {
+	ta.failFlowAdd = failFlowAdd
+	ta.failFlowDelete = failFlowDelete
+}