[VOL-2364] InvokeRPC returns an error code in case of a timeout
Change-Id: Ia3725bb4778e1935cf62e5348bfcd0bd15cb9466
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index cf80858..c5e1c14 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -431,8 +431,14 @@
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+ code := codes.Internal
+
+ if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
+ code = codes.DeadlineExceeded
+ }
+
+ return nil, status.Errorf(code, "%s", unpackResult.Reason)
}
}
@@ -467,8 +473,14 @@
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+ code := codes.Internal
+
+ if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
+ code = codes.DeadlineExceeded
+ }
+
+ return nil, status.Errorf(code, "%s", unpackResult.Reason)
}
}
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 6d2f78c..149ab2e 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -41,7 +41,8 @@
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.Device{Id: "testDevice"},
},
}
@@ -71,7 +72,8 @@
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.Device{Id: "testDevice"},
},
}
@@ -101,8 +103,8 @@
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
- Fail: mocks.Timeout,
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Timeout: true,
},
}
@@ -117,16 +119,15 @@
assert.Nil(t, device)
parsedErr, _ := status.FromError(error)
- // TODO assert that the Code is not Internal but DeadlineExceeded
- assert.Equal(t, parsedErr.Code(), codes.Internal)
+ assert.Equal(t, parsedErr.Code(), codes.DeadlineExceeded)
}
func TestCoreProxy_GetChildDevice_fail_unmarhsal(t *testing.T) {
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
- Fail: mocks.UnmarshalError,
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.LogicalDevice{Id: "testDevice"},
},
}
@@ -143,3 +144,75 @@
parsedErr, _ := status.FromError(error)
assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
}
+
+func TestCoreProxy_GetChildDevices_success(t *testing.T) {
+
+ devicesResponse := &voltha.Devices{}
+
+ devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice1"})
+ devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice2"})
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: devicesResponse,
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ parentDeviceId := "aabbcc"
+ devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+ assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+ call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+ assert.Equal(t, call.Rpc, "GetChildDevices")
+ assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+ assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+ assert.Equal(t, call.WaitForResponse, true)
+ assert.Equal(t, call.Key, parentDeviceId)
+ assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "device_id", Value: &voltha.ID{Id: parentDeviceId}})
+
+ assert.Equal(t, nil, error)
+ assert.Equal(t, 2, len(devices.Items))
+}
+
+func TestCoreProxy_GetChildDevices_fail_unmarhsal(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.LogicalDevice{Id: "testDevice"},
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ parentDeviceId := "aabbcc"
+ devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+ assert.Nil(t, devices)
+
+ parsedErr, _ := status.FromError(error)
+ assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
+}
+
+func TestCoreProxy_GetChildDevices_fail_timeout(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Timeout: true,
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ parentDeviceId := "aabbcc"
+ devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+ assert.Nil(t, devices)
+
+ parsedErr, _ := status.FromError(error)
+ assert.Equal(t, parsedErr.Code(), codes.DeadlineExceeded)
+}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index a75c1b6..de22dda 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -309,10 +309,8 @@
case <-ctx.Done():
logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
// pack the error as proto any type
- protoError := &ic.Error{Reason: ctx.Err().Error()}
+ protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
- // FIXME we need to return a Code together with the reason
- //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -321,10 +319,8 @@
case <-childCtx.Done():
logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
// pack the error as proto any type
- protoError := &ic.Error{Reason: childCtx.Err().Error()}
+ protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
- // FIXME we need to return a Code together with the reason
- //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka_inter_container_proxy.go
index 3af728a..c53abb4 100644
--- a/pkg/mocks/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka_inter_container_proxy.go
@@ -18,11 +18,11 @@
import (
"context"
+ "github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
- "github.com/opencord/voltha-protos/v3/go/voltha"
)
type InvokeRpcArgs struct {
@@ -35,21 +35,11 @@
KvArgs map[int]interface{}
}
-type FailReason int
-
-const (
- Timeout FailReason = iota + 1
- UnmarshalError
-)
-
-func (r FailReason) String() string {
- return [...]string{"Timeout", "UnmarshalError"}[r]
-}
-
type InvokeRpcSpy struct {
CallCount int
Calls map[int]InvokeRpcArgs
- Fail FailReason // timeout, error
+ Timeout bool
+ Response proto.Message
}
type MockKafkaICProxy struct {
@@ -81,26 +71,20 @@
KvArgs: args,
}
- device := &voltha.Device{
- Id: "testDevice",
- }
- response, _ := ptypes.MarshalAny(device)
-
- if s.InvokeRpcSpy.Fail == Timeout {
+ var response any.Any
+ if s.InvokeRpcSpy.Timeout {
success = false
- // TODO once InvokeRPC is fixed to return an error code, add it here
- err := &ic.Error{Reason: "context deadline exceeded"}
- response, _ = ptypes.MarshalAny(err)
- } else if s.InvokeRpcSpy.Fail == UnmarshalError {
- res := &voltha.LogicalDevice{
- Id: "testLogicalDevice",
- }
- response, _ = ptypes.MarshalAny(res)
+ err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+ res, _ := ptypes.MarshalAny(err)
+ response = *res
+ } else {
+ res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
+ response = *res
}
- return success, response
+ return success, &response
}
func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
return nil