[VOL-2972] Revert added flows on failure

This commit consists of:
1) Reverts added flows on flow addition failure (e.g. one adapter
may return a failure in which case any corresponding flow that
has been successfully added to the other adapter will be removed).
The corresponding logical device flow will be removed as well.

2) Some minor refactoring in the mocks adapter

3) Some minor logging change to decrease the clutter when running
unit tests.

Change-Id: Ia63243e83516ef81152893563bef76c830bea022
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index e14457d..3c29a01 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -17,6 +17,7 @@
 package mocks
 
 import (
+	"fmt"
 	"strconv"
 	"strings"
 	"sync"
@@ -25,8 +26,6 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 func macAddressToUint32Array(mac string) []uint32 {
@@ -45,39 +44,40 @@
 
 // Adapter represents adapter attributes
 type Adapter struct {
-	coreProxy adapterif.CoreProxy
-	devices   sync.Map
+	coreProxy      adapterif.CoreProxy
+	flows          map[uint64]*voltha.OfpFlowStats
+	flowLock       sync.RWMutex
+	devices        map[string]*voltha.Device
+	deviceLock     sync.RWMutex
+	failFlowAdd    bool
+	failFlowDelete bool
 }
 
 // NewAdapter creates adapter instance
 func NewAdapter(cp adapterif.CoreProxy) *Adapter {
 	return &Adapter{
+		flows:     map[uint64]*voltha.OfpFlowStats{},
+		devices:   map[string]*voltha.Device{},
 		coreProxy: cp,
 	}
 }
 
 func (ta *Adapter) storeDevice(d *voltha.Device) {
+	ta.deviceLock.Lock()
+	defer ta.deviceLock.Unlock()
 	if d != nil {
-		ta.devices.Store(d.Id, d)
+		ta.devices[d.Id] = d
 	}
 }
 
 func (ta *Adapter) getDevice(id string) *voltha.Device {
-	if val, ok := ta.devices.Load(id); ok && val != nil {
-		if device, ok := val.(*voltha.Device); ok {
-			return device
-		}
-	}
-	return nil
+	ta.deviceLock.RLock()
+	defer ta.deviceLock.RUnlock()
+	return ta.devices[id]
 }
 
-func (ta *Adapter) updateDevice(d *voltha.Device) error {
-	if d != nil {
-		if _, ok := ta.devices.LoadOrStore(d.Id, d); !ok {
-			return status.Errorf(codes.Internal, "error updating device %s", d.Id)
-		}
-	}
-	return nil
+func (ta *Adapter) updateDevice(d *voltha.Device) {
+	ta.storeDevice(d)
 }
 
 // Adapter_descriptor -
@@ -145,8 +145,27 @@
 	return nil
 }
 
-// Update_flows_incrementally -
+// Update_flows_incrementally mocks the incremental flow update
 func (ta *Adapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	ta.flowLock.Lock()
+	defer ta.flowLock.Unlock()
+
+	if flows.ToAdd != nil {
+		if ta.failFlowAdd {
+			return fmt.Errorf("flow-add-error")
+		}
+		for _, f := range flows.ToAdd.Items {
+			ta.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		if ta.failFlowDelete {
+			return fmt.Errorf("flow-delete-error")
+		}
+		for _, f := range flows.ToRemove.Items {
+			delete(ta.flows, f.Id)
+		}
+	}
 	return nil
 }
 
@@ -263,3 +282,25 @@
 func (ta *Adapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { //nolint
 	return nil, nil
 }
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (ta *Adapter) GetFlowCount() int {
+	ta.flowLock.RLock()
+	defer ta.flowLock.RUnlock()
+
+	return len(ta.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (ta *Adapter) ClearFlows() {
+	ta.flowLock.Lock()
+	defer ta.flowLock.Unlock()
+
+	ta.flows = map[uint64]*voltha.OfpFlowStats{}
+}
+
+// SetFlowAction sets the adapter action on addition and deletion of flows
+func (ta *Adapter) SetFlowAction(failFlowAdd, failFlowDelete bool) {
+	ta.failFlowAdd = failFlowAdd
+	ta.failFlowDelete = failFlowDelete
+}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 3ccecc7..61f431a 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,9 +20,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"strings"
-	"sync"
-
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -30,6 +27,7 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"strings"
 )
 
 const (
@@ -39,18 +37,13 @@
 
 // OLTAdapter represent OLT adapter
 type OLTAdapter struct {
-	flows map[uint64]*voltha.OfpFlowStats
-	lock  sync.Mutex
-	Adapter
+	*Adapter
 }
 
 // NewOLTAdapter - creates OLT adapter instance
 func NewOLTAdapter(cp adapterif.CoreProxy) *OLTAdapter {
 	return &OLTAdapter{
-		flows: map[uint64]*voltha.OfpFlowStats{},
-		Adapter: Adapter{
-			coreProxy: cp,
-		},
+		Adapter: NewAdapter(cp),
 	}
 }
 
@@ -100,9 +93,7 @@
 			logger.Fatalf("getting-device-failed-%s", err)
 		}
 
-		if err = oltA.updateDevice(d); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		oltA.updateDevice(d)
 
 		// Register Child devices
 		initialUniPortNo := startingUNIPortNo
@@ -204,9 +195,7 @@
 			return
 		}
 
-		if err := oltA.updateDevice(cloned); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		oltA.updateDevice(cloned)
 
 		// Tell the Core that all child devices have been disabled (by default it's an action already taken by the Core
 		if err := oltA.coreProxy.ChildDevicesLost(context.TODO(), cloned.Id); err != nil {
@@ -278,24 +267,6 @@
 	return nil
 }
 
-// Update_flows_incrementally mocks the incremental flow update
-func (oltA *OLTAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
-	oltA.lock.Lock()
-	defer oltA.lock.Unlock()
-
-	if flows.ToAdd != nil {
-		for _, f := range flows.ToAdd.Items {
-			oltA.flows[f.Id] = f
-		}
-	}
-	if flows.ToRemove != nil {
-		for _, f := range flows.ToRemove.Items {
-			delete(oltA.flows, f.Id)
-		}
-	}
-	return nil
-}
-
 // Reboot_device -
 func (oltA *OLTAdapter) Reboot_device(device *voltha.Device) error { // nolint
 	logger.Infow("reboot-device", log.Fields{"deviceId": device.Id})
@@ -305,7 +276,8 @@
 			logger.Fatalf("device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
 		}
 		if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), device.Id, voltha.OperStatus_UNKNOWN); err != nil {
-			logger.Fatalf("port-update-failed", log.Fields{"device-id": device.Id, "error": err})
+			// Not an error as the previous command will start the process of clearing the OLT
+			logger.Infow("port-update-failed", log.Fields{"device-id": device.Id, "error": err})
 		}
 	}()
 	return nil
@@ -317,22 +289,6 @@
 	return nil, errors.New("start-omci-test-not-implemented")
 }
 
-// GetFlowCount returns the total number of flows presently under this adapter
-func (oltA *OLTAdapter) GetFlowCount() int {
-	oltA.lock.Lock()
-	defer oltA.lock.Unlock()
-
-	return len(oltA.flows)
-}
-
-// ClearFlows removes all flows in this adapter
-func (oltA *OLTAdapter) ClearFlows() {
-	oltA.lock.Lock()
-	defer oltA.lock.Unlock()
-
-	oltA.flows = map[uint64]*voltha.OfpFlowStats{}
-}
-
 func (oltA *OLTAdapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
 	_ = deviceId
 	_ = device
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 5f4a5e6..217f01d 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -20,9 +20,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"strings"
-	"sync"
-
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -30,22 +27,18 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"strings"
 )
 
 // ONUAdapter represent ONU adapter attributes
 type ONUAdapter struct {
-	flows map[uint64]*voltha.OfpFlowStats
-	lock  sync.Mutex
-	Adapter
+	*Adapter
 }
 
 // NewONUAdapter creates ONU adapter
 func NewONUAdapter(cp adapterif.CoreProxy) *ONUAdapter {
 	return &ONUAdapter{
-		flows: map[uint64]*voltha.OfpFlowStats{},
-		Adapter: Adapter{
-			coreProxy: cp,
-		},
+		Adapter: NewAdapter(cp),
 	}
 }
 
@@ -116,9 +109,7 @@
 			logger.Fatalf("getting-device-failed-%s", err)
 		}
 
-		if err = onuA.updateDevice(d); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		onuA.updateDevice(d)
 	}()
 	return nil
 }
@@ -168,9 +159,7 @@
 			logger.Warnw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
 			return
 		}
-		if err := onuA.updateDevice(cloned); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		onuA.updateDevice(cloned)
 	}()
 	return nil
 }
@@ -195,53 +184,18 @@
 		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
 			logger.Fatalf("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
 		}
-		if err := onuA.updateDevice(cloned); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+
+		onuA.updateDevice(cloned)
 	}()
 	return nil
 }
 
-// Update_flows_incrementally mocks the incremental flow update
-func (onuA *ONUAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
-	onuA.lock.Lock()
-	defer onuA.lock.Unlock()
-
-	if flows.ToAdd != nil {
-		for _, f := range flows.ToAdd.Items {
-			onuA.flows[f.Id] = f
-		}
-	}
-	if flows.ToRemove != nil {
-		for _, f := range flows.ToRemove.Items {
-			delete(onuA.flows, f.Id)
-		}
-	}
-	return nil
-}
-
 // Start_omci_test begins an omci self-test
 func (onuA *ONUAdapter) Start_omci_test(device *voltha.Device, request *voltha.OmciTestRequest) (*ic.TestResponse, error) { // nolint
 	_ = device
 	return &ic.TestResponse{Result: ic.TestResponse_SUCCESS}, nil
 }
 
-// GetFlowCount returns the total number of flows presently under this adapter
-func (onuA *ONUAdapter) GetFlowCount() int {
-	onuA.lock.Lock()
-	defer onuA.lock.Unlock()
-
-	return len(onuA.flows)
-}
-
-// ClearFlows removes all flows in this adapter
-func (onuA *ONUAdapter) ClearFlows() {
-	onuA.lock.Lock()
-	defer onuA.lock.Unlock()
-
-	onuA.flows = map[uint64]*voltha.OfpFlowStats{}
-}
-
 func (onuA *ONUAdapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
 	_ = deviceId
 	_ = device