[VOL-3005] Separate Flows from Device
Also some unit test functions moved to a test util class.
New loaders and Proxy implementation are applied.
Change-Id: Icf5a6f0a42a2dbaeff768fdb108f5e9b46644977
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index f9bb047..097b1c1 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -18,24 +18,12 @@
import (
"context"
"fmt"
- "strconv"
"time"
"github.com/golang/protobuf/ptypes/empty"
"github.com/google/uuid"
- "github.com/opencord/voltha-go/rw_core/config"
- cm "github.com/opencord/voltha-go/rw_core/mocks"
- "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
- com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
- "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
- mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
"github.com/opencord/voltha-protos/v3/go/voltha"
- "github.com/phayes/freeport"
- "google.golang.org/grpc/codes"
"google.golang.org/grpc/metadata"
- "google.golang.org/grpc/status"
)
const (
@@ -43,11 +31,6 @@
retryInterval = 50 * time.Millisecond
)
-const (
- OltAdapter = iota
- OnuAdapter
-)
-
var (
coreInCompeteMode bool
)
@@ -74,67 +57,6 @@
return context.Background()
}
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
- kvClientPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- peerPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
- if etcdServer == nil {
- return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
- }
- return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
- if server != nil {
- server.Stop()
- }
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
- addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
- if err != nil {
- panic("no kv client")
- }
- return client
-}
-
-func createMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
- var err error
- var adapter adapters.IAdapter
- adapterKafkaICProxy := kafka.NewInterContainerProxy(
- kafka.MsgClient(kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
- adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
- var adapterReqHandler *com.RequestHandlerProxy
- switch adapterType {
- case OltAdapter:
- adapter = cm.NewOLTAdapter(adapterCoreProxy)
- case OnuAdapter:
- adapter = cm.NewONUAdapter(adapterCoreProxy)
- default:
- logger.Fatalf("invalid-adapter-type-%d", adapterType)
- }
- adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
-
- if err = adapterKafkaICProxy.Start(); err != nil {
- logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
- return nil, err
- }
- if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
- logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
- return nil, err
- }
- return adapter, nil
-}
-
func waitUntilDeviceReadiness(deviceID string,
timeout time.Duration,
verificationFunction isDeviceConditionSatisfied,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 634a286..543a19d 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -35,13 +35,12 @@
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device"
cm "github.com/opencord/voltha-go/rw_core/mocks"
+ tst "github.com/opencord/voltha-go/rw_core/test"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
"github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
- "github.com/opencord/voltha-lib-go/v3/pkg/version"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
@@ -50,10 +49,6 @@
"google.golang.org/grpc/status"
)
-const (
- coreName = "rw_core"
-)
-
type NBTest struct {
etcdServer *mock_etcd.EtcdServer
deviceMgr *device.Manager
@@ -77,7 +72,7 @@
test := &NBTest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
if err != nil {
logger.Fatal(err)
}
@@ -107,7 +102,7 @@
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
setCoreCompeteMode(inCompeteMode)
- client := setupKVClient(cfg, nb.coreInstanceID)
+ client := tst.SetupKVClient(cfg, nb.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -141,58 +136,6 @@
}
}
-func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
- // Setup the mock OLT adapter
- oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
- if err != nil {
- logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
- }
- nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
- nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
- nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
-
- // Register the adapter
- registrationData := &voltha.Adapter{
- Id: nb.oltAdapterName,
- Vendor: "Voltha-olt",
- Version: version.VersionInfo.Version,
- Type: nb.oltAdapterName,
- CurrentReplica: 1,
- TotalReplicas: 1,
- Endpoint: nb.oltAdapterName,
- }
- types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
- deviceTypes := &voltha.DeviceTypes{Items: types}
- if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
- logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
- assert.NotNil(t, err)
- }
-
- // Setup the mock ONU adapter
- onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
- if err != nil {
- logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
- }
- nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
-
- // Register the adapter
- registrationData = &voltha.Adapter{
- Id: nb.onuAdapterName,
- Vendor: "Voltha-onu",
- Version: version.VersionInfo.Version,
- Type: nb.onuAdapterName,
- CurrentReplica: 1,
- TotalReplicas: 1,
- Endpoint: nb.onuAdapterName,
- }
- types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
- deviceTypes = &voltha.DeviceTypes{Items: types}
- if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
- logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
- assert.NotNil(t, err)
- }
-}
-
func (nb *NBTest) stopAll() {
if nb.kClient != nil {
nb.kClient.Stop()
@@ -201,7 +144,7 @@
nb.kmp.Stop()
}
if nb.etcdServer != nil {
- stopEmbeddedEtcdServer(nb.etcdServer)
+ tst.StopEmbeddedEtcdServer(nb.etcdServer)
}
}
@@ -1287,7 +1230,9 @@
nb.testCoreWithoutData(t, nbi)
// Create/register the adapters
- nb.createAndregisterAdapters(t)
+ nb.oltAdapter, nb.onuAdapter = tst.CreateAndregisterAdapters(t, nb.kClient, nb.coreInstanceID, nb.oltAdapterName, nb.onuAdapterName, nb.adapterMgr)
+ nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+ nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
// 2. Test adapter registration
nb.testAdapterRegistration(t, nbi)
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 918432f..fe379d6 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -22,18 +22,21 @@
"errors"
"fmt"
"reflect"
+ "strconv"
"sync"
"time"
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device/flow"
+ "github.com/opencord/voltha-go/rw_core/core/device/group"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
+ fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
- fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
@@ -59,10 +62,13 @@
startOnce sync.Once
stopOnce sync.Once
stopped bool
+
+ flowLoader *flow.Loader
+ groupLoader *group.Loader
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
-func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
+func newAgent(ap *remote.AdapterProxy, device *voltha.Device, deviceMgr *Manager, dbProxy *model.Path, deviceProxy *model.Proxy, timeout time.Duration) *Agent {
var agent Agent
agent.adapterProxy = ap
if device.Id == "" {
@@ -81,6 +87,9 @@
agent.defaultTimeout = timeout
agent.device = proto.Clone(device).(*voltha.Device)
agent.requestQueue = coreutils.NewRequestQueue()
+ agent.flowLoader = flow.NewLoader(dbProxy.SubPath("flows").Proxy(device.Id))
+ agent.groupLoader = group.NewLoader(dbProxy.SubPath("groups").Proxy(device.Id))
+
return &agent
}
@@ -114,6 +123,9 @@
agent.deviceType = device.Adapter
agent.device = proto.Clone(device).(*voltha.Device)
+ // load the flows and groups from KV to cache
+ agent.flowLoader.Load(ctx)
+ agent.groupLoader.Load(ctx)
logger.Infow("device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
@@ -138,7 +150,6 @@
}
agent.device = device
}
-
startSucceeded = true
logger.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
@@ -191,6 +202,8 @@
agent.deviceType = device.Adapter
agent.device = device
+ agent.flowLoader.Load(ctx)
+ agent.groupLoader.Load(ctx)
logger.Debugw("reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
@@ -317,6 +330,15 @@
}
}
+//replaceFlowInList removes the old flow from list and adds the new one.
+func replaceFlowInList(flowList []*ofp.OfpFlowStats, oldFlow *ofp.OfpFlowStats, newFlow *ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+ if idx := fu.FindFlows(flowList, oldFlow); idx != -1 {
+ flowList = deleteFlowWithoutPreservingOrder(flowList, idx)
+ }
+ flowList = append(flowList, newFlow)
+ return flowList
+}
+
//deleteFlowWithoutPreservingOrder removes a flow specified by index from the flows slice. This function will
//panic if the index is out of range.
func deleteFlowWithoutPreservingOrder(flows []*ofp.OfpFlowStats, index int) []*ofp.OfpFlowStats {
@@ -325,6 +347,15 @@
return flows[:len(flows)-1]
}
+//replaceGroupInList removes the old group from list and adds the new one.
+func replaceGroupInList(groupList []*ofp.OfpGroupEntry, oldGroup *ofp.OfpGroupEntry, newGroup *ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+ if idx := fu.FindGroup(groupList, oldGroup.Desc.GroupId); idx != -1 {
+ groupList = deleteGroupWithoutPreservingOrder(groupList, idx)
+ }
+ groupList = append(groupList, newGroup)
+ return groupList
+}
+
//deleteGroupWithoutPreservingOrder removes a group specified by index from the groups slice. This function will
//panic if the index is out of range.
func deleteGroupWithoutPreservingOrder(groups []*ofp.OfpGroupEntry, index int) []*ofp.OfpGroupEntry {
@@ -333,100 +364,70 @@
return groups[:len(groups)-1]
}
-func flowsToUpdateToDelete(newFlows, existingFlows []*ofp.OfpFlowStats) (updatedNewFlows, flowsToDelete, updatedAllFlows []*ofp.OfpFlowStats) {
- // Process flows
- for _, flow := range existingFlows {
- if idx := fu.FindFlows(newFlows, flow); idx == -1 {
- updatedAllFlows = append(updatedAllFlows, flow)
- } else {
- // We have a matching flow (i.e. the following field matches: "TableId", "Priority", "Flags", "Cookie",
- // "Match". If this is an exact match (i.e. all other fields matches as well) then this flow will be
- // ignored. Otherwise, the previous flow will be deleted and the new one added
- if proto.Equal(newFlows[idx], flow) {
- // Flow already exist, remove it from the new flows but keep it in the updated flows slice
- newFlows = deleteFlowWithoutPreservingOrder(newFlows, idx)
- updatedAllFlows = append(updatedAllFlows, flow)
- } else {
- // Minor change to flow, delete old and add new one
- flowsToDelete = append(flowsToDelete, flow)
- }
- }
- }
- updatedAllFlows = append(updatedAllFlows, newFlows...)
- return newFlows, flowsToDelete, updatedAllFlows
-}
+func (agent *Agent) addFlowsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("add-flows-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
-func groupsToUpdateToDelete(newGroups, existingGroups []*ofp.OfpGroupEntry) (updatedNewGroups, groupsToDelete, updatedAllGroups []*ofp.OfpGroupEntry) {
- for _, group := range existingGroups {
- if idx := fu.FindGroup(newGroups, group.Desc.GroupId); idx == -1 { // does not exist now
- updatedAllGroups = append(updatedAllGroups, group)
- } else {
- // Follow same logic as flows
- if proto.Equal(newGroups[idx], group) {
- // Group already exist, remove it from the new groups
- newGroups = deleteGroupWithoutPreservingOrder(newGroups, idx)
- updatedAllGroups = append(updatedAllGroups, group)
- } else {
- // Minor change to group, delete old and add new one
- groupsToDelete = append(groupsToDelete, group)
- }
- }
- }
- updatedAllGroups = append(updatedAllGroups, newGroups...)
- return newGroups, groupsToDelete, updatedAllGroups
-}
-
-func (agent *Agent) addFlowsAndGroupsToAdapter(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("add-flows-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups, "flow-metadata": flowMetadata})
-
- if (len(newFlows) | len(newGroups)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
+ if (len(newFlows)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
return coreutils.DoneResponse(), nil
}
-
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return coreutils.DoneResponse(), err
- }
- defer agent.requestQueue.RequestComplete()
-
device := agent.getDeviceWithoutLock()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
+ updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ flowIDs := agent.flowLoader.List()
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+ flowHandle.Unlock()
+ }
+ }
+ }
+ flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+ flowsToDelete := make([]*ofp.OfpFlowStats, 0)
+ for _, flow := range newFlows {
+ flowHandle, created, err := agent.flowLoader.LockOrCreate(ctx, flow)
+ if err != nil {
+ return coreutils.DoneResponse(), err
+ }
- existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
- existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
+ if created {
+ flowsToAdd = append(flowsToAdd, flow)
+ updatedAllFlows = append(updatedAllFlows, flow)
+ } else {
+ flowToReplace := flowHandle.GetReadOnly()
+ if !proto.Equal(flowToReplace, flow) {
+ //Flow needs to be updated.
+ if err := flowHandle.Update(ctx, flow); err != nil {
+ flowHandle.Unlock()
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-flow-%d-to-device-%s", flow.Id, agent.deviceID)
+ }
+ flowsToDelete = append(flowsToDelete, flowToReplace)
+ flowsToAdd = append(flowsToAdd, flow)
+ updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToReplace, flow)
+ } else {
+ //No need to change the flow. It is already exist.
+ logger.Debugw("No-need-to-change-already-existing-flow", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "flow-metadata": flowMetadata})
+ }
+ }
- // Process flows
- newFlows, flowsToDelete, updatedAllFlows := flowsToUpdateToDelete(newFlows, existingFlows.Items)
-
- // Process groups
- newGroups, groupsToDelete, updatedAllGroups := groupsToUpdateToDelete(newGroups, existingGroups.Items)
-
- // Sanity check
- if (len(updatedAllFlows) | len(flowsToDelete) | len(updatedAllGroups) | len(groupsToDelete)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
- return coreutils.DoneResponse(), nil
+ flowHandle.Unlock()
}
- // store the changed data
- device.Flows = &voltha.Flows{Items: updatedAllFlows}
- device.FlowGroups = &voltha.FlowGroups{Items: updatedAllGroups}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-device-%s", agent.deviceID)
+ // Sanity check
+ if (len(flowsToAdd)) == 0 {
+ logger.Debugw("no-flows-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
+ return coreutils.DoneResponse(), nil
}
// Send update to adapters
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
- if len(updatedAllGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedAllGroups) && len(updatedAllFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedAllFlows) {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows, "groups": newGroups})
- cancel()
- return coreutils.DoneResponse(), nil
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &ofp.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -434,11 +435,102 @@
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
flowChanges := &ofp.FlowChanges{
- ToAdd: &voltha.Flows{Items: newFlows},
+ ToAdd: &voltha.Flows{Items: flowsToAdd},
ToRemove: &voltha.Flows{Items: flowsToDelete},
}
groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: newGroups},
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+ return response, nil
+}
+
+func (agent *Agent) addGroupsToAdapter(ctx context.Context, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("add-groups-to-adapters", log.Fields{"device-id": agent.deviceID, "groups": newGroups, "flow-metadata": flowMetadata})
+
+ if (len(newGroups)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+ return coreutils.DoneResponse(), nil
+ }
+
+ device := agent.getDeviceWithoutLock()
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ groupIDs := agent.groupLoader.List()
+ for groupID := range groupIDs {
+ if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+ updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+ grpHandle.Unlock()
+ }
+ }
+ }
+
+ groupsToAdd := make([]*ofp.OfpGroupEntry, 0)
+ groupsToDelete := make([]*ofp.OfpGroupEntry, 0)
+ for _, group := range newGroups {
+
+ groupHandle, created, err := agent.groupLoader.LockOrCreate(ctx, group)
+ if err != nil {
+ return coreutils.DoneResponse(), err
+ }
+
+ if created {
+ groupsToAdd = append(groupsToAdd, group)
+ updatedAllGroups = append(updatedAllGroups, group)
+ } else {
+ groupToChange := groupHandle.GetReadOnly()
+ if !proto.Equal(groupToChange, group) {
+ //Group needs to be updated.
+ if err := groupHandle.Update(ctx, group); err != nil {
+ groupHandle.Unlock()
+ return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-group-%s-to-device-%s", strconv.Itoa(int(group.Desc.GroupId)), agent.deviceID)
+ }
+ groupsToDelete = append(groupsToDelete, groupToChange)
+ groupsToAdd = append(groupsToAdd, group)
+ updatedAllGroups = replaceGroupInList(updatedAllGroups, groupToChange, group)
+ } else {
+ //No need to change the group. It is already exist.
+ logger.Debugw("No-need-to-change-already-existing-group", log.Fields{"device-id": agent.deviceID, "group": newGroups, "flow-metadata": flowMetadata})
+ }
+ }
+
+ groupHandle.Unlock()
+ }
+ // Sanity check
+ if (len(groupsToAdd)) == 0 {
+ logger.Debugw("no-groups-to-update", log.Fields{"device-id": agent.deviceID, "groups": newGroups})
+ return coreutils.DoneResponse(), nil
+ }
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
@@ -455,88 +547,66 @@
//addFlowsAndGroups adds the "newFlows" and "newGroups" from the existing flows/groups and sends the update to the
//adapters
func (agent *Agent) addFlowsAndGroups(ctx context.Context, newFlows []*ofp.OfpFlowStats, newGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- response, err := agent.addFlowsAndGroupsToAdapter(ctx, newFlows, newGroups, flowMetadata)
- if err != nil {
+ var flwResponse, grpResponse coreutils.Response
+ var err error
+ //if new flow list is empty then the called function returns quickly
+ if flwResponse, err = agent.addFlowsToAdapter(ctx, newFlows, flowMetadata); err != nil {
return err
}
- if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); errs != nil {
+ //if new group list is empty then the called function returns quickly
+ if grpResponse, err = agent.addGroupsToAdapter(ctx, newGroups, flowMetadata); err != nil {
+ return err
+ }
+ if errs := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); errs != nil {
logger.Warnw("no-adapter-response", log.Fields{"device-id": agent.deviceID, "result": errs})
return status.Errorf(codes.Aborted, "flow-failure-device-%s", agent.deviceID)
}
return nil
}
-func (agent *Agent) deleteFlowsAndGroupsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("delete-flows-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+func (agent *Agent) deleteFlowsFromAdapter(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("delete-flows-from-adapter", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
- if (len(flowsToDel) | len(groupsToDel)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel, "groups": groupsToDel})
+ if (len(flowsToDel)) == 0 {
+ logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID, "flows": flowsToDel})
return coreutils.DoneResponse(), nil
}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return coreutils.DoneResponse(), err
- }
- defer agent.requestQueue.RequestComplete()
-
device := agent.getDeviceWithoutLock()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
-
- existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
- existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-
- var flowsToKeep []*ofp.OfpFlowStats
- var groupsToKeep []*ofp.OfpGroupEntry
-
- // Process flows
- for _, flow := range existingFlows.Items {
- if idx := fu.FindFlows(flowsToDel, flow); idx == -1 {
- flowsToKeep = append(flowsToKeep, flow)
+ updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ flowIDs := agent.flowLoader.List()
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+ flowHandle.Unlock()
+ }
}
}
-
- // Process groups
- for _, group := range existingGroups.Items {
- if fu.FindGroup(groupsToDel, group.Desc.GroupId) == -1 { // does not exist now
- groupsToKeep = append(groupsToKeep, group)
+ for _, flow := range flowsToDel {
+ if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ // Update the store and cache
+ flowToDelete := flowHandle.GetReadOnly()
+ if err := flowHandle.Delete(ctx); err != nil {
+ flowHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+ if idx := fu.FindFlows(updatedAllFlows, flowToDelete); idx != -1 {
+ updatedAllFlows = deleteFlowWithoutPreservingOrder(updatedAllFlows, idx)
+ }
+ flowHandle.Unlock()
}
}
- logger.Debugw("deleteFlowsAndGroups",
- log.Fields{
- "device-id": agent.deviceID,
- "flows-to-del": len(flowsToDel),
- "flows-to-keep": len(flowsToKeep),
- "groups-to-del": len(groupsToDel),
- "groups-to-keep": len(groupsToKeep),
- })
-
- // Sanity check
- if (len(flowsToKeep) | len(flowsToDel) | len(groupsToKeep) | len(groupsToDel)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows-to-del": flowsToDel, "groups-to-del": groupsToDel})
- return coreutils.DoneResponse(), nil
- }
-
- // store the changed data
- device.Flows = &voltha.Flows{Items: flowsToKeep}
- device.FlowGroups = &voltha.FlowGroups{Items: groupsToKeep}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
- }
-
// Send update to adapters
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
response := coreutils.NewResponse()
if !dType.AcceptsAddRemoveFlowUpdates {
- if len(groupsToKeep) != 0 && reflect.DeepEqual(existingGroups.Items, groupsToKeep) && len(flowsToKeep) != 0 && reflect.DeepEqual(existingFlows.Items, flowsToKeep) {
- logger.Debugw("nothing-to-update", log.Fields{"deviceId": agent.deviceID, "flowsToDel": flowsToDel, "groupsToDel": groupsToDel})
- cancel()
- return coreutils.DoneResponse(), nil
- }
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: flowsToKeep}, &voltha.FlowGroups{Items: groupsToKeep}, flowMetadata)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, flowMetadata)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
@@ -549,6 +619,73 @@
}
groupChanges := &ofp.FlowGroupChanges{
ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+ return response, nil
+}
+
+func (agent *Agent) deleteGroupsFromAdapter(ctx context.Context, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("delete-groups-from-adapter", log.Fields{"device-id": agent.deviceID, "groups": groupsToDel})
+
+ if (len(groupsToDel)) == 0 {
+ logger.Debugw("nothing-to-delete", log.Fields{"device-id": agent.deviceID})
+ return coreutils.DoneResponse(), nil
+ }
+ device := agent.getDeviceWithoutLock()
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ groupIDs := agent.groupLoader.List()
+ for groupID := range groupIDs {
+ if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+ updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+ grpHandle.Unlock()
+ }
+ }
+ }
+
+ for _, group := range groupsToDel {
+ if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+ // Update the store and cache
+ if err := groupHandle.Delete(ctx); err != nil {
+ groupHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+ if idx := fu.FindGroup(updatedAllGroups, group.Desc.GroupId); idx != -1 {
+ updatedAllGroups = deleteGroupWithoutPreservingOrder(updatedAllGroups, idx)
+ }
+ groupHandle.Unlock()
+ }
+ }
+
+ // Send update to adapters
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
ToRemove: &voltha.FlowGroups{Items: groupsToDel},
ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
}
@@ -565,11 +702,16 @@
//deleteFlowsAndGroups removes the "flowsToDel" and "groupsToDel" from the existing flows/groups and sends the update to the
//adapters
func (agent *Agent) deleteFlowsAndGroups(ctx context.Context, flowsToDel []*ofp.OfpFlowStats, groupsToDel []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDel, groupsToDel, flowMetadata)
- if err != nil {
+ var flwResponse, grpResponse coreutils.Response
+ var err error
+ if flwResponse, err = agent.deleteFlowsFromAdapter(ctx, flowsToDel, flowMetadata); err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ if grpResponse, err = agent.deleteGroupsFromAdapter(ctx, groupsToDel, flowMetadata); err != nil {
+ return err
+ }
+
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -577,25 +719,24 @@
//filterOutFlows removes flows from a device using the uni-port as filter
func (agent *Agent) filterOutFlows(ctx context.Context, uniPort uint32, flowMetadata *voltha.FlowMetadata) error {
- device, err := agent.getDevice(ctx)
- if err != nil {
- return err
- }
- existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
var flowsToDelete []*ofp.OfpFlowStats
-
// If an existing flow has the uniPort as an InPort or OutPort or as a Tunnel ID then it needs to be removed
- for _, flow := range existingFlows.Items {
- if fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort) {
- flowsToDelete = append(flowsToDelete, flow)
+ for flowID := range agent.flowLoader.List() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ flow := flowHandle.GetReadOnly()
+ if flow != nil && (fu.GetInPort(flow) == uniPort || fu.GetOutPort(flow) == uniPort || fu.GetTunnelId(flow) == uint64(uniPort)) {
+ flowsToDelete = append(flowsToDelete, flow)
+ }
+ flowHandle.Unlock()
}
}
+
logger.Debugw("flows-to-delete", log.Fields{"device-id": agent.deviceID, "uni-port": uniPort, "flows": flowsToDelete})
if len(flowsToDelete) == 0 {
return nil
}
- response, err := agent.deleteFlowsAndGroupsFromAdapter(ctx, flowsToDelete, []*ofp.OfpGroupEntry{}, flowMetadata)
+ response, err := agent.deleteFlowsFromAdapter(ctx, flowsToDelete, flowMetadata)
if err != nil {
return err
}
@@ -605,19 +746,14 @@
return nil
}
-func (agent *Agent) updateFlowsAndGroupsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
- logger.Debugw("updateFlowsAndGroups", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+func (agent *Agent) updateFlowsToAdapter(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("updateFlowsToAdapter", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
- if (len(updatedFlows) | len(updatedGroups)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ if (len(updatedFlows)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
return coreutils.DoneResponse(), nil
}
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return coreutils.DoneResponse(), err
- }
- defer agent.requestQueue.RequestComplete()
-
device := agent.getDeviceWithoutLock()
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
@@ -626,81 +762,55 @@
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
-
- existingFlows := proto.Clone(device.Flows).(*voltha.Flows)
- existingGroups := proto.Clone(device.FlowGroups).(*ofp.FlowGroups)
-
- if len(updatedGroups) != 0 && reflect.DeepEqual(existingGroups.Items, updatedGroups) && len(updatedFlows) != 0 && reflect.DeepEqual(existingFlows.Items, updatedFlows) {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
- return coreutils.DoneResponse(), nil
+ updatedAllFlows := make([]*ofp.OfpFlowStats, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ flowIDs := agent.flowLoader.List()
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ updatedAllFlows = append(updatedAllFlows, flowHandle.GetReadOnly())
+ flowHandle.Unlock()
+ }
+ }
}
+ flowsToAdd := make([]*ofp.OfpFlowStats, 0)
+ flowsToDelete := make([]*ofp.OfpFlowStats, 0)
- logger.Debugw("updating-flows-and-groups",
- log.Fields{
- "device-id": agent.deviceID,
- "updated-flows": updatedFlows,
- "updated-groups": updatedGroups,
- })
+ for _, flow := range updatedFlows {
+ if flowHandle, have := agent.flowLoader.Lock(flow.Id); have {
+ flowToDelete := flowHandle.GetReadOnly()
+ // Update the store and cache
+ if err := flowHandle.Update(ctx, flow); err != nil {
+ flowHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
- // store the updated data
- device.Flows = &voltha.Flows{Items: updatedFlows}
- device.FlowGroups = &voltha.FlowGroups{Items: updatedGroups}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- return coreutils.DoneResponse(), status.Errorf(codes.Internal, "failure-updating-%s", agent.deviceID)
+ flowsToDelete = append(flowsToDelete, flowToDelete)
+ flowsToAdd = append(flowsToAdd, flow)
+ updatedAllFlows = replaceFlowInList(updatedAllFlows, flowToDelete, flow)
+ flowHandle.Unlock()
+ }
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
response := coreutils.NewResponse()
// Process bulk flow update differently than incremental update
if !dType.AcceptsAddRemoveFlowUpdates {
- rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedFlows}, &voltha.FlowGroups{Items: updatedGroups}, nil)
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: updatedAllFlows}, &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}}, nil)
if err != nil {
cancel()
return coreutils.DoneResponse(), err
}
go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
} else {
- var flowsToAdd []*ofp.OfpFlowStats
- var flowsToDelete []*ofp.OfpFlowStats
- var groupsToAdd []*ofp.OfpGroupEntry
- var groupsToDelete []*ofp.OfpGroupEntry
-
- // Process flows
- for _, flow := range updatedFlows {
- if idx := fu.FindFlows(existingFlows.Items, flow); idx == -1 {
- flowsToAdd = append(flowsToAdd, flow)
- }
- }
- for _, flow := range existingFlows.Items {
- if idx := fu.FindFlows(updatedFlows, flow); idx != -1 {
- flowsToDelete = append(flowsToDelete, flow)
- }
- }
-
- // Process groups
- for _, g := range updatedGroups {
- if fu.FindGroup(existingGroups.Items, g.Desc.GroupId) == -1 { // does not exist now
- groupsToAdd = append(groupsToAdd, g)
- }
- }
- for _, group := range existingGroups.Items {
- if fu.FindGroup(updatedGroups, group.Desc.GroupId) != -1 { // does not exist now
- groupsToDelete = append(groupsToDelete, group)
- }
- }
-
logger.Debugw("updating-flows-and-groups",
log.Fields{
- "device-id": agent.deviceID,
- "flows-to-add": flowsToAdd,
- "flows-to-delete": flowsToDelete,
- "groups-to-add": groupsToAdd,
- "groups-to-delete": groupsToDelete,
+ "device-id": agent.deviceID,
+ "flows-to-add": flowsToAdd,
+ "flows-to-delete": flowsToDelete,
})
-
// Sanity check
- if (len(flowsToAdd) | len(flowsToDelete) | len(groupsToAdd) | len(groupsToDelete) | len(updatedGroups)) == 0 {
- logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows, "groups": updatedGroups})
+ if (len(flowsToAdd) | len(flowsToDelete)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": updatedFlows})
cancel()
return coreutils.DoneResponse(), nil
}
@@ -710,9 +820,94 @@
ToRemove: &voltha.Flows{Items: flowsToDelete},
}
groupChanges := &ofp.FlowGroupChanges{
- ToAdd: &voltha.FlowGroups{Items: groupsToAdd},
- ToRemove: &voltha.FlowGroups{Items: groupsToDelete},
- ToUpdate: &voltha.FlowGroups{Items: updatedGroups},
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ }
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ }
+
+ return response, nil
+}
+
+func (agent *Agent) updateGroupsToAdapter(ctx context.Context, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) (coreutils.Response, error) {
+ logger.Debugw("updateGroupsToAdapter", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+
+ if (len(updatedGroups)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": updatedGroups})
+ return coreutils.DoneResponse(), nil
+ }
+
+ device := agent.getDeviceWithoutLock()
+ if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
+ }
+ dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
+ if err != nil {
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
+ }
+ updatedAllGroups := make([]*ofp.OfpGroupEntry, 0)
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ groupIDs := agent.groupLoader.List()
+ for groupID := range groupIDs {
+ if grpHandle, have := agent.groupLoader.Lock(groupID); have {
+ updatedAllGroups = append(updatedAllGroups, grpHandle.GetReadOnly())
+ grpHandle.Unlock()
+ }
+ }
+ }
+ groupsToUpdate := make([]*ofp.OfpGroupEntry, 0)
+
+ for _, group := range updatedGroups {
+ if groupHandle, have := agent.groupLoader.Lock(group.Desc.GroupId); have {
+ // Update the store and cache
+ if err := groupHandle.Update(ctx, group); err != nil {
+ groupHandle.Unlock()
+ return coreutils.DoneResponse(), err
+ }
+ groupsToUpdate = append(groupsToUpdate, group)
+ updatedAllGroups = replaceGroupInList(updatedAllGroups, groupHandle.GetReadOnly(), group)
+ groupHandle.Unlock()
+ }
+ }
+
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ response := coreutils.NewResponse()
+ // Process bulk flow update differently than incremental update
+ if !dType.AcceptsAddRemoveFlowUpdates {
+ rpcResponse, err := agent.adapterProxy.UpdateFlowsBulk(subCtx, device, &voltha.Flows{Items: []*ofp.OfpFlowStats{}}, &voltha.FlowGroups{Items: updatedAllGroups}, nil)
+ if err != nil {
+ cancel()
+ return coreutils.DoneResponse(), err
+ }
+ go agent.waitForAdapterFlowResponse(subCtx, cancel, rpcResponse, response)
+ } else {
+ logger.Debugw("updating-groups",
+ log.Fields{
+ "device-id": agent.deviceID,
+ "groups-to-update": groupsToUpdate,
+ })
+
+ // Sanity check
+ if (len(groupsToUpdate)) == 0 {
+ logger.Debugw("nothing-to-update", log.Fields{"device-id": agent.deviceID, "groups": groupsToUpdate})
+ cancel()
+ return coreutils.DoneResponse(), nil
+ }
+
+ flowChanges := &ofp.FlowChanges{
+ ToAdd: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ ToRemove: &voltha.Flows{Items: []*ofp.OfpFlowStats{}},
+ }
+ groupChanges := &ofp.FlowGroupChanges{
+ ToAdd: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToRemove: &voltha.FlowGroups{Items: []*ofp.OfpGroupEntry{}},
+ ToUpdate: &voltha.FlowGroups{Items: groupsToUpdate},
}
rpcResponse, err := agent.adapterProxy.UpdateFlowsIncremental(subCtx, device, flowChanges, groupChanges, flowMetadata)
if err != nil {
@@ -728,11 +923,16 @@
//updateFlowsAndGroups replaces the existing flows and groups with "updatedFlows" and "updatedGroups" respectively. It
//also sends the updates to the adapters
func (agent *Agent) updateFlowsAndGroups(ctx context.Context, updatedFlows []*ofp.OfpFlowStats, updatedGroups []*ofp.OfpGroupEntry, flowMetadata *voltha.FlowMetadata) error {
- response, err := agent.updateFlowsAndGroupsToAdapter(ctx, updatedFlows, updatedGroups, flowMetadata)
- if err != nil {
+ var flwResponse, grpResponse coreutils.Response
+ var err error
+ if flwResponse, err = agent.updateFlowsToAdapter(ctx, updatedFlows, flowMetadata); err != nil {
return err
}
- if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, response); res != nil {
+ if grpResponse, err = agent.updateGroupsToAdapter(ctx, updatedGroups, flowMetadata); err != nil {
+ return err
+ }
+
+ if res := coreutils.WaitForNilOrErrorResponses(agent.defaultTimeout, flwResponse, grpResponse); res != nil {
return status.Errorf(codes.Aborted, "errors-%s", res)
}
return nil
@@ -741,17 +941,17 @@
//deleteAllFlows deletes all flows in the device table
func (agent *Agent) deleteAllFlows(ctx context.Context) error {
logger.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- device := agent.getDeviceWithoutLock()
- // purge all flows on the device by setting it to nil
- device.Flows = &ofp.Flows{Items: nil}
- if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
- // The caller logs the error
- return err
+ for flowID := range agent.flowLoader.List() {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ // Update the store and cache
+ if err := flowHandle.Delete(ctx); err != nil {
+ flowHandle.Unlock()
+ logger.Errorw("unable-to-delete-flow", log.Fields{"device-id": agent.deviceID, "flowID": flowID})
+ continue
+ }
+ flowHandle.Unlock()
+ }
}
return nil
}
@@ -1296,13 +1496,6 @@
return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
-func (agent *Agent) updateDeviceWithoutLock(ctx context.Context, device *voltha.Device) error {
- logger.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
- //cloned := proto.Clone(device).(*voltha.Device)
- cloned := device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
-}
-
func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
new file mode 100644
index 0000000..bb8fe08
--- /dev/null
+++ b/rw_core/core/device/agent_flow.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+)
+
+// listDeviceFlows returns device flows
+func (agent *Agent) listDeviceFlows() map[uint64]*ofp.OfpFlowStats {
+ flowIDs := agent.flowLoader.List()
+ flows := make(map[uint64]*ofp.OfpFlowStats, len(flowIDs))
+ for flowID := range flowIDs {
+ if flowHandle, have := agent.flowLoader.Lock(flowID); have {
+ flows[flowID] = flowHandle.GetReadOnly()
+ flowHandle.Unlock()
+ }
+ }
+ return flows
+}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
new file mode 100644
index 0000000..18ac83a
--- /dev/null
+++ b/rw_core/core/device/agent_group.go
@@ -0,0 +1,34 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package device
+
+import (
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+)
+
+// listDeviceGroups returns logical device flow groups
+func (agent *Agent) listDeviceGroups() map[uint32]*ofp.OfpGroupEntry {
+ groupIDs := agent.groupLoader.List()
+ groups := make(map[uint32]*ofp.OfpGroupEntry, len(groupIDs))
+ for groupID := range groupIDs {
+ if groupHandle, have := agent.groupLoader.Lock(groupID); have {
+ groups[groupID] = groupHandle.GetReadOnly()
+ groupHandle.Unlock()
+ }
+ }
+ return groups
+}
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index ffc2a3b..29d8062 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,13 +18,21 @@
import (
"context"
+ "math/rand"
+ "sort"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
"github.com/gogo/protobuf/proto"
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ cm "github.com/opencord/voltha-go/rw_core/mocks"
+ tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
- "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
@@ -33,24 +41,18 @@
"github.com/opencord/voltha-protos/v3/go/voltha"
"github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
- "google.golang.org/grpc/codes"
- "google.golang.org/grpc/status"
- "math/rand"
- "sort"
- "strconv"
- "strings"
- "sync"
- "testing"
- "time"
)
type DATest struct {
etcdServer *mock_etcd.EtcdServer
deviceMgr *Manager
logicalDeviceMgr *LogicalManager
+ adapterMgr *adapter.Manager
kmp kafka.InterContainerProxy
kClient kafka.Client
kvClientPort int
+ oltAdapter *cm.OLTAdapter
+ onuAdapter *cm.ONUAdapter
oltAdapterName string
onuAdapterName string
coreInstanceID string
@@ -64,7 +66,7 @@
test := &DATest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.da.test", "voltha.rwcore.da.etcd", "error")
if err != nil {
logger.Fatal(err)
}
@@ -120,7 +122,7 @@
}
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
- client := setupKVClient(cfg, dat.coreInstanceID)
+ client := tst.SetupKVClient(cfg, dat.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -138,13 +140,18 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
- adapterMgr := adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
+ dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
- adapterMgr.Start(context.Background())
+
+ if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf("Cannot add default request handler: %s", err)
+ }
+
}
func (dat *DATest) stopAll() {
@@ -155,46 +162,14 @@
dat.kmp.Stop()
}
if dat.etcdServer != nil {
- stopEmbeddedEtcdServer(dat.etcdServer)
+ tst.StopEmbeddedEtcdServer(dat.etcdServer)
}
}
-//startEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
-func startEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
- kvClientPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- peerPort, err := freeport.GetFreePort()
- if err != nil {
- return nil, 0, err
- }
- etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
- if etcdServer == nil {
- return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
- }
- return etcdServer, kvClientPort, nil
-}
-
-func stopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
- if server != nil {
- server.Stop()
- }
-}
-
-func setupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
- addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
- if err != nil {
- panic("no kv client")
- }
- return client
-}
-
func (dat *DATest) createDeviceAgent(t *testing.T) *Agent {
deviceMgr := dat.deviceMgr
clonedDevice := proto.Clone(dat.device).(*voltha.Device)
- deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dProxy, deviceMgr.defaultTimeout)
+ deviceAgent := newAgent(deviceMgr.adapterProxy, clonedDevice, deviceMgr, deviceMgr.dbPath, deviceMgr.dProxy, deviceMgr.defaultTimeout)
d, err := deviceAgent.start(context.TODO(), clonedDevice)
assert.Nil(t, err)
assert.NotNil(t, d)
@@ -278,7 +253,7 @@
da := newDATest()
assert.NotNil(t, da)
defer da.stopAll()
-
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
da.startCore(false)
@@ -293,6 +268,37 @@
wg.Wait()
}
}
+func TestFlowUpdates(t *testing.T) {
+ da := newDATest()
+ assert.NotNil(t, da)
+ defer da.stopAll()
+
+ log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+ // Start the Core
+ da.startCore(false)
+ da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+
+ a := da.createDeviceAgent(t)
+ cloned := a.getDeviceWithoutLock()
+ err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ assert.Nil(t, err)
+ da.testFlowAddDeletes(t, a)
+}
+
+func TestGroupUpdates(t *testing.T) {
+ da := newDATest()
+ assert.NotNil(t, da)
+ defer da.stopAll()
+
+ // Start the Core
+ da.startCore(false)
+ da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
+ a := da.createDeviceAgent(t)
+ cloned := a.getDeviceWithoutLock()
+ err := a.updateDeviceStateInStoreWithoutLock(context.Background(), cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ assert.Nil(t, err)
+ da.testGroupAddDeletes(t, a)
+}
func isFlowSliceEqual(a, b []*ofp.OfpFlowStats) bool {
if len(a) != len(b) {
@@ -329,196 +335,214 @@
}
return true
}
-
-func TestFlowsToUpdateToDelete_EmptySlices(t *testing.T) {
- newFlows := []*ofp.OfpFlowStats{}
- existingFlows := []*ofp.OfpFlowStats{}
- expectedNewFlows := []*ofp.OfpFlowStats{}
- expectedFlowsToDelete := []*ofp.OfpFlowStats{}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+func changeToFlowList(flowList map[uint64]*ofp.OfpFlowStats) []*ofp.OfpFlowStats {
+ flows := make([]*ofp.OfpFlowStats, 0)
+ for _, flow := range flowList {
+ flows = append(flows, flow)
+ }
+ return flows
}
-
-func TestFlowsToUpdateToDelete_NoExistingFlows(t *testing.T) {
+func changeToGroupList(groupList map[uint32]*ofp.OfpGroupEntry) []*ofp.OfpGroupEntry {
+ groups := make([]*ofp.OfpGroupEntry, 0)
+ for _, group := range groupList {
+ groups = append(groups, group)
+ }
+ return groups
+}
+func (dat *DATest) testFlowAddDeletes(t *testing.T, da *Agent) {
+ //Add new Flows on empty list
newFlows := []*ofp.OfpFlowStats{
{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
}
- existingFlows := []*ofp.OfpFlowStats{}
- expectedNewFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- expectedFlowsToDelete := []*ofp.OfpFlowStats{}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
+ err := da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows := changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(newFlows, daFlows))
-func TestFlowsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
- newFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- existingFlows := []*ofp.OfpFlowStats{
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
- }
- expectedNewFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- }
- expectedFlowsToDelete := []*ofp.OfpFlowStats{}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
- }
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
-}
-
-func TestFlowsToUpdateToDelete_UpdateAndDelete(t *testing.T) {
- newFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
- {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ //Add new Flows on existing ones
+ newFlows = []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
{Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
}
- existingFlows := []*ofp.OfpFlowStats{
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
+
+ expectedFlows := []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+ {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+ {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
+ }
+
+ err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+ //Add existing Flows again with a new flow
+ newFlows = []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ expectedFlows = []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+ {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+ {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+ //Add already existing flows again
+ newFlows = []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ expectedFlows = []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+ {Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
+ {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ err = da.addFlowsAndGroups(context.Background(), newFlows, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+
+ //Delete flows
+ flowsToDelete := []*ofp.OfpFlowStats{
+ {Id: 126, TableId: 1260, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
+ {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270001, PacketCount: 0},
+ {Id: 128, TableId: 1280, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1280000, PacketCount: 0},
+ }
+
+ expectedFlows = []*ofp.OfpFlowStats{
{Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
}
- expectedNewFlows := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
- {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
- }
- expectedFlowsToDelete := []*ofp.OfpFlowStats{
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
+
+ err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
+ //Delete flows with an unexisting one
+ flowsToDelete = []*ofp.OfpFlowStats{
{Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1250000, PacketCount: 0},
+ {Id: 129, TableId: 1290, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1290000, PacketCount: 0},
}
- expectedUpdatedAllFlows := []*ofp.OfpFlowStats{
- {Id: 121, TableId: 1210, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1210000, PacketCount: 0},
- {Id: 122, TableId: 1220, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1220000, PacketCount: 0},
- {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 20},
+
+ expectedFlows = []*ofp.OfpFlowStats{
+ {Id: 123, TableId: 1230, Priority: 100, IdleTimeout: 0, Flags: 0, Cookie: 1230000, PacketCount: 0},
{Id: 124, TableId: 1240, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1240000, PacketCount: 0},
- {Id: 125, TableId: 1250, Priority: 1000, IdleTimeout: 10, Flags: 0, Cookie: 1250000, PacketCount: 0},
- {Id: 126, TableId: 1260, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1260000, PacketCount: 0},
- {Id: 127, TableId: 1270, Priority: 1000, IdleTimeout: 0, Flags: 0, Cookie: 1270000, PacketCount: 0},
}
- uNF, fD, uAF := flowsToUpdateToDelete(newFlows, existingFlows)
- assert.True(t, isFlowSliceEqual(uNF, expectedNewFlows))
- assert.True(t, isFlowSliceEqual(fD, expectedFlowsToDelete))
- assert.True(t, isFlowSliceEqual(uAF, expectedUpdatedAllFlows))
+
+ err = da.deleteFlowsAndGroups(context.Background(), flowsToDelete, []*ofp.OfpGroupEntry{}, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daFlows = changeToFlowList(da.listDeviceFlows())
+ assert.True(t, isFlowSliceEqual(expectedFlows, daFlows))
}
-func TestGroupsToUpdateToDelete_EmptySlices(t *testing.T) {
- newGroups := []*ofp.OfpGroupEntry{}
- existingGroups := []*ofp.OfpGroupEntry{}
- expectedNewGroups := []*ofp.OfpGroupEntry{}
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
-
-func TestGroupsToUpdateToDelete_NoExistingGroups(t *testing.T) {
+func (dat *DATest) testGroupAddDeletes(t *testing.T, da *Agent) {
+ //Add new Groups on empty list
newGroups := []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
}
- existingGroups := []*ofp.OfpGroupEntry{}
- expectedNewGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
- }
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
- }
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+ err := da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups := changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(newGroups, daGroups))
-func TestGroupsToUpdateToDelete_UpdateNoDelete(t *testing.T) {
- newGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
- }
- existingGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ //Add new Groups on existing ones
+ newGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- expectedNewGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- }
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+ expectedGroups := []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
-}
+ err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
-func TestGroupsToUpdateToDelete_UpdateWithDelete(t *testing.T) {
- newGroups := []*ofp.OfpGroupEntry{
- {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
+ //Add new Groups on existing ones
+ newGroups = []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
- existingGroups := []*ofp.OfpGroupEntry{
+ expectedGroups = []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
{Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
- expectedNewGroups := []*ofp.OfpGroupEntry{
+ err = da.addFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, newGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+ //Modify Group
+ updtGroups := []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+ }
+ expectedGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
- }
- expectedGroupsToDelete := []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
}
- expectedUpdatedAllGroups := []*ofp.OfpGroupEntry{
+ err = da.updateFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, updtGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+ //Delete Group
+ delGroups := []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 33, GroupId: 30, Buckets: nil}},
+ }
+ expectedGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
- {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: []*ofp.OfpBucket{{WatchPort: 10}}}},
- {Desc: &ofp.OfpGroupDesc{Type: 3, GroupId: 30, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 44, GroupId: 40, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+ }
+ err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
+
+ //Delete Group
+ delGroups = []*ofp.OfpGroupEntry{
{Desc: &ofp.OfpGroupDesc{Type: 4, GroupId: 40, Buckets: nil}},
}
- uNG, gD, uAG := groupsToUpdateToDelete(newGroups, existingGroups)
- assert.True(t, isGroupSliceEqual(uNG, expectedNewGroups))
- assert.True(t, isGroupSliceEqual(gD, expectedGroupsToDelete))
- assert.True(t, isGroupSliceEqual(uAG, expectedUpdatedAllGroups))
+ expectedGroups = []*ofp.OfpGroupEntry{
+ {Desc: &ofp.OfpGroupDesc{Type: 1, GroupId: 10, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 2, GroupId: 20, Buckets: nil}},
+ {Desc: &ofp.OfpGroupDesc{Type: 5, GroupId: 50, Buckets: nil}},
+ }
+ err = da.deleteFlowsAndGroups(context.Background(), []*ofp.OfpFlowStats{}, delGroups, &voltha.FlowMetadata{})
+ assert.Nil(t, err)
+ daGroups = changeToGroupList(da.listDeviceGroups())
+ assert.True(t, isGroupSliceEqual(expectedGroups, daGroups))
}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index eb65673..77c4b7c 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -26,6 +26,7 @@
"github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/config"
"github.com/opencord/voltha-go/rw_core/core/adapter"
+ tst "github.com/opencord/voltha-go/rw_core/test"
com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
"github.com/opencord/voltha-lib-go/v3/pkg/db"
fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
@@ -381,7 +382,7 @@
test := &LDATest{}
// Start the embedded etcd server
var err error
- test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
+ test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
if err != nil {
logger.Fatal(err)
}
@@ -463,7 +464,7 @@
}
cfg.GrpcPort = grpcPort
cfg.GrpcHost = "127.0.0.1"
- client := setupKVClient(cfg, lda.coreInstanceID)
+ client := tst.SetupKVClient(cfg, lda.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
@@ -498,7 +499,7 @@
lda.kmp.Stop()
}
if lda.etcdServer != nil {
- stopEmbeddedEtcdServer(lda.etcdServer)
+ tst.StopEmbeddedEtcdServer(lda.etcdServer)
}
}
@@ -508,7 +509,7 @@
clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
clonedLD.Id = com.GetRandomString(10)
clonedLD.DatapathId = rand.Uint64()
- lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbProxy, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent := newLogicalAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
lDeviceAgent.logicalDevice = clonedLD
err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
assert.Nil(t, err)
diff --git a/rw_core/core/device/logical_manager.go b/rw_core/core/device/logical_manager.go
index f9bff21..d6ca4ce 100644
--- a/rw_core/core/device/logical_manager.go
+++ b/rw_core/core/device/logical_manager.go
@@ -42,7 +42,7 @@
logicalDeviceAgents sync.Map
deviceMgr *Manager
kafkaICProxy kafka.InterContainerProxy
- dbProxy *model.Path
+ dbPath *model.Path
ldProxy *model.Proxy
defaultTimeout time.Duration
logicalDevicesLoadingLock sync.RWMutex
@@ -126,7 +126,7 @@
logger.Debugw("logical-device-id", log.Fields{"logicaldeviceId": id})
- agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(id, sn, device.Id, ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
ldMgr.addLogicalDeviceAgentToMap(agent)
// Update the root device with the logical device Id reference
@@ -198,7 +198,7 @@
ldMgr.logicalDevicesLoadingLock.Unlock()
if _, err := ldMgr.getLogicalDeviceFromModel(ctx, lDeviceID); err == nil {
logger.Debugw("loading-logical-device", log.Fields{"lDeviceId": lDeviceID})
- agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbProxy, ldMgr.ldProxy, ldMgr.defaultTimeout)
+ agent := newLogicalAgent(lDeviceID, "", "", ldMgr, ldMgr.deviceMgr, ldMgr.dbPath, ldMgr.ldProxy, ldMgr.defaultTimeout)
if err := agent.start(ctx, true); err != nil {
return err
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 357c49a..17ac266 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -34,6 +34,7 @@
"github.com/opencord/voltha-lib-go/v3/pkg/log"
"github.com/opencord/voltha-protos/v3/go/common"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/openflow_13"
ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
"github.com/opencord/voltha-protos/v3/go/voltha"
"google.golang.org/grpc/codes"
@@ -50,6 +51,7 @@
logicalDeviceMgr *LogicalManager
kafkaICProxy kafka.InterContainerProxy
stateTransitions *TransitionMap
+ dbPath *model.Path
dProxy *model.Proxy
coreInstanceID string
defaultTimeout time.Duration
@@ -57,13 +59,15 @@
deviceLoadingInProgress map[string][]chan int
}
-func NewManagers(dbProxy *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+//NewManagers creates the Manager and the Logical Manager.
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
coreInstanceID: coreInstanceID,
- dProxy: dbProxy.Proxy("devices"),
+ dbPath: dbPath,
+ dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
defaultTimeout: defaultCoreTimeout * time.Millisecond,
deviceLoadingInProgress: make(map[string][]chan int),
@@ -74,8 +78,8 @@
Manager: event.NewManager(),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
- dbProxy: dbProxy,
- ldProxy: dbProxy.Proxy("logical_devices"),
+ dbPath: dbPath,
+ ldProxy: dbPath.Proxy("logical_devices"),
defaultTimeout: defaultCoreTimeout,
logicalDeviceLoadingInProgress: make(map[string][]chan int),
}
@@ -157,7 +161,7 @@
// Ensure this device is set as root
device.Root = true
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
device, err = agent.start(ctx, device)
if err != nil {
logger.Errorw("Fail-to-start-device", log.Fields{"device-id": agent.deviceID, "error": err})
@@ -220,22 +224,34 @@
// ListDeviceFlows returns the flow details for a specific device entry
func (dMgr *Manager) ListDeviceFlows(ctx context.Context, id *voltha.ID) (*ofp.Flows, error) {
logger.Debugw("ListDeviceFlows", log.Fields{"device-id": id.Id})
- device, err := dMgr.getDevice(ctx, id.Id)
- if err != nil {
- return &ofp.Flows{}, err
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
+ return &ofp.Flows{}, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
- return device.Flows, nil
+
+ flows := agent.listDeviceFlows()
+ ctr, ret := 0, make([]*ofp.OfpFlowStats, len(flows))
+ for _, flow := range flows {
+ ret[ctr] = flow
+ ctr++
+ }
+ return &openflow_13.Flows{Items: ret}, nil
}
// ListDeviceFlowGroups returns the flow group details for a specific device entry
func (dMgr *Manager) ListDeviceFlowGroups(ctx context.Context, id *voltha.ID) (*voltha.FlowGroups, error) {
logger.Debugw("ListDeviceFlowGroups", log.Fields{"device-id": id.Id})
-
- device, err := dMgr.getDevice(ctx, id.Id)
- if err != nil {
+ agent := dMgr.getDeviceAgent(ctx, id.Id)
+ if agent == nil {
return nil, status.Errorf(codes.NotFound, "device-%s", id.Id)
}
- return device.GetFlowGroups(), nil
+ groups := agent.listDeviceGroups()
+ ctr, ret := 0, make([]*openflow_13.OfpGroupEntry, len(groups))
+ for _, group := range groups {
+ ret[ctr] = group
+ ctr++
+ }
+ return &voltha.FlowGroups{Items: ret}, nil
}
// stopManagingDevice stops the management of the device as well as any of its reference device and logical device.
@@ -408,7 +424,7 @@
// If device is not in memory then set it up
if !dMgr.IsDeviceInCache(device.Id) {
logger.Debugw("loading-device-from-Model", log.Fields{"id": device.Id})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err := agent.start(ctx, nil); err != nil {
logger.Warnw("failure-starting-agent", log.Fields{"deviceId": device.Id})
} else {
@@ -471,7 +487,7 @@
// Proceed with the loading only if the device exist in the Model (could have been deleted)
if device, err = dMgr.getDeviceFromModel(ctx, deviceID); err == nil {
logger.Debugw("loading-device", log.Fields{"deviceId": deviceID})
- agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, device, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
if _, err = agent.start(ctx, nil); err != nil {
logger.Warnw("Failure loading device", log.Fields{"deviceId": deviceID, "error": err})
} else {
@@ -1029,7 +1045,7 @@
childDevice.ProxyAddress = &voltha.Device_ProxyAddress{DeviceId: parentDeviceID, DeviceType: pAgent.deviceType, ChannelId: uint32(channelID), OnuId: uint32(onuID)}
// Create and start a device agent for that device
- agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dProxy, dMgr.defaultTimeout)
+ agent := newAgent(dMgr.adapterProxy, childDevice, dMgr, dMgr.dbPath, dMgr.dProxy, dMgr.defaultTimeout)
childDevice, err := agent.start(ctx, childDevice)
if err != nil {
logger.Errorw("error-starting-child-device", log.Fields{"parent-device-id": childDevice.ParentId, "child-device-id": agent.deviceID, "error": err})
diff --git a/rw_core/test/common.go b/rw_core/test/common.go
new file mode 100644
index 0000000..a20f029
--- /dev/null
+++ b/rw_core/test/common.go
@@ -0,0 +1,33 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package test
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+var logger log.Logger
+
+func init() {
+ // Setup this package so that it's log level can be modified at run time
+ var err error
+ logger, err = log.AddPackage(log.JSON, log.ErrorLevel, log.Fields{"pkg": "test"})
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/rw_core/test/utils.go b/rw_core/test/utils.go
new file mode 100644
index 0000000..7d2dda0
--- /dev/null
+++ b/rw_core/test/utils.go
@@ -0,0 +1,156 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+// Package core Common Logger initialization
+package test
+
+import (
+ "strconv"
+ "testing"
+
+ "github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ cm "github.com/opencord/voltha-go/rw_core/mocks"
+ "github.com/opencord/voltha-lib-go/v3/pkg/adapters"
+ com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+ "github.com/opencord/voltha-lib-go/v3/pkg/version"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ OltAdapter = iota
+ OnuAdapter
+)
+
+//CreateMockAdapter creates mock OLT and ONU adapters
+func CreateMockAdapter(adapterType int, kafkaClient kafka.Client, coreInstanceID string, coreName string, adapterName string) (adapters.IAdapter, error) {
+ var err error
+ var adapter adapters.IAdapter
+ adapterKafkaICProxy := kafka.NewInterContainerProxy(
+ kafka.MsgClient(kafkaClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: adapterName}))
+ adapterCoreProxy := com.NewCoreProxy(adapterKafkaICProxy, adapterName, coreName)
+ var adapterReqHandler *com.RequestHandlerProxy
+ switch adapterType {
+ case OltAdapter:
+ adapter = cm.NewOLTAdapter(adapterCoreProxy)
+ case OnuAdapter:
+ adapter = cm.NewONUAdapter(adapterCoreProxy)
+ default:
+ logger.Fatalf("invalid-adapter-type-%d", adapterType)
+ }
+ adapterReqHandler = com.NewRequestHandlerProxy(coreInstanceID, adapter, adapterCoreProxy)
+
+ if err = adapterKafkaICProxy.Start(); err != nil {
+ logger.Errorw("Failure-starting-adapter-intercontainerProxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if err = adapterKafkaICProxy.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: adapterName}, adapterReqHandler); err != nil {
+ logger.Errorw("Failure-to-subscribe-onu-request-handler", log.Fields{"error": err})
+ return nil, err
+ }
+ return adapter, nil
+}
+
+//CreateAndregisterAdapters creates mock ONU and OLT adapters and egisters them to rw-core
+func CreateAndregisterAdapters(t *testing.T, kClient kafka.Client, coreInstanceID string, oltAdapterName string, onuAdapterName string, adapterMgr *adapter.Manager) (*cm.OLTAdapter, *cm.ONUAdapter) {
+ // Setup the mock OLT adapter
+ oltAdapter, err := CreateMockAdapter(OltAdapter, kClient, coreInstanceID, "rw_core", oltAdapterName)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltAdapter)
+
+ // Register the adapter
+ registrationData := &voltha.Adapter{
+ Id: oltAdapterName,
+ Vendor: "Voltha-olt",
+ Version: version.VersionInfo.Version,
+ Type: oltAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: oltAdapterName,
+ }
+ types := []*voltha.DeviceType{{Id: oltAdapterName, Adapter: oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ if _, err := adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
+
+ // Setup the mock ONU adapter
+ onuAdapter, err := CreateMockAdapter(OnuAdapter, kClient, coreInstanceID, "rw_core", onuAdapterName)
+
+ assert.Nil(t, err)
+ assert.NotNil(t, onuAdapter)
+ // Register the adapter
+ registrationData = &voltha.Adapter{
+ Id: onuAdapterName,
+ Vendor: "Voltha-onu",
+ Version: version.VersionInfo.Version,
+ Type: onuAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: onuAdapterName,
+ }
+ types = []*voltha.DeviceType{{Id: onuAdapterName, Adapter: onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes = &voltha.DeviceTypes{Items: types}
+ if _, err := adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
+ return oltAdapter.(*cm.OLTAdapter), onuAdapter.(*cm.ONUAdapter)
+}
+
+//StartEmbeddedEtcdServer creates and starts an Embedded etcd server locally.
+func StartEmbeddedEtcdServer(configName, storageDir, logLevel string) (*mock_etcd.EtcdServer, int, error) {
+ kvClientPort, err := freeport.GetFreePort()
+ if err != nil {
+ return nil, 0, err
+ }
+ peerPort, err := freeport.GetFreePort()
+ if err != nil {
+ return nil, 0, err
+ }
+ etcdServer := mock_etcd.StartEtcdServer(mock_etcd.MKConfig(configName, kvClientPort, peerPort, storageDir, logLevel))
+ if etcdServer == nil {
+ return nil, 0, status.Error(codes.Internal, "Embedded server failed to start")
+ }
+ return etcdServer, kvClientPort, nil
+}
+
+//StopEmbeddedEtcdServer stops the embedded etcd server
+func StopEmbeddedEtcdServer(server *mock_etcd.EtcdServer) {
+ if server != nil {
+ server.Stop()
+ }
+}
+
+//SetupKVClient creates a new etcd client
+func SetupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
+ addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
+ client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+ if err != nil {
+ panic("no kv client")
+ }
+ return client
+}