[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/test/core_nbi_handler_multi_test.go b/rw_core/test/core_nbi_handler_multi_test.go
new file mode 100755
index 0000000..d65eb39
--- /dev/null
+++ b/rw_core/test/core_nbi_handler_multi_test.go
@@ -0,0 +1,2167 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package test
+
+import (
+ "context"
+ "errors"
+ "fmt"
+ "io"
+ "math/rand"
+ "strings"
+ "sync"
+
+ "github.com/Shopify/sarama"
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-lib-go/v7/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v7/pkg/kafka"
+ mock_kafka "github.com/opencord/voltha-lib-go/v7/pkg/mocks/kafka"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
+
+ "os"
+ "runtime"
+ "runtime/pprof"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/jsonpb"
+ "github.com/opencord/voltha-go/rw_core/config"
+ c "github.com/opencord/voltha-go/rw_core/core"
+ cm "github.com/opencord/voltha-go/rw_core/mocks"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ mock_etcd "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+)
+
+var oltAdapters = map[string]*AdapterInfo{
+ "olt_adapter_type1": {
+ TotalReplica: 1,
+ DeviceType: "olt-device-type1",
+ Vendor: "olt-mock-vendor1",
+ ChildDeviceType: "onu-device-type1",
+ ChildVendor: "onu-mock-vendor1",
+ },
+ "olt_adapter_type2": {
+ TotalReplica: 1,
+ DeviceType: "olt-device-type2",
+ Vendor: "olt-mock-vendor2",
+ ChildDeviceType: "onu-device-type2",
+ ChildVendor: "onu-mock-vendor2",
+ },
+}
+
+var onuAdapters = map[string]*AdapterInfo{
+ "onu_adapter_type1": {
+ TotalReplica: 1,
+ DeviceType: "onu-device-type1",
+ Vendor: "onu-mock-vendor1",
+ },
+ "onu_adapter_type2": {
+ TotalReplica: 1,
+ DeviceType: "onu-device-type2",
+ Vendor: "onu-mock-vendor2",
+ },
+}
+
+type NBTest struct {
+ etcdServer *mock_etcd.EtcdServer
+ config *config.RWCoreFlags
+ kvClientPort int
+ kEventClient kafka.Client
+ kafkaBroker *sarama.MockBroker
+ numONUPerOLT int
+ startingUNIPortNo int
+ oltAdapters map[string][]*cm.OLTAdapter // map<adapter type>[adapter instances]
+ onuAdapters map[string][]*cm.ONUAdapter
+ coreInstanceID string
+ internalTimeout time.Duration
+ maxTimeout time.Duration
+ coreRPCTimeout time.Duration
+ core *c.Core
+ probe *probe.Probe
+ oltAdaptersLock sync.RWMutex
+ onuAdaptersLock sync.RWMutex
+ changeEventLister *ChangedEventListener
+}
+
+var testLogger log.CLogger
+
+func init() {
+ var err error
+ testLogger, err = log.RegisterPackage(log.JSON, log.InfoLevel, log.Fields{"nbi-handler-test": true})
+ if err != nil {
+ panic(err)
+ }
+
+ if err = log.SetLogLevel(log.InfoLevel); err != nil {
+ panic(err)
+ }
+}
+
+func newNBTest(ctx context.Context, loadTest bool) *NBTest {
+ test := &NBTest{}
+ // Start the embedded etcd server
+ var err error
+ test.etcdServer, test.kvClientPort, err = StartEmbeddedEtcdServer(ctx, "voltha.rwcore.nb.test", "voltha.rwcore.nb.etcd", "error")
+ if err != nil {
+ logger.Fatal(ctx, err)
+ }
+ test.coreInstanceID = "rw-nbi-test"
+ test.internalTimeout = 20 * time.Second
+ test.maxTimeout = 20 * time.Second
+ test.coreRPCTimeout = 20 * time.Second
+ if loadTest {
+ test.internalTimeout = 100 * time.Second
+ test.maxTimeout = 300 * time.Second
+ test.coreRPCTimeout = 100 * time.Second
+ setRetryInterval(5 * time.Second)
+ }
+ return test
+}
+
+func (nb *NBTest) startGRPCCore(ctx context.Context, t *testing.T) (coreEndpoint, nbiEndpoint string) {
+ // Setup the configs
+ cfg := &config.RWCoreFlags{}
+ cfg.ParseCommandArguments([]string{})
+ cfg.InternalTimeout = nb.internalTimeout
+ cfg.RPCTimeout = nb.coreRPCTimeout
+ cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
+ cfg.LogLevel = "DEBUG"
+
+ // Get a free port for the Core gRPC server
+ grpcPort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal(ctx, "Cannot get a freeport for grpc core")
+ }
+ cfg.GrpcSBIAddress = "127.0.0.1:" + strconv.Itoa(grpcPort)
+ coreEndpoint = cfg.GrpcSBIAddress
+
+ // Get a free port for the NBI gRPC server
+ grpcPort, err = freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal(ctx, "Cannot get a freeport for grpc NBI")
+ }
+ cfg.GrpcNBIAddress = "127.0.0.1:" + strconv.Itoa(grpcPort)
+ nbiEndpoint = cfg.GrpcNBIAddress
+
+ // Set up the probe service
+ nb.probe = &probe.Probe{}
+ probePort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatal(ctx, "Cannot get a freeport for probe port")
+ }
+ cfg.ProbeAddress = "127.0.0.1:" + strconv.Itoa(probePort)
+ go nb.probe.ListenAndServe(ctx, cfg.ProbeAddress)
+
+ //Add the probe to the context to pass to all the services started
+ probeCtx := context.WithValue(ctx, probe.ProbeContextKey, nb.probe)
+
+ // Set up a mock kafka broker
+ kafkaPort, err := freeport.GetFreePort()
+ if err != nil {
+ logger.Fatalw(probeCtx, "Cannot get a freeport for kafka port", log.Fields{"error": err})
+ }
+ cfg.KafkaClusterAddress = "127.0.0.1:" + strconv.Itoa(kafkaPort)
+
+ // Register probe services
+ nb.probe.RegisterService(
+ ctx,
+ "cluster-message-service",
+ "grpc-sbi-service",
+ "adapter-service",
+ "kv-service",
+ "device-service",
+ "logical-device-service",
+ )
+
+ nb.kEventClient = mock_kafka.NewKafkaClient()
+
+ nb.config = cfg
+ shutdownCtx, cancelCtx := context.WithCancel(probeCtx)
+
+ rwCore := &c.Core{Shutdown: cancelCtx, Stopped: make(chan struct{}), KafkaClient: nb.kEventClient}
+ go rwCore.Start(shutdownCtx, "core-test", cfg)
+
+ return
+}
+
+func (nb *NBTest) stopAll(ctx context.Context) {
+ if nb.etcdServer != nil {
+ StopEmbeddedEtcdServer(ctx, nb.etcdServer)
+ }
+
+ if nb.kEventClient != nil {
+ nb.kEventClient.Stop(ctx)
+ }
+
+ if nb.kafkaBroker != nil {
+ nb.kafkaBroker.Close()
+ }
+
+ // Stop all grpc clients first
+ nb.oltAdaptersLock.Lock()
+ if nb.oltAdapters != nil {
+ for _, adapterInstances := range nb.oltAdapters {
+ for _, instance := range adapterInstances {
+ instance.StopGrpcClient()
+ }
+ }
+ }
+ nb.oltAdaptersLock.Unlock()
+ nb.onuAdaptersLock.Lock()
+ if nb.onuAdapters != nil {
+ for _, adapterInstances := range nb.onuAdapters {
+ for _, instance := range adapterInstances {
+ instance.StopGrpcClient()
+ }
+ }
+ }
+ nb.onuAdaptersLock.Unlock()
+
+ // Now stop the grpc servers
+ nb.oltAdaptersLock.Lock()
+ defer nb.oltAdaptersLock.Unlock()
+ if nb.oltAdapters != nil {
+ for _, adapterInstances := range nb.oltAdapters {
+ for _, instance := range adapterInstances {
+ instance.Stop()
+ }
+ }
+ }
+
+ nb.onuAdaptersLock.Lock()
+ defer nb.onuAdaptersLock.Unlock()
+ if nb.onuAdapters != nil {
+ for _, adapterInstances := range nb.onuAdapters {
+ for _, instance := range adapterInstances {
+ instance.Stop()
+ }
+ }
+ }
+ if nb.core != nil {
+ nb.core.Stop()
+ }
+}
+
+func (nb *NBTest) verifyLogicalDevices(t *testing.T, oltDevice *voltha.Device, nbi voltha.VolthaServiceClient) {
+ // Get the latest logical device
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ var ld *voltha.LogicalDevice
+ for _, logicalDevice := range logicalDevices.Items {
+ if logicalDevice.RootDeviceId == oltDevice.Id {
+ ld = logicalDevice
+ break
+ }
+ }
+ assert.NotNil(t, ld)
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: ld.Id})
+ assert.Nil(t, err)
+
+ assert.NotEqual(t, "", ld.Id)
+ assert.NotEqual(t, uint64(0), ld.DatapathId)
+ assert.Equal(t, "olt_adapter_mock", ld.Desc.HwDesc)
+ assert.Equal(t, "olt_adapter_mock", ld.Desc.SwDesc)
+ assert.NotEqual(t, "", ld.RootDeviceId)
+ assert.NotEqual(t, "", ld.Desc.SerialNum)
+ assert.Equal(t, uint32(256), ld.SwitchFeatures.NBuffers)
+ assert.Equal(t, uint32(2), ld.SwitchFeatures.NTables)
+ assert.Equal(t, uint32(15), ld.SwitchFeatures.Capabilities)
+ assert.Equal(t, 1+nb.numONUPerOLT, len(ports.Items))
+ assert.Equal(t, oltDevice.ParentId, ld.Id)
+ //Expected port no
+ expectedPortNo := make(map[uint32]bool)
+ expectedPortNo[uint32(2)] = false
+ for i := 0; i < nb.numONUPerOLT; i++ {
+ expectedPortNo[uint32(i+100)] = false
+ }
+ for _, p := range ports.Items {
+ assert.Equal(t, p.OfpPort.PortNo, p.DevicePortNo)
+ assert.Equal(t, uint32(4), p.OfpPort.State)
+ expectedPortNo[p.OfpPort.PortNo] = true
+ if strings.HasPrefix(p.Id, "nni") {
+ assert.Equal(t, true, p.RootPort)
+ //assert.Equal(t, uint32(2), p.OfpPort.PortNo)
+ assert.Equal(t, p.Id, fmt.Sprintf("nni-%d", p.DevicePortNo))
+ } else {
+ assert.Equal(t, p.Id, fmt.Sprintf("uni-%d", p.DevicePortNo))
+ assert.Equal(t, false, p.RootPort)
+ }
+ }
+}
+
+func (nb *NBTest) verifyDevices(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceID string) {
+ // Get the latest set of devices
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, devices)
+
+ // A device is ready to be examined when its ADMIN state is ENABLED and OPERATIONAL state is ACTIVE
+ var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
+ }
+
+ var wg sync.WaitGroup
+ for _, device := range devices.Items {
+ if (device.Root && device.Id != oltDeviceID) || (!device.Root && device.ParentId != oltDeviceID) {
+ continue
+ }
+ wg.Add(1)
+ go func(wg *sync.WaitGroup, device *voltha.Device) {
+ // Wait until the device is in the right state
+ err := waitUntilDeviceReadiness(device.Id, nb.maxTimeout, vFunction, nbi)
+ assert.Nil(t, err)
+
+ // Now, verify the details of the device. First get the latest update
+ d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
+ assert.Nil(t, err)
+ dPorts, err := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: device.Id})
+ assert.Nil(t, err)
+ assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
+ assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
+ assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
+ assert.NotEqual(t, "", d.MacAddress)
+ assert.NotEqual(t, "", d.SerialNumber)
+
+ if d.Type == "olt_adapter_mock" {
+ assert.Equal(t, true, d.Root)
+ assert.NotEqual(t, "", d.Id)
+ assert.NotEqual(t, "", d.ParentId)
+ assert.Nil(t, d.ProxyAddress)
+ } else if d.Type == "onu_adapter_mock" {
+ assert.Equal(t, false, d.Root)
+ assert.NotEqual(t, uint32(0), d.Vlan)
+ assert.NotEqual(t, "", d.Id)
+ assert.NotEqual(t, "", d.ParentId)
+ assert.NotEqual(t, "", d.ProxyAddress.DeviceId)
+ assert.Equal(t, "olt_adapter_mock", d.ProxyAddress.DeviceType)
+ } else {
+ assert.Error(t, errors.New("invalid-device-type"))
+ }
+ assert.Equal(t, 2, len(dPorts.Items))
+ for _, p := range dPorts.Items {
+ assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
+ assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
+ if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
+ assert.Equal(t, 0, len(p.Peers))
+ } else if p.Type == voltha.Port_PON_OLT {
+ assert.Equal(t, nb.numONUPerOLT, len(p.Peers))
+ assert.Equal(t, uint32(1), p.PortNo)
+ } else if p.Type == voltha.Port_PON_ONU {
+ assert.Equal(t, 1, len(p.Peers))
+ assert.Equal(t, uint32(1), p.PortNo)
+ } else {
+ assert.Error(t, errors.New("invalid-port"))
+ }
+ }
+ wg.Done()
+ }(&wg, device)
+ }
+ wg.Wait()
+}
+
+func (nb *NBTest) getChildDevices(parentID string, nbi voltha.VolthaServiceClient) (*voltha.Devices, error) {
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ if err != nil {
+ return nil, err
+ }
+ var childDevice []*voltha.Device
+ for _, d := range devices.Items {
+ if d.Root != true && d.ParentId == parentID {
+ childDevice = append(childDevice, d)
+ }
+ }
+ return &voltha.Devices{Items: childDevice}, nil
+}
+
+func (nb *NBTest) testCoreWithoutData(t *testing.T, nbi voltha.VolthaServiceClient) {
+ lds, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, lds)
+ assert.Equal(t, 0, len(lds.Items))
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, devices)
+ assert.Equal(t, 0, len(devices.Items))
+ adapters, err := nbi.ListAdapters(getContext(), &empty.Empty{})
+ assert.Equal(t, 0, len(adapters.Items))
+ assert.Nil(t, err)
+ assert.NotNil(t, adapters)
+}
+func (nb *NBTest) getNumAdapters() int {
+ totalAdapters := int32(0)
+ for _, aInfo := range onuAdapters {
+ totalAdapters = totalAdapters + aInfo.TotalReplica
+ }
+ for _, aInfo := range oltAdapters {
+ totalAdapters = totalAdapters + aInfo.TotalReplica
+ }
+ return int(totalAdapters)
+}
+
+func (nb *NBTest) testAdapterRegistration(t *testing.T, nbi voltha.VolthaServiceClient) {
+ ctx := context.Background()
+ adapters, err := nbi.ListAdapters(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, adapters)
+ assert.Equal(t, nb.getNumAdapters(), len(adapters.Items))
+ nb.oltAdaptersLock.RLock()
+ defer nb.oltAdaptersLock.RUnlock()
+ nb.onuAdaptersLock.RLock()
+ defer nb.onuAdaptersLock.RUnlock()
+ for _, a := range adapters.Items {
+ if strings.Contains(a.Type, "olt") {
+ _, exist := nb.oltAdapters[a.Type]
+ assert.True(t, exist)
+ assert.True(t, strings.Contains(a.Vendor, "olt-mock-vendor"))
+ } else if strings.Contains(a.Type, "onu") {
+ _, exist := nb.onuAdapters[a.Type]
+ assert.True(t, exist)
+ assert.True(t, strings.Contains(a.Vendor, "onu-mock-vendor"))
+ } else {
+ logger.Fatal(ctx, "unregistered-adapter", a.Id)
+ }
+ }
+ deviceTypes, err := nbi.ListDeviceTypes(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, deviceTypes)
+ assert.Equal(t, len(nb.oltAdapters)+len(nb.onuAdapters), len(deviceTypes.Items))
+ for _, dt := range deviceTypes.Items {
+ if strings.Contains(dt.AdapterType, "olt") {
+ _, exist := nb.oltAdapters[dt.AdapterType]
+ assert.True(t, exist)
+ assert.Equal(t, false, dt.AcceptsBulkFlowUpdate)
+ assert.Equal(t, true, dt.AcceptsAddRemoveFlowUpdates)
+ } else if strings.Contains(dt.AdapterType, "onu") {
+ _, exist := nb.onuAdapters[dt.AdapterType]
+ assert.True(t, exist)
+ assert.Equal(t, false, dt.AcceptsBulkFlowUpdate)
+ assert.Equal(t, true, dt.AcceptsAddRemoveFlowUpdates)
+ } else {
+ logger.Fatal(ctx, "invalid-device-type", dt.Id)
+ }
+ }
+}
+
+func (nb *NBTest) testCreateDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create a valid device
+ aRandomMacAddress := getRandomMacAddress()
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: aRandomMacAddress})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ oltD, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltD)
+ assert.Equal(t, oltDevice.String(), oltD.String())
+
+ // Try to create the same device
+ _, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: aRandomMacAddress})
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "device is already pre-provisioned"))
+
+ // Try to create a device with invalid data
+ _, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType})
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "no-device-info-present; MAC or HOSTIP&PORT"))
+
+ // Ensure we still have the previous device still in the core
+ createDevice, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, createDevice)
+
+ //Remove the device
+ err = cleanUpCreatedDevices(nb.maxTimeout, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) enableDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDevice *voltha.Device) {
+ // Subscribe to the event listener
+ eventCh := nb.changeEventLister.Subscribe((nb.numONUPerOLT + 1) * nb.getNumAdapters())
+ defer nb.changeEventLister.Unsubscribe(eventCh)
+
+ // Enable the oltDevice
+ _, err := nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+ var wg sync.WaitGroup
+ wg.Add(1)
+ subCtx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go nb.monitorLogicalDevices(subCtx, t, nbi, 1, nb.numONUPerOLT, &wg, false, false, oltDevice.Id, eventCh)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Verify that the devices have been setup correctly
+ nb.verifyDevices(t, nbi, oltDevice.Id)
+
+ // Get latest oltDevice data
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Verify that the logical device has been setup correctly
+ nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ // Wait until all flows has been sent to the devices successfully
+ wg.Wait()
+}
+
+func (nb *NBTest) testForceDeletePreProvDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create a valid device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Ensure the device is present
+ device, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, device)
+ assert.Equal(t, oltDevice.String(), device.String())
+
+ //Remove the device forcefully
+ _, err = nbi.ForceDeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ err = waitUntilDeviceIsRemoved(nb.maxTimeout, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testForceDeleteEnabledDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create a valid device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Enable device
+ nb.enableDevice(t, nbi, oltDevice)
+
+ //Remove the device forcefully
+ _, err = nbi.ForceDeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ err = waitUntilDeviceIsRemoved(nb.maxTimeout, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDeletePreProvDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create a valid device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Ensure device is present
+ device, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, device)
+ assert.Equal(t, oltDevice.String(), device.String())
+
+ err = cleanUpCreatedDevice(nb.maxTimeout, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDeleteEnabledDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create a valid device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Enable device
+ nb.enableDevice(t, nbi, oltDevice)
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ var vFunction isConditionSatisfied = func() bool {
+ _, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ if err != nil {
+ return strings.Contains(err.Error(), "NotFound")
+ }
+ return false
+ }
+
+ err = waitUntilCondition(nb.maxTimeout, vFunction)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testForceDeleteDeviceFailure(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create a valid device
+ oltDevice, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Enable the device
+ nb.enableDevice(t, nbi, oltDevice)
+
+ // Set the delete action on the relevant adapter
+ oltAdapter, err := nb.getOLTAdapterInstance(t, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+ oltAdapter.SetDeleteAction(oltDevice.Id, true)
+
+ //Remove the device
+ _, err = nbi.ForceDeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ err = waitUntilDeviceIsRemoved(nb.maxTimeout, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+
+}
+
+func (nb *NBTest) testDeleteDeviceFailure(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create and enable a OLT device for that device type
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Set the delete action to fail device deletion
+ oltAdapter, err := nb.getOLTAdapterInstance(t, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+ oltAdapter.SetDeleteAction(oltDevice.Id, true)
+
+ // Subscribe and wait asynchronously on the kafka message bus for a delete failure event
+ ch := make(chan int, 1)
+ eventTopic := &kafka.Topic{Name: nb.config.EventTopic}
+ eventChnl, err := nb.kEventClient.Subscribe(getContext(), eventTopic)
+ assert.Nil(t, err)
+ defer func() {
+ if eventChnl != nil {
+ err = nb.kEventClient.UnSubscribe(getContext(), eventTopic, eventChnl)
+ assert.Nil(t, err)
+ }
+ }()
+ go func() {
+ timer := time.NewTimer(nb.internalTimeout)
+ defer timer.Stop()
+ loop:
+ for {
+ select {
+ case event := <-eventChnl:
+ if evnt, ok := event.(*voltha.Event); ok {
+ rpcEvent := evnt.GetRpcEvent()
+ if rpcEvent != nil && rpcEvent.ResourceId == oltDevice.Id && rpcEvent.Rpc == "DeleteDevice" {
+ ch <- 1
+ break loop
+ }
+ }
+ case <-timer.C:
+ ch <- 0
+ break loop
+ }
+ }
+ }()
+
+ //Now remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the delete event
+ event := <-ch
+ assert.Equal(t, 1, event)
+
+ // Set the delete action to pass device deletion
+ oltAdapter.SetDeleteAction(oltDevice.Id, false)
+
+ // Now Force Delete this device - needs to be done in a verification function because if the
+ // previous failed delete action was not complete then a force delete will return an error
+ var forceDeleteComplete isConditionSatisfied = func() bool {
+ _, err := nbi.ForceDeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ return err != nil
+
+ }
+ err = waitUntilCondition(nb.maxTimeout, forceDeleteComplete)
+ assert.Nil(t, err)
+
+ // Wait until device is gone
+ var deviceDeleteComplete isConditionSatisfied = func() bool {
+ _, err := nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ if err != nil {
+ return strings.Contains(err.Error(), "NotFound")
+ }
+ return false
+ }
+
+ err = waitUntilCondition(nb.maxTimeout, deviceDeleteComplete)
+ assert.Nil(t, err)
+}
+
+// createAndEnableOLTDevice creates and enables an OLT device. If there is a connection error (e.g. core communication is
+// not fully ready or the relevant adapter has not been registered yet) then it will automatically retry on failure.
+func (nb *NBTest) createAndEnableOLTDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) (*voltha.Device, error) {
+ var oltDevice *voltha.Device
+ var err error
+ var enableDeviceWithRetry isConditionSatisfied = func() bool {
+ // Create device
+ oltDevice, err = nbi.CreateDevice(getContext(), &voltha.Device{Type: oltDeviceType, MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Verify oltDevice exist in the core
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ exist := false
+ for _, d := range devices.Items {
+ if d.Id == oltDevice.Id {
+ exist = true
+ break
+ }
+ }
+ assert.True(t, true, exist)
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ if err == nil {
+ return true
+ }
+ _, _ = nbi.ForceDeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ return false
+ }
+ err = waitUntilCondition(nb.maxTimeout, enableDeviceWithRetry)
+ assert.Nil(t, err)
+
+ // Wait for device to be fully enabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED &&
+ device.OperStatus == voltha.OperStatus_ACTIVE &&
+ device.ConnectStatus == voltha.ConnectStatus_REACHABLE
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Wait until all relevant ONUs are enabled and ready
+ numOnuPerOlt := cm.GetNumONUPerOLT()
+ var onusReady isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ if devices == nil || len(devices.Items) < numOnuPerOlt+1 {
+ return false
+ }
+ count := 0
+ for _, d := range devices.Items {
+ if !d.Root && d.ParentId == oltDevice.Id {
+ if d.AdminState == voltha.AdminState_ENABLED &&
+ d.OperStatus == voltha.OperStatus_ACTIVE &&
+ d.ConnectStatus == voltha.ConnectStatus_REACHABLE {
+ count = count + 1
+ }
+ }
+ }
+ return count >= numOnuPerOlt
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, onusReady)
+ assert.Nil(t, err)
+
+ return oltDevice, err
+}
+
+func (nb *NBTest) testEnableDeviceFailed(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ //Create a device that has no adapter registered
+ macAddress := getRandomMacAddress()
+ oltDeviceNoAdapter, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: "noAdapterRegistered", MacAddress: macAddress})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDeviceNoAdapter)
+
+ // Try to enable the oltDevice and check the error message
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDeviceNoAdapter.Id})
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "adapter-not-registered-for-device-type noAdapterRegistered"))
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDeviceNoAdapter.Id})
+ assert.Nil(t, err)
+
+ // Wait until device is removed
+ err = waitUntilDeviceIsRemoved(nb.maxTimeout, nbi, oltDeviceNoAdapter.Id)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testEnableDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Subscribe to the event listener
+ eventCh := nb.changeEventLister.Subscribe((nb.numONUPerOLT + 1) * nb.getNumAdapters())
+
+ defer nb.changeEventLister.Unsubscribe(eventCh)
+
+ // Create and enable a OLT device for that device type
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ //Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+ var wg sync.WaitGroup
+ wg.Add(1)
+ subCtx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go nb.monitorLogicalDevices(subCtx, t, nbi, 1, nb.numONUPerOLT, &wg, false, false, oltDevice.Id, eventCh)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Verify that the devices have been setup correctly
+ nb.verifyDevices(t, nbi, oltDevice.Id)
+
+ // Get latest oltDevice data
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Verify that the logical device has been setup correctly
+ nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ //Wait until all flows has been sent to the devices successfully
+ wg.Wait()
+
+ // log.SetAllLogLevel(log.DebugLevel)
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, false)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDisableAndReEnableRootDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Create and enable an OLT device
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Wait until all ONU devices have been created and enabled
+
+ // Disable the oltDevice
+ _, err = nbi.DisableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the old device to be disabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_DISABLED && device.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Verify that all onu devices are disabled as well
+ onuDevices, err := nb.getChildDevices(oltDevice.Id, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Greater(t, len(onuDevices.Items), 0)
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
+ if (lp.OfpPort.Config&uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Reenable the oltDevice
+ _, err = nbi.EnableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the old device to be enabled
+ vdFunction = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_ENABLED && device.OperStatus == voltha.OperStatus_ACTIVE
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Verify that all onu devices are enabled as well
+ onuDevices, err = nb.getChildDevices(oltDevice.Id, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Greater(t, len(onuDevices.Items), 0)
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Wait for the logical device to satisfy the expected condition
+ vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDisableAndDeleteAllDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ //Get an OLT device
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Disable the oltDevice
+ _, err = nbi.DisableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the olt device to be disabled
+ var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_DISABLED && device.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+
+ // Verify that all onu devices are disabled as well (previous test may have removed all ONUs)
+ onuDevices, err := nb.getChildDevices(oltDevice.Id, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.GreaterOrEqual(t, len(onuDevices.Items), 0)
+ for _, onu := range onuDevices.Items {
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Delete the oltDevice
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Verify all devices relevant to the OLT device are gone
+ var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ if (devices == nil) || len(devices.Items) == 0 {
+ return true
+ }
+ for _, d := range devices.Items {
+ if (d.Root && d.Id == oltDevice.Id) || (!d.Root && d.ParentId == oltDevice.Id) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.Nil(t, err)
+
+ // Wait for absence of logical device
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ if (lds == nil) || (len(lds.Items) == 0) {
+ return true
+ }
+ for _, ld := range lds.Items {
+ if ld.RootDeviceId == oltDevice.Id {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testEnableAndDeleteAllDevice(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ //Create/Enable an OLT device
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ //Get all child devices
+ onuDevices, err := nb.getChildDevices(oltDevice.Id, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Greater(t, len(onuDevices.Items), 0)
+
+ // Wait for each onu device to get deleted
+ var vdFunc isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device == nil
+ }
+
+ // Delete the onuDevice
+ for _, onu := range onuDevices.Items {
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: onu.Id})
+ assert.Nil(t, err)
+ err = waitUntilDeviceReadiness(onu.Id, nb.maxTimeout, vdFunc, nbi)
+ assert.Nil(t, err)
+ }
+
+ // Disable the oltDevice
+ _, err = nbi.DisableDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Wait for the olt device to be disabled
+ var vFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
+ return device.AdminState == voltha.AdminState_DISABLED && device.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vFunction, nbi)
+ assert.Nil(t, err)
+
+ // Delete the oltDevice
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Cleanup
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDisableAndEnablePort(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ //Create an OLT device
+ var cp *voltha.Port
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ oltPorts, err := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ for _, cp = range oltPorts.Items {
+ if cp.Type == voltha.Port_PON_OLT {
+ break
+ }
+
+ }
+ assert.NotNil(t, cp)
+ cp.DeviceId = oltDevice.Id
+
+ // Disable the NW Port of oltDevice
+ _, err = nbi.DisablePort(getContext(), cp)
+ assert.Nil(t, err)
+ // Wait for the olt device Port to be disabled
+ var vdFunction isDevicePortsConditionSatisfied = func(ports *voltha.Ports) bool {
+ for _, port := range ports.Items {
+ if port.PortNo == cp.PortNo {
+ return port.AdminState == voltha.AdminState_DISABLED
+ }
+ }
+ return false
+ }
+ err = waitUntilDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Enable the NW Port of oltDevice
+ _, err = nbi.EnablePort(getContext(), cp)
+ assert.Nil(t, err)
+
+ // Wait for the olt device Port to be enabled
+ vdFunction = func(ports *voltha.Ports) bool {
+ for _, port := range ports.Items {
+ if port.PortNo == cp.PortNo {
+ return port.AdminState == voltha.AdminState_ENABLED
+ }
+ }
+ return false
+ }
+ err = waitUntilDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ assert.Nil(t, err)
+ // Wait for the logical device to satisfy the expected condition
+ vlFunction = func(ports []*voltha.LogicalPort) bool {
+ for _, lp := range ports {
+ if (lp.OfpPort.Config&^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) != lp.OfpPort.Config) ||
+ lp.OfpPort.State != uint32(ofp.OfpPortState_OFPPS_LIVE) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ // Disable a non-PON port
+ for _, cp = range oltPorts.Items {
+ if cp.Type != voltha.Port_PON_OLT {
+ break
+ }
+
+ }
+ assert.NotNil(t, cp)
+ cp.DeviceId = oltDevice.Id
+
+ // Disable the NW Port of oltDevice
+ _, err = nbi.DisablePort(getContext(), cp)
+ assert.NotNil(t, err)
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testDeviceRebootWhenOltIsEnabled(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ //Create an OLT device
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Reboot the OLT and very that Connection Status goes to UNREACHABLE and operation status to UNKNOWN
+ _, err = nbi.RebootDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ var vlFunction0 = func(d *voltha.Device) bool {
+ return d.ConnectStatus == voltha.ConnectStatus_UNREACHABLE && d.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction0, nbi)
+ assert.Nil(t, err)
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction1 = func(ld *voltha.LogicalDevice) bool {
+ return ld == nil
+ }
+
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction1)
+ assert.Nil(t, err)
+
+ // Wait for the device to satisfy the expected condition (device does not have flows)
+ var vlFunction2 = func(d *voltha.Device) bool {
+ var deviceFlows *ofp.Flows
+ var err error
+ if deviceFlows, err = nbi.ListDeviceFlows(getContext(), &voltha.ID{Id: d.Id}); err != nil {
+ return false
+ }
+ return len(deviceFlows.Items) == 0
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction2, nbi)
+ assert.Nil(t, err)
+
+ // Wait for the device to satisfy the expected condition (there are no child devices)
+ var vlFunction3 isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ if (devices == nil) || (len(devices.Items) == 0) {
+ return false
+ }
+ for _, d := range devices.Items {
+ if !d.Root && d.ParentId == oltDevice.Id {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vlFunction3)
+ assert.Nil(t, err)
+
+ // Update the OLT Connection Status to REACHABLE and operation status to ACTIVE
+ // Normally, in a real adapter this happens after connection regain via a heartbeat mechanism with real hardware
+ oltAdapter, err := nb.getOLTAdapterInstance(t, nbi, oltDevice.Id)
+ assert.Nil(t, err)
+ oltAdapter.SetDeviceActive(oltDevice.Id)
+
+ // Verify the device connection and operation states
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+ assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction4 = func(ld *voltha.LogicalDevice) bool {
+ return ld != nil
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction4)
+ assert.Nil(t, err)
+
+ // Verify that we have no ONUs
+ onuDevices, err := nb.getChildDevices(oltDevice.Id, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Equal(t, 0, len(onuDevices.Items))
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testStartOmciTestAction(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // -----------------------------------------------------------------------
+ // SubTest 1: Omci test action should fail due to nonexistent device id
+
+ request := &voltha.OmciTestRequest{Id: "123", Uuid: "456"}
+ _, err := nbi.StartOmciTestAction(getContext(), request)
+ assert.NotNil(t, err)
+ assert.Equal(t, "rpc error: code = NotFound desc = 123", err.Error())
+
+ // -----------------------------------------------------------------------
+ // SubTest 2: Error should be returned for device with no adapter registered
+
+ // Create a device that has no adapter registered
+ deviceNoAdapter, err := nbi.CreateDevice(getContext(), &voltha.Device{Type: "noAdapterRegisteredOmciTest", MacAddress: getRandomMacAddress()})
+ assert.Nil(t, err)
+ assert.NotNil(t, deviceNoAdapter)
+
+ // Omci test action should fail due to nonexistent adapter
+ request = &voltha.OmciTestRequest{Id: deviceNoAdapter.Id, Uuid: "456"}
+ _, err = nbi.StartOmciTestAction(getContext(), request)
+ assert.NotNil(t, err)
+ assert.True(t, strings.Contains(err.Error(), "noAdapterRegisteredOmciTest"))
+
+ //Remove the device
+ _, err = nbi.DeleteDevice(getContext(), &voltha.ID{Id: deviceNoAdapter.Id})
+ assert.Nil(t, err)
+
+ //Ensure there are no devices in the Core now - wait until condition satisfied or timeout
+ var vFunction isDevicesConditionSatisfied = func(devices *voltha.Devices) bool {
+ if (devices == nil) || (len(devices.Items) == 0) {
+ return true
+ }
+ for _, d := range devices.Items {
+ if (d.Root && d.Id == deviceNoAdapter.Id) || (!d.Root && d.ParentId == deviceNoAdapter.Id) {
+ return false
+ }
+ }
+ return true
+ }
+ err = waitUntilConditionForDevices(nb.maxTimeout, nbi, vFunction)
+ assert.Nil(t, err)
+
+ // -----------------------------------------------------------------------
+ // SubTest 3: Omci test action should succeed on valid ONU
+
+ // Create and enable device with valid data
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ onuDevices, err := nb.getChildDevices(oltDevice.Id, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Greater(t, len(onuDevices.Items), 0)
+
+ onuDevice := onuDevices.Items[0]
+
+ // Omci test action should succeed
+ request = &voltha.OmciTestRequest{Id: onuDevice.Id, Uuid: "456"}
+ resp, err := nbi.StartOmciTestAction(getContext(), request)
+ assert.Nil(t, err)
+ assert.Equal(t, resp.Result, voltha.TestResponse_SUCCESS)
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, false)
+ assert.Nil(t, err)
+}
+
+func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
+ matchFields := make([]*ofp.OfpOxmField, 0)
+ for _, val := range fa.MatchFields {
+ matchFields = append(matchFields, &ofp.OfpOxmField{Field: &ofp.OfpOxmField_OfbField{OfbField: val}})
+ }
+ return flows.MkSimpleFlowMod(matchFields, fa.Actions, fa.Command, fa.KV)
+}
+
+func createMetadata(cTag int, techProfile int, port int) uint64 {
+ md := 0
+ md = (md | (cTag & 0xFFFF)) << 16
+ md = (md | (techProfile & 0xFFFF)) << 32
+ return uint64(md | (port & 0xFFFFFFFF))
+}
+
+func (nb *NBTest) verifyLogicalDeviceFlowCount(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceID string, numNNIPorts int, numUNIPorts int, flowAddFail bool) {
+ expectedNumFlows := numNNIPorts*3 + numNNIPorts*numUNIPorts
+ if flowAddFail {
+ expectedNumFlows = 0
+ }
+
+ // Wait for logical device to have the flows (or none
+ var vlFunction isLogicalDeviceConditionSatisfied = func(ld *voltha.LogicalDevice) bool {
+ f, _ := nbi.ListLogicalDeviceFlows(getContext(), &voltha.ID{Id: ld.Id})
+ return f != nil && len(f.Items) == expectedNumFlows
+ }
+ // No timeout implies a success
+ err := waitUntilLogicalDeviceReadiness(oltDeviceID, nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) sendTrapFlows(t *testing.T, nbi voltha.VolthaServiceClient, logicalDeviceID string, ports []*voltha.LogicalPort, meterID uint64, startingVlan int) (numNNIPorts, numUNIPorts int) {
+ // Send flows for the parent device
+ var nniPorts []*voltha.LogicalPort
+ var uniPorts []*voltha.LogicalPort
+ for _, p := range ports {
+ if p.RootPort {
+ nniPorts = append(nniPorts, p)
+ } else {
+ uniPorts = append(uniPorts, p)
+ }
+ }
+ assert.Equal(t, 1, len(nniPorts))
+ //assert.Greater(t, len(uniPorts), 1 )
+ nniPort := nniPorts[0].OfpPort.PortNo
+ maxInt32 := uint64(0xFFFFFFFF)
+ controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+ var fa *flows.FlowArgs
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(35020),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowLLDP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+ _, err := nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowLLDP)
+ assert.Nil(t, err)
+
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(2048),
+ flows.IpProto(17),
+ flows.UdpSrc(67),
+ flows.UdpDst(68),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowIPV4 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV4)
+ assert.Nil(t, err)
+
+ fa = &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(nniPort),
+ flows.EthType(34525),
+ flows.IpProto(17),
+ flows.UdpSrc(546),
+ flows.UdpDst(547),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowIPV6 := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowIPV6)
+ assert.Nil(t, err)
+
+ return len(nniPorts), len(uniPorts)
+}
+
+func (nb *NBTest) sendEAPFlows(t *testing.T, nbi voltha.VolthaServiceClient, logicalDeviceID string, port *ofp.OfpPort, vlan int, meterID uint64) {
+ maxInt32 := uint64(0xFFFFFFFF)
+ controllerPortMask := uint32(4294967293) // will result in 4294967293&0x7fffffff => 2147483645 which is the actual controller port
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 10000, "buffer_id": maxInt32, "out_port": maxInt32, "out_group": maxInt32, "flags": 1, "write_metadata": createMetadata(vlan, 64, 0), "meter_id": meterID},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(port.PortNo),
+ flows.EthType(34958),
+ flows.VlanVid(8187),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(controllerPortMask),
+ },
+ }
+ flowEAP := ofp.FlowTableUpdate{FlowMod: makeSimpleFlowMod(fa), Id: logicalDeviceID}
+ maxTries := 3
+ var err error
+ for {
+ if _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &flowEAP); err == nil {
+ if maxTries < 3 {
+ t.Log("Re-sending EAPOL flow succeeded for port:", port)
+ }
+ break
+ }
+ t.Log("Sending EAPOL flows fail:", err)
+ time.Sleep(50 * time.Millisecond)
+ maxTries--
+ if maxTries == 0 {
+ break
+ }
+ }
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) receiveChangeEvents(ctx context.Context, nbi voltha.VolthaServiceClient, ch chan *ofp.ChangeEvent) {
+ opt := grpc.EmptyCallOption{}
+ streamCtx, streamDone := context.WithCancel(log.WithSpanFromContext(context.Background(), ctx))
+ defer streamDone()
+ stream, err := nbi.ReceiveChangeEvents(streamCtx, &empty.Empty{}, opt)
+ if err != nil {
+ logger.Errorw(ctx, "cannot-establish-receive-change-events", log.Fields{"error": err})
+ return
+ }
+
+ for {
+ ce, err := stream.Recv()
+ if err == nil {
+ ch <- ce
+ continue
+ }
+ if err == io.EOF || strings.Contains(err.Error(), "Unavailable") {
+ logger.Debug(context.Background(), "receive-events-stream-closing")
+ } else {
+ logger.Errorw(ctx, "error-receiving-change-event", log.Fields{"error": err})
+ }
+ return
+ }
+}
+
+func (nb *NBTest) getOLTAdapterInstance(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceID string) (*cm.OLTAdapter, error) {
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ nb.oltAdaptersLock.RLock()
+ defer nb.oltAdaptersLock.RUnlock()
+ for _, d := range devices.Items {
+ if d.Id == oltDeviceID {
+ for _, oltAdapters := range nb.oltAdapters {
+ for _, oAdapter := range oltAdapters {
+ if oAdapter.Adapter.GetEndPoint() == d.AdapterEndpoint {
+ return oAdapter, nil
+ }
+ }
+ }
+ }
+ }
+ return nil, fmt.Errorf("no-adapter-for-%s", oltDeviceID)
+}
+
+func (nb *NBTest) getAdapterInstancesWithDeviceIds(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceID string) (*cm.OLTAdapter, map[string]*cm.ONUAdapter, []string, error) {
+ var oltAdapter *cm.OLTAdapter
+ onuAdapters := make(map[string]*cm.ONUAdapter)
+ devices, err := nbi.ListDevices(getContext(), &empty.Empty{})
+ onuDeviceIDs := make([]string, 0)
+ assert.Nil(t, err)
+ oltAdapterFound := false
+ nb.oltAdaptersLock.RLock()
+ defer nb.oltAdaptersLock.RUnlock()
+ nb.onuAdaptersLock.RLock()
+ defer nb.onuAdaptersLock.RUnlock()
+ for _, d := range devices.Items {
+ if !oltAdapterFound && d.Id == oltDeviceID {
+ for _, oltAdapters := range nb.oltAdapters {
+ for _, oAdapter := range oltAdapters {
+ if oAdapter.Adapter.GetEndPoint() == d.AdapterEndpoint {
+ oltAdapter = oAdapter
+ oltAdapterFound = true
+ }
+ }
+ }
+ }
+ // We can have multiple ONU adapters managing the ONU devices off an OLT
+ if !d.Root && d.ParentId == oltDeviceID {
+ onuDeviceIDs = append(onuDeviceIDs, d.Id)
+ for _, adapters := range nb.onuAdapters {
+ for _, oAdapter := range adapters {
+ if oAdapter.Adapter.GetEndPoint() == d.AdapterEndpoint {
+ onuAdapters[d.AdapterEndpoint] = oAdapter
+ }
+ }
+ }
+ }
+ }
+ if len(onuAdapters) > 0 && oltAdapter != nil && len(onuDeviceIDs) > 0 {
+ return oltAdapter, onuAdapters, onuDeviceIDs, nil
+ }
+ return nil, nil, nil, fmt.Errorf("no-adapter-for-%s", oltDeviceID)
+}
+
+func (nb *NBTest) monitorLogicalDevices(
+ ctx context.Context,
+ t *testing.T,
+ nbi voltha.VolthaServiceClient,
+ numNNIPorts int,
+ numUNIPorts int,
+ wg *sync.WaitGroup,
+ flowAddFail bool,
+ flowDeleteFail bool,
+ oltID string,
+ eventCh chan *ofp.ChangeEvent) {
+
+ defer wg.Done()
+
+ // Wait until a logical device is ready
+ var vlFunction isLogicalDevicesConditionSatisfied = func(lds *voltha.LogicalDevices) bool {
+ if lds == nil || len(lds.Items) == 0 {
+ return false
+ }
+ // Ensure there are both NNI ports and at least one UNI port on the logical devices discovered
+ for _, ld := range lds.Items {
+ if ld.RootDeviceId != oltID {
+ continue
+ }
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: ld.Id})
+ if err != nil {
+ return false
+ }
+ return len(ports.Items) == numNNIPorts+numUNIPorts // wait until all logical ports are created
+ }
+ return false
+ }
+ err := waitUntilConditionForLogicalDevices(nb.maxTimeout, nbi, vlFunction)
+ assert.Nil(t, err)
+
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ var logicalDevice *voltha.LogicalDevice
+ for _, ld := range logicalDevices.Items {
+ if ld.RootDeviceId == oltID {
+ logicalDevice = ld
+ break
+ }
+ }
+ assert.NotNil(t, logicalDevice)
+ logicalDeviceID := logicalDevice.Id
+
+ // Figure out the olt and onuAdapter being used by that logicalDeviceld\DeviceId
+ // Clear any existing flows on these adapters
+ oltAdapter, onuAdapters, onuDeviceIDs, err := nb.getAdapterInstancesWithDeviceIds(t, nbi, oltID)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltAdapter)
+ assert.Greater(t, len(onuAdapters), 0)
+
+ // Clear flows for that olt device and set the flow action
+ oltAdapter.RemoveDevice(oltID)
+ oltAdapter.SetFlowAction(oltID, flowAddFail, flowDeleteFail)
+
+ // Clear flows for the onu devices and set the flow action
+ for _, a := range onuAdapters {
+ for _, id := range onuDeviceIDs {
+ a.RemoveDevice(id)
+ a.SetFlowAction(id, flowAddFail, flowDeleteFail)
+ }
+ }
+
+ meterID := rand.Uint32()
+
+ // Add a meter to the logical device
+ meterMod := &ofp.OfpMeterMod{
+ Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+ Flags: rand.Uint32(),
+ MeterId: meterID,
+ Bands: []*ofp.OfpMeterBandHeader{
+ {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+ Rate: rand.Uint32(),
+ BurstSize: rand.Uint32(),
+ Data: nil,
+ },
+ },
+ }
+ _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{Id: logicalDeviceID, MeterMod: meterMod})
+ assert.Nil(t, err)
+
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &voltha.ID{Id: logicalDeviceID})
+ assert.Nil(t, err)
+
+ // Send initial set of Trap flows
+ startingVlan := 4091
+ nb.sendTrapFlows(t, nbi, logicalDeviceID, ports.Items, uint64(meterID), startingVlan)
+
+ //Listen for port events
+ processedNniLogicalPorts := 0
+ processedUniLogicalPorts := 0
+
+ for event := range eventCh {
+ if event.Id != logicalDeviceID {
+ continue
+ }
+ startingVlan++
+ if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
+ ps := portStatus.PortStatus
+ if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
+ if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+ processedUniLogicalPorts++
+ nb.sendEAPFlows(t, nbi, logicalDeviceID, ps.Desc, startingVlan, uint64(meterID))
+ } else {
+ processedNniLogicalPorts++
+ }
+ }
+ }
+
+ if processedNniLogicalPorts >= numNNIPorts && processedUniLogicalPorts >= numUNIPorts {
+ break
+ }
+ }
+
+ //Verify the flow count on the logical device
+ nb.verifyLogicalDeviceFlowCount(t, nbi, oltID, numNNIPorts, numUNIPorts, flowAddFail)
+
+ // Wait until all flows have been sent to the OLT adapters (or all failed)
+ expectedFlowCount := (numNNIPorts * 3) + numNNIPorts*numUNIPorts
+ if flowAddFail {
+ expectedFlowCount = 0
+ }
+ var oltVFunc isConditionSatisfied = func() bool {
+ return oltAdapter.GetFlowCount(oltID) >= expectedFlowCount
+ }
+ err = waitUntilCondition(nb.maxTimeout, oltVFunc)
+ assert.Nil(t, err)
+
+ // Wait until all flows have been sent to the ONU adapters (or all failed)
+ expectedFlowCount = numUNIPorts
+ if flowAddFail {
+ expectedFlowCount = 0
+ }
+ var onuVFunc isConditionSatisfied = func() bool {
+ count := 0
+ for _, a := range onuAdapters {
+ for _, id := range onuDeviceIDs {
+ count = count + a.GetFlowCount(id)
+ }
+ }
+ return count == expectedFlowCount
+ }
+ err = waitUntilCondition(nb.maxTimeout, onuVFunc)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testFlowAddFailure(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+ // Subscribe to the event listener
+ eventCh := nb.changeEventLister.Subscribe((nb.numONUPerOLT + 1) * nb.getNumAdapters())
+
+ defer nb.changeEventLister.Unsubscribe(eventCh)
+
+ // Create and enable device with valid data
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Create a logical device monitor will automatically send trap and eapol flows to the devices being enables
+ var wg sync.WaitGroup
+ wg.Add(1)
+ subCtx, cancel := context.WithCancel(context.Background())
+ defer cancel()
+ go nb.monitorLogicalDevices(subCtx, t, nbi, 1, nb.numONUPerOLT, &wg, true, false, oltDevice.Id, eventCh)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Verify that the devices have been setup correctly
+ nb.verifyDevices(t, nbi, oltDevice.Id)
+
+ // Get latest oltDevice data
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ // Verify that the logical device has been setup correctly
+ nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ // Wait until all flows has been sent to the devices successfully
+ wg.Wait()
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func (nb *NBTest) testMPLSFlowsAddition(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string) {
+
+ // Create and enable device with valid data
+ oltDevice, err := nb.createAndEnableOLTDevice(t, nbi, oltDeviceType)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+
+ // Wait for the logical device to be in the ready state
+ var vldFunction = func(ports []*voltha.LogicalPort) bool {
+ return len(ports) == nb.numONUPerOLT+1
+ }
+ err = waitUntilLogicalDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, nbi, vldFunction)
+ assert.Nil(t, err)
+
+ // Get latest oltDevice data
+ oltDevice, err = nbi.GetDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ testLogger.Infow(getContext(), "olt-device-created-and-verified", log.Fields{"device-id": oltDevice.GetId()})
+
+ // Verify that the logical device has been setup correctly
+ nb.verifyLogicalDevices(t, oltDevice, nbi)
+
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.NoError(t, err)
+
+ testLogger.Infow(getContext(), "list-logical-devices", log.Fields{"logical-device": logicalDevices.GetItems()[0]})
+ // Add a meter to the logical device, which the flow can refer to
+ meterMod := &ofp.OfpMeterMod{
+ Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+ Flags: rand.Uint32(),
+ MeterId: 1,
+ Bands: []*ofp.OfpMeterBandHeader{
+ {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+ Rate: rand.Uint32(),
+ BurstSize: rand.Uint32(),
+ Data: nil,
+ },
+ },
+ }
+ _, err = nbi.UpdateLogicalDeviceMeterTable(getContext(), &ofp.MeterModUpdate{
+ Id: logicalDevices.GetItems()[0].GetId(),
+ MeterMod: meterMod,
+ })
+ assert.NoError(t, err)
+
+ meters, err := nbi.ListLogicalDeviceMeters(getContext(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+ assert.NoError(t, err)
+
+ for _, item := range meters.GetItems() {
+ testLogger.Infow(getContext(), "list-logical-device-meters", log.Fields{"meter-config": item.GetConfig()})
+ }
+
+ logicalPorts, err := nbi.ListLogicalDevicePorts(context.Background(), &voltha.ID{Id: logicalDevices.GetItems()[0].GetId()})
+ assert.NoError(t, err)
+ m := jsonpb.Marshaler{}
+ logicalPortsJson, err := m.MarshalToString(logicalPorts)
+ assert.NoError(t, err)
+
+ testLogger.Infow(getContext(), "list-logical-ports", log.Fields{"ports": logicalPortsJson})
+
+ callables := []func() *ofp.OfpFlowMod{getOnuUpstreamRules, getOltUpstreamRules, getOLTDownstreamMplsSingleTagRules,
+ getOLTDownstreamMplsDoubleTagRules, getOLTDownstreamRules, getOnuDownstreamRules}
+
+ for _, callable := range callables {
+ _, err = nbi.UpdateLogicalDeviceFlowTable(getContext(), &ofp.FlowTableUpdate{Id: logicalDevices.GetItems()[0].GetId(), FlowMod: callable()})
+ assert.NoError(t, err)
+ }
+
+ //Remove the device
+ err = cleanUpDevices(nb.maxTimeout, nbi, oltDevice.Id, true)
+ assert.Nil(t, err)
+}
+
+func getOnuUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100100000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(103),
+ flows.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{},
+ }
+
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 0
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "onu-upstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOltUpstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1, "meter_id": 1, "write_metadata": 4100000000},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(103),
+ flows.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.PushVlan(0x8100),
+ flows.SetField(flows.VlanVid(2)),
+ flows.SetField(flows.EthSrc(1111)),
+ flows.SetField(flows.EthDst(2222)),
+ flows.PushVlan(0x8847),
+ flows.SetField(flows.MplsLabel(100)),
+ flows.SetField(flows.MplsBos(1)),
+ flows.PushVlan(0x8847),
+ flows.SetField(flows.MplsLabel(200)),
+ flows.MplsTtl(64),
+ flows.Output(2),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 1
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-upstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOLTDownstreamMplsSingleTagRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.Metadata_ofp((1000 << 32) | 1),
+ flows.EthType(0x8847),
+ flows.MplsBos(1),
+ flows.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ flows.PopMpls(0x8847),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 0
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-mpls-downstream-single-tag-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOLTDownstreamMplsDoubleTagRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.EthType(0x8847),
+ flows.EthSrc(2222),
+ },
+ Actions: []*ofp.OfpAction{
+ {Type: ofp.OfpActionType_OFPAT_DEC_MPLS_TTL, Action: &ofp.OfpAction_MplsTtl{MplsTtl: &ofp.OfpActionMplsTtl{MplsTtl: 62}}},
+ flows.PopMpls(0x8847),
+ flows.PopMpls(0x8847),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 0
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-mpls-downstream-double-tagged-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOLTDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "table_id": 2, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.VlanVid(2),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.PopVlan(),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 1
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "olt-downstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func getOnuDownstreamRules() (flowMod *ofp.OfpFlowMod) {
+ fa := &flows.FlowArgs{
+ KV: flows.OfpFlowModArgs{"priority": 1000, "meter_id": 1},
+ MatchFields: []*ofp.OfpOxmOfbField{
+ flows.InPort(2),
+ flows.Metadata_ofp((1000 << 32) | 1),
+ flows.VlanVid(4096),
+ },
+ Actions: []*ofp.OfpAction{
+ flows.Output(103),
+ },
+ }
+ flowMod = makeSimpleFlowMod(fa)
+ flowMod.TableId = 2
+ m := jsonpb.Marshaler{}
+ flowModJson, _ := m.MarshalToString(flowMod)
+ testLogger.Infow(getContext(), "onu-downstream-flow", log.Fields{"flow-mod": flowModJson})
+ return
+}
+
+func (nb *NBTest) runTestSuite(t *testing.T, nbi voltha.VolthaServiceClient, oltDeviceType string, testWg *sync.WaitGroup) {
+ defer testWg.Done()
+
+ // Test create device
+ nb.testCreateDevice(t, nbi, oltDeviceType)
+
+ //Test Delete Device Scenarios
+ nb.testForceDeletePreProvDevice(t, nbi, oltDeviceType)
+ nb.testDeletePreProvDevice(t, nbi, oltDeviceType)
+ nb.testForceDeleteEnabledDevice(t, nbi, oltDeviceType)
+ nb.testDeleteEnabledDevice(t, nbi, oltDeviceType)
+ nb.testForceDeleteDeviceFailure(t, nbi, oltDeviceType)
+ nb.testDeleteDeviceFailure(t, nbi, oltDeviceType)
+
+ ////Test failed enable device
+ nb.testEnableDeviceFailed(t, nbi, oltDeviceType)
+
+ //Test Enable a device
+ nb.testEnableDevice(t, nbi, oltDeviceType)
+
+ //Test disable and ReEnable a root device
+ nb.testDisableAndReEnableRootDevice(t, nbi, oltDeviceType)
+
+ // Test disable and Enable pon port of OLT device
+ nb.testDisableAndEnablePort(t, nbi, oltDeviceType)
+
+ // Test Device unreachable when OLT is enabled
+ nb.testDeviceRebootWhenOltIsEnabled(t, nbi, oltDeviceType)
+
+ // Test disable and delete all devices
+ nb.testDisableAndDeleteAllDevice(t, nbi, oltDeviceType)
+
+ // Test enable and delete all devices
+ nb.testEnableAndDeleteAllDevice(t, nbi, oltDeviceType)
+
+ // Test omci test
+ nb.testStartOmciTestAction(t, nbi, oltDeviceType)
+
+ // Test flow add failure
+ nb.testFlowAddFailure(t, nbi, oltDeviceType)
+
+ // Test MPLS flows addition where:
+ /*
+ Upstream
+ ONU
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[],
+ transition=TABLE:1, meter=METER:1, metadata=METADATA:4100010000/0]
+ OLT
+ ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:32, VLAN_VID:ANY], treatment=[immediate=[VLAN_PUSH:vlan,
+ VLAN_ID:2, MPLS_PUSH:mpls_unicast, MPLS_LABEL:YYY,MPLS_BOS:true, MPLS_PUSH:mpls_unicast ,MPLS_LABEL:XXX, MPLS_BOS:false,
+ EXTENSION:of:0000000000000227/VolthaPushL2Header{}, ETH_SRC:OLT_MAC, ETH_DST:LEAF_MAC, TTL:64, OUTPUT:65536],
+ meter=METER:1, metadata=METADATA:4100000000/0]
+
+ Downstream
+ OLT
+ //Below flow rule to pop L2 Ethernet headers from packets which have a single MPLS label
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:true, ETH_SRC:LEAF_MAC],
+ treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, EXTENSION:of:0000000000000227/VolthaPopL2Header{},
+ transition=TABLE:1]
+
+ //Below flow rule to pop L2 Ethernet headers from packets which have two MPLS label
+ ADDED, bytes=0, packets=0, table=0, priority=1000, selector=[IN_PORT:65536, ETH_TYPE:mpls_unicast, MPLS_BOS:false, ETH_SRC:LEAF_MAC],
+ treatment=[DefaultTrafficTreatment{immediate=[DEC_MPLS_TTL, TTL_IN, MPLS_POP:mpls_unicast, MPLS_POP:mpls_unicast ,
+ EXTENSION:of:0000000000000227/VolthaPopL2Header{}, transition=TABLE:1]
+
+ //Below flow rules are unchanged from the current implementations except for the table numbers
+ ADDED, bytes=0, packets=0, table=1, priority=1000, selector=[IN_PORT:65536, VLAN_VID:2], treatment=[immediate=[VLAN_POP], transition=TABLE:2,
+ meter=METER:2, metadata=METADATA:1000004100000020/0]
+ ONU
+ ADDED, bytes=0, packets=0, table=2, priority=1000, selector=[IN_PORT:65536, METADATA:20 VLAN_VID:ANY], treatment=[immediate=[OUTPUT:32],
+ meter=METER:2, metadata=METADATA:4100000000/0]
+ */
+ nb.testMPLSFlowsAddition(t, nbi, oltDeviceType)
+}
+
+func setUpCore(ctx context.Context, t *testing.T, nb *NBTest) (voltha.VolthaServiceClient, string) {
+ // Start the Core
+ coreAPIEndpoint, nbiEndpoint := nb.startGRPCCore(ctx, t)
+
+ // Wait until the core is ready
+ start := time.Now()
+ logger.Infow(ctx, "waiting-for-core-to-be-ready", log.Fields{"start": start, "api-endpoint": coreAPIEndpoint})
+
+ var vFunction isConditionSatisfied = func() bool {
+ return nb.probe.IsReady()
+ }
+ err := waitUntilCondition(nb.internalTimeout, vFunction)
+ assert.Nil(t, err)
+ logger.Infow(ctx, "core-is-ready", log.Fields{"time-taken": time.Since(start)})
+
+ // Create a grpc client to communicate with the Core
+ conn, err := grpc.Dial(nbiEndpoint, grpc.WithInsecure())
+ if err != nil {
+ logger.Fatalw(ctx, "cannot connect to core", log.Fields{"error": err})
+ }
+ nbi := voltha.NewVolthaServiceClient(conn)
+ if nbi == nil {
+ logger.Fatalw(ctx, "cannot create a service to core", log.Fields{"error": err})
+ }
+
+ // Basic test with no data in Core
+ nb.testCoreWithoutData(t, nbi)
+
+ logger.Infow(ctx, "core-setup-complete", log.Fields{"time": time.Since(start), "api-endpoint": coreAPIEndpoint})
+
+ return nbi, coreAPIEndpoint
+}
+
+func setupAdapters(ctx context.Context, t *testing.T, nb *NBTest, coreAPIEndpoint string, nbi voltha.VolthaServiceClient) {
+ // Create/register the adapters
+ start := time.Now()
+ nb.oltAdaptersLock.Lock()
+ nb.onuAdaptersLock.Lock()
+ nb.oltAdapters, nb.onuAdapters = CreateAndRegisterAdapters(ctx, t, oltAdapters, onuAdapters, coreAPIEndpoint)
+ nb.oltAdaptersLock.Unlock()
+ nb.onuAdaptersLock.Unlock()
+
+ nb.numONUPerOLT = cm.GetNumONUPerOLT()
+ nb.startingUNIPortNo = cm.GetStartingUNIPortNo()
+
+ // Wait for adapters to be fully running
+ var areAdaptersRunning isConditionSatisfied = func() bool {
+ ready := true
+ nb.oltAdaptersLock.RLock()
+ defer nb.oltAdaptersLock.RUnlock()
+ for _, adapters := range nb.onuAdapters {
+ for _, a := range adapters {
+ ready = ready && a.IsReady()
+ if !ready {
+ return false
+ }
+ }
+ }
+ nb.onuAdaptersLock.RLock()
+ defer nb.onuAdaptersLock.RUnlock()
+ for _, adapters := range nb.oltAdapters {
+ for _, a := range adapters {
+ ready = ready && a.IsReady()
+ if !ready {
+ return false
+ }
+ }
+ }
+ return true
+ }
+ err := waitUntilCondition(nb.internalTimeout, areAdaptersRunning)
+ assert.Nil(t, err)
+ logger.Infow(ctx, "adapters-are-ready", log.Fields{"time-taken": time.Since(start)})
+
+ // Test adapter registration
+ nb.testAdapterRegistration(t, nbi)
+}
+
+//TestLogDeviceUpdate is used to extract and format device updates. Not to be run on jenkins.
+func TestLogDeviceUpdate(t *testing.T) {
+ t.Skip()
+ var inputFile = os.Getenv("LGF")
+ var deviceID = os.Getenv("DID")
+
+ prettyPrintDeviceUpdateLog(inputFile, deviceID)
+}
+
+func TestOMCIData(t *testing.T) {
+ t.Skip()
+ var inputFile = os.Getenv("LGF")
+ var deviceID = os.Getenv("DID")
+ omciLog(inputFile, deviceID)
+}
+
+func TestRandomMacGenerator(t *testing.T) {
+ t.Skip()
+ var wg sync.WaitGroup
+ myMap := make(map[string]int)
+ var myMapLock sync.Mutex
+ max := 1000000
+ for i := 0; i < max; i++ {
+ wg.Add(1)
+ go func() {
+ str := getRandomMacAddress()
+ myMapLock.Lock()
+ myMap[str]++
+ myMapLock.Unlock()
+ wg.Done()
+ }()
+ }
+ wg.Wait()
+ // Look for duplicates
+ for str, val := range myMap {
+ if val != 1 {
+ fmt.Println("duplicate", str)
+ }
+ }
+}
+
+func TestSuite(t *testing.T) {
+ log.SetAllLogLevel(log.FatalLevel)
+
+ // Create a context to be cancelled at the end of all tests. This will trigger closing of any ressources used.
+ ctx, cancel := context.WithCancel(context.Background())
+
+ // Setup CPU profiling
+ f, err := os.Create("grpc_profile.cpu")
+ // f, err := os.Create("../../../tests/results/grpc_profile.cpu")
+ if err != nil {
+ logger.Fatalf(ctx, "could not create CPU profile: %v\n ", err)
+ }
+ defer f.Close()
+ runtime.SetBlockProfileRate(1)
+ runtime.SetMutexProfileFraction(-1)
+ runtime.SetCPUProfileRate(200)
+ if err := pprof.StartCPUProfile(f); err != nil {
+ logger.Fatalf(ctx, "could not start CPU profile: %v\n", err)
+ }
+ defer pprof.StopCPUProfile()
+
+ // Create test object
+ nb := newNBTest(ctx, false)
+ assert.NotNil(t, nb)
+ defer nb.stopAll(ctx)
+
+ // Setup the Core
+ nbi, coreAPIEndpoint := setUpCore(ctx, t, nb)
+
+ // Setup the adapters
+ setupAdapters(ctx, t, nb, coreAPIEndpoint, nbi)
+
+ // Start the change events listener and dispatcher to receive all change events from the Core
+ nb.changeEventLister = NewChangedEventListener(len(nb.oltAdapters))
+ ch := make(chan *ofp.ChangeEvent, (nb.numONUPerOLT+1)*len(nb.oltAdapters))
+ go nb.changeEventLister.Start(ctx, ch)
+ go nb.receiveChangeEvents(ctx, nbi, ch)
+
+ // Run the full set of tests in parallel for each olt device type
+ start := time.Now()
+ fmt.Println("starting test at:", start)
+ var wg sync.WaitGroup
+ nb.oltAdaptersLock.RLock()
+ numTestCycles := 1
+ for i := 1; i <= numTestCycles; i++ {
+ for oltAdapterType, oltAdapters := range nb.oltAdapters {
+ for _, a := range oltAdapters {
+ wg.Add(1)
+ fmt.Printf("Launching test for OLT adapter type:%s supporting OLT device type:%s and ONU device type:%s\n", oltAdapterType, a.DeviceType, a.ChildDeviceType)
+ go nb.runTestSuite(t, nbi, a.DeviceType, &wg)
+ }
+ }
+ }
+ nb.oltAdaptersLock.RUnlock()
+
+ // Wait for all tests to complete
+ wg.Wait()
+ fmt.Println("Execution time:", time.Since(start))
+
+ // Cleanup before leaving
+ fmt.Println("Cleaning up ... grpc warnings can be safely ignored")
+ cancel()
+}