VOL-2909 - Disaggregating rw_core/core/.
This breaks the core package into logical components. (adapter manager, adapter proxy, devices, nbi/api), as well as the "core" which aggregates all these.
Change-Id: I257ac64024a1cf3efe3f5d89d508e60e6e681fb1
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
new file mode 100755
index 0000000..0579f94
--- /dev/null
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -0,0 +1,1227 @@
+/*
+* Copyright 2019-present Open Networking Foundation
+*
+* Licensed under the Apache License, Version 2.0 (the "License");
+* you may not use this file except in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing, software
+* distributed under the License is distributed on an "AS IS" BASIS,
+* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+* See the License for the specific language governing permissions and
+* limitations under the License.
+ */
+package api
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "math/rand"
+ "os"
+ "runtime"
+ "runtime/pprof"
+ "strings"
+ "sync"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/core/adapter"
+ "github.com/opencord/voltha-go/rw_core/core/device"
+ cm "github.com/opencord/voltha-go/rw_core/mocks"
+ "github.com/opencord/voltha-lib-go/v3/pkg/db"
+ "github.com/opencord/voltha-lib-go/v3/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+ mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
+ mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/version"
+ ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ coreName = "rw_core"
+)
+
+type NBTest struct {
+ etcdServer *mock_etcd.EtcdServer
+ deviceMgr *device.Manager
+ logicalDeviceMgr *device.LogicalManager
+ adapterMgr *adapter.Manager
+ kmp kafka.InterContainerProxy
+ kClient kafka.Client
+ kvClientPort int
+ numONUPerOLT int
+ startingUNIPortNo int
+ oltAdapter *cm.OLTAdapter
+ onuAdapter *cm.ONUAdapter
+ oltAdapterName string
+ onuAdapterName string
+ coreInstanceID string
+ defaultTimeout time.Duration
+ maxTimeout time.Duration
+}
+
+func newNBTest() *NBTest {
+ test := &NBTest{}
+ // Start the embedded etcd server
+ var err error
+ test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
+ if err != nil {
+ logger.Fatal(err)
+ }
+ // Create the kafka client
+ test.kClient = mock_kafka.NewKafkaClient()
+ test.oltAdapterName = "olt_adapter_mock"
+ test.onuAdapterName = "onu_adapter_mock"
+ test.coreInstanceID = "rw-nbi-test"
+ test.defaultTimeout = 10 * time.Second
+ test.maxTimeout = 20 * time.Second
+ return test
+}
+
+func (nb *NBTest) startCore(inCompeteMode bool) {
+ ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
+ defer cancel()
+ cfg := config.NewRWCoreFlags()
+ cfg.CorePairTopic = "rw_core"
+ cfg.DefaultRequestTimeout = nb.defaultTimeout
+ cfg.DefaultCoreTimeout = nb.defaultTimeout
+ cfg.KVStorePort = nb.kvClientPort
+ cfg.InCompetingMode = inCompeteMode
+ grpcPort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal("Cannot get a freeport for grpc")
+ }
+ cfg.GrpcPort = grpcPort
+ cfg.GrpcHost = "127.0.0.1"
+ setCoreCompeteMode(inCompeteMode)
+ client := setupKVClient(cfg, nb.coreInstanceID)
+ backend := &db.Backend{
+ Client: client,
+ StoreType: cfg.KVStoreType,
+ Host: cfg.KVStoreHost,
+ Port: cfg.KVStorePort,
+ Timeout: cfg.KVStoreTimeout,
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2,
+ PathPrefix: cfg.KVStoreDataPrefix}
+ nb.kmp = kafka.NewInterContainerProxy(
+ kafka.InterContainerHost(cfg.KafkaAdapterHost),
+ kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.MsgClient(nb.kClient),
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
+ kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+
+ endpointMgr := kafka.NewEndpointManager(backend)
+ proxy := model.NewProxy(backend, "/")
+ nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewDeviceManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+ if err = nb.adapterMgr.Start(ctx); err != nil {
+ logger.Fatalf("Cannot start adapterMgr: %s", err)
+ }
+ nb.deviceMgr.Start(ctx)
+ nb.logicalDeviceMgr.Start(ctx)
+
+ if err = nb.kmp.Start(); err != nil {
+ logger.Fatalf("Cannot start InterContainerProxy: %s", err)
+ }
+ requestProxy := NewAdapterRequestHandlerProxy(nb.coreInstanceID, nb.deviceMgr, nb.adapterMgr, proxy, proxy, cfg.LongRunningRequestTimeout, cfg.DefaultRequestTimeout)
+ if err := nb.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
+ logger.Fatalf("Cannot add request handler: %s", err)
+ }
+ if err := nb.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ logger.Fatalf("Cannot add default request handler: %s", err)
+ }
+}
+
+func (nb *NBTest) createAndregisterAdapters(t *testing.T) {
+ // Setup the mock OLT adapter
+ oltAdapter, err := createMockAdapter(OltAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.oltAdapterName)
+ if err != nil {
+ logger.Fatalw("setting-mock-olt-adapter-failed", log.Fields{"error": err})
+ }
+ nb.oltAdapter = (oltAdapter).(*cm.OLTAdapter)
+ nb.numONUPerOLT = nb.oltAdapter.GetNumONUPerOLT()
+ nb.startingUNIPortNo = nb.oltAdapter.GetStartingUNIPortNo()
+
+ // Register the adapter
+ registrationData := &voltha.Adapter{
+ Id: nb.oltAdapterName,
+ Vendor: "Voltha-olt",
+ Version: version.VersionInfo.Version,
+ Type: nb.oltAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: nb.oltAdapterName,
+ }
+ types := []*voltha.DeviceType{{Id: nb.oltAdapterName, Adapter: nb.oltAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes := &voltha.DeviceTypes{Items: types}
+ if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
+
+ // Setup the mock ONU adapter
+ onuAdapter, err := createMockAdapter(OnuAdapter, nb.kClient, nb.coreInstanceID, coreName, nb.onuAdapterName)
+ if err != nil {
+ logger.Fatalw("setting-mock-onu-adapter-failed", log.Fields{"error": err})
+ }
+ nb.onuAdapter = (onuAdapter).(*cm.ONUAdapter)
+
+ // Register the adapter
+ registrationData = &voltha.Adapter{
+ Id: nb.onuAdapterName,
+ Vendor: "Voltha-onu",
+ Version: version.VersionInfo.Version,
+ Type: nb.onuAdapterName,
+ CurrentReplica: 1,
+ TotalReplicas: 1,
+ Endpoint: nb.onuAdapterName,
+ }
+ types = []*voltha.DeviceType{{Id: nb.onuAdapterName, Adapter: nb.onuAdapterName, AcceptsAddRemoveFlowUpdates: true}}
+ deviceTypes = &voltha.DeviceTypes{Items: types}
+ if _, err := nb.adapterMgr.RegisterAdapter(registrationData, deviceTypes); err != nil {
+ logger.Errorw("failed-to-register-adapter", log.Fields{"error": err})
+ assert.NotNil(t, err)
+ }
+}
+
+func (nb *NBTest) stopAll() {
+ if nb.kClient != nil {
+ nb.kClient.Stop()
+ }
+ if nb.logicalDeviceMgr != nil {
+ nb.logicalDeviceMgr.Stop(context.Background())
+ }
+ if nb.deviceMgr != nil {
+ nb.deviceMgr.Stop(context.Background())
+ }
+ if nb.kmp != nil {
+ nb.kmp.Stop()
+ }
+ if nb.etcdServer != nil {
+ stopEmbeddedEtcdServer(nb.etcdServer)
+ }
+}
+
+func (nb *NBTest) verifyLogicalDevices(t *testing.T, oltDevice *voltha.Device, nbi *NBIHandler) {
+ // Get the latest set of logical devices
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ assert.Equal(t, 1, len(logicalDevices.Items))
+
+ ld := logicalDevices.Items[0]
+ assert.NotEqual(t, "", ld.Id)
+ assert.NotEqual(t, uint64(0), ld.DatapathId)
+ assert.Equal(t, "olt_adapter_mock", ld.Desc.HwDesc)
+ assert.Equal(t, "olt_adapter_mock", ld.Desc.SwDesc)
+ assert.NotEqual(t, "", ld.RootDeviceId)
+ assert.NotEqual(t, "", ld.Desc.SerialNum)
+ assert.Equal(t, uint32(256), ld.SwitchFeatures.NBuffers)
+ assert.Equal(t, uint32(2), ld.SwitchFeatures.NTables)
+ assert.Equal(t, uint32(15), ld.SwitchFeatures.Capabilities)
+ assert.Equal(t, 1+nb.numONUPerOLT, len(ld.Ports))
+ assert.Equal(t, oltDevice.ParentId, ld.Id)
+ //Expected port no
+ expectedPortNo := make(map[uint32]bool)
+ expectedPortNo[uint32(2)] = false
+ for i := 0; i < nb.numONUPerOLT; i++ {
+ expectedPortNo[uint32(i+100)] = false
+ }
+ for _, p := range ld.Ports {
+ assert.Equal(t, p.OfpPort.PortNo, p.DevicePortNo)
+ assert.Equal(t, uint32(4), p.OfpPort.State)
+ expectedPortNo[p.OfpPort.PortNo] = true
+ if strings.HasPrefix(p.Id, "nni") {
+ assert.Equal(t, true, p.RootPort)
+ //assert.Equal(t, uint32(2), p.OfpPort.PortNo)
+ assert.Equal(t, p.Id, fmt.Sprintf("nni-%d", p.DevicePortNo))
+ } else {
+ assert.Equal(t, p.Id, fmt.Sprintf("uni-%d", p.DevicePortNo))
+ assert.Equal(t, false, p.RootPort)
+ }
+ }
+}
+
+func (nb *NBTest) verifyDevices(t *testing.T, nbi *NBIHandler) {
+ // Get the latest set of devices
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, devices)
+
+ // A device is ready to be examined when its ADMIN state is ENABLED and OPERATIONAL state is ACTIVE
+ var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
+ }
+
+ var wg sync.WaitGroup
+ for _, device := range devices.Items {
+ wg.Add(1)
+ go func(wg *sync.WaitGroup, device *voltha.Device) {
+ // Wait until the device is in the right state
+ err := waitUntilDeviceReadiness(device.Id, nb.maxTimeout, vFunction, nbi)
+ assert.Nil(t, err)
+
+ // Now, verify the details of the device. First get the latest update
+ d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
+ assert.Nil(t, err)
+ assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
+ assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
+ assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
+ assert.Equal(t, d.Type, d.Adapter)
+ assert.NotEqual(t, "", d.MacAddress)
+ assert.NotEqual(t, "", d.SerialNumber)
+
+ if d.Type == "olt_adapter_mock" {
+ assert.Equal(t, true, d.Root)
+ assert.NotEqual(t, "", d.Id)
+ assert.NotEqual(t, "", d.ParentId)
+ assert.Nil(t, d.ProxyAddress)
+ } else if d.Type == "onu_adapter_mock" {
+ assert.Equal(t, false, d.Root)
+ assert.NotEqual(t, uint32(0), d.Vlan)
+ assert.NotEqual(t, "", d.Id)
+ assert.NotEqual(t, "", d.ParentId)
+ assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
+ assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
+ } else {
+ assert.Error(t, errors.New("invalid-device-type"))
+ }
+ assert.Equal(t, 2, len(d.Ports))
+ for _, p := range d.Ports {
+ assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
+ assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
+ if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+ assert.Equal(t, 0, len(p.Peers))
+ } else if p.Type == voltha.Port_PON_OLT {
+ assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
+ assert.Equal(t, uint32(1), p.PortNo)
+ } else if p.Type == voltha.Port_PON_ONU {
+ assert.Equal(t, 1, len(p.Peers))
+ assert.Equal(t, uint32(1), p.PortNo)
+ } else {
+ assert.Error(t, errors.New("invalid-port"))
+ }
+ }
+ wg.Done()
+ }(&wg, device)
+ }
+ wg.Wait()
+}
+
+func (nb *NBTest) getADevice(rootDevice bool, nbi *NBIHandler) (*voltha.Device, error) {
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ if err != nil {
+ return nil, err
+ }
+ for _, d := range devices.Items {
+ if d.Root == rootDevice {
+ return d, nil
+ }
+ }
+ return nil, status.Errorf(codes.NotFound, "%v device not found", rootDevice)
+}
+
+func (nb *NBTest) testCoreWithoutData(t *testing.T, nbi *NBIHandler) {
+ lds, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, lds)
+ assert.Equal(t, 0, len(lds.Items))
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, devices)
+ assert.Equal(t, 0, len(devices.Items))
+ adapters, err := nbi.ListAdapters(getContext(), &empty.Empty{})
+ assert.Equal(t, 0, len(adapters.Items))
+ assert.Nil(t, err)
+ assert.NotNil(t, adapters)
+}
+
+func (nb *NBTest) testAdapterRegistration(t *testing.T, nbi *NBIHandler) {
+ adapters, err := nbi.ListAdapters(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, adapters)
+ assert.Equal(t, 2, len(adapters.Items))
+ for _, a := range adapters.Items {
+ switch a.Id {
+ case nb.oltAdapterName:
+ assert.Equal(t, "Voltha-olt", a.Vendor)
+ case nb.onuAdapterName:
+ assert.Equal(t, "Voltha-onu", a.Vendor)
+ default:
+ logger.Fatal("unregistered-adapter", a.Id)
+ }
+ }
+ deviceTypes, err := nbi.ListDeviceTypes(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, deviceTypes)
+ assert.Equal(t, 2, len(deviceTypes.Items))
+ for _, dt := range deviceTypes.Items {
+ switch dt.Id {
+ case nb.oltAdapterName:
+ assert.Equal(t, nb.oltAdapterName, dt.Adapter)
+ assert.Equal(t, false, dt.AcceptsBulkFlowUpdate)
+ assert.Equal(t, true, dt.AcceptsAddRemoveFlowUpdates)
+ case nb.onuAdapterName:
+ assert.Equal(t, nb.onuAdapterName, dt.Adapter)
+ assert.Equal(t, false, dt.AcceptsBulkFlowUpdate)
+ assert.Equal(t, true, dt.AcceptsAddRemoveFlowUpdates)
+ default:
+ logger.Fatal("invalid-device-type", dt.Id)
+ }
+ }
+}
+
+func (nb *NBTest) testCreateDevice(t *testing.T, nbi *NBIHandler) {
+ // Create a valid device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ device, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, device)
+ assert.Equal(t, oltDevice.String(), device.String())
+
+ // Try to create the same device
+ _, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
+ assert.NotNil(t, err)
+ assert.Equal(t, "Device is already pre-provisioned", err.Error())
+
+ // Try to create a device with invalid data
+ _, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName})
+ assert.NotNil(t, err)
+ assert.Equal(t, "no-device-info-present; MAC or HOSTIP&PORT", err.Error())
+
+ // Ensure we only have 1 device in the Core
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, devices)
+ assert.Equal(t, 1, len(devices.Items))
+ assert.Equal(t, oltDevice.String(), devices.Items[0].String())
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ //Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+ var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ return devices != nil && len(devices.Items) == 0
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testEnableDevice(t *testing.T, nbi *NBIHandler) {
+ // Create a device that has no adapter registered
+ oltDeviceNoAdapter, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: "noAdapterRegistered", MacAddress: "aa:bb:cc:cc:ee:ff"})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDeviceNoAdapter)
+
+ // Try to enable the oltDevice and check the error message
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDeviceNoAdapter.Id})
+ assert.NotNil(t, err)
+ assert.Equal(t, "Adapter-not-registered-for-device-type noAdapterRegistered", err.Error())
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDeviceNoAdapter.Id})
+ assert.Nil(t, err)
+
+ //Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+ var vdFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ return devices != nil && len(devices.Items) == 0
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vdFunction)
+ assert.Nil(t, err)
+
+ // Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+ var wg sync.WaitGroup
+ wg.Add(1)
+ go nb.monitorLogicalDevice(t, nbi, 1, nb.numONUPerOLT, &wg)
+
+ // Create the device with valid data
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Verify oltDevice exist in the core
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(devices.Items))
+ assert.Equal(t, oltDevice.Id, devices.Items[0].Id)
+
+ // Enable the oltDevice
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+ return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Verify that the devices have been setup correctly
+ nb.verifyDevices(t, nbi)
+
+ // Get latest oltDevice data
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Verify that the logical device has been setup correctly
+ nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ // Wait until all flows has been sent to the devices successfully
+ wg.Wait()
+}
+
+func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi *NBIHandler) {
+ //Get an OLT device
+ oltDevice, err := nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Disable the oltDevice
+ _, err = nbi.DisableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the old device to be disabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_DISABLED && device.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Verify that all onu devices are disabled as well
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
+ for _, lp := range ld.Ports {
+ if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Reenable the oltDevice
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the old device to be enabled
+ vdFunction = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Verify that all onu devices are enabled as well
+ onuDevices, err = nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Wait for the logical device to satisfy the expected condition
+ vlFunction = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
+ for _, lp := range ld.Ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDisableAndDeleteAllDevice(t *testing.T, nbi *NBIHandler) {
+ //Get an OLT device
+ oltDevice, err := nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Disable the oltDevice
+ _, err = nbi.DisableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the olt device to be disabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_DISABLED && device.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Verify that all onu devices are disabled as well
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Delete the oltDevice
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ return devices != nil && len(devices.Items) == 0
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.Nil(t, err)
+
+ // Wait for absence of logical device
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ return lds != nil && len(lds.Items) == 0
+ }
+
+ err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+}
+func (nb *NBTest) testEnableAndDeleteAllDevice(t *testing.T, nbi *NBIHandler) {
+ //Create the device with valid data
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ //Get an OLT device
+ oltDevice, err = nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Enable the oltDevice
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+ return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ //Get all child devices
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+
+ // Wait for the all onu devices to be enabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED
+ }
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+ // Wait for each onu device to get deleted
+ var vdFunc isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device == nil
+ }
+
+ // Delete the onuDevice
+ for _, onu := range onuDevices.Items {
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: onu.Id})
+ assert.Nil(t, err)
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunc, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Disable the oltDevice
+ _, err = nbi.DisableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the olt device to be disabled
+ var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_DISABLED && device.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vFunction, nbi)
+ assert.Nil(t, err)
+
+ // Delete the oltDevice
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ var vFunc isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ return devices != nil && len(devices.Items) == 0
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunc)
+ assert.Nil(t, err)
+}
+func (nb *NBTest) testDisableAndEnablePort(t *testing.T, nbi *NBIHandler) {
+ //Get an OLT device
+ var cp *voltha.Port
+ oltDevice, err := nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ for _, cp = range oltDevice.Ports {
+ if cp.Type == voltha.Port_PON_OLT {
+ break
+ }
+
+ }
+ assert.NotNil(t, cp)
+ cp.DeviceId = oltDevice.Id
+
+ // Disable the NW Port of oltDevice
+ _, err = nbi.DisablePort(getContext(), cp)
+ assert.Nil(t, err)
+ // Wait for the olt device Port to be disabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ for _, port := range device.Ports {
+ if port.PortNo == cp.PortNo {
+ return port.AdminState == voltha.AdminState_DISABLED
+ }
+ }
+ return false
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
+ for _, lp := range ld.Ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Enable the NW Port of oltDevice
+ _, err = nbi.EnablePort(getContext(), cp)
+ assert.Nil(t, err)
+
+ // Wait for the olt device Port to be enabled
+ vdFunction = func(device *voltha.Device) bool {
+ for _, port := range device.Ports {
+ if port.PortNo == cp.PortNo {
+ return port.AdminState == voltha.AdminState_ENABLED
+ }
+ }
+ return false
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ // Wait for the logical device to satisfy the expected condition
+ vlFunction = func(ld *voltha.LogicalDevice) bool {
+ if ld == nil {
+ return false
+ }
+ for _, lp := range ld.Ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Disable a non-PON port
+ for _, cp = range oltDevice.Ports {
+ if cp.Type != voltha.Port_PON_OLT {
+ break
+ }
+
+ }
+ assert.NotNil(t, cp)
+ cp.DeviceId = oltDevice.Id
+
+ // Disable the NW Port of oltDevice
+ _, err = nbi.DisablePort(getContext(), cp)
+ assert.NotNil(t, err)
+
+}
+
+func (nb *NBTest) testDeviceRebootWhenOltIsEnabled(t *testing.T, nbi *NBIHandler) {
+ //Get an OLT device
+ oltDevice, err := nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+ assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+ // Verify that we have one or more ONUs to start with
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Greater(t, len(onuDevices.Items), 0)
+
+ // Reboot the OLT and very that Connection Status goes to UNREACHABLE and operation status to UNKNOWN
+ _, err = nbi.RebootDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ var vlFunction0 = func(d *voltha.Device) bool {
+ return d.ConnectStatus == voltha.ConnectStatus_UNREACHABLE && d.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction0, nbi)
+ assert.Nil(t, err)
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction1 = func(ld *voltha.LogicalDevice) bool {
+ return ld == nil
+ }
+
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction1)
+ assert.Nil(t, err)
+
+ // Wait for the device to satisfy the expected condition (device does not have flows)
+ var vlFunction2 = func(d *voltha.Device) bool {
+ var deviceFlows *ofp.Flows
+ var err error
+ if deviceFlows, err = nbi.ListDeviceFlows(getContext(), &voltha.ID{Id: d.Id}); err != nil {
+ return false
+ }
+ return len(deviceFlows.Items) == 0
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction2, nbi)
+ assert.Nil(t, err)
+
+ // Wait for the device to satisfy the expected condition (there are no child devices)
+ var vlFunction3 = func(d *voltha.Device) bool {
+ var devices *voltha.Devices
+ var err error
+ if devices, err = nbi.ListDevices(getContext(), nil); err != nil {
+ return false
+ }
+ for _, device := range devices.Items {
+ if device.ParentId == d.Id {
+ // We have a child device still left
+ return false
+ }
+ }
+ return true
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction3, nbi)
+ assert.Nil(t, err)
+
+ // Update the OLT Connection Status to REACHABLE and operation status to ACTIVE
+ // Normally, in a real adapter this happens after connection regain via a heartbeat mechanism with real hardware
+ err = nbi.deviceMgr.UpdateDeviceStatus(getContext(), oltDevice.Id, voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+ assert.Nil(t, err)
+
+ // Verify the device connection and operation states
+ oltDevice, err = nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+ assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction4 = func(ld *voltha.LogicalDevice) bool {
+ return ld != nil
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction4)
+ assert.Nil(t, err)
+
+ // Verify that logical device is created again
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ assert.Equal(t, 1, len(logicalDevices.Items))
+
+ // Verify that we have no ONUs left
+ onuDevices, err = nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Equal(t, 0, len(onuDevices.Items))
+}
+
+func (nb *NBTest) testStartOmciTestAction(t *testing.T, nbi *NBIHandler) {
+ // -----------------------------------------------------------------------
+ // SubTest 1: Omci test action should fail due to nonexistent device id
+
+ request := &voltha.OmciTestRequest{Id: "123", Uuid: "456"}
+ _, err := nbi.StartOmciTestAction(getContext(), request)
+ assert.NotNil(t, err)
+ assert.Equal(t, "rpc error: code = NotFound desc = 123", err.Error())
+
+ // -----------------------------------------------------------------------
+ // SubTest 2: Error should be returned for device with no adapter registered
+
+ // Create a device that has no adapter registered
+ deviceNoAdapter, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: "noAdapterRegisteredOmciTest", MacAddress: "aa:bb:cc:cc:ee:01"})
+ assert.Nil(t, err)
+ assert.NotNil(t, deviceNoAdapter)
+
+ // Omci test action should fail due to nonexistent adapter
+ request = &voltha.OmciTestRequest{Id: deviceNoAdapter.Id, Uuid: "456"}
+ _, err = nbi.StartOmciTestAction(getContext(), request)
+ assert.NotNil(t, err)
+ assert.Equal(t, "Adapter-not-registered-for-device-type noAdapterRegisteredOmciTest", err.Error())
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: deviceNoAdapter.Id})
+ assert.Nil(t, err)
+
+ //Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+ var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ return devices != nil && len(devices.Items) == 0
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.Nil(t, err)
+
+ // -----------------------------------------------------------------------
+ // SubTest 3: Omci test action should succeed on valid ONU
+
+ // Create the device with valid data
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: nb.oltAdapterName, MacAddress: "aa:bb:cc:cc:ee:ee"})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Verify oltDevice exist in the core
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.Equal(t, 1, len(devices.Items))
+ assert.Equal(t, oltDevice.Id, devices.Items[0].Id)
+
+ // Enable the oltDevice
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+ return ld != nil && len(ld.Ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Wait for the olt device to be enabled
+ vdFunction := func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ onuDevices, err := nb.deviceMgr.GetAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ assert.Greater(t, len(onuDevices.Items), 0)
+
+ onuDevice := onuDevices.Items[0]
+
+ // Omci test action should succeed
+ request = &voltha.OmciTestRequest{Id: onuDevice.Id, Uuid: "456"}
+ resp, err := nbi.StartOmciTestAction(getContext(), request)
+ assert.Nil(t, err)
+ assert.Equal(t, resp.Result, voltha.TestResponse_SUCCESS)
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ //Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.Nil(t, err)
+}
+
+func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range fa.MatchFields {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func createMetadata(cTag int, techProfile int, port int) uint64 {
+ md := 0
+ md = (md | (cTag & 0xFFFF)) << 16
+ md = (md | (techProfile & 0xFFFF)) << 32
+ return uint64(md | (port & 0xFFFFFFFF))
+}
+
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int) {
+ expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
+ // Wait for logical device to have all the flows
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ return lds != nil && len(lds.Items) == 1 && len(lds.Items[0].Flows.Items) == expectedNumFlows
+ }
+ // No timeout implies a success
+ err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi *NBIHandler, logicalDevice *voltha.LogicalDevice, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+ // Send flows for the parent device
+ var nniPorts []*voltha.LogicalPort
+ var uniPorts []*voltha.LogicalPort
+ for _, p := range logicalDevice.Ports {
+ if p.RootPort {
+ nniPorts = append(nniPorts, p)
+ } else {
+ uniPorts = append(uniPorts, p)
+ }
+ }
+ assert.Equal(t, 1, len(nniPorts))
+ //assert.Greater(t, len(uniPorts), 1 )
+ nniPort := nniPorts[0].OfpPort.PortNo
+ maxInt32 := uint64(0xFFFFFFFF)
+ controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+ var fa *flows.FlowArgs
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(35020),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
+ assert.Nil(t, err)
+
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(2048),
+ flows.IpProto(17),
+ flows.UdpSrc(67),
+ flows.UdpDst(68),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
+ assert.Nil(t, err)
+
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(34525),
+ flows.IpProto(17),
+ flows.UdpSrc(546),
+ flows.UdpDst(547),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDevice.Id}
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
+ assert.Nil(t, err)
+
+ return len(nniPorts), len(uniPorts)
+}
+
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi *NBIHandler, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+ maxInt32 := uint64(0xFFFFFFFF)
+ controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(port.PortNo),
+ flows.EthType(34958),
+ flows.VlanVid(8187),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+ _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) monitorLogicalDevice(t *testing.T, nbi *NBIHandler, numNNIPorts int, numUNIPorts int, wg *sync.WaitGroup) {
+ defer wg.Done()
+ nb.logicalDeviceMgr.SetEventCallbacks(nbi)
+
+ // Clear any existing flows on the adapters
+ nb.oltAdapter.ClearFlows()
+ nb.onuAdapter.ClearFlows()
+
+ // Wait until a logical device is ready
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ if lds == nil || len(lds.Items) != 1 {
+ return false
+ }
+ // Ensure there are both NNI ports and at least one UNI port on the logical device
+ ld := lds.Items[0]
+ nniPort := false
+ uniPort := false
+ for _, p := range ld.Ports {
+ nniPort = nniPort || p.RootPort == true
+ uniPort = uniPort || p.RootPort == false
+ if nniPort && uniPort {
+ return true
+ }
+ }
+ return false
+ }
+ err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ assert.Equal(t, 1, len(logicalDevices.Items))
+
+ logicalDevice := logicalDevices.Items[0]
+ meterID := rand.Uint32()
+
+ // Add a meter to the logical device
+ meterMod := &ofp.OfpMeterMod{
+ Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+ Flags: rand.Uint32(),
+ MeterId: meterID,
+ Bands: []*ofp.OfpMeterBandHeader{
+ {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+ Rate: rand.Uint32(),
+ BurstSize: rand.Uint32(),
+ Data: nil,
+ },
+ },
+ }
+ _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDevice.Id, MeterMod: meterMod})
+ assert.Nil(t, err)
+
+ // Send initial set of Trap flows
+ startingVlan := 4091
+ nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
+
+ // Listen for port events
+ start := time.Now()
+ processedNniLogicalPorts := 0
+ processedUniLogicalPorts := 0
+
+ for event := range nbi.changeEventQueue {
+ startingVlan++
+ if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
+ ps := portStatus.PortStatus
+ if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
+ if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+ processedUniLogicalPorts++
+ nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+ } else {
+ processedNniLogicalPorts++
+ }
+ }
+ }
+
+ if processedNniLogicalPorts >= numNNIPorts && processedUniLogicalPorts >= numUNIPorts {
+ fmt.Println("Total time to send all flows:", time.Since(start))
+ break
+ }
+ }
+ //Verify the flow count on the logical device
+ nb.verifyLogicalDeviceFlowCount(t, nbi, numNNIPorts, numUNIPorts)
+
+ // Wait until all flows have been sent to the OLT adapters
+ var oltVFunc isConditionSatisfied = func() bool {
+ return nb.oltAdapter.GetFlowCount() >= (numNNIPorts*3)+numNNIPorts*numUNIPorts
+ }
+ err = waitUntilCondition(nb.maxTimeout, nbi, oltVFunc)
+ assert.Nil(t, err)
+
+ // Wait until all flows have been sent to the ONU adapters
+ var onuVFunc isConditionSatisfied = func() bool {
+ return nb.onuAdapter.GetFlowCount() == numUNIPorts
+ }
+ err = waitUntilCondition(nb.maxTimeout, nbi, onuVFunc)
+ assert.Nil(t, err)
+}
+
+func TestSuiteNbiApiHandler(t *testing.T) {
+ f, err := os.Create("../../../tests/results/profile.cpu")
+ if err != nil {
+ logger.Fatalf("could not create CPU profile: %v\n ", err)
+ }
+ defer f.Close()
+ runtime.SetBlockProfileRate(1)
+ runtime.SetMutexProfileFraction(-1)
+ if err := pprof.StartCPUProfile(f); err != nil {
+ logger.Fatalf("could not start CPU profile: %v\n", err)
+ }
+ defer pprof.StopCPUProfile()
+
+ //log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
+
+ nb := newNBTest()
+ assert.NotNil(t, nb)
+
+ defer nb.stopAll()
+
+ // Start the Core
+ nb.startCore(false)
+
+ // Set the grpc API interface - no grpc server is running in unit test
+ nbi := NewAPIHandler(nb.deviceMgr, nb.logicalDeviceMgr, nb.adapterMgr)
+
+ // 1. Basic test with no data in Core
+ nb.testCoreWithoutData(t, nbi)
+
+ // Create/register the adapters
+ nb.createAndregisterAdapters(t)
+
+ // 2. Test adapter registration
+ nb.testAdapterRegistration(t, nbi)
+
+ numberOfDeviceTestRuns := 2
+ for i := 1; i <= numberOfDeviceTestRuns; i++ {
+ //3. Test create device
+ nb.testCreateDevice(t, nbi)
+
+ // 4. Test Enable a device
+ nb.testEnableDevice(t, nbi)
+
+ // 5. Test disable and ReEnable a root device
+ nb.testDisableAndReEnableRootDevice(t, nbi)
+
+ // 6. Test disable and Enable pon port of OLT device
+ nb.testDisableAndEnablePort(t, nbi)
+
+ // 7.Test Device unreachable when OLT is enabled
+ nb.testDeviceRebootWhenOltIsEnabled(t, nbi)
+
+ // 8. Test disable and delete all devices
+ nb.testDisableAndDeleteAllDevice(t, nbi)
+
+ // 9. Test enable and delete all devices
+ nb.testEnableAndDeleteAllDevice(t, nbi)
+
+ // 10. Test omci test
+ nb.testStartOmciTestAction(t, nbi)
+ }
+
+ //x. TODO - More tests to come
+}