blob: 99ba2c48068d5f518204a3aa3f326ffb7e68a1c7 [file] [log] [blame]
/*
* Copyright 2018-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package kafka
import (
"context"
"github.com/golang/protobuf/ptypes"
"github.com/google/uuid"
"github.com/opencord/voltha-go/common/log"
kk "github.com/opencord/voltha-go/kafka"
ca "github.com/opencord/voltha-go/protos/core_adapter"
"github.com/opencord/voltha-go/protos/voltha"
rhp "github.com/opencord/voltha-go/rw_core/core"
"github.com/stretchr/testify/assert"
"testing"
"time"
)
/*
Prerequite: Start the kafka/zookeeper containers.
*/
var coreKafkaProxy *kk.KafkaMessagingProxy
var adapterKafkaProxy *kk.KafkaMessagingProxy
func init() {
log.AddPackage(log.JSON, log.ErrorLevel, nil)
log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
log.SetAllLogLevel(log.ErrorLevel)
coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
kk.KafkaHost("192.168.0.20"),
kk.KafkaPort(9092),
kk.DefaultTopic(&kk.Topic{Name: "Core"}))
adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
kk.KafkaHost("192.168.0.20"),
kk.KafkaPort(9092),
kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
coreKafkaProxy.Start()
adapterKafkaProxy.Start()
subscribeTarget(coreKafkaProxy)
}
func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
topic := kk.Topic{Name: "Core"}
requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
kmp.SubscribeWithTarget(topic, requestProxy)
}
func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
for msg := range ch {
log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
// Unpack message
requestBody := &ca.InterContainerRequestBody{}
if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
doneCh <- "Error"
} else {
doneCh <- requestBody.Rpc
}
break
}
}
func TestSubscribeUnsubscribe(t *testing.T) {
// First subscribe to the specific topic
topic := kk.Topic{Name: "Core"}
ch, err := coreKafkaProxy.Subscribe(topic)
assert.NotNil(t, ch)
assert.Nil(t, err)
// Create a channel to receive a response
waitCh := make(chan string)
// Wait for a message
go waitForRPCMessage(topic, ch, waitCh)
// Send the message - don't care of the response
rpc := "AnyRPCRequestForTest"
adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
// Wait for the result on ouw own channel
result := <-waitCh
assert.Equal(t, result, rpc)
close(waitCh)
err = coreKafkaProxy.UnSubscribe(topic, ch)
assert.Nil(t, err)
}
func TestMultipleSubscribeUnsubscribe(t *testing.T) {
// First subscribe to the specific topic
//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
var err error
var ch1 <-chan *ca.InterContainerMessage
var ch2 <-chan *ca.InterContainerMessage
topic := kk.Topic{Name: "Core"}
ch1, err = coreKafkaProxy.Subscribe(topic)
assert.NotNil(t, ch1)
assert.Nil(t, err)
// Create a channel to receive responses
waitCh := make(chan string)
ch2, err = coreKafkaProxy.Subscribe(topic)
assert.NotNil(t, ch2)
assert.Nil(t, err)
// Wait for a message
go waitForRPCMessage(topic, ch2, waitCh)
go waitForRPCMessage(topic, ch1, waitCh)
// Send the message - don't care of the response
rpc := "AnyRPCRequestForTest"
adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
// Wait for the result on ouw own channel
responses := 0
for msg := range waitCh {
assert.Equal(t, msg, rpc)
responses = responses + 1
if responses > 1 {
break
}
}
assert.Equal(t, responses, 2)
close(waitCh)
err = coreKafkaProxy.UnSubscribe(topic, ch1)
assert.Nil(t, err)
err = coreKafkaProxy.UnSubscribe(topic, ch2)
assert.Nil(t, err)
}
func TestIncorrectAPI(t *testing.T) {
log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.ErrorLevel)
trnsId := uuid.New().String()
protoMsg := &voltha.Device{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "device",
Value: protoMsg,
}
rpc := "IncorrectAPI"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ca.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
func TestIncorrectAPIParams(t *testing.T) {
trnsId := uuid.New().String()
protoMsg := &voltha.Device{Id: trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "device",
Value: protoMsg,
}
rpc := "GetDevice"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ca.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
func TestGetDevice(t *testing.T) {
trnsId := uuid.New().String()
protoMsg := &voltha.ID{Id:trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoMsg,
}
rpc := "GetDevice"
topic := kk.Topic{Name: "Core"}
expectedResponse := &voltha.Device{Id: trnsId}
timeout := time.Duration(50) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
func TestGetDeviceTimeout(t *testing.T) {
trnsId := uuid.New().String()
protoMsg := &voltha.ID{Id:trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoMsg,
}
rpc := "GetDevice"
topic := kk.Topic{Name: "Core"}
timeout := time.Duration(2) * time.Millisecond
ctx, cancel := context.WithTimeout(context.Background(), timeout)
defer cancel()
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
unpackResult := &ca.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
func TestGetChildDevice(t *testing.T) {
trnsId := uuid.New().String()
protoMsg := &voltha.ID{Id:trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoMsg,
}
rpc := "GetChildDevice"
topic := kk.Topic{Name: "Core"}
expectedResponse := &voltha.Device{Id: trnsId}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
func TestGetChildDevices(t *testing.T) {
trnsId := uuid.New().String()
protoMsg := &voltha.ID{Id:trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoMsg,
}
rpc := "GetChildDevices"
topic := kk.Topic{Name: "Core"}
expectedResponse := &voltha.Device{Id: trnsId}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Device{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.Equal(t, unpackResult, expectedResponse)
}
func TestGetPorts(t *testing.T) {
trnsId := uuid.New().String()
protoArg1 := &voltha.ID{Id:trnsId}
args := make([]*kk.KVArg, 2)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
protoArg2 := &ca.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "portType",
Value: protoArg2,
}
rpc := "GetPorts"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
unpackResult := &voltha.Ports{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
expectedLen := len(unpackResult.Items) >= 1
assert.Equal(t, true, expectedLen)
}
func TestGetPortsMissingArgs(t *testing.T) {
trnsId := uuid.New().String()
protoArg1 := &voltha.ID{Id:trnsId}
args := make([]*kk.KVArg, 1)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
rpc := "GetPorts"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
//Unpack the result into the actual proto object
unpackResult := &ca.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
func TestChildDeviceDetected(t *testing.T) {
trnsId := uuid.New().String()
protoArg1 := &ca.StrType{Val: trnsId}
args := make([]*kk.KVArg, 5)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
protoArg2 := &ca.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "parentPortNo",
Value: protoArg2,
}
protoArg3 := &ca.StrType{Val: "great_onu"}
args[2] = &kk.KVArg{
Key: "childDeviceType",
Value: protoArg3,
}
protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
args[3] = &kk.KVArg{
Key: "proxyAddress",
Value: protoArg4,
}
protoArg5 := &ca.IntType{Val: 1}
args[4] = &kk.KVArg{
Key: "portType",
Value: protoArg5,
}
rpc := "ChildDeviceDetected"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
func TestChildDeviceDetectedNoWait(t *testing.T) {
trnsId := uuid.New().String()
protoArg1 := &ca.StrType{Val: trnsId}
args := make([]*kk.KVArg, 5)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
protoArg2 := &ca.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "parentPortNo",
Value: protoArg2,
}
protoArg3 := &ca.StrType{Val: "great_onu"}
args[2] = &kk.KVArg{
Key: "childDeviceType",
Value: protoArg3,
}
protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
args[3] = &kk.KVArg{
Key: "proxyAddress",
Value: protoArg4,
}
protoArg5 := &ca.IntType{Val: 1}
args[4] = &kk.KVArg{
Key: "portType",
Value: protoArg5,
}
rpc := "ChildDeviceDetected"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, true)
assert.Nil(t, result)
}
func TestChildDeviceDetectedMissingArgs(t *testing.T) {
trnsId := uuid.New().String()
protoArg1 := &ca.StrType{Val: trnsId}
args := make([]*kk.KVArg, 4)
args[0] = &kk.KVArg{
Key: "deviceID",
Value: protoArg1,
}
protoArg2 := &ca.IntType{Val: 1}
args[1] = &kk.KVArg{
Key: "parentPortNo",
Value: protoArg2,
}
protoArg3 := &ca.StrType{Val: "great_onu"}
args[2] = &kk.KVArg{
Key: "childDeviceType",
Value: protoArg3,
}
rpc := "ChildDeviceDetected"
topic := kk.Topic{Name: "Core"}
start := time.Now()
status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
elapsed := time.Since(start)
log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
assert.Equal(t, status, false)
unpackResult := &ca.Error{}
if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
}
assert.NotNil(t, unpackResult)
}
func TestStopKafkaProxy(t *testing.T) {
adapterKafkaProxy.Stop()
coreKafkaProxy.Stop()
}
//func TestMain(m *testing.T) {
// log.Info("Main")
//}