[VOL-1346]  This commit addresses device discovery notifications
which will be principally used by the affinity router.  In doing so
this commit also rename the core_adapter.proto to inter_container.proto.

Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 0293d6d..57e7ab9 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -21,10 +21,11 @@
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	kk "github.com/opencord/voltha-go/kafka"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
 	rhp "github.com/opencord/voltha-go/rw_core/core"
 	"github.com/stretchr/testify/assert"
+	"os"
 	"testing"
 	"time"
 )
@@ -35,27 +36,42 @@
 
 var coreKafkaProxy *kk.InterContainerProxy
 var adapterKafkaProxy *kk.InterContainerProxy
+var kafkaPartitionClient kk.Client
+var affinityRouterTopic string
+var hostIP string
+var kafkaClient kk.Client
 
 func init() {
 	log.AddPackage(log.JSON, log.ErrorLevel, nil)
 	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
 	log.SetAllLogLevel(log.ErrorLevel)
-	kafkaClient := kk.NewSaramaClient(
-		kk.Host("10.176.212.108"),
+	affinityRouterTopic = "AffinityRouter"
+	hostIP = os.Getenv("DOCKER_HOST_IP")
+	kafkaClient = kk.NewSaramaClient(
+		kk.Host(hostIP),
 		kk.Port(9092))
 
 	coreKafkaProxy, _ = kk.NewInterContainerProxy(
-		kk.InterContainerHost("10.176.212.108"),
+		kk.InterContainerHost(hostIP),
 		kk.InterContainerPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Core"}),
-		kk.MsgClient(kafkaClient))
+		kk.MsgClient(kafkaClient),
+		kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
 
 	adapterKafkaProxy, _ = kk.NewInterContainerProxy(
-		kk.InterContainerHost("10.176.212.108"),
+		kk.InterContainerHost(hostIP),
 		kk.InterContainerPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
 		kk.MsgClient(kafkaClient))
 
+	kafkaPartitionClient = kk.NewSaramaClient(
+		kk.ConsumerType(kk.PartitionConsumer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(true),
+		kk.ProducerFlushFrequency(5))
+	kafkaPartitionClient.Start()
+
 	coreKafkaProxy.Start()
 	adapterKafkaProxy.Start()
 	subscribeTarget(coreKafkaProxy)
@@ -67,11 +83,11 @@
 	kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
 }
 
-func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
+func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
 	for msg := range ch {
 		log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
 		//	Unpack message
-		requestBody := &ca.InterContainerRequestBody{}
+		requestBody := &ic.InterContainerRequestBody{}
 		if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			doneCh <- "Error"
 		} else {
@@ -106,8 +122,8 @@
 //	// First subscribe to the specific topic
 //	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 //	var err error
-//	var ch1 <-chan *ca.InterContainerMessage
-//	var ch2 <-chan *ca.InterContainerMessage
+//	var ch1 <-chan *ic.InterContainerMessage
+//	var ch2 <-chan *ic.InterContainerMessage
 //	topic := kk.Topic{Name: "Core"}
 //	ch1, err = coreKafkaProxy.Subscribe(topic)
 //	assert.NotNil(t, ch1)
@@ -159,7 +175,7 @@
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
 	//Unpack the result into the actual proto object
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -177,12 +193,12 @@
 	rpc := "GetDevice"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
 	//Unpack the result into the actual proto object
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -204,7 +220,7 @@
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -229,11 +245,11 @@
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -252,7 +268,7 @@
 	topic := kk.Topic{Name: "Core"}
 	expectedResponse := &voltha.Device{Id: trnsId}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -275,7 +291,7 @@
 	topic := kk.Topic{Name: "Core"}
 	expectedResponse := &voltha.Device{Id: trnsId}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -294,7 +310,7 @@
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "portType",
 		Value: protoArg2,
@@ -302,7 +318,7 @@
 	rpc := "GetPorts"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -325,12 +341,12 @@
 	rpc := "GetPorts"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
 	//Unpack the result into the actual proto object
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -339,18 +355,18 @@
 
 func TestChildDeviceDetected(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &ca.StrType{Val: trnsId}
+	protoArg1 := &ic.StrType{Val: trnsId}
 	args := make([]*kk.KVArg, 5)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "parentPortNo",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.StrType{Val: "great_onu"}
+	protoArg3 := &ic.StrType{Val: "great_onu"}
 	args[2] = &kk.KVArg{
 		Key:   "childDeviceType",
 		Value: protoArg3,
@@ -360,7 +376,7 @@
 		Key:   "proxyAddress",
 		Value: protoArg4,
 	}
-	protoArg5 := &ca.IntType{Val: 1}
+	protoArg5 := &ic.IntType{Val: 1}
 	args[4] = &kk.KVArg{
 		Key:   "portType",
 		Value: protoArg5,
@@ -369,7 +385,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -378,18 +394,18 @@
 
 func TestChildDeviceDetectedNoWait(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &ca.StrType{Val: trnsId}
+	protoArg1 := &ic.StrType{Val: trnsId}
 	args := make([]*kk.KVArg, 5)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "parentPortNo",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.StrType{Val: "great_onu"}
+	protoArg3 := &ic.StrType{Val: "great_onu"}
 	args[2] = &kk.KVArg{
 		Key:   "childDeviceType",
 		Value: protoArg3,
@@ -399,7 +415,7 @@
 		Key:   "proxyAddress",
 		Value: protoArg4,
 	}
-	protoArg5 := &ca.IntType{Val: 1}
+	protoArg5 := &ic.IntType{Val: 1}
 	args[4] = &kk.KVArg{
 		Key:   "portType",
 		Value: protoArg5,
@@ -408,7 +424,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,false, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -417,18 +433,18 @@
 
 func TestChildDeviceDetectedMissingArgs(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &ca.StrType{Val: trnsId}
+	protoArg1 := &ic.StrType{Val: trnsId}
 	args := make([]*kk.KVArg, 4)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "parentPortNo",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.StrType{Val: "great_onu"}
+	protoArg3 := &ic.StrType{Val: "great_onu"}
 	args[2] = &kk.KVArg{
 		Key:   "childDeviceType",
 		Value: protoArg3,
@@ -437,11 +453,11 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -457,12 +473,12 @@
 		Key:   "device_id",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "oper_status",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.IntType{Val: 1}
+	protoArg3 := &ic.IntType{Val: 1}
 	args[2] = &kk.KVArg{
 		Key:   "connect_status",
 		Value: protoArg3,
@@ -471,13 +487,58 @@
 	rpc := "DeviceStateUpdate"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
 	assert.Nil(t, result)
 }
 
+func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
+	var ch <-chan *ic.InterContainerMessage
+	var err error
+	if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
+		return nil
+	}
+	msg := <-ch
+
+	log.Debugw("msg-received", log.Fields{"msg": msg})
+	waitingChannel <- msg
+	return nil
+}
+
+func TestDeviceDiscovery(t *testing.T) {
+	// Create an intercontainer proxy - similar to the Core
+	testProxy, _ := kk.NewInterContainerProxy(
+		kk.InterContainerHost(hostIP),
+		kk.InterContainerPort(9092),
+		kk.DefaultTopic(&kk.Topic{Name: "Test"}),
+		kk.MsgClient(kafkaClient),
+		kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
+
+	//	First start to wait for the message
+	waitingChannel := make(chan *ic.InterContainerMessage)
+	go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
+
+	// Sleep to make sure the consumer is ready
+	time.Sleep(time.Millisecond * 100)
+
+	// Send the message
+	go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId")
+
+	msg := <-waitingChannel
+	totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
+	assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
+	//	Unpack message
+	dd := &ic.DeviceDiscovered{}
+	err := ptypes.UnmarshalAny(msg.Body, dd)
+	assert.Nil(t, err)
+	assert.Equal(t, dd.Id, "TestDeviceId")
+	assert.Equal(t, dd.DeviceType, "TestDevicetype")
+	assert.Equal(t, dd.ParentId, "TestParentId")
+	log.Debugw("TotalTime", log.Fields{"time": totalTime})
+}
+
 func TestStopKafkaProxy(t *testing.T) {
 	adapterKafkaProxy.Stop()
 	coreKafkaProxy.Stop()