blob: d6cabdaf01a86705249a14c7d3ddae66f47072ab [file] [log] [blame]
khenaidood948f772021-08-11 17:49:24 -04001/*
2 * Copyright 2019-present Open Networking Foundation
3 *
4 * Licensed under the Apache License, Version 2.0 (the "License");
5 * you may not use this file except in compliance with the License.
6 * You may obtain a copy of the License at
7 *
8 * http://www.apache.org/licenses/LICENSE-2.0
9 *
10 * Unless required by applicable law or agreed to in writing, software
11 * distributed under the License is distributed on an "AS IS" BASIS,
12 * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
13 * See the License for the specific language governing permissions and
14 * limitations under the License.
15 */
16package test
17
18import (
19 "context"
20 "fmt"
21 "strings"
22 "sync"
23 "time"
24
25 "github.com/opencord/voltha-lib-go/v7/pkg/log"
26 "github.com/opencord/voltha-protos/v5/go/common"
27 ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
28
29 "github.com/golang/protobuf/ptypes/empty"
30 "github.com/opencord/voltha-protos/v5/go/voltha"
31)
32
33var retryInterval = 50 * time.Millisecond
34
35type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
36type isLogicalDevicePortsConditionSatisfied func(ports []*voltha.LogicalPort) bool
37type isDeviceConditionSatisfied func(ld *voltha.Device) bool
38type isDevicePortsConditionSatisfied func(ports *voltha.Ports) bool
39type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
40type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
41type isConditionSatisfied func() bool
42
43func getContext() context.Context {
44 return context.Background()
45}
46
47func setRetryInterval(interval time.Duration) {
48 retryInterval = interval
49}
50
51func waitUntilDeviceReadiness(deviceID string,
52 timeout time.Duration,
53 verificationFunction isDeviceConditionSatisfied,
54 nbi voltha.VolthaServiceClient) error {
55 ch := make(chan int, 1)
56 done := false
57 go func() {
58 for {
59 device, _ := nbi.GetDevice(getContext(), &common.ID{Id: deviceID})
60 if verificationFunction(device) {
61 ch <- 1
62 break
63 }
64 if done {
65 break
66 }
67 time.Sleep(retryInterval)
68 }
69 }()
70 timer := time.NewTimer(timeout)
71 defer timer.Stop()
72 select {
73 case <-ch:
74 return nil
75 case <-timer.C:
76 done = true
77 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
78 }
79}
80
81func waitUntilDevicePortsReadiness(deviceID string,
82 timeout time.Duration,
83 verificationFunction isDevicePortsConditionSatisfied,
84 nbi voltha.VolthaServiceClient) error {
85 ch := make(chan int, 1)
86 done := false
87 go func() {
88 for {
89 ports, _ := nbi.ListDevicePorts(getContext(), &common.ID{Id: deviceID})
90 if verificationFunction(ports) {
91 ch <- 1
92 break
93 }
94 if done {
95 break
96 }
97 time.Sleep(retryInterval)
98 }
99 }()
100 timer := time.NewTimer(timeout)
101 defer timer.Stop()
102 select {
103 case <-ch:
104 return nil
105 case <-timer.C:
106 done = true
107 return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
108 }
109}
110
111func waitUntilLogicalDeviceReadiness(oltDeviceID string,
112 timeout time.Duration,
113 nbi voltha.VolthaServiceClient,
114 verificationFunction isLogicalDeviceConditionSatisfied,
115) error {
116 ch := make(chan int, 1)
117 done := false
118 go func() {
119 for {
120 // Get the logical device from the olt device
121 d, _ := nbi.GetDevice(getContext(), &common.ID{Id: oltDeviceID})
122 if d != nil && d.ParentId != "" {
123 ld, _ := nbi.GetLogicalDevice(getContext(), &common.ID{Id: d.ParentId})
124 if verificationFunction(ld) {
125 ch <- 1
126 break
127 }
128 if done {
129 break
130 }
131 } else if d != nil && d.ParentId == "" { // case where logical device deleted
132 if verificationFunction(nil) {
133 ch <- 1
134 break
135 }
136 if done {
137 break
138 }
139 }
140 time.Sleep(retryInterval)
141 }
142 }()
143 timer := time.NewTimer(timeout)
144 defer timer.Stop()
145 select {
146 case <-ch:
147 return nil
148 case <-timer.C:
149 done = true
150 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
151 }
152}
153
154func waitUntilLogicalDevicePortsReadiness(oltDeviceID string,
155 timeout time.Duration,
156 nbi voltha.VolthaServiceClient,
157 verificationFunction isLogicalDevicePortsConditionSatisfied,
158) error {
159 ch := make(chan int, 1)
160 done := false
161 go func() {
162 for {
163 // Get the logical device from the olt device
164 d, _ := nbi.GetDevice(getContext(), &common.ID{Id: oltDeviceID})
165 if d != nil && d.ParentId != "" {
166 ports, err := nbi.ListLogicalDevicePorts(getContext(), &common.ID{Id: d.ParentId})
167 if err == nil && verificationFunction(ports.Items) {
168 ch <- 1
169 break
170 }
171 if done {
172 break
173 }
174 }
175 time.Sleep(retryInterval)
176 }
177 }()
178 timer := time.NewTimer(timeout)
179 defer timer.Stop()
180 select {
181 case <-ch:
182 return nil
183 case <-timer.C:
184 done = true
185 return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
186 }
187}
188
189func waitUntilConditionForDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, verificationFunction isDevicesConditionSatisfied) error {
190 ch := make(chan int, 1)
191 done := false
192 go func() {
193 for {
194 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
195 if verificationFunction(devices) {
196 ch <- 1
197 break
198 }
199 if done {
200 break
201 }
202
203 time.Sleep(retryInterval)
204 }
205 }()
206 timer := time.NewTimer(timeout)
207 defer timer.Stop()
208 select {
209 case <-ch:
210 return nil
211 case <-timer.C:
212 done = true
213 return fmt.Errorf("timeout-waiting-devices")
214 }
215}
216
217func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, verificationFunction isLogicalDevicesConditionSatisfied) error {
218 ch := make(chan int, 1)
219 done := false
220 go func() {
221 for {
222 lDevices, _ := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
223 if verificationFunction(lDevices) {
224 ch <- 1
225 break
226 }
227 if done {
228 break
229 }
230
231 time.Sleep(retryInterval)
232 }
233 }()
234 timer := time.NewTimer(timeout)
235 defer timer.Stop()
236 select {
237 case <-ch:
238 return nil
239 case <-timer.C:
240 done = true
241 return fmt.Errorf("timeout-waiting-logical-devices")
242 }
243}
244
245func waitUntilCondition(timeout time.Duration, verificationFunction isConditionSatisfied) error {
246 ch := make(chan int, 1)
247 done := false
248 go func() {
249 for {
250 if verificationFunction() {
251 ch <- 1
252 break
253 }
254 if done {
255 break
256 }
257 time.Sleep(retryInterval)
258 }
259 }()
260 timer := time.NewTimer(timeout)
261 defer timer.Stop()
262 select {
263 case <-ch:
264 return nil
265 case <-timer.C:
266 done = true
267 return fmt.Errorf("timeout-waiting-for-condition")
268 }
269}
270
271func waitUntilDeviceIsRemoved(timeout time.Duration, nbi voltha.VolthaServiceClient, deviceID string) error {
272 ch := make(chan int, 1)
273 done := false
274 go func() {
275 for {
276 _, err := nbi.GetDevice(getContext(), &common.ID{Id: deviceID})
277 if err != nil && strings.Contains(err.Error(), "NotFound") {
278 ch <- 1
279 break
280 }
281 if done {
282 break
283 }
284 time.Sleep(retryInterval)
285 }
286 }()
287 timer := time.NewTimer(timeout)
288 defer timer.Stop()
289 select {
290 case <-ch:
291 return nil
292 case <-timer.C:
293 done = true
294 return fmt.Errorf("timeout-waiting-for-condition")
295 }
296}
297
298func cleanUpCreatedDevice(timeout time.Duration, nbi voltha.VolthaServiceClient, deviceID string) error {
299 logger.Warnw(context.Background(), "cleanUpCreatedDevice", log.Fields{"device-id": deviceID})
300 ch := make(chan int, 1)
301 done := false
302 go func() {
303 //Force Remove the device - use a loop in case the initial delete fails
304 for {
305 logger.Debugw(context.Background(), "sending delete force ", log.Fields{"device-id": deviceID})
306 var err error
307 if _, err = nbi.ForceDeleteDevice(getContext(), &common.ID{Id: deviceID}); err != nil {
308 logger.Debugw(context.Background(), "delete failed", log.Fields{"device-id": deviceID, "error": err})
309 if strings.Contains(err.Error(), "NotFound") {
310 logger.Debugw(context.Background(), "delete not found", log.Fields{"device-id": deviceID, "error": err})
311 //ch <- 1
312 break
313 }
314 time.Sleep(retryInterval)
315 continue
316 }
317 logger.Debugw(context.Background(), "delete force no error", log.Fields{"device-id": deviceID, "error": err})
318 break
319 }
320 logger.Debugw(context.Background(), "delete sent", log.Fields{"device-id": deviceID})
321 for {
322 _, err := nbi.GetDevice(getContext(), &common.ID{Id: deviceID})
323 if err != nil && strings.Contains(err.Error(), "NotFound") {
324 ch <- 1
325 break
326 }
327 if done {
328 break
329 }
330 time.Sleep(retryInterval)
331 }
332 }()
333 timer := time.NewTimer(timeout)
334 defer timer.Stop()
335 select {
336 case <-ch:
337 return nil
338 case <-timer.C:
339 done = true
340 return fmt.Errorf("timeout-waiting-devices-cleanup")
341 }
342}
343
344func cleanUpCreatedDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, parentDeviceID string) error {
345 ch := make(chan int, 1)
346 done := false
347 go func() {
348 //Force Remove the device - use a loop in case the initial delete fails
349 for {
350 if _, err := nbi.ForceDeleteDevice(getContext(), &common.ID{Id: parentDeviceID}); err != nil {
351 if strings.Contains(err.Error(), "NotFound") {
352 ch <- 1
353 break
354 }
355 time.Sleep(retryInterval)
356 continue
357 }
358 break
359 }
360 for {
361 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
362 removed := devices == nil || len(devices.Items) == 0
363 if !removed {
364 removed = true
365 for _, d := range devices.Items {
366 if (d.Root && d.Id == parentDeviceID) || (!d.Root && d.ParentId == parentDeviceID) {
367 removed = false
368 break
369 }
370 }
371 }
372 if removed {
373 ch <- 1
374 break
375 }
376 if done {
377 break
378 }
379 time.Sleep(retryInterval)
380 }
381 }()
382 timer := time.NewTimer(timeout)
383 defer timer.Stop()
384 select {
385 case <-ch:
386 return nil
387 case <-timer.C:
388 done = true
389 return fmt.Errorf("timeout-waiting-devices-cleanup")
390 }
391}
392
393func cleanUpDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, parentDeviceID string, verifyParentDeletionOnly bool) error {
394 ch := make(chan int, 1)
395 done := false
396 go func() {
397 // Send a force delete to the parent device
398 for {
399 _, err := nbi.ForceDeleteDevice(getContext(), &common.ID{Id: parentDeviceID})
400 if err == nil || strings.Contains(err.Error(), "NotFound") {
401 break
402 }
403 time.Sleep(retryInterval)
404 if done {
405 return
406 }
407 }
408 var err error
409 for {
410 if verifyParentDeletionOnly {
411 _, err = nbi.GetDevice(getContext(), &common.ID{Id: parentDeviceID})
412 if err != nil && strings.Contains(err.Error(), "NotFound") {
413 ch <- 1
414 break
415 }
416 time.Sleep(retryInterval)
417 if done {
418 return
419 }
420 continue
421 }
422 // verifyParentDeletionOnly is False => check children as well
423 devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
424 removed := devices == nil || len(devices.Items) == 0
425 if !removed {
426 removed = true
427 for _, d := range devices.Items {
428 if (d.Root && d.Id == parentDeviceID) || (!d.Root && d.ParentId == parentDeviceID) {
429 removed = false
430 break
431 }
432 }
433 }
434 if removed {
435 ch <- 1
436 break
437 }
438 time.Sleep(retryInterval)
439 if done {
440 break
441 }
442 }
443 }()
444 timer := time.NewTimer(timeout)
445 defer timer.Stop()
446 select {
447 case <-ch:
448 return nil
449 case <-timer.C:
450 done = true
451 return fmt.Errorf("timeout-waiting-devices-cleanup")
452 }
453}
454
455type ChangedEventListener struct {
456 eventSubscriber chan chan *ofp.ChangeEvent
457 eventUnSubscriber chan chan *ofp.ChangeEvent
458}
459
460func NewChangedEventListener(bufferSize int) *ChangedEventListener {
461 return &ChangedEventListener{
462 eventSubscriber: make(chan chan *ofp.ChangeEvent, bufferSize),
463 eventUnSubscriber: make(chan chan *ofp.ChangeEvent, bufferSize),
464 }
465}
466
467func (cel *ChangedEventListener) Start(ctx context.Context, coreEventsCh chan *ofp.ChangeEvent) {
468 subs := map[chan *ofp.ChangeEvent]struct{}{}
469 var subsLock sync.RWMutex
470 for {
471 select {
472 case <-ctx.Done():
473 logger.Debug(ctx, "closing-change-event-listener")
474 subsLock.RLock()
475 for msgCh := range subs {
476 close(msgCh)
477 }
478 subsLock.RUnlock()
479 return
480 case eventCh := <-cel.eventSubscriber:
481 subsLock.Lock()
482 subs[eventCh] = struct{}{}
483 subsLock.Unlock()
484 case eventCh := <-cel.eventUnSubscriber:
485 subsLock.Lock()
486 close(eventCh)
487 delete(subs, eventCh)
488 subsLock.Unlock()
489 case event := <-coreEventsCh:
490 subsLock.RLock()
491 for subscriber := range subs {
492 select {
493 case subscriber <- event:
494 default:
495 }
496 }
497 subsLock.RUnlock()
498 }
499 }
500}
501
502func (cel *ChangedEventListener) Subscribe(bufferSize int) chan *ofp.ChangeEvent {
503 eventCh := make(chan *ofp.ChangeEvent, bufferSize)
504 cel.eventSubscriber <- eventCh
505 return eventCh
506}
507
508func (cel *ChangedEventListener) Unsubscribe(eventCh chan *ofp.ChangeEvent) {
509 cel.eventUnSubscriber <- eventCh
510}