[VOL-3005] Separate Flows from Device
Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.
Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index ffc2a3b..29d8062 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,13 +18,21 @@
import (
"context"
+ "math/rand"
+ "sort"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ cm "github.com/opencord/voltha-go/rw_core/mocks"
+ tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
@@ -33,24 +41,18 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "math/rand"
- "sort"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
)
type DATest struct {
etcdServer *mock_etcd.EtcdServer
deviceMgr *Manager
logicalDeviceMgr *LogicalManager
+ adapterMgr *adapter.Manager
kmp kafka.InterContainerProxy
kClient kafka.Client
kvClientPort int
+ oltAdapter *cm.OLTAdapter
+ onuAdapter *cm.ONUAdapter
oltAdapterName string
onuAdapterName string
coreInstanceID string
@@ -64,7 +66,7 @@
test := &DATest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
if err != nil {
logger.Fatal(err)
}
@@ -120,7 +122,7 @@
}
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
- client := setupKVClient(cfg, dat.coreInstanceID)
+ client := tst.SetupKVClient(cfg, dat.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -138,13 +140,18 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
- adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+ dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
- adapterMgr.Start(context.Background())
+
+ if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf("Cannot add default request handler: %s", err)
+ }
+
}
func (dat *DATest) stopAll() {
@@ -155,46 +162,14 @@
dat.kmp.Stop()
}
if dat.etcdServer != nil {
- stopEmbeddedEtcdServer(dat.etcdServer)
+ tst.StopEmbeddedEtcdServer(dat.etcdServer)
}
}
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
- kvClientPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- peerPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
- if etcdServer == nil {
- return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
- }
- return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
- if server != nil {
- server.Stop()
- }
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
- addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
- if err != nil {
- panic("no kv client")
- }
- return client
-}
-
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dProxy, deviceMgr.defaultTimeout)
+ deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
d, err := deviceAgent.start(context.TODO(), clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
@@ -278,7 +253,7 @@
da := newDATest()
assert.NotNil(t, da)
defer da.stopAll()
-
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
da.startCore(false)
@@ -293,6 +268,37 @@
wg.Wait()
}
}
+func TestFlowUpdates(t *testing.T) {
+ da := newDATest()
+ assert.NotNil(t, da)
+ defer da.stopAll()
+
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+ // Start the Core
+ da.startCore(false)
+ da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+
+ a := da.createDeviceAgent(t)
+ cloned := a.getDeviceWithoutLock()
+ err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ assert.Nil(t, err)
+ da.testFlowAddDeletes(t, a)
+}
+
+func TestGroupUpdates(t *testing.T) {
+ da := newDATest()
+ assert.NotNil(t, da)
+ defer da.stopAll()
+
+ // Start the Core
+ da.startCore(false)
+ da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+ a := da.createDeviceAgent(t)
+ cloned := a.getDeviceWithoutLock()
+ err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ assert.Nil(t, err)
+ da.testGroupAddDeletes(t, a)
+}
func isFlowSliceEqual(a, b []*ofp.OfpFlowStats) bool {
if len(a) != len(b) {
@@ -329,196 +335,214 @@
}
return true
}
-
-func TestFlowsToUpdateToDelete_EmptySlices(t *testing.T) {
- newFlows := []*ofp.OfpFlowStats{}
- existingFlows := []*ofp.OfpFlowStats{}
- expectedNewFlows := []*ofp.OfpFlowStats{}
- expectedFlowsToDelete := []*ofp.OfpFlowStats{}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+func changeToFlowList(flowList map[uint64]*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+ flows := make([]*ofp.OfpFlowStats, 0)
+ for _, flow := range flowList {
+ flows = append(flows, flow)
+ }
+ return flows
}
-
-func TestFlowsToUpdateToDelete_NoExistingFlows(t *testing.T) {
+func changeToGroupList(groupList map[uint32]*ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+ groups := make([]*ofp.OfpGroupEntry, 0)
+ for _, group := range groupList {
+ groups = append(groups, group)
+ }
+ return groups
+}
+func (dat *DATest) testFlowAddDeletes(t *testing.T, da *Agent) {
+ //Add new Flows on empty list
newFlows := []*ofp.OfpFlowStats{
{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
}
- existingFlows := []*ofp.OfpFlowStats{}
- expectedNewFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- expectedFlowsToDelete := []*ofp.OfpFlowStats{}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
+ err := da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows := changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(newFlows, daFlows))
-func TestFlowsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
- newFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- existingFlows := []*ofp.OfpFlowStats{
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
- }
- expectedNewFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- expectedFlowsToDelete := []*ofp.OfpFlowStats{}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
- }
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
-
-func TestFlowsToUpdateToDelete_UpdateAndDelete(t *testing.T) {
- newFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ //Add new Flows on existing ones
+ newFlows = []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
}
- existingFlows := []*ofp.OfpFlowStats{
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+
+ expectedFlows := []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+ {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+ {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+ }
+
+ err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+ //Add existing Flows again with a new flow
+ newFlows = []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ expectedFlows = []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+ {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+ {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+ //Add already existing flows again
+ newFlows = []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ expectedFlows = []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+ {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+ {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+ //Delete flows
+ flowsToDelete := []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ expectedFlows = []*ofp.OfpFlowStats{
{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
}
- expectedNewFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
- {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
- }
- expectedFlowsToDelete := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+
+ err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+ //Delete flows with an unexisting one
+ flowsToDelete = []*ofp.OfpFlowStats{
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 129, TableId: 1290, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1290000, PacketCount: 0},
}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+
+ expectedFlows = []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
- {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+
+ err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
}
-func TestGroupsToUpdateToDelete_EmptySlices(t *testing.T) {
- newGroups := []*ofp.OfpGroupEntry{}
- existingGroups := []*ofp.OfpGroupEntry{}
- expectedNewGroups := []*ofp.OfpGroupEntry{}
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
-
-func TestGroupsToUpdateToDelete_NoExistingGroups(t *testing.T) {
+func (dat *DATest) testGroupAddDeletes(t *testing.T, da *Agent) {
+ //Add new Groups on empty list
newGroups := []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
}
- existingGroups := []*ofp.OfpGroupEntry{}
- expectedNewGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
- }
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
- }
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+ err := da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups := changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(newGroups, daGroups))
-func TestGroupsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
- newGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
- }
- existingGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ //Add new Groups on existing ones
+ newGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- expectedNewGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- }
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+ expectedGroups := []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+ err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
-func TestGroupsToUpdateToDelete_UpdateWithDelete(t *testing.T) {
- newGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+ //Add new Groups on existing ones
+ newGroups = []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
- existingGroups := []*ofp.OfpGroupEntry{
+ expectedGroups = []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
- expectedNewGroups := []*ofp.OfpGroupEntry{
+ err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+ //Modify Group
+ updtGroups := []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+ }
+ expectedGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
- }
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+ err = da.updateFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, updtGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+ //Delete Group
+ delGroups := []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+ }
+ expectedGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
- {Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+ }
+ err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+ //Delete Group
+ delGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+ expectedGroups = []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+ }
+ err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
}