[VOL-2364] Adding unit test in the core_proxy package
Change-Id: Ifcaa986ae27280de9f16f3a9cabf45bb94c0d5d8
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index 8285876..a75c1b6 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -60,8 +60,19 @@
ch chan *ic.InterContainerMessage
}
-// InterContainerProxy represents the messaging proxy
-type InterContainerProxy struct {
+type InterContainerProxy interface {
+ Start() error
+ Stop()
+ DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
+ InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
+ SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
+ SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error
+ UnSubscribeFromRequestHandler(topic Topic) error
+ DeleteTopic(topic Topic) error
+}
+
+// interContainerProxy represents the messaging proxy
+type interContainerProxy struct {
kafkaHost string
kafkaPort int
DefaultTopic *Topic
@@ -87,46 +98,46 @@
lockTransactionIdToChannelMap sync.RWMutex
}
-type InterContainerProxyOption func(*InterContainerProxy)
+type InterContainerProxyOption func(*interContainerProxy)
func InterContainerHost(host string) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.kafkaHost = host
}
}
func InterContainerPort(port int) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.kafkaPort = port
}
}
func DefaultTopic(topic *Topic) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.DefaultTopic = topic
}
}
func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.deviceDiscoveryTopic = topic
}
}
func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.defaultRequestHandlerInterface = handler
}
}
func MsgClient(client Client) InterContainerProxyOption {
- return func(args *InterContainerProxy) {
+ return func(args *interContainerProxy) {
args.kafkaClient = client
}
}
-func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
- proxy := &InterContainerProxy{
+func newInterContainerProxy(opts ...InterContainerProxyOption) (*interContainerProxy, error) {
+ proxy := &interContainerProxy{
kafkaHost: DefaultKafkaHost,
kafkaPort: DefaultKafkaPort,
}
@@ -143,7 +154,11 @@
return proxy, nil
}
-func (kp *InterContainerProxy) Start() error {
+func NewInterContainerProxy(opts ...InterContainerProxyOption) (InterContainerProxy, error) {
+ return newInterContainerProxy(opts...)
+}
+
+func (kp *interContainerProxy) Start() error {
logger.Info("Starting-Proxy")
// Kafka MsgClient should already have been created. If not, output fatal error
@@ -172,7 +187,7 @@
return nil
}
-func (kp *InterContainerProxy) Stop() {
+func (kp *interContainerProxy) Stop() {
logger.Info("stopping-intercontainer-proxy")
kp.doneCh <- 1
// TODO : Perform cleanup
@@ -183,7 +198,7 @@
}
// DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
+func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
// Simple validation
if deviceId == "" || deviceType == "" {
@@ -225,7 +240,7 @@
}
// InvokeRPC is used to send a request to a given topic
-func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+func (kp *interContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any) {
// If a replyToTopic is provided then we use it, otherwise just use the default toTopic. The replyToTopic is
@@ -288,12 +303,16 @@
var err error
if responseBody, err = decodeResponse(msg); err != nil {
logger.Errorw("decode-response-error", log.Fields{"error": err})
+ // FIXME we should return something
}
return responseBody.Success, responseBody.Result
case <-ctx.Done():
logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
// pack the error as proto any type
protoError := &ic.Error{Reason: ctx.Err().Error()}
+
+ // FIXME we need to return a Code together with the reason
+ //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -303,6 +322,9 @@
logger.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
// pack the error as proto any type
protoError := &ic.Error{Reason: childCtx.Err().Error()}
+
+ // FIXME we need to return a Code together with the reason
+ //protoError := &ic.Error{Reason: childCtx.Err().Error(), Code: codes.DeadlineExceeded}
var marshalledArg *any.Any
if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
return false, nil // Should never happen
@@ -318,7 +340,7 @@
// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
// when a message is received on a given topic
-func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
+func (kp *interContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
// Subscribe to receive messages for that topic
var ch <-chan *ic.InterContainerMessage
@@ -339,7 +361,7 @@
// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
// when a message is received on a given topic. So far there is only 1 target registered per microservice
-func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
+func (kp *interContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic, initialOffset int64) error {
// Subscribe to receive messages for that topic
var ch <-chan *ic.InterContainerMessage
var err error
@@ -355,13 +377,13 @@
return nil
}
-func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+func (kp *interContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
}
// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
// responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -369,14 +391,14 @@
}
}
-func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+func (kp *interContainerProxy) isTopicSubscribedForResponse(topic string) bool {
kp.lockTopicResponseChannelMap.RLock()
defer kp.lockTopicResponseChannelMap.RUnlock()
_, exist := kp.topicToResponseChannelMap[topic]
return exist
}
-func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
if _, exist := kp.topicToResponseChannelMap[topic]; exist {
@@ -392,7 +414,7 @@
}
}
-func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicResponseChannelMap() error {
kp.lockTopicResponseChannelMap.Lock()
defer kp.lockTopicResponseChannelMap.Unlock()
var err error
@@ -406,7 +428,7 @@
return err
}
-func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+func (kp *interContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
@@ -414,7 +436,7 @@
}
}
-func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+func (kp *interContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
@@ -427,7 +449,7 @@
}
}
-func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+func (kp *interContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
kp.lockTopicRequestHandlerChannelMap.Lock()
defer kp.lockTopicRequestHandlerChannelMap.Unlock()
var err error
@@ -441,7 +463,7 @@
return err
}
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
+func (kp *interContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -449,7 +471,7 @@
}
}
-func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
@@ -459,7 +481,7 @@
}
}
-func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+func (kp *interContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
@@ -470,7 +492,7 @@
}
}
-func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+func (kp *interContainerProxy) deleteAllTransactionIdToChannelMap() {
kp.lockTransactionIdToChannelMap.Lock()
defer kp.lockTransactionIdToChannelMap.Unlock()
for key, value := range kp.transactionIdToChannelMap {
@@ -479,7 +501,7 @@
}
}
-func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+func (kp *interContainerProxy) DeleteTopic(topic Topic) error {
// If we have any consumers on that topic we need to close them
if err := kp.deleteFromTopicResponseChannelMap(topic.Name); err != nil {
logger.Errorw("delete-from-topic-responsechannelmap-failed", log.Fields{"error": err})
@@ -598,7 +620,7 @@
return
}
-func (kp *InterContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addTransactionId(transactionId string, currentArgs []*ic.Argument) []*ic.Argument {
arg := &KVArg{
Key: TransactionKey,
Value: &ic.StrType{Val: transactionId},
@@ -617,7 +639,7 @@
return append(currentArgs, protoArg)
}
-func (kp *InterContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
+func (kp *interContainerProxy) addFromTopic(fromTopic string, currentArgs []*ic.Argument) []*ic.Argument {
var marshalledArg *any.Any
var err error
if marshalledArg, err = ptypes.MarshalAny(&ic.StrType{Val: fromTopic}); err != nil {
@@ -631,7 +653,7 @@
return append(currentArgs, protoArg)
}
-func (kp *InterContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
+func (kp *interContainerProxy) handleMessage(msg *ic.InterContainerMessage, targetInterface interface{}) {
// First extract the header to know whether this is a request - responses are handled by a different handler
if msg.Header.Type == ic.MessageType_REQUEST {
@@ -721,7 +743,7 @@
}
}
-func (kp *InterContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *interContainerProxy) waitForMessages(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
// Wait for messages
for msg := range ch {
//logger.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -729,7 +751,7 @@
}
}
-func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
+func (kp *interContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
kp.lockTransactionIdToChannelMap.RLock()
defer kp.lockTransactionIdToChannelMap.RUnlock()
if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -743,7 +765,7 @@
// This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
// API. There is one response channel waiting for kafka messages before dispatching the message to the
// corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
+func (kp *interContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
logger.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
// Create a specific channel for this consumers. We cannot use the channel from the kafkaclient as it will
@@ -754,21 +776,21 @@
return ch, nil
}
-func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *interContainerProxy) unSubscribeForResponse(trnsId string) error {
logger.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
kp.deleteFromTransactionIdToChannelMap(trnsId)
return nil
}
-func (kp *InterContainerProxy) EnableLivenessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableLivenessChannel(enable bool) chan bool {
return kp.kafkaClient.EnableLivenessChannel(enable)
}
-func (kp *InterContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
+func (kp *interContainerProxy) EnableHealthinessChannel(enable bool) chan bool {
return kp.kafkaClient.EnableHealthinessChannel(enable)
}
-func (kp *InterContainerProxy) SendLiveness() error {
+func (kp *interContainerProxy) SendLiveness() error {
return kp.kafkaClient.SendLiveness()
}
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index c3eace7..f9888c0 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -21,7 +21,7 @@
)
func TestDefaultKafkaProxy(t *testing.T) {
- actualResult, error := NewInterContainerProxy()
+ actualResult, error := newInterContainerProxy()
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -29,7 +29,7 @@
}
func TestKafkaProxyOptionHost(t *testing.T) {
- actualResult, error := NewInterContainerProxy(InterContainerHost("10.20.30.40"))
+ actualResult, error := newInterContainerProxy(InterContainerHost("10.20.30.40"))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -37,7 +37,7 @@
}
func TestKafkaProxyOptionPort(t *testing.T) {
- actualResult, error := NewInterContainerProxy(InterContainerPort(1020))
+ actualResult, error := newInterContainerProxy(InterContainerPort(1020))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, 1020)
@@ -45,7 +45,7 @@
}
func TestKafkaProxyOptionTopic(t *testing.T) {
- actualResult, error := NewInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
+ actualResult, error := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -61,7 +61,7 @@
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
- actualResult, error := NewInterContainerProxy(RequestHandlerInterface(m))
+ actualResult, error := newInterContainerProxy(RequestHandlerInterface(m))
assert.Equal(t, error, nil)
assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
@@ -70,7 +70,7 @@
func TestKafkaProxyChangeAllOptions(t *testing.T) {
var m *myInterface
- actualResult, error := NewInterContainerProxy(
+ actualResult, error := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),
@@ -88,7 +88,7 @@
// Note: This doesn't actually start the client
client := NewSaramaClient()
- probe, err := NewInterContainerProxy(
+ probe, err := newInterContainerProxy(
InterContainerHost("10.20.30.40"),
InterContainerPort(1020),
DefaultTopic(&Topic{Name: "Adapter"}),