[VOL-1346]  This commit addresses device discovery notifications
which will be principally used by the affinity router.  In doing so
this commit also rename the core_adapter.proto to inter_container.proto.

Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 8468e42..e330b85 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -22,7 +22,7 @@
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"gopkg.in/Shopify/sarama.v1"
 	"strings"
 	"sync"
@@ -40,7 +40,7 @@
 //consumer or a group consumer
 type consumerChannels struct {
 	consumers []interface{}
-	channels  []chan *ca.InterContainerMessage
+	channels  []chan *ic.InterContainerMessage
 }
 
 // SaramaClient represents the messaging proxy
@@ -307,20 +307,20 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
 
 	// If a consumers already exist for that topic then resuse it
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
 		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
-		ch := make(chan *ca.InterContainerMessage)
+		ch := make(chan *ic.InterContainerMessage)
 		sc.addChannelToConsumerChannelMap(topic, ch)
 		return ch, nil
 	}
 
 	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
+	var consumerListeningChannel chan *ic.InterContainerMessage
 	var err error
 
 	// Use the consumerType option to figure out the type of consumer to launch
@@ -351,7 +351,7 @@
 }
 
 //UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
 	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
 	return err
@@ -393,9 +393,9 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		log.Debugw("message-sent", log.Fields{"status":ok})
+		log.Debugw("message-sent", log.Fields{"status": ok})
 	case notOk := <-sc.producer.Errors():
-		log.Debugw("error-sending", log.Fields{"status":notOk})
+		log.Debugw("error-sending", log.Fields{"status": notOk})
 		return notOk
 	}
 	return nil
@@ -443,7 +443,7 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -482,7 +482,7 @@
 	return err
 }
 
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -620,12 +620,12 @@
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
 // topic via the unique channel each subsciber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	for _, ch := range consumerCh.channels {
-		go func(c chan *ca.InterContainerMessage) {
+		go func(c chan *ic.InterContainerMessage) {
 			c <- protoMessage
 		}(ch)
 	}
@@ -652,7 +652,7 @@
 				break startloop
 			}
 			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
+			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("partition-invalid-message", log.Fields{"error": err})
 				continue
@@ -688,7 +688,7 @@
 				break startloop
 			}
 			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
+			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("invalid-message", log.Fields{"error": err})
 				continue
@@ -728,7 +728,7 @@
 
 //// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 //// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
@@ -744,10 +744,10 @@
 
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
 	cc := &consumerChannels{
 		consumers: consumersIf,
-		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -761,7 +761,7 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
@@ -771,10 +771,10 @@
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
 	cc := &consumerChannels{
 		consumers: []interface{}{pConsumer},
-		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -806,9 +806,9 @@
 	return pConsumers, nil
 }
 
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
 	var i int
-	var channel chan *ca.InterContainerMessage
+	var channel chan *ic.InterContainerMessage
 	for i, channel = range channels {
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]