[VOL-2364] InvokeRPC returns an error code in case of a timeout
Change-Id: Ia3725bb4778e1935cf62e5348bfcd0bd15cb9466
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index cf80858..c5e1c14 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -431,8 +431,14 @@
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+ code := codes.Internal
+
+ if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
+ code = codes.DeadlineExceeded
+ }
+
+ return nil, status.Errorf(code, "%s", unpackResult.Reason)
}
}
@@ -467,8 +473,14 @@
logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
- // TODO: Need to get the real error code
- return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+ code := codes.Internal
+
+ if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
+ code = codes.DeadlineExceeded
+ }
+
+ return nil, status.Errorf(code, "%s", unpackResult.Reason)
}
}
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 6d2f78c..149ab2e 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -41,7 +41,8 @@
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.Device{Id: "testDevice"},
},
}
@@ -71,7 +72,8 @@
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.Device{Id: "testDevice"},
},
}
@@ -101,8 +103,8 @@
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
- Fail: mocks.Timeout,
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Timeout: true,
},
}
@@ -117,16 +119,15 @@
assert.Nil(t, device)
parsedErr, _ := status.FromError(error)
- // TODO assert that the Code is not Internal but DeadlineExceeded
- assert.Equal(t, parsedErr.Code(), codes.Internal)
+ assert.Equal(t, parsedErr.Code(), codes.DeadlineExceeded)
}
func TestCoreProxy_GetChildDevice_fail_unmarhsal(t *testing.T) {
var mockKafkaIcProxy = mocks.MockKafkaICProxy{
InvokeRpcSpy: mocks.InvokeRpcSpy{
- Calls: make(map[int]mocks.InvokeRpcArgs),
- Fail: mocks.UnmarshalError,
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.LogicalDevice{Id: "testDevice"},
},
}
@@ -143,3 +144,75 @@
parsedErr, _ := status.FromError(error)
assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
}
+
+func TestCoreProxy_GetChildDevices_success(t *testing.T) {
+
+ devicesResponse := &voltha.Devices{}
+
+ devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice1"})
+ devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice2"})
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: devicesResponse,
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ parentDeviceId := "aabbcc"
+ devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+ assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+ call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+ assert.Equal(t, call.Rpc, "GetChildDevices")
+ assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+ assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+ assert.Equal(t, call.WaitForResponse, true)
+ assert.Equal(t, call.Key, parentDeviceId)
+ assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "device_id", Value: &voltha.ID{Id: parentDeviceId}})
+
+ assert.Equal(t, nil, error)
+ assert.Equal(t, 2, len(devices.Items))
+}
+
+func TestCoreProxy_GetChildDevices_fail_unmarhsal(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Response: &voltha.LogicalDevice{Id: "testDevice"},
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ parentDeviceId := "aabbcc"
+ devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+ assert.Nil(t, devices)
+
+ parsedErr, _ := status.FromError(error)
+ assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
+}
+
+func TestCoreProxy_GetChildDevices_fail_timeout(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Timeout: true,
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ parentDeviceId := "aabbcc"
+ devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+ assert.Nil(t, devices)
+
+ parsedErr, _ := status.FromError(error)
+ assert.Equal(t, parsedErr.Code(), codes.DeadlineExceeded)
+}