[VOL-2364] Adding unit test in the core_proxy package
Change-Id: Ifcaa986ae27280de9f16f3a9cabf45bb94c0d5d8
diff --git a/VERSION b/VERSION
index cb2b00e..7364556 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.1
+3.0.2-dev
diff --git a/pkg/adapters/common/adapter_proxy.go b/pkg/adapters/common/adapter_proxy.go
index b302214..02fa3de 100644
--- a/pkg/adapters/common/adapter_proxy.go
+++ b/pkg/adapters/common/adapter_proxy.go
@@ -29,12 +29,12 @@
)
type AdapterProxy struct {
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
adapterTopic string
coreTopic string
}
-func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
var proxy AdapterProxy
proxy.kafkaICProxy = kafkaProxy
proxy.adapterTopic = adapterTopic
diff --git a/pkg/adapters/common/common_test.go b/pkg/adapters/common/common_test.go
new file mode 100644
index 0000000..d2d9f0e
--- /dev/null
+++ b/pkg/adapters/common/common_test.go
@@ -0,0 +1,54 @@
+/*
+ * Portions copyright 2019-present Open Networking Foundation
+ * Original copyright 2019-present Ciena Corporation
+ *
+ * Licensed under the Apache License, Version 2.0 (the"github.com/stretchr/testify/assert" "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package common
+
+/*
+ * This file has common code that is imported for all test cases, but
+ * is not built into production binaries.
+ */
+
+import (
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
+)
+
+const (
+ /*
+ * This sets the LogLevel of the Voltha logger. It's pinned to FatalLevel here, as we
+ * generally don't want to see logger output, even when running go test in verbose
+ * mode. Even "Error" level messages are expected to be output by some unit tests.
+ *
+ * If you are developing a unit test, and experiencing problems or wish additional
+ * debugging from Voltha, then changing this constant to log.DebugLevel may be
+ * useful.
+ */
+
+ VOLTHA_LOGLEVEL = log.FatalLevel
+)
+
+// Unit test initialization. This init() function will be run once for all unit tests in afrouter
+func init() {
+ // Logger must be configured or bad things happen
+ _, err := log.SetDefaultLogger(log.JSON, VOLTHA_LOGLEVEL, log.Fields{"instanceId": 1})
+ if err != nil {
+ panic(err)
+ }
+
+ _, err = log.AddPackage(log.JSON, VOLTHA_LOGLEVEL, nil)
+ if err != nil {
+ panic(err)
+ }
+}
diff --git a/pkg/adapters/common/core_proxy.go b/pkg/adapters/common/core_proxy.go
index 9b46c28..cf80858 100644
--- a/pkg/adapters/common/core_proxy.go
+++ b/pkg/adapters/common/core_proxy.go
@@ -30,14 +30,14 @@
)
type CoreProxy struct {
- kafkaICProxy *kafka.InterContainerProxy
+ kafkaICProxy kafka.InterContainerProxy
adapterTopic string
coreTopic string
deviceIdCoreMap map[string]string
lockDeviceIdCoreMap sync.RWMutex
}
-func NewCoreProxy(kafkaProxy *kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
+func NewCoreProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *CoreProxy {
var proxy CoreProxy
proxy.kafkaICProxy = kafkaProxy
proxy.adapterTopic = adapterTopic
diff --git a/pkg/adapters/common/core_proxy_test.go b/pkg/adapters/common/core_proxy_test.go
index b47f43e..6d2f78c 100644
--- a/pkg/adapters/common/core_proxy_test.go
+++ b/pkg/adapters/common/core_proxy_test.go
@@ -16,9 +16,16 @@
package common
import (
- "testing"
-
+ "context"
adapterIf "github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ "github.com/opencord/voltha-lib-go/v3/pkg/mocks"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+ "testing"
)
func TestCoreProxyImplementsAdapterIfCoreProxy(t *testing.T) {
@@ -29,3 +36,110 @@
}
}
+
+func TestCoreProxy_GetChildDevice_sn(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ kwargs := make(map[string]interface{})
+ kwargs["serial_number"] = "TEST00000000001"
+
+ parentDeviceId := "aabbcc"
+ device, error := proxy.GetChildDevice(context.TODO(), parentDeviceId, kwargs)
+
+ assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+ call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+ assert.Equal(t, call.Rpc, "GetChildDevice")
+ assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+ assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+ assert.Equal(t, call.WaitForResponse, true)
+ assert.Equal(t, call.Key, parentDeviceId)
+ assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "device_id", Value: &voltha.ID{Id: parentDeviceId}})
+ assert.Equal(t, call.KvArgs[1], &kafka.KVArg{Key: "serial_number", Value: &ic.StrType{Val: kwargs["serial_number"].(string)}})
+
+ assert.Equal(t, "testDevice", device.Id)
+ assert.Equal(t, nil, error)
+}
+
+func TestCoreProxy_GetChildDevice_id(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ kwargs := make(map[string]interface{})
+ kwargs["onu_id"] = uint32(1234)
+
+ parentDeviceId := "aabbcc"
+ device, error := proxy.GetChildDevice(context.TODO(), parentDeviceId, kwargs)
+
+ assert.Equal(t, mockKafkaIcProxy.InvokeRpcSpy.CallCount, 1)
+ call := mockKafkaIcProxy.InvokeRpcSpy.Calls[1]
+ assert.Equal(t, call.Rpc, "GetChildDevice")
+ assert.Equal(t, call.ToTopic, &kafka.Topic{Name: "testCoreTopic"})
+ assert.Equal(t, call.ReplyToTopic, &kafka.Topic{Name: "testAdapterTopic"})
+ assert.Equal(t, call.WaitForResponse, true)
+ assert.Equal(t, call.Key, parentDeviceId)
+ assert.Equal(t, call.KvArgs[0], &kafka.KVArg{Key: "device_id", Value: &voltha.ID{Id: parentDeviceId}})
+ assert.Equal(t, call.KvArgs[1], &kafka.KVArg{Key: "onu_id", Value: &ic.IntType{Val: int64(kwargs["onu_id"].(uint32))}})
+
+ assert.Equal(t, "testDevice", device.Id)
+ assert.Equal(t, nil, error)
+}
+
+func TestCoreProxy_GetChildDevice_fail_timeout(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Fail: mocks.Timeout,
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ kwargs := make(map[string]interface{})
+ kwargs["onu_id"] = uint32(1234)
+
+ parentDeviceId := "aabbcc"
+ device, error := proxy.GetChildDevice(context.TODO(), parentDeviceId, kwargs)
+
+ assert.Nil(t, device)
+ parsedErr, _ := status.FromError(error)
+
+ // TODO assert that the Code is not Internal but DeadlineExceeded
+ assert.Equal(t, parsedErr.Code(), codes.Internal)
+}
+
+func TestCoreProxy_GetChildDevice_fail_unmarhsal(t *testing.T) {
+
+ var mockKafkaIcProxy = mocks.MockKafkaICProxy{
+ InvokeRpcSpy: mocks.InvokeRpcSpy{
+ Calls: make(map[int]mocks.InvokeRpcArgs),
+ Fail: mocks.UnmarshalError,
+ },
+ }
+
+ proxy := NewCoreProxy(&mockKafkaIcProxy, "testAdapterTopic", "testCoreTopic")
+
+ kwargs := make(map[string]interface{})
+ kwargs["onu_id"] = uint32(1234)
+
+ parentDeviceId := "aabbcc"
+ device, error := proxy.GetChildDevice(context.TODO(), parentDeviceId, kwargs)
+
+ assert.Nil(t, device)
+
+ parsedErr, _ := status.FromError(error)
+ assert.Equal(t, parsedErr.Code(), codes.InvalidArgument)
+}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 8285876..a75c1b6 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -60,8 +60,19 @@
ch chan *ic.InterContainerMessage
}
-// InterContainerProxy represents the messaging proxy
-type InterContainerProxy struct {
+type InterContainerProxy interface {
+ Start() error
+ Stop()
+ DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
+ InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+ SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
+ SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
+ UnSubscribeFromRequestHandler(topic Topic) error
+ DeleteTopic(topic Topic) error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
kafkaHost string
kafkaPort int
DefaultTopic *Topic
@@ -87,46 +98,46 @@
lockTransactionIdToChannelMap sync.RWMutex
}
-type InterContainerProxyOption func(*InterContainerProxy)
+type InterContainerProxyOption func(*interContainerProxy)
func InterContainerHost(host string) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.kafkaHost = host
}
}
func InterContainerPort(port int) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.kafkaPort = port
}
}
func DefaultTopic(topic *Topic) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.DefaultTopic = topic
}
}
func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.deviceDiscoveryTopic = topic
}
}
func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.defaultRequestHandlerInterface = handler
}
}
func MsgClient(client Client) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.kafkaClient = client
}
}
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
- proxy := &InterContainerProxy{
+func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
+ proxy := &interContainerProxy{
kafkaHost: DefaultKafkaHost,
kafkaPort: DefaultKafkaPort,
}
@@ -143,7 +154,11 @@
return proxy, nil
}
-func (kp *InterContainerProxy) Start() error {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
+ return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start() error {
logger.Info("Starting-Proxy")
// Kafka MsgClient should already have been created. If not, output fatal error
@@ -172,7 +187,7 @@
return nil
}
-func (kp *InterContainerProxy) Stop() {
+func (kp *interContainerProxy) Stop() {
logger.Info("stopping-intercontainer-proxy")
kp.doneCh <- 1
// TODO : Perform cleanup
@@ -183,7 +198,7 @@
}
// DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
// Simple validation
if deviceId == "" || deviceType == "" {
@@ -225,7 +240,7 @@
}
// InvokeRPC is used to send a request to a given topic
-func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
@@ -288,12 +303,16 @@
var err error
if responseBody, err = decodeResponse(msg); err != nil {
logger.Errorw("decode-response-error", log.Fields{"error": err})
+ // FIXME we should return something
}
return responseBody.Success, responseBody.Result
case <-ctx.Done():
logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
// pack the error as proto any type
protoError := &ic.Error{Reason: ctx.Err().Error()}
+
+ // FIXME we need to return a Code together with the reason
+ //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -303,6 +322,9 @@
logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
// pack the error as proto any type
protoError := &ic.Error{Reason: childCtx.Err().Error()}
+
+ // FIXME we need to return a Code together with the reason
+ //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -318,7 +340,7 @@
// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
// when a message is received on a given topic
-func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
// Subscribe to receive messages for that topic
var ch <-chan *ic.InterContainerMessage
@@ -339,7 +361,7 @@
// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
// when a message is received on a given topic. So far there is only 1 target registered per microservice
-func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
// Subscribe to receive messages for that topic
var ch <-chan *ic.InterContainerMessage
var err error
@@ -355,13 +377,13 @@
return nil
}
-func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -369,14 +391,14 @@
}
}
-func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
kp.lockTopicResponseChannelMap.RLock()
defer kp.lockTopicResponseChannelMap.RUnlock()
_, exist := kp.topicToResponseChannelMap[topic]
return exist
}
-func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
if _, exist := kp.topicToResponseChannelMap[topic]; exist {
@@ -392,7 +414,7 @@
}
}
-func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
var err error
@@ -406,7 +428,7 @@
return err
}
-func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
@@ -414,7 +436,7 @@
}
}
-func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
@@ -427,7 +449,7 @@
}
}
-func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
var err error
@@ -441,7 +463,7 @@
return err
}
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -449,7 +471,7 @@
}
}
-func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
@@ -459,7 +481,7 @@
}
}
-func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
@@ -470,7 +492,7 @@
}
}
-func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
@@ -479,7 +501,7 @@
}
}
-func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
// If we have any consumers on that topic we need to close them
if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
@@ -598,7 +620,7 @@
return
}
-func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
arg := &KVArg{
Key: TransactionKey,
Value: &ic.StrType{Val: transactionId},
@@ -617,7 +639,7 @@
return append(currentArgs, protoArg)
}
-func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
var marshalledArg *any.Any
var err error
if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
@@ -631,7 +653,7 @@
return append(currentArgs, protoArg)
}
-func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
if msg.Header.Type == ic.MessageType_REQUEST {
@@ -721,7 +743,7 @@
}
}
-func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
// Wait for messages
for msg := range ch {
//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -729,7 +751,7 @@
}
}
-func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.RLock()
defer kp.lockTransactionIdToChannelMap.RUnlock()
if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -743,7 +765,7 @@
// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
// API. There is one response channel waiting for kafka messages before dispatching the message to the
// corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
@@ -754,21 +776,21 @@
return ch, nil
}
-func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
kp.deleteFromTransactionIdToChannelMap(trnsId)
return nil
}
-func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
return kp.kafkaClient.EnableLivenessChannel(enable)
}
-func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
return kp.kafkaClient.EnableHealthinessChannel(enable)
}
-func (kp *InterContainerProxy) SendLiveness() error {
+func (kp *interContainerProxy) SendLiveness() error {
return kp.kafkaClient.SendLiveness()
}
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index c3eace7..f9888c0 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -21,7 +21,7 @@
)
func TestDefaultKafkaProxy(t *testing.T) {
- actualResult, error := NewInterContainerProxy()
+ actualResult, error := newInterContainerProxy()
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -29,7 +29,7 @@
}
func TestKafkaProxyOptionHost(t *testing.T) {
- actualResult, error := NewInterContainerProxy(InterContainerHost("10.20.30.40"))
+ actualResult, error := newInterContainerProxy(InterContainerHost("10.20.30.40"))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -37,7 +37,7 @@
}
func TestKafkaProxyOptionPort(t *testing.T) {
- actualResult, error := NewInterContainerProxy(InterContainerPort(1020))
+ actualResult, error := newInterContainerProxy(InterContainerPort(1020))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, 1020)
@@ -45,7 +45,7 @@
}
func TestKafkaProxyOptionTopic(t *testing.T) {
- actualResult, error := NewInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
+ actualResult, error := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -61,7 +61,7 @@
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
- actualResult, error := NewInterContainerProxy(RequestHandlerInterface(m))
+ actualResult, error := newInterContainerProxy(RequestHandlerInterface(m))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -70,7 +70,7 @@
func TestKafkaProxyChangeAllOptions(t *testing.T) {
var m *myInterface
- actualResult, error := NewInterContainerProxy(
+ actualResult, error := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
@@ -88,7 +88,7 @@
// Note: This doesn't actually start the client
client := NewSaramaClient()
- probe, err := NewInterContainerProxy(
+ probe, err := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
diff --git a/pkg/mocks/kafka_inter_container_proxy.go b/pkg/mocks/kafka_inter_container_proxy.go
new file mode 100644
index 0000000..3af728a
--- /dev/null
+++ b/pkg/mocks/kafka_inter_container_proxy.go
@@ -0,0 +1,111 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package mocks
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/golang/protobuf/ptypes/any"
+ "github.com/opencord/voltha-lib-go/v3/pkg/kafka"
+ ic "github.com/opencord/voltha-protos/v3/go/inter_container"
+ "github.com/opencord/voltha-protos/v3/go/voltha"
+)
+
+type InvokeRpcArgs struct {
+ Rpc string
+ ToTopic *kafka.Topic
+ ReplyToTopic *kafka.Topic
+ WaitForResponse bool
+ Key string
+ ParentDeviceId string
+ KvArgs map[int]interface{}
+}
+
+type FailReason int
+
+const (
+ Timeout FailReason = iota + 1
+ UnmarshalError
+)
+
+func (r FailReason) String() string {
+ return [...]string{"Timeout", "UnmarshalError"}[r]
+}
+
+type InvokeRpcSpy struct {
+ CallCount int
+ Calls map[int]InvokeRpcArgs
+ Fail FailReason // timeout, error
+}
+
+type MockKafkaICProxy struct {
+ InvokeRpcSpy InvokeRpcSpy
+}
+
+func (s *MockKafkaICProxy) Start() error { return nil }
+func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
+func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+ return nil
+}
+func (s *MockKafkaICProxy) Stop() {}
+func (s *MockKafkaICProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic, waitForResponse bool, key string, kvArgs ...*kafka.KVArg) (bool, *any.Any) {
+ s.InvokeRpcSpy.CallCount++
+
+ success := true
+
+ args := make(map[int]interface{}, 4)
+ for k, v := range kvArgs {
+ args[k] = v
+ }
+
+ s.InvokeRpcSpy.Calls[s.InvokeRpcSpy.CallCount] = InvokeRpcArgs{
+ Rpc: rpc,
+ ToTopic: toTopic,
+ ReplyToTopic: replyToTopic,
+ WaitForResponse: waitForResponse,
+ Key: key,
+ KvArgs: args,
+ }
+
+ device := &voltha.Device{
+ Id: "testDevice",
+ }
+ response, _ := ptypes.MarshalAny(device)
+
+ if s.InvokeRpcSpy.Fail == Timeout {
+
+ success = false
+
+ // TODO once InvokeRPC is fixed to return an error code, add it here
+ err := &ic.Error{Reason: "context deadline exceeded"}
+ response, _ = ptypes.MarshalAny(err)
+ } else if s.InvokeRpcSpy.Fail == UnmarshalError {
+ res := &voltha.LogicalDevice{
+ Id: "testLogicalDevice",
+ }
+ response, _ = ptypes.MarshalAny(res)
+ }
+
+ return success, response
+}
+func (s *MockKafkaICProxy) SubscribeWithRequestHandlerInterface(topic kafka.Topic, handler interface{}) error {
+ return nil
+}
+func (s *MockKafkaICProxy) SubscribeWithDefaultRequestHandler(topic kafka.Topic, initialOffset int64) error {
+ return nil
+}
+func (s *MockKafkaICProxy) UnSubscribeFromRequestHandler(topic kafka.Topic) error { return nil }