[VOL-2364] InvokeRPC returns an error code in case of a timeout

Change-Id: Ia3725bb4778e1935cf62e5348bfcd0bd15cb9466
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index cf80858..c5e1c14 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -431,8 +431,14 @@
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		}
 		logger.Debugw("GetChildDevice-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+		code := codes.Internal
+
+		if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
+			code = codes.DeadlineExceeded
+		}
+
+		return nil, status.Errorf(code, "%s", unpackResult.Reason)
 	}
 }
 
@@ -467,8 +473,14 @@
 			logger.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		}
 		logger.Debugw("GetChildDevices-return", log.Fields{"deviceid": parentDeviceId, "success": success, "error": err})
-		// TODO:  Need to get the real error code
-		return nil, status.Errorf(codes.Internal, "%s", unpackResult.Reason)
+
+		code := codes.Internal
+
+		if unpackResult.Code == ic.ErrorCode_DEADLINE_EXCEEDED {
+			code = codes.DeadlineExceeded
+		}
+
+		return nil, status.Errorf(code, "%s", unpackResult.Reason)
 	}
 }
 
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index 6d2f78c..149ab2e 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -41,7 +41,8 @@
 
 	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
 		InvokeRpcSpy: mocks.InvokeRpcSpy{
-			Calls: make(map[int]mocks.InvokeRpcArgs),
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: &voltha.Device{Id: "testDevice"},
 		},
 	}
 
@@ -71,7 +72,8 @@
 
 	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
 		InvokeRpcSpy: mocks.InvokeRpcSpy{
-			Calls: make(map[int]mocks.InvokeRpcArgs),
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: &voltha.Device{Id: "testDevice"},
 		},
 	}
 
@@ -101,8 +103,8 @@
 
 	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
 		InvokeRpcSpy: mocks.InvokeRpcSpy{
-			Calls: make(map[int]mocks.InvokeRpcArgs),
-			Fail:  mocks.Timeout,
+			Calls:   make(map[int]mocks.InvokeRpcArgs),
+			Timeout: true,
 		},
 	}
 
@@ -117,16 +119,15 @@
 	assert.Nil(t, device)
 	parsedErr, _ := status.FromError(error)
 
-	// TODO assert that the Code is not Internal but DeadlineExceeded
-	assert.Equal(t, parsedErr.Code(), codes.Internal)
+	assert.Equal(t, parsedErr.Code(), codes.DeadlineExceeded)
 }
 
 func TestCoreProxy_GetChildDevice_fail_unmarhsal(t *testing.T) {
 
 	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
 		InvokeRpcSpy: mocks.InvokeRpcSpy{
-			Calls: make(map[int]mocks.InvokeRpcArgs),
-			Fail:  mocks.UnmarshalError,
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: &voltha.LogicalDevice{Id: "testDevice"},
 		},
 	}
 
@@ -143,3 +144,75 @@
 	parsedErr, _ := status.FromError(error)
 	assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
 }
+
+func TestCoreProxy_GetChildDevices_success(t *testing.T) {
+
+	devicesResponse := &voltha.Devices{}
+
+	devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice1"})
+	devicesResponse.Items = append(devicesResponse.Items, &voltha.Device{Id: "testDevice2"})
+
+	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+		InvokeRpcSpy: mocks.InvokeRpcSpy{
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: devicesResponse,
+		},
+	}
+
+	proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+	parentDeviceId := "aabbcc"
+	devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+	assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+	call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+	assert.Equal(t, call.Rpc, "GetChildDevices")
+	assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+	assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+	assert.Equal(t, call.WaitForResponse, true)
+	assert.Equal(t, call.Key, parentDeviceId)
+	assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "device_id", Value: &voltha.ID{Id: parentDeviceId}})
+
+	assert.Equal(t, nil, error)
+	assert.Equal(t, 2, len(devices.Items))
+}
+
+func TestCoreProxy_GetChildDevices_fail_unmarhsal(t *testing.T) {
+
+	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+		InvokeRpcSpy: mocks.InvokeRpcSpy{
+			Calls:    make(map[int]mocks.InvokeRpcArgs),
+			Response: &voltha.LogicalDevice{Id: "testDevice"},
+		},
+	}
+
+	proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+	parentDeviceId := "aabbcc"
+	devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+	assert.Nil(t, devices)
+
+	parsedErr, _ := status.FromError(error)
+	assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
+}
+
+func TestCoreProxy_GetChildDevices_fail_timeout(t *testing.T) {
+
+	var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+		InvokeRpcSpy: mocks.InvokeRpcSpy{
+			Calls:   make(map[int]mocks.InvokeRpcArgs),
+			Timeout: true,
+		},
+	}
+
+	proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+	parentDeviceId := "aabbcc"
+	devices, error := proxy.GetChildDevices(context.TODO(), parentDeviceId)
+
+	assert.Nil(t, devices)
+
+	parsedErr, _ := status.FromError(error)
+	assert.Equal(t, parsedErr.Code(), codes.DeadlineExceeded)
+}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index a75c1b6..de22dda 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -309,10 +309,8 @@
 		case <-ctx.Done():
 			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: ctx.Err().Error()}
+			protoError := &ic.Error{Reason: ctx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
 
-			// FIXME we need to return a Code together with the reason
-			//protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -321,10 +319,8 @@
 		case <-childCtx.Done():
 			logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ic.Error{Reason: childCtx.Err().Error()}
+			protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: ic.ErrorCode_DEADLINE_EXCEEDED}
 
-			// FIXME we need to return a Code together with the reason
-			//protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka_inter_container_proxy.go
index 3af728a..c53abb4 100644
--- a/pkg/mocks/kafka_inter_container_proxy.go
+++ b/pkg/mocks/kafka_inter_container_proxy.go
@@ -18,11 +18,11 @@
 
 import (
 	"context"
+	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
 	"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
-	"github.com/opencord/voltha-protos/v3/go/voltha"
 )
 
 type InvokeRpcArgs struct {
@@ -35,21 +35,11 @@
 	KvArgs          map[int]interface{}
 }
 
-type FailReason int
-
-const (
-	Timeout FailReason = iota + 1
-	UnmarshalError
-)
-
-func (r FailReason) String() string {
-	return [...]string{"Timeout", "UnmarshalError"}[r]
-}
-
 type InvokeRpcSpy struct {
 	CallCount int
 	Calls     map[int]InvokeRpcArgs
-	Fail      FailReason // timeout, error
+	Timeout   bool
+	Response  proto.Message
 }
 
 type MockKafkaICProxy struct {
@@ -81,26 +71,20 @@
 		KvArgs:          args,
 	}
 
-	device := &voltha.Device{
-		Id: "testDevice",
-	}
-	response, _ := ptypes.MarshalAny(device)
-
-	if s.InvokeRpcSpy.Fail == Timeout {
+	var response any.Any
+	if s.InvokeRpcSpy.Timeout {
 
 		success = false
 
-		// TODO once InvokeRPC is fixed to return an error code, add it here
-		err := &ic.Error{Reason: "context deadline exceeded"}
-		response, _ = ptypes.MarshalAny(err)
-	} else if s.InvokeRpcSpy.Fail == UnmarshalError {
-		res := &voltha.LogicalDevice{
-			Id: "testLogicalDevice",
-		}
-		response, _ = ptypes.MarshalAny(res)
+		err := &ic.Error{Reason: "context deadline exceeded", Code: ic.ErrorCode_DEADLINE_EXCEEDED}
+		res, _ := ptypes.MarshalAny(err)
+		response = *res
+	} else {
+		res, _ := ptypes.MarshalAny(s.InvokeRpcSpy.Response)
+		response = *res
 	}
 
-	return success, response
+	return success, &response
 }
 func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
 	return nil