| /* |
| * Copyright 2019-present Open Networking Foundation |
| * |
| * Licensed under the Apache License, Version 2.0 (the "License"); |
| * you may not use this file except in compliance with the License. |
| * You may obtain a copy of the License at |
| * |
| * http://www.apache.org/licenses/LICENSE-2.0 |
| * |
| * Unless required by applicable law or agreed to in writing, software |
| * distributed under the License is distributed on an "AS IS" BASIS, |
| * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| * See the License for the specific language governing permissions and |
| * limitations under the License. |
| */ |
| package core |
| |
| import ( |
| "context" |
| "github.com/gogo/protobuf/proto" |
| "github.com/opencord/voltha-go/rw_core/config" |
| com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common" |
| fu "github.com/opencord/voltha-lib-go/v3/pkg/flows" |
| "github.com/opencord/voltha-lib-go/v3/pkg/kafka" |
| "github.com/opencord/voltha-lib-go/v3/pkg/log" |
| lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks" |
| ofp "github.com/opencord/voltha-protos/v3/go/openflow_13" |
| "github.com/opencord/voltha-protos/v3/go/voltha" |
| "github.com/phayes/freeport" |
| "github.com/stretchr/testify/assert" |
| "math/rand" |
| "sync" |
| "testing" |
| "time" |
| ) |
| |
| func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) { |
| currentLogicalPorts := []*voltha.LogicalPort{} |
| updatedLogicalPorts := []*voltha.LogicalPort{} |
| newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts) |
| assert.Equal(t, 0, len(newPorts)) |
| assert.Equal(t, 0, len(changedPorts)) |
| assert.Equal(t, 0, len(deletedPorts)) |
| } |
| |
| func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) { |
| currentLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1233", |
| DeviceId: "d1234", |
| DevicePortNo: 3, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 3, |
| Name: "port3", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| updatedLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1233", |
| DeviceId: "d1234", |
| DevicePortNo: 3, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 3, |
| Name: "port3", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts) |
| assert.Equal(t, 0, len(newPorts)) |
| assert.Equal(t, 0, len(changedPorts)) |
| assert.Equal(t, 0, len(deletedPorts)) |
| } |
| |
| func TestLogicalDeviceAgent_diff_add(t *testing.T) { |
| currentLogicalPorts := []*voltha.LogicalPort{} |
| updatedLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts) |
| assert.Equal(t, 2, len(newPorts)) |
| assert.Equal(t, 0, len(changedPorts)) |
| assert.Equal(t, 0, len(deletedPorts)) |
| assert.Equal(t, updatedLogicalPorts[0], newPorts[0]) |
| assert.Equal(t, updatedLogicalPorts[1], newPorts[1]) |
| } |
| |
| func TestLogicalDeviceAgent_diff_delete(t *testing.T) { |
| currentLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| updatedLogicalPorts := []*voltha.LogicalPort{} |
| newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts) |
| assert.Equal(t, 0, len(newPorts)) |
| assert.Equal(t, 0, len(changedPorts)) |
| assert.Equal(t, 1, len(deletedPorts)) |
| assert.Equal(t, currentLogicalPorts[0], deletedPorts[0]) |
| } |
| |
| func TestLogicalDeviceAgent_diff_changed(t *testing.T) { |
| currentLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1233", |
| DeviceId: "d1234", |
| DevicePortNo: 3, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 3, |
| Name: "port3", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| updatedLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| { |
| Id: "1233", |
| DeviceId: "d1234", |
| DevicePortNo: 3, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 3, |
| Name: "port3", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts) |
| assert.Equal(t, 0, len(newPorts)) |
| assert.Equal(t, 2, len(changedPorts)) |
| assert.Equal(t, 0, len(deletedPorts)) |
| assert.Equal(t, updatedLogicalPorts[0], changedPorts[0]) |
| assert.Equal(t, updatedLogicalPorts[1], changedPorts[1]) |
| } |
| |
| func TestLogicalDeviceAgent_diff_mix(t *testing.T) { |
| currentLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| { |
| Id: "1233", |
| DeviceId: "d1234", |
| DevicePortNo: 3, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 3, |
| Name: "port3", |
| Config: 1, |
| State: 1, |
| }, |
| }, |
| } |
| updatedLogicalPorts := []*voltha.LogicalPort{ |
| { |
| Id: "1231", |
| DeviceId: "d1234", |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| { |
| Id: "1232", |
| DeviceId: "d1234", |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| { |
| Id: "1234", |
| DeviceId: "d1234", |
| DevicePortNo: 4, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 4, |
| Name: "port4", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| } |
| newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts) |
| assert.Equal(t, 1, len(newPorts)) |
| assert.Equal(t, 2, len(changedPorts)) |
| assert.Equal(t, 1, len(deletedPorts)) |
| assert.Equal(t, updatedLogicalPorts[0], changedPorts[0]) |
| assert.Equal(t, updatedLogicalPorts[1], changedPorts[1]) |
| assert.Equal(t, currentLogicalPorts[2], deletedPorts[0]) |
| } |
| |
| type LDATest struct { |
| etcdServer *lm.EtcdServer |
| core *Core |
| kClient kafka.Client |
| kvClientPort int |
| oltAdapterName string |
| onuAdapterName string |
| coreInstanceID string |
| defaultTimeout time.Duration |
| maxTimeout time.Duration |
| logicalDevice *voltha.LogicalDevice |
| deviceIds []string |
| done chan int |
| } |
| |
| func newLDATest() *LDATest { |
| test := &LDATest{} |
| // Start the embedded etcd server |
| var err error |
| test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error") |
| if err != nil { |
| log.Fatal(err) |
| } |
| // Create the kafka client |
| test.kClient = lm.NewKafkaClient() |
| test.oltAdapterName = "olt_adapter_mock" |
| test.onuAdapterName = "onu_adapter_mock" |
| test.coreInstanceID = "rw-da-test" |
| test.defaultTimeout = 5 * time.Second |
| test.maxTimeout = 20 * time.Second |
| test.done = make(chan int) |
| test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)} |
| test.logicalDevice = &voltha.LogicalDevice{ |
| Desc: &ofp.OfpDesc{ |
| HwDesc: "olt_adapter_mock", |
| SwDesc: "olt_adapter_mock", |
| SerialNum: com.GetRandomSerialNumber(), |
| }, |
| SwitchFeatures: &ofp.OfpSwitchFeatures{ |
| NBuffers: 256, |
| NTables: 2, |
| Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS | |
| ofp.OfpCapabilities_OFPC_TABLE_STATS | |
| ofp.OfpCapabilities_OFPC_PORT_STATS | |
| ofp.OfpCapabilities_OFPC_GROUP_STATS), |
| }, |
| RootDeviceId: test.deviceIds[0], |
| Ports: []*voltha.LogicalPort{ |
| { |
| Id: "1001", |
| DeviceId: test.deviceIds[0], |
| DevicePortNo: 1, |
| RootPort: true, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 1, |
| Name: "port1", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| { |
| Id: "1002", |
| DeviceId: test.deviceIds[1], |
| DevicePortNo: 2, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 2, |
| Name: "port2", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| { |
| Id: "1003", |
| DeviceId: test.deviceIds[2], |
| DevicePortNo: 3, |
| RootPort: false, |
| OfpPort: &ofp.OfpPort{ |
| PortNo: 4, |
| Name: "port3", |
| Config: 4, |
| State: 4, |
| }, |
| }, |
| }, |
| } |
| return test |
| } |
| |
| func (lda *LDATest) startCore(inCompeteMode bool) { |
| ctx := context.Background() |
| cfg := config.NewRWCoreFlags() |
| cfg.CorePairTopic = "rw_core" |
| cfg.DefaultRequestTimeout = lda.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration |
| cfg.KVStorePort = lda.kvClientPort |
| cfg.InCompetingMode = inCompeteMode |
| grpcPort, err := freeport.GetFreePort() |
| if err != nil { |
| log.Fatal("Cannot get a freeport for grpc") |
| } |
| cfg.GrpcPort = grpcPort |
| cfg.GrpcHost = "127.0.0.1" |
| setCoreCompeteMode(inCompeteMode) |
| client := setupKVClient(cfg, lda.coreInstanceID) |
| lda.core = NewCore(ctx, lda.coreInstanceID, cfg, client, lda.kClient) |
| err = lda.core.Start(ctx) |
| if err != nil { |
| log.Fatal("Cannot start core") |
| } |
| } |
| |
| func (lda *LDATest) stopAll() { |
| if lda.kClient != nil { |
| lda.kClient.Stop() |
| } |
| if lda.core != nil { |
| lda.core.Stop(context.Background()) |
| } |
| if lda.etcdServer != nil { |
| stopEmbeddedEtcdServer(lda.etcdServer) |
| } |
| } |
| |
| func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalDeviceAgent { |
| lDeviceMgr := lda.core.logicalDeviceMgr |
| deviceMgr := lda.core.deviceMgr |
| clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice) |
| clonedLD.Id = com.GetRandomString(10) |
| clonedLD.DatapathId = rand.Uint64() |
| lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout) |
| lDeviceAgent.logicalDevice = clonedLD |
| added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "") |
| assert.Nil(t, err) |
| assert.NotNil(t, added) |
| lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent) |
| return lDeviceAgent |
| } |
| |
| func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) { |
| originalLogicalDevice := ldAgent.GetLogicalDevice() |
| assert.NotNil(t, originalLogicalDevice) |
| var localWG sync.WaitGroup |
| |
| // Change the state of the first port to FAILED |
| localWG.Add(1) |
| go func() { |
| err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED) |
| assert.Nil(t, err) |
| localWG.Done() |
| }() |
| |
| // Change the state of the second port to TESTING |
| localWG.Add(1) |
| go func() { |
| err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING) |
| assert.Nil(t, err) |
| localWG.Done() |
| }() |
| |
| // Change the state of the third port to UNKNOWN and then back to ACTIVE |
| localWG.Add(1) |
| go func() { |
| err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN) |
| assert.Nil(t, err) |
| err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE) |
| assert.Nil(t, err) |
| localWG.Done() |
| }() |
| |
| // Add a meter to the logical device |
| meterMod := &ofp.OfpMeterMod{ |
| Command: ofp.OfpMeterModCommand_OFPMC_ADD, |
| Flags: rand.Uint32(), |
| MeterId: rand.Uint32(), |
| Bands: []*ofp.OfpMeterBandHeader{ |
| {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER, |
| Rate: rand.Uint32(), |
| BurstSize: rand.Uint32(), |
| Data: nil, |
| }, |
| }, |
| } |
| localWG.Add(1) |
| go func() { |
| err := ldAgent.meterAdd(context.Background(), meterMod) |
| assert.Nil(t, err) |
| localWG.Done() |
| }() |
| |
| // wait for go routines to be done |
| localWG.Wait() |
| |
| expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice) |
| expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) |
| expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) |
| expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) |
| expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN) |
| expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN) |
| expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE) |
| expectedChange.Meters = &voltha.Meters{Items: nil} |
| expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod)) |
| updatedLogicalDevice := ldAgent.GetLogicalDevice() |
| assert.NotNil(t, updatedLogicalDevice) |
| assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice)) |
| globalWG.Done() |
| } |
| |
| func TestConcurrentLogicalDeviceUpdate(t *testing.T) { |
| lda := newLDATest() |
| assert.NotNil(t, lda) |
| defer lda.stopAll() |
| |
| // Start the Core |
| lda.startCore(false) |
| |
| var wg sync.WaitGroup |
| numConCurrentLogicalDeviceAgents := 20 |
| for i := 0; i < numConCurrentLogicalDeviceAgents; i++ { |
| wg.Add(1) |
| a := lda.createLogicalDeviceAgent(t) |
| go lda.updateLogicalDeviceConcurrently(t, a, &wg) |
| } |
| |
| wg.Wait() |
| } |