VOL-3121 - Separated ports from devices.
Similar to flows/groups/meters/logical ports.
Also added ListDevicePorts and GetDevicePort to the adapter API.
Also removed unused `// +build integration` tests.
Change-Id: I586adb9f46a249c9430d4205ef5db2d105dbbe06
diff --git a/rw_core/core/api/adapter_request_handler.go b/rw_core/core/api/adapter_request_handler.go
index d1e655e..79093a7 100644
--- a/rw_core/core/api/adapter_request_handler.go
+++ b/rw_core/core/api/adapter_request_handler.go
@@ -19,6 +19,7 @@
import (
"context"
"errors"
+
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/core/adapter"
@@ -455,6 +456,7 @@
return nil, err
}
deviceID := &voltha.ID{}
+ portTypeFilter := &ic.IntType{}
operStatus := &ic.IntType{}
transactionID := &ic.StrType{}
for _, arg := range args {
@@ -464,6 +466,11 @@
logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
return nil, err
}
+ case "port_type_filter":
+ if err := ptypes.UnmarshalAny(arg.Value, portTypeFilter); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
case "oper_status":
if err := ptypes.UnmarshalAny(arg.Value, operStatus); err != nil {
logger.Warnw(ctx, "cannot-unmarshal-operStatus", log.Fields{"error": err})
@@ -478,7 +485,7 @@
}
logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"deviceID": deviceID.Id, "operStatus": operStatus, "transactionID": transactionID.Val})
- if err := rhp.deviceMgr.UpdatePortsState(context.TODO(), deviceID.Id, voltha.OperStatus_Types(operStatus.Val)); err != nil {
+ if err := rhp.deviceMgr.UpdatePortsState(context.TODO(), deviceID.Id, uint32(portTypeFilter.Val), voltha.OperStatus_Types(operStatus.Val)); err != nil {
logger.Debugw(ctx, "unable-to-update-ports-state", log.Fields{"error": err})
return nil, err
}
@@ -572,6 +579,68 @@
return &empty.Empty{}, nil
}
+// GetDevicePort returns a single port
+func (rhp *AdapterRequestHandlerProxy) GetDevicePort(ctx context.Context, args []*ic.Argument) (*voltha.Port, error) {
+ if len(args) < 3 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceID := &voltha.ID{}
+ portNo := &ic.IntType{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case "port_no":
+ if err := ptypes.UnmarshalAny(arg.Value, portNo); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-port-no", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "GetDevicePort", log.Fields{"deviceID": deviceID.Id, "portNo": portNo.Val, "transactionID": transactionID.Val})
+
+ return rhp.deviceMgr.GetDevicePort(context.TODO(), deviceID.Id, uint32(portNo.Val))
+}
+
+// ListDevicePorts returns all ports belonging to the device
+func (rhp *AdapterRequestHandlerProxy) ListDevicePorts(ctx context.Context, args []*ic.Argument) (*voltha.Ports, error) {
+ if len(args) < 2 {
+ logger.Warn(ctx, "invalid-number-of-args", log.Fields{"args": args})
+ err := errors.New("invalid-number-of-args")
+ return nil, err
+ }
+ deviceID := &voltha.ID{}
+ transactionID := &ic.StrType{}
+ for _, arg := range args {
+ switch arg.Key {
+ case "device_id":
+ if err := ptypes.UnmarshalAny(arg.Value, deviceID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-device-id", log.Fields{"error": err})
+ return nil, err
+ }
+ case kafka.TransactionKey:
+ if err := ptypes.UnmarshalAny(arg.Value, transactionID); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-transaction-ID", log.Fields{"error": err})
+ return nil, err
+ }
+ }
+ }
+ logger.Debugw(ctx, "ListDevicePorts", log.Fields{"deviceID": deviceID.Id, "transactionID": transactionID.Val})
+
+ return rhp.deviceMgr.ListDevicePorts(context.TODO(), deviceID)
+}
+
// ChildDevicesLost indicates that a parent device is in a state (Disabled) where it cannot manage the child devices.
// This will trigger the Core to disable all the child devices.
func (rhp *AdapterRequestHandlerProxy) ChildDevicesLost(ctx context.Context, args []*ic.Argument) (*empty.Empty, error) {
diff --git a/rw_core/core/api/common_test.go b/rw_core/core/api/common_test.go
index 6fcd511..92ba701 100644
--- a/rw_core/core/api/common_test.go
+++ b/rw_core/core/api/common_test.go
@@ -38,6 +38,7 @@
type isLogicalDeviceConditionSatisfied func(ld *voltha.LogicalDevice) bool
type isLogicalDevicePortsConditionSatisfied func(ports []*voltha.LogicalPort) bool
type isDeviceConditionSatisfied func(ld *voltha.Device) bool
+type isDevicePortsConditionSatisfied func(ports *voltha.Ports) bool
type isDevicesConditionSatisfied func(ds *voltha.Devices) bool
type isLogicalDevicesConditionSatisfied func(lds *voltha.LogicalDevices) bool
type isConditionSatisfied func() bool
@@ -88,6 +89,36 @@
}
}
+func waitUntilDevicePortsReadiness(deviceID string,
+ timeout time.Duration,
+ verificationFunction isDevicePortsConditionSatisfied,
+ nbi *NBIHandler) error {
+ ch := make(chan int, 1)
+ done := false
+ go func() {
+ for {
+ ports, _ := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: deviceID})
+ if verificationFunction(ports) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
+ time.Sleep(retryInterval)
+ }
+ }()
+ timer := time.NewTimer(timeout)
+ defer timer.Stop()
+ select {
+ case <-ch:
+ return nil
+ case <-timer.C:
+ done = true
+ return fmt.Errorf("expected-states-not-reached-for-device%s", deviceID)
+ }
+}
+
func waitUntilLogicalDeviceReadiness(oltDeviceID string,
timeout time.Duration,
nbi *NBIHandler,
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 434df90..6b37a14 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -205,6 +205,8 @@
// Now, verify the details of the device. First get the latest update
d, err := nbi.GetDevice(getContext(), &voltha.ID{Id: device.Id})
assert.Nil(t, err)
+ dPorts, err := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: device.Id})
+ assert.Nil(t, err)
assert.Equal(t, voltha.AdminState_ENABLED, d.AdminState)
assert.Equal(t, voltha.ConnectStatus_REACHABLE, d.ConnectStatus)
assert.Equal(t, voltha.OperStatus_ACTIVE, d.OperStatus)
@@ -227,8 +229,8 @@
} else {
assert.Error(t, errors.New("invalid-device-type"))
}
- assert.Equal(t, 2, len(d.Ports))
- for _, p := range d.Ports {
+ assert.Equal(t, 2, len(dPorts.Items))
+ for _, p := range dPorts.Items {
assert.Equal(t, voltha.AdminState_ENABLED, p.AdminState)
assert.Equal(t, voltha.OperStatus_ACTIVE, p.OperStatus)
if p.Type == voltha.Port_ETHERNET_NNI || p.Type == voltha.Port_ETHERNET_UNI {
@@ -635,8 +637,10 @@
oltDevice, err := nb.getADevice(true, nbi)
assert.Nil(t, err)
assert.NotNil(t, oltDevice)
+ oltPorts, err := nbi.ListDevicePorts(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
- for _, cp = range oltDevice.Ports {
+ for _, cp = range oltPorts.Items {
if cp.Type == voltha.Port_PON_OLT {
break
}
@@ -649,15 +653,15 @@
_, err = nbi.DisablePort(getContext(), cp)
assert.Nil(t, err)
// Wait for the olt device Port to be disabled
- var vdFunction isDeviceConditionSatisfied = func(device *voltha.Device) bool {
- for _, port := range device.Ports {
+ var vdFunction isDevicePortsConditionSatisfied = func(ports *voltha.Ports) bool {
+ for _, port := range ports.Items {
if port.PortNo == cp.PortNo {
return port.AdminState == voltha.AdminState_DISABLED
}
}
return false
}
- err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ err = waitUntilDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
var vlFunction = func(ports []*voltha.LogicalPort) bool {
@@ -677,15 +681,15 @@
assert.Nil(t, err)
// Wait for the olt device Port to be enabled
- vdFunction = func(device *voltha.Device) bool {
- for _, port := range device.Ports {
+ vdFunction = func(ports *voltha.Ports) bool {
+ for _, port := range ports.Items {
if port.PortNo == cp.PortNo {
return port.AdminState == voltha.AdminState_ENABLED
}
}
return false
}
- err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
+ err = waitUntilDevicePortsReadiness(oltDevice.Id, nb.maxTimeout, vdFunction, nbi)
assert.Nil(t, err)
// Wait for the logical device to satisfy the expected condition
vlFunction = func(ports []*voltha.LogicalPort) bool {
@@ -701,7 +705,7 @@
assert.Nil(t, err)
// Disable a non-PON port
- for _, cp = range oltDevice.Ports {
+ for _, cp = range oltPorts.Items {
if cp.Type != voltha.Port_PON_OLT {
break
}