blob: ece7c7b2f9a2fe6cf5505c74575ac8d7af00ebdd [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
20 "github.com/gogo/protobuf/proto"
21 "github.com/opencord/voltha-go/rw_core/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
23 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
24 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
25 "github.com/opencord/voltha-lib-go/v3/pkg/log"
26 lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
27 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
28 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050029 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040030 "github.com/stretchr/testify/assert"
khenaidoo6e55d9e2019-12-12 18:26:26 -050031 "math/rand"
32 "sync"
khenaidoo2bc48282019-07-16 18:13:46 -040033 "testing"
khenaidoo6e55d9e2019-12-12 18:26:26 -050034 "time"
khenaidoo2bc48282019-07-16 18:13:46 -040035)
36
37func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
38 currentLogicalPorts := []*voltha.LogicalPort{}
39 updatedLogicalPorts := []*voltha.LogicalPort{}
40 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
41 assert.Equal(t, 0, len(newPorts))
42 assert.Equal(t, 0, len(changedPorts))
43 assert.Equal(t, 0, len(deletedPorts))
44}
45
46func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) {
47 currentLogicalPorts := []*voltha.LogicalPort{
48 {
49 Id: "1231",
50 DeviceId: "d1234",
51 DevicePortNo: 1,
52 RootPort: true,
53 OfpPort: &ofp.OfpPort{
54 PortNo: 1,
55 Name: "port1",
56 Config: 1,
57 State: 1,
58 },
59 },
60 {
61 Id: "1232",
62 DeviceId: "d1234",
63 DevicePortNo: 2,
64 RootPort: false,
65 OfpPort: &ofp.OfpPort{
66 PortNo: 2,
67 Name: "port2",
68 Config: 1,
69 State: 1,
70 },
71 },
72 {
73 Id: "1233",
74 DeviceId: "d1234",
75 DevicePortNo: 3,
76 RootPort: false,
77 OfpPort: &ofp.OfpPort{
78 PortNo: 3,
79 Name: "port3",
80 Config: 1,
81 State: 1,
82 },
83 },
84 }
85 updatedLogicalPorts := []*voltha.LogicalPort{
86 {
87 Id: "1231",
88 DeviceId: "d1234",
89 DevicePortNo: 1,
90 RootPort: true,
91 OfpPort: &ofp.OfpPort{
92 PortNo: 1,
93 Name: "port1",
94 Config: 1,
95 State: 1,
96 },
97 },
98 {
99 Id: "1232",
100 DeviceId: "d1234",
101 DevicePortNo: 2,
102 RootPort: false,
103 OfpPort: &ofp.OfpPort{
104 PortNo: 2,
105 Name: "port2",
106 Config: 1,
107 State: 1,
108 },
109 },
110 {
111 Id: "1233",
112 DeviceId: "d1234",
113 DevicePortNo: 3,
114 RootPort: false,
115 OfpPort: &ofp.OfpPort{
116 PortNo: 3,
117 Name: "port3",
118 Config: 1,
119 State: 1,
120 },
121 },
122 }
123 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
124 assert.Equal(t, 0, len(newPorts))
125 assert.Equal(t, 0, len(changedPorts))
126 assert.Equal(t, 0, len(deletedPorts))
127}
128
129func TestLogicalDeviceAgent_diff_add(t *testing.T) {
130 currentLogicalPorts := []*voltha.LogicalPort{}
131 updatedLogicalPorts := []*voltha.LogicalPort{
132 {
133 Id: "1231",
134 DeviceId: "d1234",
135 DevicePortNo: 1,
136 RootPort: true,
137 OfpPort: &ofp.OfpPort{
138 PortNo: 1,
139 Name: "port1",
140 Config: 1,
141 State: 1,
142 },
143 },
144 {
145 Id: "1232",
146 DeviceId: "d1234",
147 DevicePortNo: 2,
148 RootPort: true,
149 OfpPort: &ofp.OfpPort{
150 PortNo: 2,
151 Name: "port2",
152 Config: 1,
153 State: 1,
154 },
155 },
156 }
157 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
158 assert.Equal(t, 2, len(newPorts))
159 assert.Equal(t, 0, len(changedPorts))
160 assert.Equal(t, 0, len(deletedPorts))
161 assert.Equal(t, updatedLogicalPorts[0], newPorts[0])
162 assert.Equal(t, updatedLogicalPorts[1], newPorts[1])
163}
164
165func TestLogicalDeviceAgent_diff_delete(t *testing.T) {
166 currentLogicalPorts := []*voltha.LogicalPort{
167 {
168 Id: "1231",
169 DeviceId: "d1234",
170 DevicePortNo: 1,
171 RootPort: true,
172 OfpPort: &ofp.OfpPort{
173 PortNo: 1,
174 Name: "port1",
175 Config: 1,
176 State: 1,
177 },
178 },
179 }
180 updatedLogicalPorts := []*voltha.LogicalPort{}
181 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
182 assert.Equal(t, 0, len(newPorts))
183 assert.Equal(t, 0, len(changedPorts))
184 assert.Equal(t, 1, len(deletedPorts))
185 assert.Equal(t, currentLogicalPorts[0], deletedPorts[0])
186}
187
188func TestLogicalDeviceAgent_diff_changed(t *testing.T) {
189 currentLogicalPorts := []*voltha.LogicalPort{
190 {
191 Id: "1231",
192 DeviceId: "d1234",
193 DevicePortNo: 1,
194 RootPort: true,
195 OfpPort: &ofp.OfpPort{
196 PortNo: 1,
197 Name: "port1",
198 Config: 1,
199 State: 1,
200 },
201 },
202 {
203 Id: "1232",
204 DeviceId: "d1234",
205 DevicePortNo: 2,
206 RootPort: false,
207 OfpPort: &ofp.OfpPort{
208 PortNo: 2,
209 Name: "port2",
210 Config: 1,
211 State: 1,
212 },
213 },
214 {
215 Id: "1233",
216 DeviceId: "d1234",
217 DevicePortNo: 3,
218 RootPort: false,
219 OfpPort: &ofp.OfpPort{
220 PortNo: 3,
221 Name: "port3",
222 Config: 1,
223 State: 1,
224 },
225 },
226 }
227 updatedLogicalPorts := []*voltha.LogicalPort{
228 {
229 Id: "1231",
230 DeviceId: "d1234",
231 DevicePortNo: 1,
232 RootPort: true,
233 OfpPort: &ofp.OfpPort{
234 PortNo: 1,
235 Name: "port1",
236 Config: 4,
237 State: 4,
238 },
239 },
240 {
241 Id: "1232",
242 DeviceId: "d1234",
243 DevicePortNo: 2,
244 RootPort: false,
245 OfpPort: &ofp.OfpPort{
246 PortNo: 2,
247 Name: "port2",
248 Config: 4,
249 State: 4,
250 },
251 },
252 {
253 Id: "1233",
254 DeviceId: "d1234",
255 DevicePortNo: 3,
256 RootPort: false,
257 OfpPort: &ofp.OfpPort{
258 PortNo: 3,
259 Name: "port3",
260 Config: 1,
261 State: 1,
262 },
263 },
264 }
265 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
266 assert.Equal(t, 0, len(newPorts))
267 assert.Equal(t, 2, len(changedPorts))
268 assert.Equal(t, 0, len(deletedPorts))
269 assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
270 assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
271}
272
273func TestLogicalDeviceAgent_diff_mix(t *testing.T) {
274 currentLogicalPorts := []*voltha.LogicalPort{
275 {
276 Id: "1231",
277 DeviceId: "d1234",
278 DevicePortNo: 1,
279 RootPort: true,
280 OfpPort: &ofp.OfpPort{
281 PortNo: 1,
282 Name: "port1",
283 Config: 1,
284 State: 1,
285 },
286 },
287 {
288 Id: "1232",
289 DeviceId: "d1234",
290 DevicePortNo: 2,
291 RootPort: false,
292 OfpPort: &ofp.OfpPort{
293 PortNo: 2,
294 Name: "port2",
295 Config: 1,
296 State: 1,
297 },
298 },
299 {
300 Id: "1233",
301 DeviceId: "d1234",
302 DevicePortNo: 3,
303 RootPort: false,
304 OfpPort: &ofp.OfpPort{
305 PortNo: 3,
306 Name: "port3",
307 Config: 1,
308 State: 1,
309 },
310 },
311 }
312 updatedLogicalPorts := []*voltha.LogicalPort{
313 {
314 Id: "1231",
315 DeviceId: "d1234",
316 DevicePortNo: 1,
317 RootPort: true,
318 OfpPort: &ofp.OfpPort{
319 PortNo: 1,
320 Name: "port1",
321 Config: 4,
322 State: 4,
323 },
324 },
325 {
326 Id: "1232",
327 DeviceId: "d1234",
328 DevicePortNo: 2,
329 RootPort: false,
330 OfpPort: &ofp.OfpPort{
331 PortNo: 2,
332 Name: "port2",
333 Config: 4,
334 State: 4,
335 },
336 },
337 {
338 Id: "1234",
339 DeviceId: "d1234",
340 DevicePortNo: 4,
341 RootPort: false,
342 OfpPort: &ofp.OfpPort{
343 PortNo: 4,
344 Name: "port4",
345 Config: 4,
346 State: 4,
347 },
348 },
349 }
350 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
351 assert.Equal(t, 1, len(newPorts))
352 assert.Equal(t, 2, len(changedPorts))
353 assert.Equal(t, 1, len(deletedPorts))
354 assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
355 assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
356 assert.Equal(t, currentLogicalPorts[2], deletedPorts[0])
357}
khenaidoo6e55d9e2019-12-12 18:26:26 -0500358
359type LDATest struct {
360 etcdServer *lm.EtcdServer
361 core *Core
362 kClient kafka.Client
363 kvClientPort int
364 oltAdapterName string
365 onuAdapterName string
366 coreInstanceID string
367 defaultTimeout time.Duration
368 maxTimeout time.Duration
369 logicalDevice *voltha.LogicalDevice
370 deviceIds []string
371 done chan int
372}
373
374func newLDATest() *LDATest {
375 test := &LDATest{}
376 // Start the embedded etcd server
377 var err error
378 test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
379 if err != nil {
380 log.Fatal(err)
381 }
382 // Create the kafka client
383 test.kClient = lm.NewKafkaClient()
384 test.oltAdapterName = "olt_adapter_mock"
385 test.onuAdapterName = "onu_adapter_mock"
386 test.coreInstanceID = "rw-da-test"
387 test.defaultTimeout = 5 * time.Second
388 test.maxTimeout = 20 * time.Second
389 test.done = make(chan int)
390 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
391 test.logicalDevice = &voltha.LogicalDevice{
392 Desc: &ofp.OfpDesc{
393 HwDesc: "olt_adapter_mock",
394 SwDesc: "olt_adapter_mock",
395 SerialNum: com.GetRandomSerialNumber(),
396 },
397 SwitchFeatures: &ofp.OfpSwitchFeatures{
398 NBuffers: 256,
399 NTables: 2,
400 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
401 ofp.OfpCapabilities_OFPC_TABLE_STATS |
402 ofp.OfpCapabilities_OFPC_PORT_STATS |
403 ofp.OfpCapabilities_OFPC_GROUP_STATS),
404 },
405 RootDeviceId: test.deviceIds[0],
406 Ports: []*voltha.LogicalPort{
407 {
408 Id: "1001",
409 DeviceId: test.deviceIds[0],
410 DevicePortNo: 1,
411 RootPort: true,
412 OfpPort: &ofp.OfpPort{
413 PortNo: 1,
414 Name: "port1",
415 Config: 4,
416 State: 4,
417 },
418 },
419 {
420 Id: "1002",
421 DeviceId: test.deviceIds[1],
422 DevicePortNo: 2,
423 RootPort: false,
424 OfpPort: &ofp.OfpPort{
425 PortNo: 2,
426 Name: "port2",
427 Config: 4,
428 State: 4,
429 },
430 },
431 {
432 Id: "1003",
433 DeviceId: test.deviceIds[2],
434 DevicePortNo: 3,
435 RootPort: false,
436 OfpPort: &ofp.OfpPort{
437 PortNo: 4,
438 Name: "port3",
439 Config: 4,
440 State: 4,
441 },
442 },
443 },
444 }
445 return test
446}
447
448func (lda *LDATest) startCore(inCompeteMode bool) {
Thomas Lee Se5a44012019-11-07 20:32:24 +0530449 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500450 cfg := config.NewRWCoreFlags()
451 cfg.CorePairTopic = "rw_core"
452 cfg.DefaultRequestTimeout = lda.defaultTimeout.Nanoseconds() / 1000000 //TODO: change when Core changes to Duration
453 cfg.KVStorePort = lda.kvClientPort
454 cfg.InCompetingMode = inCompeteMode
455 grpcPort, err := freeport.GetFreePort()
456 if err != nil {
457 log.Fatal("Cannot get a freeport for grpc")
458 }
459 cfg.GrpcPort = grpcPort
460 cfg.GrpcHost = "127.0.0.1"
461 setCoreCompeteMode(inCompeteMode)
462 client := setupKVClient(cfg, lda.coreInstanceID)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530463 lda.core = NewCore(ctx, lda.coreInstanceID, cfg, client, lda.kClient)
464 err = lda.core.Start(ctx)
465 if err != nil {
466 log.Fatal("Cannot start core")
467 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500468}
469
470func (lda *LDATest) stopAll() {
471 if lda.kClient != nil {
472 lda.kClient.Stop()
473 }
474 if lda.core != nil {
475 lda.core.Stop(context.Background())
476 }
477 if lda.etcdServer != nil {
478 stopEmbeddedEtcdServer(lda.etcdServer)
479 }
480}
481
482func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalDeviceAgent {
483 lDeviceMgr := lda.core.logicalDeviceMgr
484 deviceMgr := lda.core.deviceMgr
485 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
486 clonedLD.Id = com.GetRandomString(10)
487 clonedLD.DatapathId = rand.Uint64()
488 lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
489 lDeviceAgent.logicalDevice = clonedLD
Thomas Lee Se5a44012019-11-07 20:32:24 +0530490 added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
491 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500492 assert.NotNil(t, added)
493 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
494 return lDeviceAgent
495}
496
497func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) {
498 originalLogicalDevice := ldAgent.GetLogicalDevice()
499 assert.NotNil(t, originalLogicalDevice)
500 var localWG sync.WaitGroup
501
502 // Change the state of the first port to FAILED
503 localWG.Add(1)
504 go func() {
npujar467fe752020-01-16 20:17:45 +0530505 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500506 assert.Nil(t, err)
507 localWG.Done()
508 }()
509
510 // Change the state of the second port to TESTING
511 localWG.Add(1)
512 go func() {
npujar467fe752020-01-16 20:17:45 +0530513 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500514 assert.Nil(t, err)
515 localWG.Done()
516 }()
517
518 // Change the state of the third port to UNKNOWN and then back to ACTIVE
519 localWG.Add(1)
520 go func() {
npujar467fe752020-01-16 20:17:45 +0530521 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500522 assert.Nil(t, err)
npujar467fe752020-01-16 20:17:45 +0530523 err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500524 assert.Nil(t, err)
525 localWG.Done()
526 }()
527
528 // Add a meter to the logical device
529 meterMod := &ofp.OfpMeterMod{
530 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
531 Flags: rand.Uint32(),
532 MeterId: rand.Uint32(),
533 Bands: []*ofp.OfpMeterBandHeader{
534 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
535 Rate: rand.Uint32(),
536 BurstSize: rand.Uint32(),
537 Data: nil,
538 },
539 },
540 }
541 localWG.Add(1)
542 go func() {
npujar467fe752020-01-16 20:17:45 +0530543 err := ldAgent.meterAdd(context.Background(), meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500544 assert.Nil(t, err)
545 localWG.Done()
546 }()
547
548 // wait for go routines to be done
549 localWG.Wait()
550
551 expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
552 expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
553 expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
554 expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
555 expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
556 expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
557 expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
558 expectedChange.Meters = &voltha.Meters{Items: nil}
559 expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
560 updatedLogicalDevice := ldAgent.GetLogicalDevice()
561 assert.NotNil(t, updatedLogicalDevice)
562 assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
563 globalWG.Done()
564}
565
566func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
567 lda := newLDATest()
568 assert.NotNil(t, lda)
569 defer lda.stopAll()
570
571 // Start the Core
572 lda.startCore(false)
573
574 var wg sync.WaitGroup
575 numConCurrentLogicalDeviceAgents := 20
576 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
577 wg.Add(1)
578 a := lda.createLogicalDeviceAgent(t)
579 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
580 }
581
582 wg.Wait()
583}