[VOL-2972] Revert added flows on failure
This commit consists of:
1) Reverts added flows on flow addition failure (e.g. one adapter
may return a failure in which case any corresponding flow that
has been successfully added to the other adapter will be removed).
The corresponding logical device flow will be removed as well.
2) Some minor refactoring in the mocks adapter
3) Some minor logging change to decrease the clutter when running
unit tests.
Change-Id: Ia63243e83516ef81152893563bef76c830bea022
diff --git a/rw_core/mocks/adapter.go b/rw_core/mocks/adapter.go
index e14457d..3c29a01 100644
--- a/rw_core/mocks/adapter.go
+++ b/rw_core/mocks/adapter.go
@@ -17,6 +17,7 @@
package mocks
import (
+ "fmt"
"strconv"
"strings"
"sync"
@@ -25,8 +26,6 @@
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
of "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
)
func macAddressToUint32Array(mac string) []uint32 {
@@ -45,39 +44,40 @@
// Adapter represents adapter attributes
type Adapter struct {
- coreProxy adapterif.CoreProxy
- devices sync.Map
+ coreProxy adapterif.CoreProxy
+ flows map[uint64]*voltha.OfpFlowStats
+ flowLock sync.RWMutex
+ devices map[string]*voltha.Device
+ deviceLock sync.RWMutex
+ failFlowAdd bool
+ failFlowDelete bool
}
// NewAdapter creates adapter instance
func NewAdapter(cp adapterif.CoreProxy) *Adapter {
return &Adapter{
+ flows: map[uint64]*voltha.OfpFlowStats{},
+ devices: map[string]*voltha.Device{},
coreProxy: cp,
}
}
func (ta *Adapter) storeDevice(d *voltha.Device) {
+ ta.deviceLock.Lock()
+ defer ta.deviceLock.Unlock()
if d != nil {
- ta.devices.Store(d.Id, d)
+ ta.devices[d.Id] = d
}
}
func (ta *Adapter) getDevice(id string) *voltha.Device {
- if val, ok := ta.devices.Load(id); ok && val != nil {
- if device, ok := val.(*voltha.Device); ok {
- return device
- }
- }
- return nil
+ ta.deviceLock.RLock()
+ defer ta.deviceLock.RUnlock()
+ return ta.devices[id]
}
-func (ta *Adapter) updateDevice(d *voltha.Device) error {
- if d != nil {
- if _, ok := ta.devices.LoadOrStore(d.Id, d); !ok {
- return status.Errorf(codes.Internal, "error updating device %s", d.Id)
- }
- }
- return nil
+func (ta *Adapter) updateDevice(d *voltha.Device) {
+ ta.storeDevice(d)
}
// Adapter_descriptor -
@@ -145,8 +145,27 @@
return nil
}
-// Update_flows_incrementally -
+// Update_flows_incrementally mocks the incremental flow update
func (ta *Adapter) Update_flows_incrementally(device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error { // nolint
+ ta.flowLock.Lock()
+ defer ta.flowLock.Unlock()
+
+ if flows.ToAdd != nil {
+ if ta.failFlowAdd {
+ return fmt.Errorf("flow-add-error")
+ }
+ for _, f := range flows.ToAdd.Items {
+ ta.flows[f.Id] = f
+ }
+ }
+ if flows.ToRemove != nil {
+ if ta.failFlowDelete {
+ return fmt.Errorf("flow-delete-error")
+ }
+ for _, f := range flows.ToRemove.Items {
+ delete(ta.flows, f.Id)
+ }
+ }
return nil
}
@@ -263,3 +282,25 @@
func (ta *Adapter) Get_ext_value(deviceId string, device *voltha.Device, valueflag voltha.ValueType_Type) (*voltha.ReturnValues, error) { //nolint
return nil, nil
}
+
+// GetFlowCount returns the total number of flows presently under this adapter
+func (ta *Adapter) GetFlowCount() int {
+ ta.flowLock.RLock()
+ defer ta.flowLock.RUnlock()
+
+ return len(ta.flows)
+}
+
+// ClearFlows removes all flows in this adapter
+func (ta *Adapter) ClearFlows() {
+ ta.flowLock.Lock()
+ defer ta.flowLock.Unlock()
+
+ ta.flows = map[uint64]*voltha.OfpFlowStats{}
+}
+
+// SetFlowAction sets the adapter action on addition and deletion of flows
+func (ta *Adapter) SetFlowAction(failFlowAdd, failFlowDelete bool) {
+ ta.failFlowAdd = failFlowAdd
+ ta.failFlowDelete = failFlowDelete
+}