blob: 53e355e517a1a9bd086c16ae20c3f2e53b398836 [file] [log] [blame]
khenaidoo2bc48282019-07-16 18:13:46 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package core
17
18import (
khenaidoo6e55d9e2019-12-12 18:26:26 -050019 "context"
20 "github.com/gogo/protobuf/proto"
21 "github.com/opencord/voltha-go/rw_core/config"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080022 com "github.com/opencord/voltha-lib-go/v3/pkg/adapters/common"
23 fu "github.com/opencord/voltha-lib-go/v3/pkg/flows"
24 "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
serkant.uluderya2ae470f2020-01-21 11:13:09 -080025 lm "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
26 ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
27 "github.com/opencord/voltha-protos/v3/go/voltha"
khenaidoo6e55d9e2019-12-12 18:26:26 -050028 "github.com/phayes/freeport"
khenaidoo2bc48282019-07-16 18:13:46 -040029 "github.com/stretchr/testify/assert"
khenaidoo6e55d9e2019-12-12 18:26:26 -050030 "math/rand"
31 "sync"
khenaidoo2bc48282019-07-16 18:13:46 -040032 "testing"
khenaidoo6e55d9e2019-12-12 18:26:26 -050033 "time"
khenaidoo2bc48282019-07-16 18:13:46 -040034)
35
36func TestLogicalDeviceAgent_diff_nochange_1(t *testing.T) {
37 currentLogicalPorts := []*voltha.LogicalPort{}
38 updatedLogicalPorts := []*voltha.LogicalPort{}
39 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
40 assert.Equal(t, 0, len(newPorts))
41 assert.Equal(t, 0, len(changedPorts))
42 assert.Equal(t, 0, len(deletedPorts))
43}
44
45func TestLogicalDeviceAgent_diff_nochange_2(t *testing.T) {
46 currentLogicalPorts := []*voltha.LogicalPort{
47 {
48 Id: "1231",
49 DeviceId: "d1234",
50 DevicePortNo: 1,
51 RootPort: true,
52 OfpPort: &ofp.OfpPort{
53 PortNo: 1,
54 Name: "port1",
55 Config: 1,
56 State: 1,
57 },
58 },
59 {
60 Id: "1232",
61 DeviceId: "d1234",
62 DevicePortNo: 2,
63 RootPort: false,
64 OfpPort: &ofp.OfpPort{
65 PortNo: 2,
66 Name: "port2",
67 Config: 1,
68 State: 1,
69 },
70 },
71 {
72 Id: "1233",
73 DeviceId: "d1234",
74 DevicePortNo: 3,
75 RootPort: false,
76 OfpPort: &ofp.OfpPort{
77 PortNo: 3,
78 Name: "port3",
79 Config: 1,
80 State: 1,
81 },
82 },
83 }
84 updatedLogicalPorts := []*voltha.LogicalPort{
85 {
86 Id: "1231",
87 DeviceId: "d1234",
88 DevicePortNo: 1,
89 RootPort: true,
90 OfpPort: &ofp.OfpPort{
91 PortNo: 1,
92 Name: "port1",
93 Config: 1,
94 State: 1,
95 },
96 },
97 {
98 Id: "1232",
99 DeviceId: "d1234",
100 DevicePortNo: 2,
101 RootPort: false,
102 OfpPort: &ofp.OfpPort{
103 PortNo: 2,
104 Name: "port2",
105 Config: 1,
106 State: 1,
107 },
108 },
109 {
110 Id: "1233",
111 DeviceId: "d1234",
112 DevicePortNo: 3,
113 RootPort: false,
114 OfpPort: &ofp.OfpPort{
115 PortNo: 3,
116 Name: "port3",
117 Config: 1,
118 State: 1,
119 },
120 },
121 }
122 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
123 assert.Equal(t, 0, len(newPorts))
124 assert.Equal(t, 0, len(changedPorts))
125 assert.Equal(t, 0, len(deletedPorts))
126}
127
128func TestLogicalDeviceAgent_diff_add(t *testing.T) {
129 currentLogicalPorts := []*voltha.LogicalPort{}
130 updatedLogicalPorts := []*voltha.LogicalPort{
131 {
132 Id: "1231",
133 DeviceId: "d1234",
134 DevicePortNo: 1,
135 RootPort: true,
136 OfpPort: &ofp.OfpPort{
137 PortNo: 1,
138 Name: "port1",
139 Config: 1,
140 State: 1,
141 },
142 },
143 {
144 Id: "1232",
145 DeviceId: "d1234",
146 DevicePortNo: 2,
147 RootPort: true,
148 OfpPort: &ofp.OfpPort{
149 PortNo: 2,
150 Name: "port2",
151 Config: 1,
152 State: 1,
153 },
154 },
155 }
156 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
157 assert.Equal(t, 2, len(newPorts))
158 assert.Equal(t, 0, len(changedPorts))
159 assert.Equal(t, 0, len(deletedPorts))
160 assert.Equal(t, updatedLogicalPorts[0], newPorts[0])
161 assert.Equal(t, updatedLogicalPorts[1], newPorts[1])
162}
163
164func TestLogicalDeviceAgent_diff_delete(t *testing.T) {
165 currentLogicalPorts := []*voltha.LogicalPort{
166 {
167 Id: "1231",
168 DeviceId: "d1234",
169 DevicePortNo: 1,
170 RootPort: true,
171 OfpPort: &ofp.OfpPort{
172 PortNo: 1,
173 Name: "port1",
174 Config: 1,
175 State: 1,
176 },
177 },
178 }
179 updatedLogicalPorts := []*voltha.LogicalPort{}
180 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
181 assert.Equal(t, 0, len(newPorts))
182 assert.Equal(t, 0, len(changedPorts))
183 assert.Equal(t, 1, len(deletedPorts))
184 assert.Equal(t, currentLogicalPorts[0], deletedPorts[0])
185}
186
187func TestLogicalDeviceAgent_diff_changed(t *testing.T) {
188 currentLogicalPorts := []*voltha.LogicalPort{
189 {
190 Id: "1231",
191 DeviceId: "d1234",
192 DevicePortNo: 1,
193 RootPort: true,
194 OfpPort: &ofp.OfpPort{
195 PortNo: 1,
196 Name: "port1",
197 Config: 1,
198 State: 1,
199 },
200 },
201 {
202 Id: "1232",
203 DeviceId: "d1234",
204 DevicePortNo: 2,
205 RootPort: false,
206 OfpPort: &ofp.OfpPort{
207 PortNo: 2,
208 Name: "port2",
209 Config: 1,
210 State: 1,
211 },
212 },
213 {
214 Id: "1233",
215 DeviceId: "d1234",
216 DevicePortNo: 3,
217 RootPort: false,
218 OfpPort: &ofp.OfpPort{
219 PortNo: 3,
220 Name: "port3",
221 Config: 1,
222 State: 1,
223 },
224 },
225 }
226 updatedLogicalPorts := []*voltha.LogicalPort{
227 {
228 Id: "1231",
229 DeviceId: "d1234",
230 DevicePortNo: 1,
231 RootPort: true,
232 OfpPort: &ofp.OfpPort{
233 PortNo: 1,
234 Name: "port1",
235 Config: 4,
236 State: 4,
237 },
238 },
239 {
240 Id: "1232",
241 DeviceId: "d1234",
242 DevicePortNo: 2,
243 RootPort: false,
244 OfpPort: &ofp.OfpPort{
245 PortNo: 2,
246 Name: "port2",
247 Config: 4,
248 State: 4,
249 },
250 },
251 {
252 Id: "1233",
253 DeviceId: "d1234",
254 DevicePortNo: 3,
255 RootPort: false,
256 OfpPort: &ofp.OfpPort{
257 PortNo: 3,
258 Name: "port3",
259 Config: 1,
260 State: 1,
261 },
262 },
263 }
264 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
265 assert.Equal(t, 0, len(newPorts))
266 assert.Equal(t, 2, len(changedPorts))
267 assert.Equal(t, 0, len(deletedPorts))
268 assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
269 assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
270}
271
272func TestLogicalDeviceAgent_diff_mix(t *testing.T) {
273 currentLogicalPorts := []*voltha.LogicalPort{
274 {
275 Id: "1231",
276 DeviceId: "d1234",
277 DevicePortNo: 1,
278 RootPort: true,
279 OfpPort: &ofp.OfpPort{
280 PortNo: 1,
281 Name: "port1",
282 Config: 1,
283 State: 1,
284 },
285 },
286 {
287 Id: "1232",
288 DeviceId: "d1234",
289 DevicePortNo: 2,
290 RootPort: false,
291 OfpPort: &ofp.OfpPort{
292 PortNo: 2,
293 Name: "port2",
294 Config: 1,
295 State: 1,
296 },
297 },
298 {
299 Id: "1233",
300 DeviceId: "d1234",
301 DevicePortNo: 3,
302 RootPort: false,
303 OfpPort: &ofp.OfpPort{
304 PortNo: 3,
305 Name: "port3",
306 Config: 1,
307 State: 1,
308 },
309 },
310 }
311 updatedLogicalPorts := []*voltha.LogicalPort{
312 {
313 Id: "1231",
314 DeviceId: "d1234",
315 DevicePortNo: 1,
316 RootPort: true,
317 OfpPort: &ofp.OfpPort{
318 PortNo: 1,
319 Name: "port1",
320 Config: 4,
321 State: 4,
322 },
323 },
324 {
325 Id: "1232",
326 DeviceId: "d1234",
327 DevicePortNo: 2,
328 RootPort: false,
329 OfpPort: &ofp.OfpPort{
330 PortNo: 2,
331 Name: "port2",
332 Config: 4,
333 State: 4,
334 },
335 },
336 {
337 Id: "1234",
338 DeviceId: "d1234",
339 DevicePortNo: 4,
340 RootPort: false,
341 OfpPort: &ofp.OfpPort{
342 PortNo: 4,
343 Name: "port4",
344 Config: 4,
345 State: 4,
346 },
347 },
348 }
349 newPorts, changedPorts, deletedPorts := diff(currentLogicalPorts, updatedLogicalPorts)
350 assert.Equal(t, 1, len(newPorts))
351 assert.Equal(t, 2, len(changedPorts))
352 assert.Equal(t, 1, len(deletedPorts))
353 assert.Equal(t, updatedLogicalPorts[0], changedPorts[0])
354 assert.Equal(t, updatedLogicalPorts[1], changedPorts[1])
355 assert.Equal(t, currentLogicalPorts[2], deletedPorts[0])
356}
khenaidoo6e55d9e2019-12-12 18:26:26 -0500357
358type LDATest struct {
359 etcdServer *lm.EtcdServer
360 core *Core
361 kClient kafka.Client
362 kvClientPort int
363 oltAdapterName string
364 onuAdapterName string
365 coreInstanceID string
366 defaultTimeout time.Duration
367 maxTimeout time.Duration
368 logicalDevice *voltha.LogicalDevice
369 deviceIds []string
370 done chan int
371}
372
373func newLDATest() *LDATest {
374 test := &LDATest{}
375 // Start the embedded etcd server
376 var err error
377 test.etcdServer, test.kvClientPort, err = startEmbeddedEtcdServer("voltha.rwcore.lda.test", "voltha.rwcore.lda.etcd", "error")
378 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000379 logger.Fatal(err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500380 }
381 // Create the kafka client
382 test.kClient = lm.NewKafkaClient()
383 test.oltAdapterName = "olt_adapter_mock"
384 test.onuAdapterName = "onu_adapter_mock"
385 test.coreInstanceID = "rw-da-test"
386 test.defaultTimeout = 5 * time.Second
387 test.maxTimeout = 20 * time.Second
388 test.done = make(chan int)
389 test.deviceIds = []string{com.GetRandomString(10), com.GetRandomString(10), com.GetRandomString(10)}
390 test.logicalDevice = &voltha.LogicalDevice{
391 Desc: &ofp.OfpDesc{
392 HwDesc: "olt_adapter_mock",
393 SwDesc: "olt_adapter_mock",
394 SerialNum: com.GetRandomSerialNumber(),
395 },
396 SwitchFeatures: &ofp.OfpSwitchFeatures{
397 NBuffers: 256,
398 NTables: 2,
399 Capabilities: uint32(ofp.OfpCapabilities_OFPC_FLOW_STATS |
400 ofp.OfpCapabilities_OFPC_TABLE_STATS |
401 ofp.OfpCapabilities_OFPC_PORT_STATS |
402 ofp.OfpCapabilities_OFPC_GROUP_STATS),
403 },
404 RootDeviceId: test.deviceIds[0],
405 Ports: []*voltha.LogicalPort{
406 {
407 Id: "1001",
408 DeviceId: test.deviceIds[0],
409 DevicePortNo: 1,
410 RootPort: true,
411 OfpPort: &ofp.OfpPort{
412 PortNo: 1,
413 Name: "port1",
414 Config: 4,
415 State: 4,
416 },
417 },
418 {
419 Id: "1002",
420 DeviceId: test.deviceIds[1],
421 DevicePortNo: 2,
422 RootPort: false,
423 OfpPort: &ofp.OfpPort{
424 PortNo: 2,
425 Name: "port2",
426 Config: 4,
427 State: 4,
428 },
429 },
430 {
431 Id: "1003",
432 DeviceId: test.deviceIds[2],
433 DevicePortNo: 3,
434 RootPort: false,
435 OfpPort: &ofp.OfpPort{
436 PortNo: 4,
437 Name: "port3",
438 Config: 4,
439 State: 4,
440 },
441 },
442 },
443 }
444 return test
445}
446
447func (lda *LDATest) startCore(inCompeteMode bool) {
Thomas Lee Se5a44012019-11-07 20:32:24 +0530448 ctx := context.Background()
khenaidoo6e55d9e2019-12-12 18:26:26 -0500449 cfg := config.NewRWCoreFlags()
450 cfg.CorePairTopic = "rw_core"
khenaidoo442e7c72020-03-10 16:13:48 -0400451 cfg.DefaultRequestTimeout = lda.defaultTimeout
khenaidoo6e55d9e2019-12-12 18:26:26 -0500452 cfg.KVStorePort = lda.kvClientPort
453 cfg.InCompetingMode = inCompeteMode
454 grpcPort, err := freeport.GetFreePort()
455 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000456 logger.Fatal("Cannot get a freeport for grpc")
khenaidoo6e55d9e2019-12-12 18:26:26 -0500457 }
458 cfg.GrpcPort = grpcPort
459 cfg.GrpcHost = "127.0.0.1"
460 setCoreCompeteMode(inCompeteMode)
461 client := setupKVClient(cfg, lda.coreInstanceID)
Thomas Lee Se5a44012019-11-07 20:32:24 +0530462 lda.core = NewCore(ctx, lda.coreInstanceID, cfg, client, lda.kClient)
463 err = lda.core.Start(ctx)
464 if err != nil {
Girish Kumarf56a4682020-03-20 20:07:46 +0000465 logger.Fatal("Cannot start core")
Thomas Lee Se5a44012019-11-07 20:32:24 +0530466 }
khenaidoo6e55d9e2019-12-12 18:26:26 -0500467}
468
469func (lda *LDATest) stopAll() {
470 if lda.kClient != nil {
471 lda.kClient.Stop()
472 }
473 if lda.core != nil {
474 lda.core.Stop(context.Background())
475 }
476 if lda.etcdServer != nil {
477 stopEmbeddedEtcdServer(lda.etcdServer)
478 }
479}
480
481func (lda *LDATest) createLogicalDeviceAgent(t *testing.T) *LogicalDeviceAgent {
482 lDeviceMgr := lda.core.logicalDeviceMgr
483 deviceMgr := lda.core.deviceMgr
484 clonedLD := proto.Clone(lda.logicalDevice).(*voltha.LogicalDevice)
485 clonedLD.Id = com.GetRandomString(10)
486 clonedLD.DatapathId = rand.Uint64()
487 lDeviceAgent := newLogicalDeviceAgent(clonedLD.Id, clonedLD.RootDeviceId, lDeviceMgr, deviceMgr, lDeviceMgr.clusterDataProxy, lDeviceMgr.defaultTimeout)
488 lDeviceAgent.logicalDevice = clonedLD
khenaidoo442e7c72020-03-10 16:13:48 -0400489 lDeviceAgent.requestQueue.Start()
Thomas Lee Se5a44012019-11-07 20:32:24 +0530490 added, err := lDeviceAgent.clusterDataProxy.AddWithID(context.Background(), "/logical_devices", clonedLD.Id, clonedLD, "")
491 assert.Nil(t, err)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500492 assert.NotNil(t, added)
493 lDeviceMgr.addLogicalDeviceAgentToMap(lDeviceAgent)
494 return lDeviceAgent
495}
496
497func (lda *LDATest) updateLogicalDeviceConcurrently(t *testing.T, ldAgent *LogicalDeviceAgent, globalWG *sync.WaitGroup) {
khenaidoo442e7c72020-03-10 16:13:48 -0400498 originalLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500499 assert.NotNil(t, originalLogicalDevice)
500 var localWG sync.WaitGroup
501
502 // Change the state of the first port to FAILED
503 localWG.Add(1)
504 go func() {
npujar467fe752020-01-16 20:17:45 +0530505 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[0].DeviceId, lda.logicalDevice.Ports[0].DevicePortNo, voltha.OperStatus_FAILED)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500506 assert.Nil(t, err)
507 localWG.Done()
508 }()
509
510 // Change the state of the second port to TESTING
511 localWG.Add(1)
512 go func() {
npujar467fe752020-01-16 20:17:45 +0530513 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[1].DeviceId, lda.logicalDevice.Ports[1].DevicePortNo, voltha.OperStatus_TESTING)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500514 assert.Nil(t, err)
515 localWG.Done()
516 }()
517
518 // Change the state of the third port to UNKNOWN and then back to ACTIVE
519 localWG.Add(1)
520 go func() {
npujar467fe752020-01-16 20:17:45 +0530521 err := ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_UNKNOWN)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500522 assert.Nil(t, err)
npujar467fe752020-01-16 20:17:45 +0530523 err = ldAgent.updatePortState(context.Background(), lda.logicalDevice.Ports[2].DeviceId, lda.logicalDevice.Ports[2].DevicePortNo, voltha.OperStatus_ACTIVE)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500524 assert.Nil(t, err)
525 localWG.Done()
526 }()
527
528 // Add a meter to the logical device
529 meterMod := &ofp.OfpMeterMod{
530 Command: ofp.OfpMeterModCommand_OFPMC_ADD,
531 Flags: rand.Uint32(),
532 MeterId: rand.Uint32(),
533 Bands: []*ofp.OfpMeterBandHeader{
534 {Type: ofp.OfpMeterBandType_OFPMBT_EXPERIMENTER,
535 Rate: rand.Uint32(),
536 BurstSize: rand.Uint32(),
537 Data: nil,
538 },
539 },
540 }
541 localWG.Add(1)
542 go func() {
npujar467fe752020-01-16 20:17:45 +0530543 err := ldAgent.meterAdd(context.Background(), meterMod)
khenaidoo6e55d9e2019-12-12 18:26:26 -0500544 assert.Nil(t, err)
545 localWG.Done()
546 }()
547
548 // wait for go routines to be done
549 localWG.Wait()
550
551 expectedChange := proto.Clone(originalLogicalDevice).(*voltha.LogicalDevice)
552 expectedChange.Ports[0].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
553 expectedChange.Ports[0].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
554 expectedChange.Ports[1].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config | uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
555 expectedChange.Ports[1].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LINK_DOWN)
556 expectedChange.Ports[2].OfpPort.Config = originalLogicalDevice.Ports[0].OfpPort.Config & ^uint32(ofp.OfpPortConfig_OFPPC_PORT_DOWN)
557 expectedChange.Ports[2].OfpPort.State = uint32(ofp.OfpPortState_OFPPS_LIVE)
558 expectedChange.Meters = &voltha.Meters{Items: nil}
559 expectedChange.Meters.Items = append(expectedChange.Meters.Items, fu.MeterEntryFromMeterMod(meterMod))
khenaidoo442e7c72020-03-10 16:13:48 -0400560 updatedLogicalDevice, _ := ldAgent.GetLogicalDevice(context.Background())
khenaidoo6e55d9e2019-12-12 18:26:26 -0500561 assert.NotNil(t, updatedLogicalDevice)
562 assert.True(t, proto.Equal(expectedChange, updatedLogicalDevice))
563 globalWG.Done()
564}
565
566func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
567 lda := newLDATest()
568 assert.NotNil(t, lda)
569 defer lda.stopAll()
570
571 // Start the Core
572 lda.startCore(false)
573
574 var wg sync.WaitGroup
khenaidoo442e7c72020-03-10 16:13:48 -0400575 numConCurrentLogicalDeviceAgents := 3
khenaidoo6e55d9e2019-12-12 18:26:26 -0500576 for i := 0; i < numConCurrentLogicalDeviceAgents; i++ {
577 wg.Add(1)
578 a := lda.createLogicalDeviceAgent(t)
579 go lda.updateLogicalDeviceConcurrently(t, a, &wg)
580 }
581
582 wg.Wait()
583}