blob: 70d809a5c9c978ad40384d07cb2640c09e74bb79 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
David Bainbridged1afd662020-03-26 18:27:41 -070020 "math/rand"
21 "sync"
22 "testing"
23 "time"
24
khenaidoo6e55d9e2019-12-12 18:26:26 -050025 "github.com/gogo/protobuf/proto"
26 "github.com/opencord/voltha-go/rw_core/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080027 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
28 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
29 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
Matteo Scandolod525ae32020-04-02 17:27:29 -070030 mock_etcd "github.com/opencord/voltha-lib-go/v3/pkg/mocks/etcd"
31 mock_kafka "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080032 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
33 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050034 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040035 "github.com/stretchr/testify/assert"
khenaidoo2bc48282019-07-16 18:13:46 -040036)
37
38func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
39 currentLogicalPorts := []*voltha.LogicalPort{}
40 updatedLogicalPorts := []*voltha.LogicalPort{}
41 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
42 assert.Equal(t, 0, len(newPorts))
43 assert.Equal(t, 0, len(changedPorts))
44 assert.Equal(t, 0, len(deletedPorts))
45}
46
47func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) {
48 currentLogicalPorts := []*voltha.LogicalPort{
49 {
50 Id: "1231",
51 DeviceId: "d1234",
52 DevicePortNo: 1,
53 RootPort: true,
54 OfpPort: &ofp.OfpPort{
55 PortNo: 1,
56 Name: "port1",
57 Config: 1,
58 State: 1,
59 },
60 },
61 {
62 Id: "1232",
63 DeviceId: "d1234",
64 DevicePortNo: 2,
65 RootPort: false,
66 OfpPort: &ofp.OfpPort{
67 PortNo: 2,
68 Name: "port2",
69 Config: 1,
70 State: 1,
71 },
72 },
73 {
74 Id: "1233",
75 DeviceId: "d1234",
76 DevicePortNo: 3,
77 RootPort: false,
78 OfpPort: &ofp.OfpPort{
79 PortNo: 3,
80 Name: "port3",
81 Config: 1,
82 State: 1,
83 },
84 },
85 }
86 updatedLogicalPorts := []*voltha.LogicalPort{
87 {
88 Id: "1231",
89 DeviceId: "d1234",
90 DevicePortNo: 1,
91 RootPort: true,
92 OfpPort: &ofp.OfpPort{
93 PortNo: 1,
94 Name: "port1",
95 Config: 1,
96 State: 1,
97 },
98 },
99 {
100 Id: "1232",
101 DeviceId: "d1234",
102 DevicePortNo: 2,
103 RootPort: false,
104 OfpPort: &ofp.OfpPort{
105 PortNo: 2,
106 Name: "port2",
107 Config: 1,
108 State: 1,
109 },
110 },
111 {
112 Id: "1233",
113 DeviceId: "d1234",
114 DevicePortNo: 3,
115 RootPort: false,
116 OfpPort: &ofp.OfpPort{
117 PortNo: 3,
118 Name: "port3",
119 Config: 1,
120 State: 1,
121 },
122 },
123 }
124 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
125 assert.Equal(t, 0, len(newPorts))
126 assert.Equal(t, 0, len(changedPorts))
127 assert.Equal(t, 0, len(deletedPorts))
128}
129
130func TestLogicalDeviceAgent_diff_add(t *testing.T) {
131 currentLogicalPorts := []*voltha.LogicalPort{}
132 updatedLogicalPorts := []*voltha.LogicalPort{
133 {
134 Id: "1231",
135 DeviceId: "d1234",
136 DevicePortNo: 1,
137 RootPort: true,
138 OfpPort: &ofp.OfpPort{
139 PortNo: 1,
140 Name: "port1",
141 Config: 1,
142 State: 1,
143 },
144 },
145 {
146 Id: "1232",
147 DeviceId: "d1234",
148 DevicePortNo: 2,
149 RootPort: true,
150 OfpPort: &ofp.OfpPort{
151 PortNo: 2,
152 Name: "port2",
153 Config: 1,
154 State: 1,
155 },
156 },
157 }
158 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
159 assert.Equal(t, 2, len(newPorts))
160 assert.Equal(t, 0, len(changedPorts))
161 assert.Equal(t, 0, len(deletedPorts))
162 assert.Equal(t, updatedLogicalPorts[0], newPorts[0])
163 assert.Equal(t, updatedLogicalPorts[1], newPorts[1])
164}
165
166func TestLogicalDeviceAgent_diff_delete(t *testing.T) {
167 currentLogicalPorts := []*voltha.LogicalPort{
168 {
169 Id: "1231",
170 DeviceId: "d1234",
171 DevicePortNo: 1,
172 RootPort: true,
173 OfpPort: &ofp.OfpPort{
174 PortNo: 1,
175 Name: "port1",
176 Config: 1,
177 State: 1,
178 },
179 },
180 }
181 updatedLogicalPorts := []*voltha.LogicalPort{}
182 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
183 assert.Equal(t, 0, len(newPorts))
184 assert.Equal(t, 0, len(changedPorts))
185 assert.Equal(t, 1, len(deletedPorts))
186 assert.Equal(t, currentLogicalPorts[0], deletedPorts[0])
187}
188
189func TestLogicalDeviceAgent_diff_changed(t *testing.T) {
190 currentLogicalPorts := []*voltha.LogicalPort{
191 {
192 Id: "1231",
193 DeviceId: "d1234",
194 DevicePortNo: 1,
195 RootPort: true,
196 OfpPort: &ofp.OfpPort{
197 PortNo: 1,
198 Name: "port1",
199 Config: 1,
200 State: 1,
201 },
202 },
203 {
204 Id: "1232",
205 DeviceId: "d1234",
206 DevicePortNo: 2,
207 RootPort: false,
208 OfpPort: &ofp.OfpPort{
209 PortNo: 2,
210 Name: "port2",
211 Config: 1,
212 State: 1,
213 },
214 },
215 {
216 Id: "1233",
217 DeviceId: "d1234",
218 DevicePortNo: 3,
219 RootPort: false,
220 OfpPort: &ofp.OfpPort{
221 PortNo: 3,
222 Name: "port3",
223 Config: 1,
224 State: 1,
225 },
226 },
227 }
228 updatedLogicalPorts := []*voltha.LogicalPort{
229 {
230 Id: "1231",
231 DeviceId: "d1234",
232 DevicePortNo: 1,
233 RootPort: true,
234 OfpPort: &ofp.OfpPort{
235 PortNo: 1,
236 Name: "port1",
237 Config: 4,
238 State: 4,
239 },
240 },
241 {
242 Id: "1232",
243 DeviceId: "d1234",
244 DevicePortNo: 2,
245 RootPort: false,
246 OfpPort: &ofp.OfpPort{
247 PortNo: 2,
248 Name: "port2",
249 Config: 4,
250 State: 4,
251 },
252 },
253 {
254 Id: "1233",
255 DeviceId: "d1234",
256 DevicePortNo: 3,
257 RootPort: false,
258 OfpPort: &ofp.OfpPort{
259 PortNo: 3,
260 Name: "port3",
261 Config: 1,
262 State: 1,
263 },
264 },
265 }
266 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
267 assert.Equal(t, 0, len(newPorts))
268 assert.Equal(t, 2, len(changedPorts))
269 assert.Equal(t, 0, len(deletedPorts))
270 assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
271 assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
272}
273
274func TestLogicalDeviceAgent_diff_mix(t *testing.T) {
275 currentLogicalPorts := []*voltha.LogicalPort{
276 {
277 Id: "1231",
278 DeviceId: "d1234",
279 DevicePortNo: 1,
280 RootPort: true,
281 OfpPort: &ofp.OfpPort{
282 PortNo: 1,
283 Name: "port1",
284 Config: 1,
285 State: 1,
286 },
287 },
288 {
289 Id: "1232",
290 DeviceId: "d1234",
291 DevicePortNo: 2,
292 RootPort: false,
293 OfpPort: &ofp.OfpPort{
294 PortNo: 2,
295 Name: "port2",
296 Config: 1,
297 State: 1,
298 },
299 },
300 {
301 Id: "1233",
302 DeviceId: "d1234",
303 DevicePortNo: 3,
304 RootPort: false,
305 OfpPort: &ofp.OfpPort{
306 PortNo: 3,
307 Name: "port3",
308 Config: 1,
309 State: 1,
310 },
311 },
312 }
313 updatedLogicalPorts := []*voltha.LogicalPort{
314 {
315 Id: "1231",
316 DeviceId: "d1234",
317 DevicePortNo: 1,
318 RootPort: true,
319 OfpPort: &ofp.OfpPort{
320 PortNo: 1,
321 Name: "port1",
322 Config: 4,
323 State: 4,
324 },
325 },
326 {
327 Id: "1232",
328 DeviceId: "d1234",
329 DevicePortNo: 2,
330 RootPort: false,
331 OfpPort: &ofp.OfpPort{
332 PortNo: 2,
333 Name: "port2",
334 Config: 4,
335 State: 4,
336 },
337 },
338 {
339 Id: "1234",
340 DeviceId: "d1234",
341 DevicePortNo: 4,
342 RootPort: false,
343 OfpPort: &ofp.OfpPort{
344 PortNo: 4,
345 Name: "port4",
346 Config: 4,
347 State: 4,
348 },
349 },
350 }
351 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
352 assert.Equal(t, 1, len(newPorts))
353 assert.Equal(t, 2, len(changedPorts))
354 assert.Equal(t, 1, len(deletedPorts))
355 assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
356 assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
357 assert.Equal(t, currentLogicalPorts[2], deletedPorts[0])
358}
khenaidoo6e55d9e2019-12-12 18:26:26 -0500359
360type LDATest struct {
Matteo Scandolod525ae32020-04-02 17:27:29 -0700361 etcdServer *mock_etcd.EtcdServer
khenaidoo6e55d9e2019-12-12 18:26:26 -0500362 core *Core
363 kClient kafka.Client
364 kvClientPort int
365 oltAdapterName string
366 onuAdapterName string
367 coreInstanceID string
368 defaultTimeout time.Duration
369 maxTimeout time.Duration
370 logicalDevice *voltha.LogicalDevice
371 deviceIds []string
372 done chan int
373}
374
375func newLDATest() *LDATest {
376 test := &LDATest{}
377 // Start the embedded etcd server
378 var err error
379 test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
380 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000381 logger.Fatal(err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500382 }
383 // Create the kafka client
Matteo Scandolod525ae32020-04-02 17:27:29 -0700384 test.kClient = mock_kafka.NewKafkaClient()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500385 test.oltAdapterName = "olt_adapter_mock"
386 test.onuAdapterName = "onu_adapter_mock"
387 test.coreInstanceID = "rw-da-test"
388 test.defaultTimeout = 5 * time.Second
389 test.maxTimeout = 20 * time.Second
390 test.done = make(chan int)
391 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
392 test.logicalDevice = &voltha.LogicalDevice{
393 Desc: &ofp.OfpDesc{
394 HwDesc: "olt_adapter_mock",
395 SwDesc: "olt_adapter_mock",
396 SerialNum: com.GetRandomSerialNumber(),
397 },
398 SwitchFeatures: &ofp.OfpSwitchFeatures{
399 NBuffers: 256,
400 NTables: 2,
401 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
402 ofp.OfpCapabilities_OFPC_TABLE_STATS |
403 ofp.OfpCapabilities_OFPC_PORT_STATS |
404 ofp.OfpCapabilities_OFPC_GROUP_STATS),
405 },
406 RootDeviceId: test.deviceIds[0],
407 Ports: []*voltha.LogicalPort{
408 {
409 Id: "1001",
410 DeviceId: test.deviceIds[0],
411 DevicePortNo: 1,
412 RootPort: true,
413 OfpPort: &ofp.OfpPort{
414 PortNo: 1,
415 Name: "port1",
416 Config: 4,
417 State: 4,
418 },
419 },
420 {
421 Id: "1002",
422 DeviceId: test.deviceIds[1],
423 DevicePortNo: 2,
424 RootPort: false,
425 OfpPort: &ofp.OfpPort{
426 PortNo: 2,
427 Name: "port2",
428 Config: 4,
429 State: 4,
430 },
431 },
432 {
433 Id: "1003",
434 DeviceId: test.deviceIds[2],
435 DevicePortNo: 3,
436 RootPort: false,
437 OfpPort: &ofp.OfpPort{
438 PortNo: 4,
439 Name: "port3",
440 Config: 4,
441 State: 4,
442 },
443 },
444 },
445 }
446 return test
447}
448
449func (lda *LDATest) startCore(inCompeteMode bool) {
Thomas Lee Se5a44012019-11-07 20:32:24 +0530450 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500451 cfg := config.NewRWCoreFlags()
452 cfg.CorePairTopic = "rw_core"
khenaidoo442e7c72020-03-10 16:13:48 -0400453 cfg.DefaultRequestTimeout = lda.defaultTimeout
khenaidoo6e55d9e2019-12-12 18:26:26 -0500454 cfg.KVStorePort = lda.kvClientPort
455 cfg.InCompetingMode = inCompeteMode
456 grpcPort, err := freeport.GetFreePort()
457 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000458 logger.Fatal("Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500459 }
460 cfg.GrpcPort = grpcPort
461 cfg.GrpcHost = "127.0.0.1"
462 setCoreCompeteMode(inCompeteMode)
463 client := setupKVClient(cfg, lda.coreInstanceID)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530464 lda.core = NewCore(ctx, lda.coreInstanceID, cfg, client, lda.kClient)
465 err = lda.core.Start(ctx)
466 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000467 logger.Fatal("Cannot start core")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530468 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500469}
470
471func (lda *LDATest) stopAll() {
472 if lda.kClient != nil {
473 lda.kClient.Stop()
474 }
475 if lda.core != nil {
476 lda.core.Stop(context.Background())
477 }
478 if lda.etcdServer != nil {
479 stopEmbeddedEtcdServer(lda.etcdServer)
480 }
481}
482
483func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalDeviceAgent {
484 lDeviceMgr := lda.core.logicalDeviceMgr
485 deviceMgr := lda.core.deviceMgr
486 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
487 clonedLD.Id = com.GetRandomString(10)
488 clonedLD.DatapathId = rand.Uint64()
David Bainbridged1afd662020-03-26 18:27:41 -0700489 lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500490 lDeviceAgent.logicalDevice = clonedLD
Kent Hagerman4f355f52020-03-30 16:01:33 -0400491 err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "logical_devices", clonedLD.Id, clonedLD)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530492 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500493 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
494 return lDeviceAgent
495}
496
497func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) {
khenaidoo442e7c72020-03-10 16:13:48 -0400498 originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500499 assert.NotNil(t, originalLogicalDevice)
500 var localWG sync.WaitGroup
501
502 // Change the state of the first port to FAILED
503 localWG.Add(1)
504 go func() {
npujar467fe752020-01-16 20:17:45 +0530505 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500506 assert.Nil(t, err)
507 localWG.Done()
508 }()
509
510 // Change the state of the second port to TESTING
511 localWG.Add(1)
512 go func() {
npujar467fe752020-01-16 20:17:45 +0530513 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500514 assert.Nil(t, err)
515 localWG.Done()
516 }()
517
518 // Change the state of the third port to UNKNOWN and then back to ACTIVE
519 localWG.Add(1)
520 go func() {
npujar467fe752020-01-16 20:17:45 +0530521 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500522 assert.Nil(t, err)
npujar467fe752020-01-16 20:17:45 +0530523 err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500524 assert.Nil(t, err)
525 localWG.Done()
526 }()
527
528 // Add a meter to the logical device
529 meterMod := &ofp.OfpMeterMod{
530 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
531 Flags: rand.Uint32(),
532 MeterId: rand.Uint32(),
533 Bands: []*ofp.OfpMeterBandHeader{
534 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
535 Rate: rand.Uint32(),
536 BurstSize: rand.Uint32(),
537 Data: nil,
538 },
539 },
540 }
541 localWG.Add(1)
542 go func() {
npujar467fe752020-01-16 20:17:45 +0530543 err := ldAgent.meterAdd(context.Background(), meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500544 assert.Nil(t, err)
545 localWG.Done()
546 }()
547
548 // wait for go routines to be done
549 localWG.Wait()
550
551 expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
552 expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
553 expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
554 expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
555 expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
556 expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
557 expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
558 expectedChange.Meters = &voltha.Meters{Items: nil}
559 expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
khenaidoo442e7c72020-03-10 16:13:48 -0400560 updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500561 assert.NotNil(t, updatedLogicalDevice)
562 assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
563 globalWG.Done()
564}
565
566func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
567 lda := newLDATest()
568 assert.NotNil(t, lda)
569 defer lda.stopAll()
570
571 // Start the Core
572 lda.startCore(false)
573
574 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400575 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500576 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
577 wg.Add(1)
578 a := lda.createLogicalDeviceAgent(t)
579 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
580 }
581
582 wg.Wait()
583}