[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/test/common_test.go b/rw_core/test/common_test.go
new file mode 100644
index 0000000..d6cabda
--- /dev/null
+++ b/rw_core/test/common_test.go
@@ -0,0 +1,510 @@
+/*
+ * Copyright 2019-present Open Networking Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package test
+
+import (
+ "context"
+ "fmt"
+ "strings"
+ "sync"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ofp "github.com/opencord/voltha-protos/v5/go/openflow_13"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+)
+
+var retryInterval = 50 * time.Millisecond
+
+type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
+type isLogicalDevicePortsConditionSatisfied func(ports []*voltha.LogicalPort) bool
+type isDeviceConditionSatisfied func(ld *voltha.Device) bool
+type isDevicePortsConditionSatisfied func(ports *voltha.Ports) bool
+type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
+type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
+type isConditionSatisfied func() bool
+
+func getContext() context.Context {
+ return context.Background()
+}
+
+func setRetryInterval(interval time.Duration) {
+ retryInterval = interval
+}
+
+func waitUntilDeviceReadiness(deviceID string,
+ timeout time.Duration,
+ verificationFunction isDeviceConditionSatisfied,
+ nbi voltha.VolthaServiceClient) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ device, _ := nbi.GetDevice(getContext(), &common.ID{Id: deviceID})
+ if verificationFunction(device) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
+ }
+}
+
+func waitUntilDevicePortsReadiness(deviceID string,
+ timeout time.Duration,
+ verificationFunction isDevicePortsConditionSatisfied,
+ nbi voltha.VolthaServiceClient) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ ports, _ := nbi.ListDevicePorts(getContext(), &common.ID{Id: deviceID})
+ if verificationFunction(ports) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
+ }
+}
+
+func waitUntilLogicalDeviceReadiness(oltDeviceID string,
+ timeout time.Duration,
+ nbi voltha.VolthaServiceClient,
+ verificationFunction isLogicalDeviceConditionSatisfied,
+) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ // Get the logical device from the olt device
+ d, _ := nbi.GetDevice(getContext(), &common.ID{Id: oltDeviceID})
+ if d != nil && d.ParentId != "" {
+ ld, _ := nbi.GetLogicalDevice(getContext(), &common.ID{Id: d.ParentId})
+ if verificationFunction(ld) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ } else if d != nil && d.ParentId == "" { // case where logical device deleted
+ if verificationFunction(nil) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
+ }
+}
+
+func waitUntilLogicalDevicePortsReadiness(oltDeviceID string,
+ timeout time.Duration,
+ nbi voltha.VolthaServiceClient,
+ verificationFunction isLogicalDevicePortsConditionSatisfied,
+) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ // Get the logical device from the olt device
+ d, _ := nbi.GetDevice(getContext(), &common.ID{Id: oltDeviceID})
+ if d != nil && d.ParentId != "" {
+ ports, err := nbi.ListLogicalDevicePorts(getContext(), &common.ID{Id: d.ParentId})
+ if err == nil && verificationFunction(ports.Items) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-for-logical-device-readiness%s", oltDeviceID)
+ }
+}
+
+func waitUntilConditionForDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, verificationFunction isDevicesConditionSatisfied) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
+ if verificationFunction(devices) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-devices")
+ }
+}
+
+func waitUntilConditionForLogicalDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, verificationFunction isLogicalDevicesConditionSatisfied) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ lDevices, _ := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ if verificationFunction(lDevices) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-logical-devices")
+ }
+}
+
+func waitUntilCondition(timeout time.Duration, verificationFunction isConditionSatisfied) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ if verificationFunction() {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-for-condition")
+ }
+}
+
+func waitUntilDeviceIsRemoved(timeout time.Duration, nbi voltha.VolthaServiceClient, deviceID string) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ _, err := nbi.GetDevice(getContext(), &common.ID{Id: deviceID})
+ if err != nil && strings.Contains(err.Error(), "NotFound") {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-for-condition")
+ }
+}
+
+func cleanUpCreatedDevice(timeout time.Duration, nbi voltha.VolthaServiceClient, deviceID string) error {
+ logger.Warnw(context.Background(), "cleanUpCreatedDevice", log.Fields{"device-id": deviceID})
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ //Force Remove the device - use a loop in case the initial delete fails
+ for {
+ logger.Debugw(context.Background(), "sending delete force ", log.Fields{"device-id": deviceID})
+ var err error
+ if _, err = nbi.ForceDeleteDevice(getContext(), &common.ID{Id: deviceID}); err != nil {
+ logger.Debugw(context.Background(), "delete failed", log.Fields{"device-id": deviceID, "error": err})
+ if strings.Contains(err.Error(), "NotFound") {
+ logger.Debugw(context.Background(), "delete not found", log.Fields{"device-id": deviceID, "error": err})
+ //ch <- 1
+ break
+ }
+ time.Sleep(retryInterval)
+ continue
+ }
+ logger.Debugw(context.Background(), "delete force no error", log.Fields{"device-id": deviceID, "error": err})
+ break
+ }
+ logger.Debugw(context.Background(), "delete sent", log.Fields{"device-id": deviceID})
+ for {
+ _, err := nbi.GetDevice(getContext(), &common.ID{Id: deviceID})
+ if err != nil && strings.Contains(err.Error(), "NotFound") {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-devices-cleanup")
+ }
+}
+
+func cleanUpCreatedDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, parentDeviceID string) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ //Force Remove the device - use a loop in case the initial delete fails
+ for {
+ if _, err := nbi.ForceDeleteDevice(getContext(), &common.ID{Id: parentDeviceID}); err != nil {
+ if strings.Contains(err.Error(), "NotFound") {
+ ch <- 1
+ break
+ }
+ time.Sleep(retryInterval)
+ continue
+ }
+ break
+ }
+ for {
+ devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
+ removed := devices == nil || len(devices.Items) == 0
+ if !removed {
+ removed = true
+ for _, d := range devices.Items {
+ if (d.Root && d.Id == parentDeviceID) || (!d.Root && d.ParentId == parentDeviceID) {
+ removed = false
+ break
+ }
+ }
+ }
+ if removed {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-devices-cleanup")
+ }
+}
+
+func cleanUpDevices(timeout time.Duration, nbi voltha.VolthaServiceClient, parentDeviceID string, verifyParentDeletionOnly bool) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ // Send a force delete to the parent device
+ for {
+ _, err := nbi.ForceDeleteDevice(getContext(), &common.ID{Id: parentDeviceID})
+ if err == nil || strings.Contains(err.Error(), "NotFound") {
+ break
+ }
+ time.Sleep(retryInterval)
+ if done {
+ return
+ }
+ }
+ var err error
+ for {
+ if verifyParentDeletionOnly {
+ _, err = nbi.GetDevice(getContext(), &common.ID{Id: parentDeviceID})
+ if err != nil && strings.Contains(err.Error(), "NotFound") {
+ ch <- 1
+ break
+ }
+ time.Sleep(retryInterval)
+ if done {
+ return
+ }
+ continue
+ }
+ // verifyParentDeletionOnly is False => check children as well
+ devices, _ := nbi.ListDevices(getContext(), &empty.Empty{})
+ removed := devices == nil || len(devices.Items) == 0
+ if !removed {
+ removed = true
+ for _, d := range devices.Items {
+ if (d.Root && d.Id == parentDeviceID) || (!d.Root && d.ParentId == parentDeviceID) {
+ removed = false
+ break
+ }
+ }
+ }
+ if removed {
+ ch <- 1
+ break
+ }
+ time.Sleep(retryInterval)
+ if done {
+ break
+ }
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("timeout-waiting-devices-cleanup")
+ }
+}
+
+type ChangedEventListener struct {
+ eventSubscriber chan chan *ofp.ChangeEvent
+ eventUnSubscriber chan chan *ofp.ChangeEvent
+}
+
+func NewChangedEventListener(bufferSize int) *ChangedEventListener {
+ return &ChangedEventListener{
+ eventSubscriber: make(chan chan *ofp.ChangeEvent, bufferSize),
+ eventUnSubscriber: make(chan chan *ofp.ChangeEvent, bufferSize),
+ }
+}
+
+func (cel *ChangedEventListener) Start(ctx context.Context, coreEventsCh chan *ofp.ChangeEvent) {
+ subs := map[chan *ofp.ChangeEvent]struct{}{}
+ var subsLock sync.RWMutex
+ for {
+ select {
+ case <-ctx.Done():
+ logger.Debug(ctx, "closing-change-event-listener")
+ subsLock.RLock()
+ for msgCh := range subs {
+ close(msgCh)
+ }
+ subsLock.RUnlock()
+ return
+ case eventCh := <-cel.eventSubscriber:
+ subsLock.Lock()
+ subs[eventCh] = struct{}{}
+ subsLock.Unlock()
+ case eventCh := <-cel.eventUnSubscriber:
+ subsLock.Lock()
+ close(eventCh)
+ delete(subs, eventCh)
+ subsLock.Unlock()
+ case event := <-coreEventsCh:
+ subsLock.RLock()
+ for subscriber := range subs {
+ select {
+ case subscriber <- event:
+ default:
+ }
+ }
+ subsLock.RUnlock()
+ }
+ }
+}
+
+func (cel *ChangedEventListener) Subscribe(bufferSize int) chan *ofp.ChangeEvent {
+ eventCh := make(chan *ofp.ChangeEvent, bufferSize)
+ cel.eventSubscriber <- eventCh
+ return eventCh
+}
+
+func (cel *ChangedEventListener) Unsubscribe(eventCh chan *ofp.ChangeEvent) {
+ cel.eventUnSubscriber <- eventCh
+}