[VOL-2972] Revert added flows on failure

This commit consists of:
1) Reverts added flows on flow addition failure (e.g. one adapter
may return a failure in which case any corresponding flow that
has been successfully added to the other adapter will be removed).
The corresponding logical device flow will be removed as well.

2) Some minor refactoring in the mocks adapter

3) Some minor logging change to decrease the clutter when running
unit tests.

Change-Id: Ia63243e83516ef81152893563bef76c830bea022
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 592ccea..2a3e71e 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -438,7 +438,7 @@
 	// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
 	var wg sync.WaitGroup
 	wg.Add(1)
-	go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg)
+	go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg, false, false)
 
 	//	Create the device with valid data
 	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
@@ -596,6 +596,33 @@
 	err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
 	assert.Nil(t, err)
 }
+
+func (nb *NBTest) deleteAllDevices(t *testing.T, nbi *NBIHandler) {
+	//Get an OLT device
+	oltDevice, err := nb.getADevice(true, nbi)
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+
+	// Delete the oltDevice
+	_, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+
+	// Wait for all devices to be deleted
+	vFunction := func(devices *voltha.Devices) bool {
+		return devices != nil && len(devices.Items) == 0
+	}
+	err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+	assert.Nil(t, err)
+
+	// Wait for absence of logical device
+	vlFunction := func(lds *voltha.LogicalDevices) bool {
+		return lds != nil && len(lds.Items) == 0
+	}
+
+	err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+	assert.Nil(t, err)
+}
+
 func (nb *NBTest) testEnableAndDeleteAllDevice(t *testing.T, nbi *NBIHandler) {
 	//Create the device with valid data
 	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
@@ -955,9 +982,12 @@
 	return uint64(md | (port & 0xFFFFFFFF))
 }
 
-func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int) {
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, flowAddFail bool) {
 	expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
-	// Wait for logical device to have all the flows
+	if flowAddFail {
+		expectedNumFlows = 0
+	}
+	// Wait for logical device to have the flows (or none
 	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
 		return lds != nil && len(lds.Items) == 1 && len(lds.Items[0].Flows.Items) == expectedNumFlows
 	}
@@ -1053,13 +1083,17 @@
 	assert.Nil(t, err)
 }
 
-func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup, flowAddFail bool, flowDelete bool) {
 	defer wg.Done()
 
 	// Clear any existing flows on the adapters
 	nb.oltAdapter.ClearFlows()
 	nb.onuAdapter.ClearFlows()
 
+	// Set the adapter actions on flow addition/deletion
+	nb.oltAdapter.SetFlowAction(flowAddFail, flowDelete)
+	nb.onuAdapter.SetFlowAction(flowAddFail, flowDelete)
+
 	// Wait until a logical device is ready
 	var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
 		if lds == nil || len(lds.Items) != 1 {
@@ -1134,23 +1168,74 @@
 		}
 	}
 	//Verify the flow count on the logical device
-	nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts)
+	nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts, flowAddFail)
 
-	// Wait until all flows have been sent to the OLT adapters
+	// Wait until all flows have been sent to the OLT adapters (or all failed)
+	expectedFlowCount := (numNNIPorts * 3) + numNNIPorts*numUNIPorts
+	if flowAddFail {
+		expectedFlowCount = 0
+	}
 	var oltVFunc isConditionSatisfied = func() bool {
-		return nb.oltAdapter.GetFlowCount() >= (numNNIPorts*3)+numNNIPorts*numUNIPorts
+		return nb.oltAdapter.GetFlowCount() >= expectedFlowCount
 	}
 	err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
 	assert.Nil(t, err)
 
-	// Wait until all flows have been sent to the ONU adapters
+	// Wait until all flows have been sent to the ONU adapters (or all failed)
+	expectedFlowCount = numUNIPorts
+	if flowAddFail {
+		expectedFlowCount = 0
+	}
 	var onuVFunc isConditionSatisfied = func() bool {
-		return nb.onuAdapter.GetFlowCount() == numUNIPorts
+		return nb.onuAdapter.GetFlowCount() == expectedFlowCount
 	}
 	err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
 	assert.Nil(t, err)
 }
 
+func (nb *NBTest) testFlowAddFailure(t *testing.T, nbi *NBIHandler) {
+
+	// Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+	var wg sync.WaitGroup
+	wg.Add(1)
+	go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg, true, true)
+
+	//	Create the device with valid data
+	oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+
+	// Verify oltDevice exist in the core
+	devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.Equal(t, 1, len(devices.Items))
+	assert.Equal(t, oltDevice.Id, devices.Items[0].Id)
+
+	// Enable the oltDevice
+	_, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+
+	// Wait for the logical device to be in the ready state
+	var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+		return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+	}
+	err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+	assert.Nil(t, err)
+
+	// Verify that the devices have been setup correctly
+	nb.verifyDevices(t, nbi)
+
+	// Get latest oltDevice data
+	oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+
+	// Verify that the logical device has been setup correctly
+	nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+	// Wait until all flows has been sent to the devices successfully
+	wg.Wait()
+}
+
 func TestSuiteNbiApiHandler(t *testing.T) {
 	f, err := os.Create("../../../tests/results/profile.cpu")
 	if err != nil {
@@ -1186,8 +1271,8 @@
 	// 2. Test adapter registration
 	nb.testAdapterRegistration(t, nbi)
 
-	numberOfDeviceTestRuns := 2
-	for i := 1; i <= numberOfDeviceTestRuns; i++ {
+	numberOfTestRuns := 2
+	for i := 1; i <= numberOfTestRuns; i++ {
 		//3. Test create device
 		nb.testCreateDevice(t, nbi)
 
@@ -1211,7 +1296,11 @@
 
 		// 10. Test omci test
 		nb.testStartOmciTestAction(t, nbi)
-	}
 
-	//x. TODO - More tests to come
+		// 11. Test flow add failure
+		nb.testFlowAddFailure(t, nbi)
+
+		// 12.  Clean up
+		nb.deleteAllDevices(t, nbi)
+	}
 }
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index 7bc8e4d..6a977b1 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -518,6 +518,14 @@
 	return proto.Clone(&voltha.LogicalPorts{Items: ports}).(*voltha.LogicalPorts).Items
 }
 
+func cloneFlows(flows []*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+	return proto.Clone(&ofp.Flows{Items: flows}).(*ofp.Flows).Items
+}
+
+func cloneMeters(meters []*ofp.OfpMeterEntry) []*ofp.OfpMeterEntry {
+	return proto.Clone(&ofp.Meters{Items: meters}).(*ofp.Meters).Items
+}
+
 //updateLogicalDevicePortsWithoutLock updates the
 func (agent *LogicalAgent) updateLogicalDevicePortsWithoutLock(ctx context.Context, device *voltha.LogicalDevice, newPorts []*voltha.LogicalPort) error {
 	oldPorts := device.Ports
@@ -790,7 +798,7 @@
 	return changed, flows
 }
 
-func (agent *LogicalAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats) bool {
+func (agent *LogicalAgent) updateFlowCountOfMeterStats(modCommand *ofp.OfpFlowMod, meters []*ofp.OfpMeterEntry, flow *ofp.OfpFlowStats, revertUpdate bool) bool {
 
 	flowCommand := modCommand.GetCommand()
 	meterID := fu.GetMeterIdFromFlow(flow)
@@ -807,10 +815,18 @@
 	for _, meter := range meters {
 		if meterID == meter.Config.MeterId { // Found meter in Logicaldevice
 			if flowCommand == ofp.OfpFlowModCommand_OFPFC_ADD {
-				meter.Stats.FlowCount++
+				if revertUpdate {
+					meter.Stats.FlowCount--
+				} else {
+					meter.Stats.FlowCount++
+				}
 				changedMeter = true
 			} else if flowCommand == ofp.OfpFlowModCommand_OFPFC_DELETE_STRICT {
-				meter.Stats.FlowCount--
+				if revertUpdate {
+					meter.Stats.FlowCount++
+				} else {
+					meter.Stats.FlowCount--
+				}
 				changedMeter = true
 			}
 			logger.Debugw("Found meter, updated meter flow stats", log.Fields{" meterId": meterID})
@@ -835,6 +851,7 @@
 
 	var flows []*ofp.OfpFlowStats
 	var meters []*ofp.OfpMeterEntry
+	var flowToReplace *ofp.OfpFlowStats
 	var flow *ofp.OfpFlowStats
 	var err error
 
@@ -870,12 +887,12 @@
 		}
 		idx := fu.FindFlows(flows, flow)
 		if idx >= 0 {
-			oldFlow := flows[idx]
+			flowToReplace = flows[idx]
 			if (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_RESET_COUNTS)) != 0 {
-				flow.ByteCount = oldFlow.ByteCount
-				flow.PacketCount = oldFlow.PacketCount
+				flow.ByteCount = flowToReplace.ByteCount
+				flow.PacketCount = flowToReplace.PacketCount
 			}
-			if !proto.Equal(oldFlow, flow) {
+			if !proto.Equal(flowToReplace, flow) {
 				flows[idx] = flow
 				updatedFlows = append(updatedFlows, flow)
 				changed = true
@@ -906,8 +923,9 @@
 			logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 			return err
 		}
+
 		if !updated {
-			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow)
+			changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, flow, false)
 			metersToUpdate := &ofp.Meters{}
 			if lDevice.Meters != nil {
 				metersToUpdate = &ofp.Meters{Items: meters}
@@ -915,7 +933,7 @@
 			if changedMeterStats {
 				//Update model
 				if err := agent.updateLogicalDeviceMetersWithoutLock(ctx, metersToUpdate); err != nil {
-					logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+					logger.Errorw("db-meter-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
 					return err
 				}
 				logger.Debugw("meter-stats-updated-in-DB-successfully", log.Fields{"updated_meters": meters})
@@ -929,14 +947,70 @@
 		go func() {
 			// Wait for completion
 			if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChannels...); res != nil {
-				logger.Warnw("failure-to-add-flows", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
-				// TODO : revert added flow
+				logger.Infow("failed-to-add-flows-will-attempt-deletion", log.Fields{"errors": res, "logical-device-id": agent.logicalDeviceID})
+				// Revert added flows
+				if err := agent.revertAddedFlows(context.Background(), mod, flow, flowToReplace, deviceRules, &flowMetadata); err != nil {
+					logger.Errorw("failure-to-delete-flows-after-failed-addition", log.Fields{"logical-device-id": agent.logicalDeviceID, "error": err})
+				}
 			}
 		}()
 	}
 	return nil
 }
 
+// revertAddedFlows reverts flows after the flowAdd request has failed.  All flows corresponding to that flowAdd request
+// will be reverted, both from the logical devices and the devices.
+func (agent *LogicalAgent) revertAddedFlows(ctx context.Context, mod *ofp.OfpFlowMod, addedFlow *ofp.OfpFlowStats, replacedFlow *ofp.OfpFlowStats, deviceRules *fu.DeviceRules, metadata *voltha.FlowMetadata) error {
+	logger.Debugw("revertFlowAdd", log.Fields{"added-flow": addedFlow, "replaced-flow": replacedFlow, "device-rules": deviceRules, "metadata": metadata})
+	if err := agent.requestQueue.WaitForGreenLight(context.Background()); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	lDevice := agent.getLogicalDeviceWithoutLock()
+
+	// Revert flows
+	clonedFlows := cloneFlows(lDevice.Flows.Items)
+	idx := fu.FindFlows(clonedFlows, addedFlow)
+	if idx < 0 {
+		// Not found - do nothing
+		log.Debugw("flow-not-found", log.Fields{"added-flow": addedFlow})
+		return nil
+	}
+	if replacedFlow != nil {
+		clonedFlows[idx] = replacedFlow
+	} else {
+		clonedFlows = deleteFlowWithoutPreservingOrder(clonedFlows, idx)
+	}
+	lDevice.Flows = &ofp.Flows{Items: clonedFlows}
+
+	// Revert meters
+	meters := cloneMeters(lDevice.Meters.Items)
+	changedMeterStats := agent.updateFlowCountOfMeterStats(mod, meters, addedFlow, true)
+	if changedMeterStats {
+		lDevice.Meters = &ofp.Meters{Items: meters}
+	}
+
+	// Update the model
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, lDevice); err != nil {
+		logger.Errorw("db-flow-update-failed", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "error": err})
+		return err
+	}
+
+	// Update the devices
+	respChnls := agent.deleteFlowsAndGroupsFromDevices(ctx, deviceRules, metadata)
+
+	// Wait for the responses
+	go func() {
+		// Since this action is taken following an add failure, we may also receive a failure for the revert
+		if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, respChnls...); res != nil {
+			logger.Warnw("failure-reverting-added-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID, "errors": res})
+		}
+	}()
+
+	return nil
+}
+
 // GetMeterConfig returns meter config
 func (agent *LogicalAgent) GetMeterConfig(flows []*ofp.OfpFlowStats, meters []*ofp.OfpMeterEntry, metadata *voltha.FlowMetadata) error {
 	m := make(map[uint32]bool)
@@ -1205,7 +1279,7 @@
 	flowsToDelete := make([]*ofp.OfpFlowStats, 0)
 	idx := fu.FindFlows(flows, flow)
 	if idx >= 0 {
-		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flows[idx])
+		changedMeter = agent.updateFlowCountOfMeterStats(mod, meters, flows[idx], false)
 		flowsToDelete = append(flowsToDelete, flows[idx])
 		flows = append(flows[:idx], flows[idx+1:]...)
 		changedFlow = true
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 9990104..73ab456 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -334,7 +334,7 @@
 		return foundChildDevice, nil
 	}
 
-	logger.Warnw("child-device-not-found", log.Fields{"parentDeviceId": parentDevice.Id,
+	logger.Debugw("child-device-not-found", log.Fields{"parentDeviceId": parentDevice.Id,
 		"serialNumber": serialNumber, "onuId": onuID, "parentPortNo": parentPortNo})
 	return nil, status.Errorf(codes.NotFound, "%s", parentDeviceID)
 }
diff --git a/rw_core/flowdecomposition/flow_decomposer_test.go b/rw_core/flowdecomposition/flow_decomposer_test.go
index 9650cd4..eb7ad75 100644
--- a/rw_core/flowdecomposition/flow_decomposer_test.go
+++ b/rw_core/flowdecomposition/flow_decomposer_test.go
@@ -21,7 +21,6 @@
 	"github.com/opencord/voltha-go/rw_core/mocks"
 	"github.com/opencord/voltha-go/rw_core/route"
 	fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/stretchr/testify/assert"
@@ -31,21 +30,6 @@
 	"testing"
 )
 
-func init() {
-	// Setup default logger - applies for packages that do not have specific logger set
-	if _, err := log.SetDefaultLogger(log.JSON, 0, log.Fields{"instanceId": 1}); err != nil {
-		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
-	}
-
-	// Update all loggers (provisioned via init) with a common field
-	if err := log.UpdateAllLoggers(log.Fields{"instanceId": 1}); err != nil {
-		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
-	}
-
-	// Update all loggers to log level specified as input parameter
-	log.SetAllLogLevel(0)
-}
-
 type testDeviceManager struct {
 	mocks.DeviceManager
 	devices map[string]*voltha.Device
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index e14457d..3c29a01 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -17,6 +17,7 @@
 package mocks
 
 import (
+	"fmt"
 	"strconv"
 	"strings"
 	"sync"
@@ -25,8 +26,6 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 )
 
 func macAddressToUint32Array(mac string) []uint32 {
@@ -45,39 +44,40 @@
 
 // Adapter represents adapter attributes
 type Adapter struct {
-	coreProxy adapterif.CoreProxy
-	devices   sync.Map
+	coreProxy      adapterif.CoreProxy
+	flows          map[uint64]*voltha.OfpFlowStats
+	flowLock       sync.RWMutex
+	devices        map[string]*voltha.Device
+	deviceLock     sync.RWMutex
+	failFlowAdd    bool
+	failFlowDelete bool
 }
 
 // NewAdapter creates adapter instance
 func NewAdapter(cp adapterif.CoreProxy) *Adapter {
 	return &Adapter{
+		flows:     map[uint64]*voltha.OfpFlowStats{},
+		devices:   map[string]*voltha.Device{},
 		coreProxy: cp,
 	}
 }
 
 func (ta *Adapter) storeDevice(d *voltha.Device) {
+	ta.deviceLock.Lock()
+	defer ta.deviceLock.Unlock()
 	if d != nil {
-		ta.devices.Store(d.Id, d)
+		ta.devices[d.Id] = d
 	}
 }
 
 func (ta *Adapter) getDevice(id string) *voltha.Device {
-	if val, ok := ta.devices.Load(id); ok && val != nil {
-		if device, ok := val.(*voltha.Device); ok {
-			return device
-		}
-	}
-	return nil
+	ta.deviceLock.RLock()
+	defer ta.deviceLock.RUnlock()
+	return ta.devices[id]
 }
 
-func (ta *Adapter) updateDevice(d *voltha.Device) error {
-	if d != nil {
-		if _, ok := ta.devices.LoadOrStore(d.Id, d); !ok {
-			return status.Errorf(codes.Internal, "error updating device %s", d.Id)
-		}
-	}
-	return nil
+func (ta *Adapter) updateDevice(d *voltha.Device) {
+	ta.storeDevice(d)
 }
 
 // Adapter_descriptor -
@@ -145,8 +145,27 @@
 	return nil
 }
 
-// Update_flows_incrementally -
+// Update_flows_incrementally mocks the incremental flow update
 func (ta *Adapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+	ta.flowLock.Lock()
+	defer ta.flowLock.Unlock()
+
+	if flows.ToAdd != nil {
+		if ta.failFlowAdd {
+			return fmt.Errorf("flow-add-error")
+		}
+		for _, f := range flows.ToAdd.Items {
+			ta.flows[f.Id] = f
+		}
+	}
+	if flows.ToRemove != nil {
+		if ta.failFlowDelete {
+			return fmt.Errorf("flow-delete-error")
+		}
+		for _, f := range flows.ToRemove.Items {
+			delete(ta.flows, f.Id)
+		}
+	}
 	return nil
 }
 
@@ -263,3 +282,25 @@
 func (ta *Adapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { //nolint
 	return nil, nil
 }
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (ta *Adapter) GetFlowCount() int {
+	ta.flowLock.RLock()
+	defer ta.flowLock.RUnlock()
+
+	return len(ta.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (ta *Adapter) ClearFlows() {
+	ta.flowLock.Lock()
+	defer ta.flowLock.Unlock()
+
+	ta.flows = map[uint64]*voltha.OfpFlowStats{}
+}
+
+// SetFlowAction sets the adapter action on addition and deletion of flows
+func (ta *Adapter) SetFlowAction(failFlowAdd, failFlowDelete bool) {
+	ta.failFlowAdd = failFlowAdd
+	ta.failFlowDelete = failFlowDelete
+}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 3ccecc7..61f431a 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,9 +20,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"strings"
-	"sync"
-
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -30,6 +27,7 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"strings"
 )
 
 const (
@@ -39,18 +37,13 @@
 
 // OLTAdapter represent OLT adapter
 type OLTAdapter struct {
-	flows map[uint64]*voltha.OfpFlowStats
-	lock  sync.Mutex
-	Adapter
+	*Adapter
 }
 
 // NewOLTAdapter - creates OLT adapter instance
 func NewOLTAdapter(cp adapterif.CoreProxy) *OLTAdapter {
 	return &OLTAdapter{
-		flows: map[uint64]*voltha.OfpFlowStats{},
-		Adapter: Adapter{
-			coreProxy: cp,
-		},
+		Adapter: NewAdapter(cp),
 	}
 }
 
@@ -100,9 +93,7 @@
 			logger.Fatalf("getting-device-failed-%s", err)
 		}
 
-		if err = oltA.updateDevice(d); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		oltA.updateDevice(d)
 
 		// Register Child devices
 		initialUniPortNo := startingUNIPortNo
@@ -204,9 +195,7 @@
 			return
 		}
 
-		if err := oltA.updateDevice(cloned); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		oltA.updateDevice(cloned)
 
 		// Tell the Core that all child devices have been disabled (by default it's an action already taken by the Core
 		if err := oltA.coreProxy.ChildDevicesLost(context.TODO(), cloned.Id); err != nil {
@@ -278,24 +267,6 @@
 	return nil
 }
 
-// Update_flows_incrementally mocks the incremental flow update
-func (oltA *OLTAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
-	oltA.lock.Lock()
-	defer oltA.lock.Unlock()
-
-	if flows.ToAdd != nil {
-		for _, f := range flows.ToAdd.Items {
-			oltA.flows[f.Id] = f
-		}
-	}
-	if flows.ToRemove != nil {
-		for _, f := range flows.ToRemove.Items {
-			delete(oltA.flows, f.Id)
-		}
-	}
-	return nil
-}
-
 // Reboot_device -
 func (oltA *OLTAdapter) Reboot_device(device *voltha.Device) error { // nolint
 	logger.Infow("reboot-device", log.Fields{"deviceId": device.Id})
@@ -305,7 +276,8 @@
 			logger.Fatalf("device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
 		}
 		if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), device.Id, voltha.OperStatus_UNKNOWN); err != nil {
-			logger.Fatalf("port-update-failed", log.Fields{"device-id": device.Id, "error": err})
+			// Not an error as the previous command will start the process of clearing the OLT
+			logger.Infow("port-update-failed", log.Fields{"device-id": device.Id, "error": err})
 		}
 	}()
 	return nil
@@ -317,22 +289,6 @@
 	return nil, errors.New("start-omci-test-not-implemented")
 }
 
-// GetFlowCount returns the total number of flows presently under this adapter
-func (oltA *OLTAdapter) GetFlowCount() int {
-	oltA.lock.Lock()
-	defer oltA.lock.Unlock()
-
-	return len(oltA.flows)
-}
-
-// ClearFlows removes all flows in this adapter
-func (oltA *OLTAdapter) ClearFlows() {
-	oltA.lock.Lock()
-	defer oltA.lock.Unlock()
-
-	oltA.flows = map[uint64]*voltha.OfpFlowStats{}
-}
-
 func (oltA *OLTAdapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
 	_ = deviceId
 	_ = device
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 5f4a5e6..217f01d 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -20,9 +20,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"strings"
-	"sync"
-
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
 	com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -30,22 +27,18 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	of "github.com/opencord/voltha-protos/v3/go/openflow_13"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"strings"
 )
 
 // ONUAdapter represent ONU adapter attributes
 type ONUAdapter struct {
-	flows map[uint64]*voltha.OfpFlowStats
-	lock  sync.Mutex
-	Adapter
+	*Adapter
 }
 
 // NewONUAdapter creates ONU adapter
 func NewONUAdapter(cp adapterif.CoreProxy) *ONUAdapter {
 	return &ONUAdapter{
-		flows: map[uint64]*voltha.OfpFlowStats{},
-		Adapter: Adapter{
-			coreProxy: cp,
-		},
+		Adapter: NewAdapter(cp),
 	}
 }
 
@@ -116,9 +109,7 @@
 			logger.Fatalf("getting-device-failed-%s", err)
 		}
 
-		if err = onuA.updateDevice(d); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		onuA.updateDevice(d)
 	}()
 	return nil
 }
@@ -168,9 +159,7 @@
 			logger.Warnw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
 			return
 		}
-		if err := onuA.updateDevice(cloned); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+		onuA.updateDevice(cloned)
 	}()
 	return nil
 }
@@ -195,53 +184,18 @@
 		if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
 			logger.Fatalf("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
 		}
-		if err := onuA.updateDevice(cloned); err != nil {
-			logger.Fatalf("saving-device-failed-%s", err)
-		}
+
+		onuA.updateDevice(cloned)
 	}()
 	return nil
 }
 
-// Update_flows_incrementally mocks the incremental flow update
-func (onuA *ONUAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
-	onuA.lock.Lock()
-	defer onuA.lock.Unlock()
-
-	if flows.ToAdd != nil {
-		for _, f := range flows.ToAdd.Items {
-			onuA.flows[f.Id] = f
-		}
-	}
-	if flows.ToRemove != nil {
-		for _, f := range flows.ToRemove.Items {
-			delete(onuA.flows, f.Id)
-		}
-	}
-	return nil
-}
-
 // Start_omci_test begins an omci self-test
 func (onuA *ONUAdapter) Start_omci_test(device *voltha.Device, request *voltha.OmciTestRequest) (*ic.TestResponse, error) { // nolint
 	_ = device
 	return &ic.TestResponse{Result: ic.TestResponse_SUCCESS}, nil
 }
 
-// GetFlowCount returns the total number of flows presently under this adapter
-func (onuA *ONUAdapter) GetFlowCount() int {
-	onuA.lock.Lock()
-	defer onuA.lock.Unlock()
-
-	return len(onuA.flows)
-}
-
-// ClearFlows removes all flows in this adapter
-func (onuA *ONUAdapter) ClearFlows() {
-	onuA.lock.Lock()
-	defer onuA.lock.Unlock()
-
-	onuA.flows = map[uint64]*voltha.OfpFlowStats{}
-}
-
 func (onuA *ONUAdapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
 	_ = deviceId
 	_ = device