blob: 89b3cbe9306d821a12477a5ef98eba7bdb728057 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
Kent Hagerman2b216042020-04-03 18:28:56 -040016package device
khenaidoo2bc48282019-07-16 18:13:46 -040017
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
David Bainbridged1afd662020-03-26 18:27:41 -070020 "math/rand"
Neha Sharmad1387da2020-05-07 20:07:28 +000021 "strconv"
David Bainbridged1afd662020-03-26 18:27:41 -070022 "sync"
23 "testing"
24 "time"
25
Kent Hagermanf5a67352020-04-30 15:15:26 -040026 "github.com/gogo/protobuf/proto"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070027 "github.com/opencord/voltha-go/db/model"
Kent Hagermanf5a67352020-04-30 15:15:26 -040028 "github.com/opencord/voltha-go/rw_core/config"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070029 "github.com/opencord/voltha-go/rw_core/core/adapter"
Mahir Gunyel03de0d32020-06-03 01:36:59 -070030 tst "github.com/opencord/voltha-go/rw_core/test"
Kent Hagermanf5a67352020-04-30 15:15:26 -040031 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
Mahir Gunyeladdb66a2020-04-29 18:08:50 -070032 "github.com/opencord/voltha-lib-go/v3/pkg/db"
33 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080034 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
Matteo Scandolod525ae32020-04-02 17:27:29 -070035 mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
36 mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080037 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
38 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050039 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040040 "github.com/stretchr/testify/assert"
khenaidoo2bc48282019-07-16 18:13:46 -040041)
42
khenaidoo6e55d9e2019-12-12 18:26:26 -050043type LDATest struct {
Kent Hagerman2b216042020-04-03 18:28:56 -040044 etcdServer *mock_etcd.EtcdServer
45 deviceMgr *Manager
46 kmp kafka.InterContainerProxy
47 logicalDeviceMgr *LogicalManager
48 kClient kafka.Client
49 kvClientPort int
50 oltAdapterName string
51 onuAdapterName string
52 coreInstanceID string
53 defaultTimeout time.Duration
54 maxTimeout time.Duration
55 logicalDevice *voltha.LogicalDevice
Kent Hagerman2a07b862020-06-19 15:23:07 -040056 logicalPorts map[uint32]*voltha.LogicalPort
Kent Hagerman2b216042020-04-03 18:28:56 -040057 deviceIds []string
58 done chan int
khenaidoo6e55d9e2019-12-12 18:26:26 -050059}
60
Rohan Agrawal31f21802020-06-12 05:38:46 +000061func newLDATest(ctx context.Context) *LDATest {
khenaidoo6e55d9e2019-12-12 18:26:26 -050062 test := &LDATest{}
63 // Start the embedded etcd server
64 var err error
Rohan Agrawal31f21802020-06-12 05:38:46 +000065 test.etcdServer, test.kvClientPort, err = tst.StartEmbeddedEtcdServer(ctx, "voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
khenaidoo6e55d9e2019-12-12 18:26:26 -050066 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +000067 logger.Fatal(ctx, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -050068 }
69 // Create the kafka client
Matteo Scandolod525ae32020-04-02 17:27:29 -070070 test.kClient = mock_kafka.NewKafkaClient()
khenaidoo6e55d9e2019-12-12 18:26:26 -050071 test.oltAdapterName = "olt_adapter_mock"
72 test.onuAdapterName = "onu_adapter_mock"
73 test.coreInstanceID = "rw-da-test"
74 test.defaultTimeout = 5 * time.Second
75 test.maxTimeout = 20 * time.Second
76 test.done = make(chan int)
77 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
78 test.logicalDevice = &voltha.LogicalDevice{
79 Desc: &ofp.OfpDesc{
80 HwDesc: "olt_adapter_mock",
81 SwDesc: "olt_adapter_mock",
82 SerialNum: com.GetRandomSerialNumber(),
83 },
84 SwitchFeatures: &ofp.OfpSwitchFeatures{
85 NBuffers: 256,
86 NTables: 2,
87 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
88 ofp.OfpCapabilities_OFPC_TABLE_STATS |
89 ofp.OfpCapabilities_OFPC_PORT_STATS |
90 ofp.OfpCapabilities_OFPC_GROUP_STATS),
91 },
92 RootDeviceId: test.deviceIds[0],
Kent Hagerman2a07b862020-06-19 15:23:07 -040093 }
94 test.logicalPorts = map[uint32]*voltha.LogicalPort{
95 1: {
96 Id: "1001",
97 DeviceId: test.deviceIds[0],
98 DevicePortNo: 1,
99 RootPort: true,
100 OfpPort: &ofp.OfpPort{
101 PortNo: 1,
102 Name: "port1",
103 Config: 4,
104 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500105 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400106 },
107 2: {
108 Id: "1002",
109 DeviceId: test.deviceIds[1],
110 DevicePortNo: 2,
111 RootPort: false,
112 OfpPort: &ofp.OfpPort{
113 PortNo: 2,
114 Name: "port2",
115 Config: 4,
116 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500117 },
Kent Hagerman2a07b862020-06-19 15:23:07 -0400118 },
119 3: {
120 Id: "1003",
121 DeviceId: test.deviceIds[2],
122 DevicePortNo: 3,
123 RootPort: false,
124 OfpPort: &ofp.OfpPort{
125 PortNo: 3,
126 Name: "port3",
127 Config: 4,
128 State: 4,
khenaidoo6e55d9e2019-12-12 18:26:26 -0500129 },
130 },
131 }
132 return test
133}
134
Rohan Agrawal31f21802020-06-12 05:38:46 +0000135func (lda *LDATest) startCore(ctx context.Context, inCompeteMode bool) {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500136 cfg := config.NewRWCoreFlags()
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700137 cfg.CoreTopic = "rw_core"
khenaidoo442e7c72020-03-10 16:13:48 -0400138 cfg.DefaultRequestTimeout = lda.defaultTimeout
Neha Sharmad1387da2020-05-07 20:07:28 +0000139 cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500140 grpcPort, err := freeport.GetFreePort()
141 if err != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000142 logger.Fatal(ctx, "Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500143 }
Neha Sharmad1387da2020-05-07 20:07:28 +0000144 cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000145 client := tst.SetupKVClient(ctx, cfg, lda.coreInstanceID)
Kent Hagerman2b216042020-04-03 18:28:56 -0400146 backend := &db.Backend{
147 Client: client,
148 StoreType: cfg.KVStoreType,
Neha Sharmad1387da2020-05-07 20:07:28 +0000149 Address: cfg.KVStoreAddress,
Kent Hagerman2b216042020-04-03 18:28:56 -0400150 Timeout: cfg.KVStoreTimeout,
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700151 LivenessChannelInterval: cfg.LiveProbeInterval / 2}
Kent Hagerman2b216042020-04-03 18:28:56 -0400152 lda.kmp = kafka.NewInterContainerProxy(
Neha Sharmad1387da2020-05-07 20:07:28 +0000153 kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
Kent Hagerman2b216042020-04-03 18:28:56 -0400154 kafka.MsgClient(lda.kClient),
David Bainbridge9ae13132020-06-22 17:28:01 -0700155 kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
Kent Hagerman2b216042020-04-03 18:28:56 -0400156
157 endpointMgr := kafka.NewEndpointManager(backend)
Kent Hagermanf5a67352020-04-30 15:15:26 -0400158 proxy := model.NewDBPath(backend)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000159 adapterMgr := adapter.NewAdapterManager(ctx, proxy, lda.coreInstanceID, lda.kClient)
Kent Hagerman2b216042020-04-03 18:28:56 -0400160
serkant.uluderya8ff291d2020-05-20 00:58:00 -0700161 lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000162 if err = lda.kmp.Start(ctx); err != nil {
163 logger.Fatal(ctx, "Cannot start InterContainerProxy")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530164 }
Kent Hagerman2f0d0552020-04-23 17:28:52 -0400165 adapterMgr.Start(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500166}
167
Rohan Agrawal31f21802020-06-12 05:38:46 +0000168func (lda *LDATest) stopAll(ctx context.Context) {
khenaidoo6e55d9e2019-12-12 18:26:26 -0500169 if lda.kClient != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000170 lda.kClient.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500171 }
Kent Hagerman2b216042020-04-03 18:28:56 -0400172 if lda.kmp != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000173 lda.kmp.Stop(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500174 }
175 if lda.etcdServer != nil {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000176 tst.StopEmbeddedEtcdServer(ctx, lda.etcdServer)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500177 }
178}
179
Kent Hagerman2b216042020-04-03 18:28:56 -0400180func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalAgent {
181 lDeviceMgr := lda.logicalDeviceMgr
182 deviceMgr := lda.deviceMgr
khenaidoo6e55d9e2019-12-12 18:26:26 -0500183 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
184 clonedLD.Id = com.GetRandomString(10)
185 clonedLD.DatapathId = rand.Uint64()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000186 lDeviceAgent := newLogicalAgent(context.Background(), clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.dbPath, lDeviceMgr.ldProxy, lDeviceMgr.defaultTimeout)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500187 lDeviceAgent.logicalDevice = clonedLD
Kent Hagerman2a07b862020-06-19 15:23:07 -0400188 for _, port := range lda.logicalPorts {
189 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
190 handle, created, err := lDeviceAgent.portLoader.LockOrCreate(context.Background(), clonedPort)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400191 if err != nil {
192 panic(err)
193 }
194 handle.Unlock()
195 if !created {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400196 t.Errorf("port %d already exists", clonedPort.OfpPort.PortNo)
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400197 }
198 }
Kent Hagermanf5a67352020-04-30 15:15:26 -0400199 err := lDeviceAgent.ldProxy.Set(context.Background(), clonedLD.Id, clonedLD)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530200 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500201 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
202 return lDeviceAgent
203}
204
Kent Hagerman2b216042020-04-03 18:28:56 -0400205func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalAgent, globalWG *sync.WaitGroup) {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400206 originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
207 assert.NotNil(t, originalLogicalPorts)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500208 var localWG sync.WaitGroup
209
210 // Change the state of the first port to FAILED
211 localWG.Add(1)
212 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400213 err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500214 assert.Nil(t, err)
215 localWG.Done()
216 }()
217
218 // Change the state of the second port to TESTING
219 localWG.Add(1)
220 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400221 err := ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500222 assert.Nil(t, err)
223 localWG.Done()
224 }()
225
226 // Change the state of the third port to UNKNOWN and then back to ACTIVE
227 localWG.Add(1)
228 go func() {
Kent Hagerman2a07b862020-06-19 15:23:07 -0400229 err := ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500230 assert.Nil(t, err)
Kent Hagerman2a07b862020-06-19 15:23:07 -0400231 err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500232 assert.Nil(t, err)
233 localWG.Done()
234 }()
235
236 // Add a meter to the logical device
237 meterMod := &ofp.OfpMeterMod{
238 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
239 Flags: rand.Uint32(),
240 MeterId: rand.Uint32(),
241 Bands: []*ofp.OfpMeterBandHeader{
242 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
243 Rate: rand.Uint32(),
244 BurstSize: rand.Uint32(),
245 Data: nil,
246 },
247 },
248 }
249 localWG.Add(1)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000250 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500251 go func() {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000252 err := ldAgent.meterAdd(ctx, meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500253 assert.Nil(t, err)
254 localWG.Done()
255 }()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500256 // wait for go routines to be done
257 localWG.Wait()
Rohan Agrawal31f21802020-06-12 05:38:46 +0000258 meterEntry := fu.MeterEntryFromMeterMod(ctx, meterMod)
Mahir Gunyeladdb66a2020-04-29 18:08:50 -0700259
Kent Hagerman433a31a2020-05-20 19:04:48 -0400260 meterHandle, have := ldAgent.meterLoader.Lock(meterMod.MeterId)
261 assert.Equal(t, have, true)
262 if have {
263 assert.True(t, proto.Equal(meterEntry, meterHandle.GetReadOnly()))
264 meterHandle.Unlock()
265 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500266
Kent Hagerman2a07b862020-06-19 15:23:07 -0400267 expectedLogicalPorts := make(map[uint32]*voltha.LogicalPort)
268 for _, port := range originalLogicalPorts {
269 clonedPort := proto.Clone(port).(*voltha.LogicalPort)
270 switch clonedPort.OfpPort.PortNo {
271 case 1:
272 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
273 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
274 case 2:
275 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
276 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
277 case 3:
278 clonedPort.OfpPort.Config = originalLogicalPorts[1].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
279 clonedPort.OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
280 }
281 expectedLogicalPorts[clonedPort.OfpPort.PortNo] = clonedPort
282 }
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400283
Kent Hagerman2a07b862020-06-19 15:23:07 -0400284 updatedLogicalDevicePorts := ldAgent.listLogicalDevicePorts(ctx)
285 assert.Equal(t, len(expectedLogicalPorts), len(updatedLogicalDevicePorts))
286 for _, p := range updatedLogicalDevicePorts {
287 assert.True(t, proto.Equal(p, expectedLogicalPorts[p.OfpPort.PortNo]))
Kent Hagermanfa9d6d42020-05-25 11:49:40 -0400288 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500289 globalWG.Done()
290}
291
292func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
Rohan Agrawal31f21802020-06-12 05:38:46 +0000293 ctx := context.Background()
294 lda := newLDATest(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500295 assert.NotNil(t, lda)
Rohan Agrawal31f21802020-06-12 05:38:46 +0000296 defer lda.stopAll(ctx)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500297
298 // Start the Core
Rohan Agrawal31f21802020-06-12 05:38:46 +0000299 lda.startCore(ctx, false)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500300
301 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400302 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500303 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
304 wg.Add(1)
305 a := lda.createLogicalDeviceAgent(t)
306 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
307 }
308
309 wg.Wait()
310}