[VOL-1024]  This update consists of:

1) Inter-container kafka library in Go
2) initial set of proto definitions
3) Test cases for the kafka library

Change-Id: Ibe8b0f673a90bbe4cb92847ce40f31ec2d0b6244
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
new file mode 100644
index 0000000..9a73b18
--- /dev/null
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -0,0 +1,437 @@
+package kafka
+
+import (
+	"context"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/opencord/voltha-go/common/log"
+	kk "github.com/opencord/voltha-go/kafka"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"github.com/opencord/voltha-go/protos/voltha"
+	rhp "github.com/opencord/voltha-go/rw_core/core"
+	"github.com/stretchr/testify/assert"
+	"testing"
+	//"time"
+	"github.com/google/uuid"
+	"time"
+)
+
+var coreKafkaProxy *kk.KafkaMessagingProxy
+var adapterKafkaProxy *kk.KafkaMessagingProxy
+
+func init() {
+	if _, err := log.SetLogger(log.JSON, 1, log.Fields{"instanceId": "testing"}); err != nil {
+		log.With(log.Fields{"error": err}).Fatal("Cannot setup logging")
+	}
+	coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
+		kk.KafkaHost("10.100.198.220"),
+		kk.KafkaPort(9092),
+		kk.DefaultTopic(&kk.Topic{Name: "Core"}))
+
+	adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
+		kk.KafkaHost("10.100.198.220"),
+		kk.KafkaPort(9092),
+		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
+
+	coreKafkaProxy.Start()
+	adapterKafkaProxy.Start()
+	subscribeTarget(coreKafkaProxy)
+}
+
+func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
+	topic := kk.Topic{Name: "Core"}
+	requestProxy := &rhp.RequestHandlerProxy{TestMode: true}
+	kmp.SubscribeWithTarget(topic, requestProxy)
+}
+
+func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
+	for msg := range ch {
+		log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
+		//	Unpack message
+		requestBody := &ca.InterContainerRequestBody{}
+		if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
+			doneCh <- "Error"
+		} else {
+			doneCh <- requestBody.Rpc
+		}
+		break
+	}
+}
+
+func TestSubscribeUnsubscribe(t *testing.T) {
+	// First subscribe to the specific topic
+	topic := kk.Topic{Name: "Core"}
+	ch, err := coreKafkaProxy.Subscribe(topic)
+	assert.NotNil(t, ch)
+	assert.Nil(t, err)
+	// Create a channel to receive a response
+	waitCh := make(chan string)
+	// Wait for a message
+	go waitForRPCMessage(topic, ch, waitCh)
+	// Send the message - don't care of the response
+	rpc := "AnyRPCRequestForTest"
+	adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
+	// Wait for the result on ouw own channel
+	result := <-waitCh
+	assert.Equal(t, result, rpc)
+	close(waitCh)
+	err = coreKafkaProxy.UnSubscribe(topic, ch)
+	assert.Nil(t, err)
+}
+
+func TestMultipleSubscribeUnsubscribe(t *testing.T) {
+	// First subscribe to the specific topic
+	var err error
+	var ch1 <-chan *ca.InterContainerMessage
+	var ch2 <-chan *ca.InterContainerMessage
+	topic := kk.Topic{Name: "Core"}
+	ch1, err = coreKafkaProxy.Subscribe(topic)
+	assert.NotNil(t, ch1)
+	assert.Nil(t, err)
+	// Create a channel to receive responses
+	waitCh := make(chan string)
+	ch2, err = coreKafkaProxy.Subscribe(topic)
+	assert.NotNil(t, ch2)
+	assert.Nil(t, err)
+	// Wait for a message
+	go waitForRPCMessage(topic, ch2, waitCh)
+	go waitForRPCMessage(topic, ch1, waitCh)
+
+	// Send the message - don't care of the response
+	rpc := "AnyRPCRequestForTest"
+	adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
+	// Wait for the result on ouw own channel
+
+	responses := 0
+	for msg := range waitCh {
+		assert.Equal(t, msg, rpc)
+		responses = responses + 1
+		if responses > 1 {
+			break
+		}
+	}
+	assert.Equal(t, responses, 2)
+	close(waitCh)
+	err = coreKafkaProxy.UnSubscribe(topic, ch1)
+	assert.Nil(t, err)
+	err = coreKafkaProxy.UnSubscribe(topic, ch2)
+	assert.Nil(t, err)
+}
+
+func TestIncorrectAPI(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoMsg := &voltha.Device{Id: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "device",
+		Value: protoMsg,
+	}
+	rpc := "IncorrectAPI"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, false)
+	//Unpack the result into the actual proto object
+	unpackResult := &ca.Error{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.NotNil(t, unpackResult)
+}
+
+func TestIncorrectAPIParams(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoMsg := &voltha.Device{Id: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "device",
+		Value: protoMsg,
+	}
+	rpc := "GetDevice"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, false)
+	//Unpack the result into the actual proto object
+	unpackResult := &ca.Error{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.NotNil(t, unpackResult)
+}
+
+func TestGetDevice(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoMsg := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoMsg,
+	}
+	rpc := "GetDevice"
+	topic := kk.Topic{Name: "Core"}
+	expectedResponse := &voltha.Device{Id: trnsId}
+	timeout := time.Duration(50) * time.Millisecond
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	unpackResult := &voltha.Device{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.Equal(t, unpackResult, expectedResponse)
+}
+
+func TestGetDeviceTimeout(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoMsg := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoMsg,
+	}
+	rpc := "GetDevice"
+	topic := kk.Topic{Name: "Core"}
+	timeout := time.Duration(2) * time.Millisecond
+	ctx, cancel := context.WithTimeout(context.Background(), timeout)
+	defer cancel()
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, false)
+	unpackResult := &ca.Error{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.NotNil(t, unpackResult)
+}
+
+func TestGetChildDevice(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoMsg := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoMsg,
+	}
+	rpc := "GetChildDevice"
+	topic := kk.Topic{Name: "Core"}
+	expectedResponse := &voltha.Device{Id: trnsId}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	unpackResult := &voltha.Device{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.Equal(t, unpackResult, expectedResponse)
+}
+
+func TestGetChildDevices(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoMsg := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoMsg,
+	}
+	rpc := "GetChildDevices"
+	topic := kk.Topic{Name: "Core"}
+	expectedResponse := &voltha.Device{Id: trnsId}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	unpackResult := &voltha.Device{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.Equal(t, unpackResult, expectedResponse)
+}
+
+func TestGetPorts(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoArg1 := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 2)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoArg1,
+	}
+	protoArg2 := &ca.IntType{Val: 1}
+	args[1] = &kk.KVArg{
+		Key:   "portType",
+		Value: protoArg2,
+	}
+	rpc := "GetPorts"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	unpackResult := &voltha.Ports{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	expectedLen := len(unpackResult.Items) >= 1
+	assert.Equal(t, true, expectedLen)
+}
+
+func TestGetPortsMissingArgs(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoArg1 := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 1)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoArg1,
+	}
+	rpc := "GetPorts"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, false)
+	//Unpack the result into the actual proto object
+	unpackResult := &ca.Error{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.NotNil(t, unpackResult)
+}
+
+func TestChildDeviceDetected(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoArg1 := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 5)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoArg1,
+	}
+	protoArg2 := &ca.IntType{Val: 1}
+	args[1] = &kk.KVArg{
+		Key:   "parentPortNo",
+		Value: protoArg2,
+	}
+	protoArg3 := &ca.StrType{Val: "great_onu"}
+	args[2] = &kk.KVArg{
+		Key:   "childDeviceType",
+		Value: protoArg3,
+	}
+	protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
+	args[3] = &kk.KVArg{
+		Key:   "proxyAddress",
+		Value: protoArg4,
+	}
+	protoArg5 := &ca.IntType{Val: 1}
+	args[4] = &kk.KVArg{
+		Key:   "portType",
+		Value: protoArg5,
+	}
+
+	rpc := "ChildDeviceDetected"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	assert.Nil(t, result)
+}
+
+func TestChildDeviceDetectedNoWait(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoArg1 := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 5)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoArg1,
+	}
+	protoArg2 := &ca.IntType{Val: 1}
+	args[1] = &kk.KVArg{
+		Key:   "parentPortNo",
+		Value: protoArg2,
+	}
+	protoArg3 := &ca.StrType{Val: "great_onu"}
+	args[2] = &kk.KVArg{
+		Key:   "childDeviceType",
+		Value: protoArg3,
+	}
+	protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
+	args[3] = &kk.KVArg{
+		Key:   "proxyAddress",
+		Value: protoArg4,
+	}
+	protoArg5 := &ca.IntType{Val: 1}
+	args[4] = &kk.KVArg{
+		Key:   "portType",
+		Value: protoArg5,
+	}
+
+	rpc := "ChildDeviceDetected"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, true)
+	assert.Nil(t, result)
+}
+
+func TestChildDeviceDetectedMissingArgs(t *testing.T) {
+	trnsId := uuid.New().String()
+	protoArg1 := &ca.StrType{Val: trnsId}
+	args := make([]*kk.KVArg, 4)
+	args[0] = &kk.KVArg{
+		Key:   "deviceID",
+		Value: protoArg1,
+	}
+	protoArg2 := &ca.IntType{Val: 1}
+	args[1] = &kk.KVArg{
+		Key:   "parentPortNo",
+		Value: protoArg2,
+	}
+	protoArg3 := &ca.StrType{Val: "great_onu"}
+	args[2] = &kk.KVArg{
+		Key:   "childDeviceType",
+		Value: protoArg3,
+	}
+	protoArg4 := &voltha.Device_ProxyAddress{DeviceId: trnsId, ChannelId: 100}
+	args[3] = &kk.KVArg{
+		Key:   "proxyAddress",
+		Value: protoArg4,
+	}
+
+	rpc := "ChildDeviceDetected"
+	topic := kk.Topic{Name: "Core"}
+	start := time.Now()
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	elapsed := time.Since(start)
+	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
+	assert.Equal(t, status, false)
+	unpackResult := &ca.Error{}
+	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
+		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
+	}
+	assert.NotNil(t, unpackResult)
+}
+
+func TestStopKafkaProxy(t *testing.T) {
+	adapterKafkaProxy.Stop()
+	coreKafkaProxy.Stop()
+}
+
+//func TestMain(m *testing.T) {
+//	log.Info("Main")
+//}