VOL-3121 - Added GetDevicePort and ListDevicePorts to the CoreProxy.
Change-Id: I6d60e50661b3b6fe8e673f6effd5b4fbbf010701
diff --git a/pkg/adapters/adapterif/core_proxy_if.go b/pkg/adapters/adapterif/core_proxy_if.go
index 9636a7d..a7ab6dc 100644
--- a/pkg/adapters/adapterif/core_proxy_if.go
+++ b/pkg/adapters/adapterif/core_proxy_if.go
@@ -18,6 +18,7 @@
import (
"context"
+
"github.com/opencord/voltha-protos/v3/go/voltha"
)
@@ -25,14 +26,13 @@
type CoreProxy interface {
UpdateCoreReference(deviceID string, coreReference string)
DeleteCoreReference(deviceID string)
- // getCoreTopic(deviceID string) kafka.Topic
- //GetAdapterTopic(args ...string) kafka.Topic
- // getAdapterTopic(args ...string) kafka.Topic
RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
DeviceUpdate(ctx context.Context, device *voltha.Device) error
PortCreated(ctx context.Context, deviceID string, port *voltha.Port) error
- PortsStateUpdate(ctx context.Context, deviceID string, operStatus voltha.OperStatus_Types) error
+ PortsStateUpdate(ctx context.Context, deviceID string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error
DeleteAllPorts(ctx context.Context, deviceID string) error
+ GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error)
+ ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error)
DeviceStateUpdate(ctx context.Context, deviceID string,
connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 28b532f..505dc79 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -175,24 +175,22 @@
return unPackResponse(ctx, rpc, deviceId, success, result)
}
-func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, operStatus voltha.OperStatus_Types) error {
+func (ap *CoreProxy) PortsStateUpdate(ctx context.Context, deviceId string, portTypeFilter uint32, operStatus voltha.OperStatus_Types) error {
logger.Debugw(ctx, "PortsStateUpdate", log.Fields{"deviceId": deviceId})
rpc := "PortsStateUpdate"
// Use a device specific topic to send the request. The adapter handling the device creates a device
// specific topic
toTopic := ap.getCoreTopic(deviceId)
- args := make([]*kafka.KVArg, 2)
- id := &voltha.ID{Id: deviceId}
- oStatus := &ic.IntType{Val: int64(operStatus)}
-
- args[0] = &kafka.KVArg{
+ args := []*kafka.KVArg{{
Key: "device_id",
- Value: id,
- }
- args[1] = &kafka.KVArg{
+ Value: &voltha.ID{Id: deviceId},
+ }, {
+ Key: "port_type_filter",
+ Value: &ic.IntType{Val: int64(portTypeFilter)},
+ }, {
Key: "oper_status",
- Value: oStatus,
- }
+ Value: &ic.IntType{Val: int64(operStatus)},
+ }}
// Use a device specific topic as we are the only adaptercore handling requests for this device
replyToTopic := ap.getAdapterTopic()
@@ -222,6 +220,77 @@
return unPackResponse(ctx, rpc, deviceId, success, result)
}
+func (ap *CoreProxy) GetDevicePort(ctx context.Context, deviceID string, portNo uint32) (*voltha.Port, error) {
+ logger.Debugw(ctx, "GetDevicePort", log.Fields{"device-id": deviceID})
+ rpc := "GetDevicePort"
+
+ toTopic := ap.getCoreTopic(deviceID)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceID},
+ }, {
+ Key: "port_no",
+ Value: &ic.IntType{Val: int64(portNo)},
+ }}
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ logger.Debugw(ctx, "GetDevicePort-response", log.Fields{"device-id": deviceID, "success": success})
+
+ if success {
+ port := &voltha.Port{}
+ if err := ptypes.UnmarshalAny(result, port); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return port, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "GetDevicePort-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
+func (ap *CoreProxy) ListDevicePorts(ctx context.Context, deviceID string) ([]*voltha.Port, error) {
+ logger.Debugw(ctx, "ListDevicePorts", log.Fields{"device-id": deviceID})
+ rpc := "ListDevicePorts"
+
+ toTopic := ap.getCoreTopic(deviceID)
+ replyToTopic := ap.getAdapterTopic()
+
+ args := []*kafka.KVArg{{
+ Key: "device_id",
+ Value: &voltha.ID{Id: deviceID},
+ }}
+
+ success, result := ap.kafkaICProxy.InvokeRPC(context.Background(), rpc, &toTopic, &replyToTopic, true, deviceID, args...)
+ logger.Debugw(ctx, "ListDevicePorts-response", log.Fields{"device-id": deviceID, "success": success})
+
+ if success {
+ ports := &voltha.Ports{}
+ if err := ptypes.UnmarshalAny(result, ports); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ return nil, status.Error(codes.InvalidArgument, err.Error())
+ }
+ return ports.Items, nil
+ } else {
+ unpackResult := &ic.Error{}
+ var err error
+ if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ logger.Warnw(ctx, "cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ logger.Debugw(ctx, "ListDevicePorts-return", log.Fields{"device-id": deviceID, "success": success, "error": err})
+ // TODO: Need to get the real error code
+ return nil, status.Error(ICProxyErrorCodeToGrpcErrorCode(ctx, unpackResult.Code), unpackResult.Reason)
+ }
+}
+
func (ap *CoreProxy) DeviceStateUpdate(ctx context.Context, deviceId string,
connStatus voltha.ConnectStatus_Types, operStatus voltha.OperStatus_Types) error {
logger.Debugw(ctx, "DeviceStateUpdate", log.Fields{"deviceId": deviceId})
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 2fb4df7..007fff4 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -17,6 +17,8 @@
import (
"context"
+ "testing"
+
adapterIf "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
mocks "github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka"
@@ -25,7 +27,6 @@
"github.com/stretchr/testify/assert"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
- "testing"
)
func TestCoreProxyImplementsAdapterIfCoreProxy(t *testing.T) {
@@ -228,11 +229,10 @@
}
func TestCoreProxy_GetChildDevices_success(t *testing.T) {
-
- devicesResponse := &voltha.Devices{}
-
- devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice1"})
- devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice2"})
+ devicesResponse := &voltha.Devices{Items: []*voltha.Device{
+ {Id: "testDevice1"},
+ {Id: "testDevice2"},
+ }}
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{