[VOL-1024] This update consists of:
1) Inter-container kafka library in Go
2) initial set of proto definitions
3) Test cases for the kafka library
Change-Id: Ibe8b0f673a90bbe4cb92847ce40f31ec2d0b6244
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
new file mode 100644
index 0000000..9a73b18
--- /dev/null
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -0,0 +1,437 @@
+package kafka
+
+import (
+ "context"
+ "github.com/golang/protobuf/ptypes"
+ "github.com/opencord/voltha-go/common/log"
+ kk "github.com/opencord/voltha-go/kafka"
+ ca "github.com/opencord/voltha-go/protos/core_adapter"
+ "github.com/opencord/voltha-go/protos/voltha"
+ rhp "github.com/opencord/voltha-go/rw_core/core"
+ "github.com/stretchr/testify/assert"
+ "testing"
+ //"time"
+ "github.com/google/uuid"
+ "time"
+)
+
+var coreKafkaProxy *kk.KafkaMessagingProxy
+var adapterKafkaProxy *kk.KafkaMessagingProxy
+
+func init() {
+ if _, err := log.SetLogger(log.JSON, 1, log.Fields{"instanceId": "testing"}); err != nil {
+ log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+ }
+ coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
+ kk.KafkaHost("10.100.198.220"),
+ kk.KafkaPort(9092),
+ kk.DefaultTopic(&kk.Topic{Name: "Core"}))
+
+ adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
+ kk.KafkaHost("10.100.198.220"),
+ kk.KafkaPort(9092),
+ kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
+
+ coreKafkaProxy.Start()
+ adapterKafkaProxy.Start()
+ subscribeTarget(coreKafkaProxy)
+}
+
+func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
+ topic := kk.Topic{Name: "Core"}
+ requestProxy := &rhp.RequestHandlerProxy{TestMode: true}
+ kmp.SubscribeWithTarget(topic, requestProxy)
+}
+
+func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
+ for msg := range ch {
+ log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
+ // Unpack message
+ requestBody := &ca.InterContainerRequestBody{}
+ if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+ doneCh <- "Error"
+ } else {
+ doneCh <- requestBody.Rpc
+ }
+ break
+ }
+}
+
+func TestSubscribeUnsubscribe(t *testing.T) {
+ // First subscribe to the specific topic
+ topic := kk.Topic{Name: "Core"}
+ ch, err := coreKafkaProxy.Subscribe(topic)
+ assert.NotNil(t, ch)
+ assert.Nil(t, err)
+ // Create a channel to receive a response
+ waitCh := make(chan string)
+ // Wait for a message
+ go waitForRPCMessage(topic, ch, waitCh)
+ // Send the message - don't care of the response
+ rpc := "AnyRPCRequestForTest"
+ adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
+ // Wait for the result on ouw own channel
+ result := <-waitCh
+ assert.Equal(t, result, rpc)
+ close(waitCh)
+ err = coreKafkaProxy.UnSubscribe(topic, ch)
+ assert.Nil(t, err)
+}
+
+func TestMultipleSubscribeUnsubscribe(t *testing.T) {
+ // First subscribe to the specific topic
+ var err error
+ var ch1 <-chan *ca.InterContainerMessage
+ var ch2 <-chan *ca.InterContainerMessage
+ topic := kk.Topic{Name: "Core"}
+ ch1, err = coreKafkaProxy.Subscribe(topic)
+ assert.NotNil(t, ch1)
+ assert.Nil(t, err)
+ // Create a channel to receive responses
+ waitCh := make(chan string)
+ ch2, err = coreKafkaProxy.Subscribe(topic)
+ assert.NotNil(t, ch2)
+ assert.Nil(t, err)
+ // Wait for a message
+ go waitForRPCMessage(topic, ch2, waitCh)
+ go waitForRPCMessage(topic, ch1, waitCh)
+
+ // Send the message - don't care of the response
+ rpc := "AnyRPCRequestForTest"
+ adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
+ // Wait for the result on ouw own channel
+
+ responses := 0
+ for msg := range waitCh {
+ assert.Equal(t, msg, rpc)
+ responses = responses + 1
+ if responses > 1 {
+ break
+ }
+ }
+ assert.Equal(t, responses, 2)
+ close(waitCh)
+ err = coreKafkaProxy.UnSubscribe(topic, ch1)
+ assert.Nil(t, err)
+ err = coreKafkaProxy.UnSubscribe(topic, ch2)
+ assert.Nil(t, err)
+}
+
+func TestIncorrectAPI(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoMsg := &voltha.Device{Id: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "device",
+ Value: protoMsg,
+ }
+ rpc := "IncorrectAPI"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, false)
+ //Unpack the result into the actual proto object
+ unpackResult := &ca.Error{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.NotNil(t, unpackResult)
+}
+
+func TestIncorrectAPIParams(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoMsg := &voltha.Device{Id: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "device",
+ Value: protoMsg,
+ }
+ rpc := "GetDevice"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, false)
+ //Unpack the result into the actual proto object
+ unpackResult := &ca.Error{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.NotNil(t, unpackResult)
+}
+
+func TestGetDevice(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoMsg := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoMsg,
+ }
+ rpc := "GetDevice"
+ topic := kk.Topic{Name: "Core"}
+ expectedResponse := &voltha.Device{Id: trnsId}
+ timeout := time.Duration(50) * time.Millisecond
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ unpackResult := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.Equal(t, unpackResult, expectedResponse)
+}
+
+func TestGetDeviceTimeout(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoMsg := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoMsg,
+ }
+ rpc := "GetDevice"
+ topic := kk.Topic{Name: "Core"}
+ timeout := time.Duration(2) * time.Millisecond
+ ctx, cancel := context.WithTimeout(context.Background(), timeout)
+ defer cancel()
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, false)
+ unpackResult := &ca.Error{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.NotNil(t, unpackResult)
+}
+
+func TestGetChildDevice(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoMsg := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoMsg,
+ }
+ rpc := "GetChildDevice"
+ topic := kk.Topic{Name: "Core"}
+ expectedResponse := &voltha.Device{Id: trnsId}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ unpackResult := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.Equal(t, unpackResult, expectedResponse)
+}
+
+func TestGetChildDevices(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoMsg := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoMsg,
+ }
+ rpc := "GetChildDevices"
+ topic := kk.Topic{Name: "Core"}
+ expectedResponse := &voltha.Device{Id: trnsId}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ unpackResult := &voltha.Device{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.Equal(t, unpackResult, expectedResponse)
+}
+
+func TestGetPorts(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoArg1 := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 2)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoArg1,
+ }
+ protoArg2 := &ca.IntType{Val: 1}
+ args[1] = &kk.KVArg{
+ Key: "portType",
+ Value: protoArg2,
+ }
+ rpc := "GetPorts"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ unpackResult := &voltha.Ports{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ expectedLen := len(unpackResult.Items) >= 1
+ assert.Equal(t, true, expectedLen)
+}
+
+func TestGetPortsMissingArgs(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoArg1 := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 1)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoArg1,
+ }
+ rpc := "GetPorts"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, false)
+ //Unpack the result into the actual proto object
+ unpackResult := &ca.Error{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.NotNil(t, unpackResult)
+}
+
+func TestChildDeviceDetected(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoArg1 := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 5)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoArg1,
+ }
+ protoArg2 := &ca.IntType{Val: 1}
+ args[1] = &kk.KVArg{
+ Key: "parentPortNo",
+ Value: protoArg2,
+ }
+ protoArg3 := &ca.StrType{Val: "great_onu"}
+ args[2] = &kk.KVArg{
+ Key: "childDeviceType",
+ Value: protoArg3,
+ }
+ protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
+ args[3] = &kk.KVArg{
+ Key: "proxyAddress",
+ Value: protoArg4,
+ }
+ protoArg5 := &ca.IntType{Val: 1}
+ args[4] = &kk.KVArg{
+ Key: "portType",
+ Value: protoArg5,
+ }
+
+ rpc := "ChildDeviceDetected"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ assert.Nil(t, result)
+}
+
+func TestChildDeviceDetectedNoWait(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoArg1 := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 5)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoArg1,
+ }
+ protoArg2 := &ca.IntType{Val: 1}
+ args[1] = &kk.KVArg{
+ Key: "parentPortNo",
+ Value: protoArg2,
+ }
+ protoArg3 := &ca.StrType{Val: "great_onu"}
+ args[2] = &kk.KVArg{
+ Key: "childDeviceType",
+ Value: protoArg3,
+ }
+ protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
+ args[3] = &kk.KVArg{
+ Key: "proxyAddress",
+ Value: protoArg4,
+ }
+ protoArg5 := &ca.IntType{Val: 1}
+ args[4] = &kk.KVArg{
+ Key: "portType",
+ Value: protoArg5,
+ }
+
+ rpc := "ChildDeviceDetected"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, true)
+ assert.Nil(t, result)
+}
+
+func TestChildDeviceDetectedMissingArgs(t *testing.T) {
+ trnsId := uuid.New().String()
+ protoArg1 := &ca.StrType{Val: trnsId}
+ args := make([]*kk.KVArg, 4)
+ args[0] = &kk.KVArg{
+ Key: "deviceID",
+ Value: protoArg1,
+ }
+ protoArg2 := &ca.IntType{Val: 1}
+ args[1] = &kk.KVArg{
+ Key: "parentPortNo",
+ Value: protoArg2,
+ }
+ protoArg3 := &ca.StrType{Val: "great_onu"}
+ args[2] = &kk.KVArg{
+ Key: "childDeviceType",
+ Value: protoArg3,
+ }
+ protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
+ args[3] = &kk.KVArg{
+ Key: "proxyAddress",
+ Value: protoArg4,
+ }
+
+ rpc := "ChildDeviceDetected"
+ topic := kk.Topic{Name: "Core"}
+ start := time.Now()
+ status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+ elapsed := time.Since(start)
+ log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+ assert.Equal(t, status, false)
+ unpackResult := &ca.Error{}
+ if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+ log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+ }
+ assert.NotNil(t, unpackResult)
+}
+
+func TestStopKafkaProxy(t *testing.T) {
+ adapterKafkaProxy.Stop()
+ coreKafkaProxy.Stop()
+}
+
+//func TestMain(m *testing.T) {
+// log.Info("Main")
+//}