[VOL-2972] Revert added flows on failure
This commit consists of:
1) Reverts added flows on flow addition failure (e.g. one adapter
may return a failure in which case any corresponding flow that
has been successfully added to the other adapter will be removed).
The corresponding logical device flow will be removed as well.
2) Some minor refactoring in the mocks adapter
3) Some minor logging change to decrease the clutter when running
unit tests.
Change-Id: Ia63243e83516ef81152893563bef76c830bea022
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index e14457d..3c29a01 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -17,6 +17,7 @@
package mocks
import (
+ "fmt"
"strconv"
"strings"
"sync"
@@ -25,8 +26,6 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
of "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
func macAddressToUint32Array(mac string) []uint32 {
@@ -45,39 +44,40 @@
// Adapter represents adapter attributes
type Adapter struct {
- coreProxy adapterif.CoreProxy
- devices sync.Map
+ coreProxy adapterif.CoreProxy
+ flows map[uint64]*voltha.OfpFlowStats
+ flowLock sync.RWMutex
+ devices map[string]*voltha.Device
+ deviceLock sync.RWMutex
+ failFlowAdd bool
+ failFlowDelete bool
}
// NewAdapter creates adapter instance
func NewAdapter(cp adapterif.CoreProxy) *Adapter {
return &Adapter{
+ flows: map[uint64]*voltha.OfpFlowStats{},
+ devices: map[string]*voltha.Device{},
coreProxy: cp,
}
}
func (ta *Adapter) storeDevice(d *voltha.Device) {
+ ta.deviceLock.Lock()
+ defer ta.deviceLock.Unlock()
if d != nil {
- ta.devices.Store(d.Id, d)
+ ta.devices[d.Id] = d
}
}
func (ta *Adapter) getDevice(id string) *voltha.Device {
- if val, ok := ta.devices.Load(id); ok && val != nil {
- if device, ok := val.(*voltha.Device); ok {
- return device
- }
- }
- return nil
+ ta.deviceLock.RLock()
+ defer ta.deviceLock.RUnlock()
+ return ta.devices[id]
}
-func (ta *Adapter) updateDevice(d *voltha.Device) error {
- if d != nil {
- if _, ok := ta.devices.LoadOrStore(d.Id, d); !ok {
- return status.Errorf(codes.Internal, "error updating device %s", d.Id)
- }
- }
- return nil
+func (ta *Adapter) updateDevice(d *voltha.Device) {
+ ta.storeDevice(d)
}
// Adapter_descriptor -
@@ -145,8 +145,27 @@
return nil
}
-// Update_flows_incrementally -
+// Update_flows_incrementally mocks the incremental flow update
func (ta *Adapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+ ta.flowLock.Lock()
+ defer ta.flowLock.Unlock()
+
+ if flows.ToAdd != nil {
+ if ta.failFlowAdd {
+ return fmt.Errorf("flow-add-error")
+ }
+ for _, f := range flows.ToAdd.Items {
+ ta.flows[f.Id] = f
+ }
+ }
+ if flows.ToRemove != nil {
+ if ta.failFlowDelete {
+ return fmt.Errorf("flow-delete-error")
+ }
+ for _, f := range flows.ToRemove.Items {
+ delete(ta.flows, f.Id)
+ }
+ }
return nil
}
@@ -263,3 +282,25 @@
func (ta *Adapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { //nolint
return nil, nil
}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (ta *Adapter) GetFlowCount() int {
+ ta.flowLock.RLock()
+ defer ta.flowLock.RUnlock()
+
+ return len(ta.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (ta *Adapter) ClearFlows() {
+ ta.flowLock.Lock()
+ defer ta.flowLock.Unlock()
+
+ ta.flows = map[uint64]*voltha.OfpFlowStats{}
+}
+
+// SetFlowAction sets the adapter action on addition and deletion of flows
+func (ta *Adapter) SetFlowAction(failFlowAdd, failFlowDelete bool) {
+ ta.failFlowAdd = failFlowAdd
+ ta.failFlowDelete = failFlowDelete
+}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index 3ccecc7..61f431a 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -20,9 +20,6 @@
"context"
"errors"
"fmt"
- "strings"
- "sync"
-
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -30,6 +27,7 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
of "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
+ "strings"
)
const (
@@ -39,18 +37,13 @@
// OLTAdapter represent OLT adapter
type OLTAdapter struct {
- flows map[uint64]*voltha.OfpFlowStats
- lock sync.Mutex
- Adapter
+ *Adapter
}
// NewOLTAdapter - creates OLT adapter instance
func NewOLTAdapter(cp adapterif.CoreProxy) *OLTAdapter {
return &OLTAdapter{
- flows: map[uint64]*voltha.OfpFlowStats{},
- Adapter: Adapter{
- coreProxy: cp,
- },
+ Adapter: NewAdapter(cp),
}
}
@@ -100,9 +93,7 @@
logger.Fatalf("getting-device-failed-%s", err)
}
- if err = oltA.updateDevice(d); err != nil {
- logger.Fatalf("saving-device-failed-%s", err)
- }
+ oltA.updateDevice(d)
// Register Child devices
initialUniPortNo := startingUNIPortNo
@@ -204,9 +195,7 @@
return
}
- if err := oltA.updateDevice(cloned); err != nil {
- logger.Fatalf("saving-device-failed-%s", err)
- }
+ oltA.updateDevice(cloned)
// Tell the Core that all child devices have been disabled (by default it's an action already taken by the Core
if err := oltA.coreProxy.ChildDevicesLost(context.TODO(), cloned.Id); err != nil {
@@ -278,24 +267,6 @@
return nil
}
-// Update_flows_incrementally mocks the incremental flow update
-func (oltA *OLTAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
- oltA.lock.Lock()
- defer oltA.lock.Unlock()
-
- if flows.ToAdd != nil {
- for _, f := range flows.ToAdd.Items {
- oltA.flows[f.Id] = f
- }
- }
- if flows.ToRemove != nil {
- for _, f := range flows.ToRemove.Items {
- delete(oltA.flows, f.Id)
- }
- }
- return nil
-}
-
// Reboot_device -
func (oltA *OLTAdapter) Reboot_device(device *voltha.Device) error { // nolint
logger.Infow("reboot-device", log.Fields{"deviceId": device.Id})
@@ -305,7 +276,8 @@
logger.Fatalf("device-state-update-failed", log.Fields{"device-id": device.Id, "error": err})
}
if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), device.Id, voltha.OperStatus_UNKNOWN); err != nil {
- logger.Fatalf("port-update-failed", log.Fields{"device-id": device.Id, "error": err})
+ // Not an error as the previous command will start the process of clearing the OLT
+ logger.Infow("port-update-failed", log.Fields{"device-id": device.Id, "error": err})
}
}()
return nil
@@ -317,22 +289,6 @@
return nil, errors.New("start-omci-test-not-implemented")
}
-// GetFlowCount returns the total number of flows presently under this adapter
-func (oltA *OLTAdapter) GetFlowCount() int {
- oltA.lock.Lock()
- defer oltA.lock.Unlock()
-
- return len(oltA.flows)
-}
-
-// ClearFlows removes all flows in this adapter
-func (oltA *OLTAdapter) ClearFlows() {
- oltA.lock.Lock()
- defer oltA.lock.Unlock()
-
- oltA.flows = map[uint64]*voltha.OfpFlowStats{}
-}
-
func (oltA *OLTAdapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
_ = deviceId
_ = device
diff --git a/rw_core/mocks/adapter_onu.go b/rw_core/mocks/adapter_onu.go
index 5f4a5e6..217f01d 100644
--- a/rw_core/mocks/adapter_onu.go
+++ b/rw_core/mocks/adapter_onu.go
@@ -20,9 +20,6 @@
"context"
"errors"
"fmt"
- "strings"
- "sync"
-
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
@@ -30,22 +27,18 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
of "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
+ "strings"
)
// ONUAdapter represent ONU adapter attributes
type ONUAdapter struct {
- flows map[uint64]*voltha.OfpFlowStats
- lock sync.Mutex
- Adapter
+ *Adapter
}
// NewONUAdapter creates ONU adapter
func NewONUAdapter(cp adapterif.CoreProxy) *ONUAdapter {
return &ONUAdapter{
- flows: map[uint64]*voltha.OfpFlowStats{},
- Adapter: Adapter{
- coreProxy: cp,
- },
+ Adapter: NewAdapter(cp),
}
}
@@ -116,9 +109,7 @@
logger.Fatalf("getting-device-failed-%s", err)
}
- if err = onuA.updateDevice(d); err != nil {
- logger.Fatalf("saving-device-failed-%s", err)
- }
+ onuA.updateDevice(d)
}()
return nil
}
@@ -168,9 +159,7 @@
logger.Warnw("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
return
}
- if err := onuA.updateDevice(cloned); err != nil {
- logger.Fatalf("saving-device-failed-%s", err)
- }
+ onuA.updateDevice(cloned)
}()
return nil
}
@@ -195,53 +184,18 @@
if err := onuA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
logger.Fatalf("device-state-update-failed", log.Fields{"deviceId": device.Id, "error": err})
}
- if err := onuA.updateDevice(cloned); err != nil {
- logger.Fatalf("saving-device-failed-%s", err)
- }
+
+ onuA.updateDevice(cloned)
}()
return nil
}
-// Update_flows_incrementally mocks the incremental flow update
-func (onuA *ONUAdapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
- onuA.lock.Lock()
- defer onuA.lock.Unlock()
-
- if flows.ToAdd != nil {
- for _, f := range flows.ToAdd.Items {
- onuA.flows[f.Id] = f
- }
- }
- if flows.ToRemove != nil {
- for _, f := range flows.ToRemove.Items {
- delete(onuA.flows, f.Id)
- }
- }
- return nil
-}
-
// Start_omci_test begins an omci self-test
func (onuA *ONUAdapter) Start_omci_test(device *voltha.Device, request *voltha.OmciTestRequest) (*ic.TestResponse, error) { // nolint
_ = device
return &ic.TestResponse{Result: ic.TestResponse_SUCCESS}, nil
}
-// GetFlowCount returns the total number of flows presently under this adapter
-func (onuA *ONUAdapter) GetFlowCount() int {
- onuA.lock.Lock()
- defer onuA.lock.Unlock()
-
- return len(onuA.flows)
-}
-
-// ClearFlows removes all flows in this adapter
-func (onuA *ONUAdapter) ClearFlows() {
- onuA.lock.Lock()
- defer onuA.lock.Unlock()
-
- onuA.flows = map[uint64]*voltha.OfpFlowStats{}
-}
-
func (onuA *ONUAdapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { // nolint
_ = deviceId
_ = device