[VOL-2318] - Fix for jenkins NBI Failure
This commit adds the latest devices and logical devices to the
device agents and logical device agents respectively. Any GET
is returned directly from these agents instead from the model.
And any create/update results in the data being sent to the KV
store via the model and also results in the latest data in the
agent being updated. If the Core dies and restart then the
latest data will be pulled from KV. These changes assumes
that a device or a logical device is always owned by one Core
only which is the case.
Change-Id: Ie671cd70b38a58a3b32fa476eced5f218aeadad9
diff --git a/rw_core/core/logical_device_agent_test.go b/rw_core/core/logical_device_agent_test.go
index 381e2a5..b5789cd 100644
--- a/rw_core/core/logical_device_agent_test.go
+++ b/rw_core/core/logical_device_agent_test.go
@@ -16,10 +16,22 @@
package core
import (
+ "context"
+ "github.com/gogo/protobuf/proto"
+ "github.com/opencord/voltha-go/rw_core/config"
+ com "github.com/opencord/voltha-lib-go/v2/pkg/adapters/common"
+ fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
+ "github.com/opencord/voltha-lib-go/v2/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v2/pkg/log"
+ lm "github.com/opencord/voltha-lib-go/v2/pkg/mocks"
ofp "github.com/opencord/voltha-protos/v2/go/openflow_13"
"github.com/opencord/voltha-protos/v2/go/voltha"
+ "github.com/phayes/freeport"
"github.com/stretchr/testify/assert"
+ "math/rand"
+ "sync"
"testing"
+ "time"
)
func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
@@ -343,3 +355,224 @@
assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
assert.Equal(t, currentLogicalPorts[2], deletedPorts[0])
}
+
+type LDATest struct {
+ etcdServer *lm.EtcdServer
+ core *Core
+ kClient kafka.Client
+ kvClientPort int
+ oltAdapterName string
+ onuAdapterName string
+ coreInstanceID string
+ defaultTimeout time.Duration
+ maxTimeout time.Duration
+ logicalDevice *voltha.LogicalDevice
+ deviceIds []string
+ done chan int
+}
+
+func newLDATest() *LDATest {
+ test := &LDATest{}
+ // Start the embedded etcd server
+ var err error
+ test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
+ if err != nil {
+ log.Fatal(err)
+ }
+ // Create the kafka client
+ test.kClient = lm.NewKafkaClient()
+ test.oltAdapterName = "olt_adapter_mock"
+ test.onuAdapterName = "onu_adapter_mock"
+ test.coreInstanceID = "rw-da-test"
+ test.defaultTimeout = 5 * time.Second
+ test.maxTimeout = 20 * time.Second
+ test.done = make(chan int)
+ test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
+ test.logicalDevice = &voltha.LogicalDevice{
+ Desc: &ofp.OfpDesc{
+ HwDesc: "olt_adapter_mock",
+ SwDesc: "olt_adapter_mock",
+ SerialNum: com.GetRandomSerialNumber(),
+ },
+ SwitchFeatures: &ofp.OfpSwitchFeatures{
+ NBuffers: 256,
+ NTables: 2,
+ Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
+ ofp.OfpCapabilities_OFPC_TABLE_STATS |
+ ofp.OfpCapabilities_OFPC_PORT_STATS |
+ ofp.OfpCapabilities_OFPC_GROUP_STATS),
+ },
+ RootDeviceId: test.deviceIds[0],
+ Ports: []*voltha.LogicalPort{
+ {
+ Id: "1001",
+ DeviceId: test.deviceIds[0],
+ DevicePortNo: 1,
+ RootPort: true,
+ OfpPort: &ofp.OfpPort{
+ PortNo: 1,
+ Name: "port1",
+ Config: 4,
+ State: 4,
+ },
+ },
+ {
+ Id: "1002",
+ DeviceId: test.deviceIds[1],
+ DevicePortNo: 2,
+ RootPort: false,
+ OfpPort: &ofp.OfpPort{
+ PortNo: 2,
+ Name: "port2",
+ Config: 4,
+ State: 4,
+ },
+ },
+ {
+ Id: "1003",
+ DeviceId: test.deviceIds[2],
+ DevicePortNo: 3,
+ RootPort: false,
+ OfpPort: &ofp.OfpPort{
+ PortNo: 4,
+ Name: "port3",
+ Config: 4,
+ State: 4,
+ },
+ },
+ },
+ }
+ return test
+}
+
+func (lda *LDATest) startCore(inCompeteMode bool) {
+ cfg := config.NewRWCoreFlags()
+ cfg.CorePairTopic = "rw_core"
+ cfg.DefaultRequestTimeout = lda.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
+ cfg.KVStorePort = lda.kvClientPort
+ cfg.InCompetingMode = inCompeteMode
+ grpcPort, err := freeport.GetFreePort()
+ if err != nil {
+ log.Fatal("Cannot get a freeport for grpc")
+ }
+ cfg.GrpcPort = grpcPort
+ cfg.GrpcHost = "127.0.0.1"
+ setCoreCompeteMode(inCompeteMode)
+ client := setupKVClient(cfg, lda.coreInstanceID)
+ lda.core = NewCore(lda.coreInstanceID, cfg, client, lda.kClient)
+ lda.core.Start(context.Background())
+}
+
+func (lda *LDATest) stopAll() {
+ if lda.kClient != nil {
+ lda.kClient.Stop()
+ }
+ if lda.core != nil {
+ lda.core.Stop(context.Background())
+ }
+ if lda.etcdServer != nil {
+ stopEmbeddedEtcdServer(lda.etcdServer)
+ }
+}
+
+func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalDeviceAgent {
+ lDeviceMgr := lda.core.logicalDeviceMgr
+ deviceMgr := lda.core.deviceMgr
+ clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
+ clonedLD.Id = com.GetRandomString(10)
+ clonedLD.DatapathId = rand.Uint64()
+ lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
+ lDeviceAgent.logicalDevice = clonedLD
+ added := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
+ assert.NotNil(t, added)
+ lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
+ return lDeviceAgent
+}
+
+func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) {
+ originalLogicalDevice := ldAgent.GetLogicalDevice()
+ assert.NotNil(t, originalLogicalDevice)
+ var localWG sync.WaitGroup
+
+ // Change the state of the first port to FAILED
+ localWG.Add(1)
+ go func() {
+ err := ldAgent.updatePortState(lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
+ assert.Nil(t, err)
+ localWG.Done()
+ }()
+
+ // Change the state of the second port to TESTING
+ localWG.Add(1)
+ go func() {
+ err := ldAgent.updatePortState(lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
+ assert.Nil(t, err)
+ localWG.Done()
+ }()
+
+ // Change the state of the third port to UNKNOWN and then back to ACTIVE
+ localWG.Add(1)
+ go func() {
+ err := ldAgent.updatePortState(lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
+ assert.Nil(t, err)
+ err = ldAgent.updatePortState(lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
+ assert.Nil(t, err)
+ localWG.Done()
+ }()
+
+ // Add a meter to the logical device
+ meterMod := &ofp.OfpMeterMod{
+ Command: ofp.OfpMeterModCommand_OFPMC_ADD,
+ Flags: rand.Uint32(),
+ MeterId: rand.Uint32(),
+ Bands: []*ofp.OfpMeterBandHeader{
+ {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
+ Rate: rand.Uint32(),
+ BurstSize: rand.Uint32(),
+ Data: nil,
+ },
+ },
+ }
+ localWG.Add(1)
+ go func() {
+ err := ldAgent.meterAdd(meterMod)
+ assert.Nil(t, err)
+ localWG.Done()
+ }()
+
+ // wait for go routines to be done
+ localWG.Wait()
+
+ expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
+ expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
+ expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
+ expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
+ expectedChange.Meters = &voltha.Meters{Items: nil}
+ expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
+ updatedLogicalDevice := ldAgent.GetLogicalDevice()
+ assert.NotNil(t, updatedLogicalDevice)
+ assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
+ globalWG.Done()
+}
+
+func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
+ lda := newLDATest()
+ assert.NotNil(t, lda)
+ defer lda.stopAll()
+
+ // Start the Core
+ lda.startCore(false)
+
+ var wg sync.WaitGroup
+ numConCurrentLogicalDeviceAgents := 20
+ for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
+ wg.Add(1)
+ a := lda.createLogicalDeviceAgent(t)
+ go lda.updateLogicalDeviceConcurrently(t, a, &wg)
+ }
+
+ wg.Wait()
+}