[VOL-1346]  This commit addresses device discovery notifications
which will be principally used by the affinity router.  In doing so
this commit also rename the core_adapter.proto to inter_container.proto.

Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/client.go b/kafka/client.go
index b93ad86..1df700e 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -16,7 +16,7 @@
 package kafka
 
 import (
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ca "github.com/opencord/voltha-go/protos/inter_container"
 	"time"
 )
 
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index ff3584f..4a3ec92 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -24,7 +24,7 @@
 	"github.com/golang/protobuf/ptypes/any"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"reflect"
 	"sync"
 	"time"
@@ -45,14 +45,14 @@
 // async requests into the Core via the kafka messaging bus
 type requestHandlerChannel struct {
 	requesthandlerInterface interface{}
-	ch                      <-chan *ca.InterContainerMessage
+	ch                      <-chan *ic.InterContainerMessage
 }
 
 // transactionChannel represents a combination of a topic and a channel onto which a response received
 // on the kafka bus will be sent to
 type transactionChannel struct {
 	topic *Topic
-	ch    chan *ca.InterContainerMessage
+	ch    chan *ic.InterContainerMessage
 }
 
 // InterContainerProxy represents the messaging proxy
@@ -61,6 +61,7 @@
 	kafkaPort                      int
 	DefaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
+	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
 	doneCh                         chan int
 
@@ -72,7 +73,7 @@
 	// This map is used to map a channel to a response topic.   This channel handles all responses on that
 	// channel for that topic and forward them to the appropriate consumers channel, using the
 	// transactionIdToChannelMap.
-	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
+	topicToResponseChannelMap   map[string]<-chan *ic.InterContainerMessage
 	lockTopicResponseChannelMap sync.RWMutex
 
 	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
@@ -101,6 +102,12 @@
 	}
 }
 
+func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.deviceDiscoveryTopic = topic
+	}
+}
+
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
 	return func(args *InterContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
@@ -149,7 +156,7 @@
 	}
 
 	// Create the topic to response channel map
-	kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
 	//
 	// Create the transactionId to Channel Map
 	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
@@ -170,6 +177,47 @@
 	//kp.deleteAllTransactionIdToChannelMap()
 }
 
+// DeviceDiscovered publish the discovered device onto the kafka messaging bus
+func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
+	log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+	//	Simple validation
+	if deviceId == "" || deviceType == "" {
+		log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+		return errors.New("invalid-parameters")
+	}
+	//	Create the device discovery message
+	header := &ic.Header{
+		Id:        uuid.New().String(),
+		Type:      ic.MessageType_DEVICE_DISCOVERED,
+		FromTopic: kp.DefaultTopic.Name,
+		ToTopic:   kp.deviceDiscoveryTopic.Name,
+		Timestamp: time.Now().UnixNano(),
+	}
+	body := &ic.DeviceDiscovered{
+		Id:         deviceId,
+		DeviceType: deviceType,
+		ParentId:   parentId,
+	}
+
+	var marshalledData *any.Any
+	var err error
+	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
+		log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+		return err
+	}
+	msg := &ic.InterContainerMessage{
+		Header: header,
+		Body:   marshalledData,
+	}
+
+	// Send the message
+	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
+		log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+		return err
+	}
+	return nil
+}
+
 // InvokeRPC is used to send a request to a given topic
 func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
@@ -189,7 +237,7 @@
 	}
 
 	// Subscribe for response, if needed, before sending request
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	if waitForResponse {
 		var err error
 		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
@@ -221,7 +269,7 @@
 		select {
 		case msg := <-ch:
 			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			var responseBody *ca.InterContainerResponseBody
+			var responseBody *ic.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
 				log.Errorw("decode-response-error", log.Fields{"error": err})
@@ -230,7 +278,7 @@
 		case <-ctx.Done():
 			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ca.Error{Reason: ctx.Err().Error()}
+			protoError := &ic.Error{Reason: ctx.Err().Error()}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -239,7 +287,7 @@
 		case <-childCtx.Done():
 			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ca.Error{Reason: childCtx.Err().Error()}
+			protoError := &ic.Error{Reason: childCtx.Err().Error()}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -258,7 +306,7 @@
 func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
@@ -277,7 +325,7 @@
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
 func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
 	// Subscribe to receive messages for that topic
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
@@ -296,7 +344,7 @@
 
 // setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -376,7 +424,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -444,15 +492,15 @@
 	return marshalledReturnedVal, nil
 }
 
-func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
-	responseHeader := &ca.Header{
+func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
-		Type:      ca.MessageType_RESPONSE,
+		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		Timestamp: time.Now().Unix(),
 	}
-	responseBody := &ca.InterContainerResponseBody{
+	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
 		Result:  nil,
 	}
@@ -463,7 +511,7 @@
 		log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
 	}
 
-	return &ca.InterContainerMessage{
+	return &ic.InterContainerMessage{
 		Header: responseHeader,
 		Body:   marshalledResponseBody,
 	}
@@ -472,11 +520,11 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
+func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
 	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
-	responseHeader := &ca.Header{
+	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
-		Type:      ca.MessageType_RESPONSE,
+		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		Timestamp: time.Now().Unix(),
@@ -492,7 +540,7 @@
 		break // for now we support only 1 returned value - (excluding the error)
 	}
 
-	responseBody := &ca.InterContainerResponseBody{
+	responseBody := &ic.InterContainerResponseBody{
 		Success: success,
 		Result:  marshalledReturnedVal,
 	}
@@ -504,7 +552,7 @@
 		return nil, err
 	}
 
-	return &ca.InterContainerMessage{
+	return &ic.InterContainerMessage{
 		Header: responseHeader,
 		Body:   marshalledResponseBody,
 	}, nil
@@ -524,16 +572,16 @@
 	return
 }
 
-func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
-	if msg.Header.Type == ca.MessageType_REQUEST {
+	if msg.Header.Type == ic.MessageType_REQUEST {
 
 		var out []reflect.Value
 		var err error
 
 		// Get the request body
-		requestBody := &ca.InterContainerRequestBody{}
+		requestBody := &ic.InterContainerRequestBody{}
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
@@ -547,11 +595,11 @@
 		// Response required?
 		if requestBody.ResponseRequired {
 			// If we already have an error before then just return that
-			var returnError *ca.Error
+			var returnError *ic.Error
 			var returnedValues []interface{}
 			var success bool
 			if err != nil {
-				returnError = &ca.Error{Reason: err.Error()}
+				returnError = &ic.Error{Reason: err.Error()}
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
@@ -561,10 +609,10 @@
 				lastIndex := len(out) - 1
 				if out[lastIndex].Interface() != nil { // Error
 					if goError, ok := out[lastIndex].Interface().(error); ok {
-						returnError = &ca.Error{Reason: goError.Error()}
+						returnError = &ic.Error{Reason: goError.Error()}
 						returnedValues = append(returnedValues, returnError)
 					} else { // Should never happen
-						returnError = &ca.Error{Reason: "incorrect-error-returns"}
+						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else { // Non-error case
@@ -578,7 +626,7 @@
 				}
 			}
 
-			var icm *ca.InterContainerMessage
+			var icm *ic.InterContainerMessage
 			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
 				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
@@ -597,7 +645,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -605,7 +653,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -617,14 +665,14 @@
 
 // waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
 // and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
 	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
 		case msg := <-subscribedCh:
 			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
-			if msg.Header.Type == ca.MessageType_RESPONSE {
+			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
 		case <-kp.doneCh:
@@ -641,13 +689,13 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// First check whether we already have a channel listening for response on that topic.  If there is
 	// already one then it will be reused.  If not, it will be created.
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
-		var subscribedCh <-chan *ca.InterContainerMessage
+		var subscribedCh <-chan *ic.InterContainerMessage
 		var err error
 		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
@@ -659,7 +707,7 @@
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
-	ch := make(chan *ca.InterContainerMessage)
+	ch := make(chan *ic.InterContainerMessage)
 	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
 
 	return ch, nil
@@ -676,15 +724,15 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
-	requestHeader := &ca.Header{
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
-		Type:      ca.MessageType_REQUEST,
+		Type:      ic.MessageType_REQUEST,
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
 		Timestamp: time.Now().Unix(),
 	}
-	requestBody := &ca.InterContainerRequestBody{
+	requestBody := &ic.InterContainerRequestBody{
 		Rpc:              rpc,
 		ResponseRequired: true,
 		ReplyToTopic:     replyTopic.Name,
@@ -708,7 +756,7 @@
 			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 			return nil, err
 		}
-		protoArg := &ca.Argument{
+		protoArg := &ic.Argument{
 			Key:   arg.Key,
 			Value: marshalledArg,
 		}
@@ -721,16 +769,16 @@
 		log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 		return nil, err
 	}
-	request := &ca.InterContainerMessage{
+	request := &ic.InterContainerMessage{
 		Header: requestHeader,
 		Body:   marshalledData,
 	}
 	return request, nil
 }
 
-func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
+func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
 	//	Extract the message body
-	responseBody := ca.InterContainerResponseBody{}
+	responseBody := ic.InterContainerResponseBody{}
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 8468e42..e330b85 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -22,7 +22,7 @@
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"gopkg.in/Shopify/sarama.v1"
 	"strings"
 	"sync"
@@ -40,7 +40,7 @@
 //consumer or a group consumer
 type consumerChannels struct {
 	consumers []interface{}
-	channels  []chan *ca.InterContainerMessage
+	channels  []chan *ic.InterContainerMessage
 }
 
 // SaramaClient represents the messaging proxy
@@ -307,20 +307,20 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
 
 	// If a consumers already exist for that topic then resuse it
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
 		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
-		ch := make(chan *ca.InterContainerMessage)
+		ch := make(chan *ic.InterContainerMessage)
 		sc.addChannelToConsumerChannelMap(topic, ch)
 		return ch, nil
 	}
 
 	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
+	var consumerListeningChannel chan *ic.InterContainerMessage
 	var err error
 
 	// Use the consumerType option to figure out the type of consumer to launch
@@ -351,7 +351,7 @@
 }
 
 //UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
 	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
 	return err
@@ -393,9 +393,9 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		log.Debugw("message-sent", log.Fields{"status":ok})
+		log.Debugw("message-sent", log.Fields{"status": ok})
 	case notOk := <-sc.producer.Errors():
-		log.Debugw("error-sending", log.Fields{"status":notOk})
+		log.Debugw("error-sending", log.Fields{"status": notOk})
 		return notOk
 	}
 	return nil
@@ -443,7 +443,7 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -482,7 +482,7 @@
 	return err
 }
 
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -620,12 +620,12 @@
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
 // topic via the unique channel each subsciber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	for _, ch := range consumerCh.channels {
-		go func(c chan *ca.InterContainerMessage) {
+		go func(c chan *ic.InterContainerMessage) {
 			c <- protoMessage
 		}(ch)
 	}
@@ -652,7 +652,7 @@
 				break startloop
 			}
 			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
+			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("partition-invalid-message", log.Fields{"error": err})
 				continue
@@ -688,7 +688,7 @@
 				break startloop
 			}
 			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
+			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("invalid-message", log.Fields{"error": err})
 				continue
@@ -728,7 +728,7 @@
 
 //// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 //// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
@@ -744,10 +744,10 @@
 
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
 	cc := &consumerChannels{
 		consumers: consumersIf,
-		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -761,7 +761,7 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
@@ -771,10 +771,10 @@
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
 	cc := &consumerChannels{
 		consumers: []interface{}{pConsumer},
-		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -806,9 +806,9 @@
 	return pConsumers, nil
 }
 
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
 	var i int
-	var channel chan *ca.InterContainerMessage
+	var channel chan *ic.InterContainerMessage
 	for i, channel = range channels {
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
diff --git a/protos/core_adapter.proto b/protos/inter_container.proto
similarity index 90%
rename from protos/core_adapter.proto
rename to protos/inter_container.proto
index 24390f5..931ddb9 100644
--- a/protos/core_adapter.proto
+++ b/protos/inter_container.proto
@@ -5,7 +5,7 @@
 import public "logical_device.proto";
 
 
-option go_package = "github.com/opencord/voltha-go/protos/core_adapter";
+option go_package = "github.com/opencord/voltha-go/protos/inter_container";
 
 package voltha;
 
@@ -40,6 +40,7 @@
 enum MessageType {
     REQUEST = 0;
     RESPONSE = 1;
+    DEVICE_DISCOVERED=2;
 }
 
 message Header {
@@ -81,6 +82,12 @@
     LogicalPort port = 1;
 }
 
+message DeviceDiscovered {
+    string id = 1;
+    string parent_id = 2;
+    string device_type = 3;
+}
+
 message InterAdapterMessageType {
     enum Types {
         FLOW_REQUEST = 0;
diff --git a/protos/scripts/build_protos.sh b/protos/scripts/build_protos.sh
index 0b2e17a..3602c23 100755
--- a/protos/scripts/build_protos.sh
+++ b/protos/scripts/build_protos.sh
@@ -37,7 +37,7 @@
     $SRC_DIR/meta.proto \
     $SRC_DIR/yang_options.proto"
 
-export CORE_ADAPTER_PB="$SRC_DIR/core_adapter.proto"
+export INTER_CONTAINER_PB="$SRC_DIR/inter_container.proto"
 export SCHEMA_PB="$SRC_DIR/schema.proto"
 export IETF_PB="$SRC_DIR/ietf_interfaces.proto"
 export OF_PB="$SRC_DIR/openflow_13.proto"
@@ -46,7 +46,7 @@
 export PB_VARS="\
     VOLTHA_PB \
     COMMON_PB \
-    CORE_ADAPTER_PB \
+    INTER_CONTAINER_PB \
     SCHEMA_PB \
     IETF_PB \
     OF_PB \
diff --git a/python/Makefile b/python/Makefile
index b19472f..c2fd010 100644
--- a/python/Makefile
+++ b/python/Makefile
@@ -189,7 +189,11 @@
 	make -C voltha/protos install-protoc
 
 clean:
-	find voltha -name '*.pyc' | xargs rm -f
+	find . -name '*.pyc' | xargs rm -f
+	rm -f ./protos/*_pb2.py
+	rm -f ./protos/*_pb2_grpc.py
+	rm -f ./protos/*.desc
+	rm -f ./protos/*.proto
 
 distclean: clean
 	rm -rf ${VENVDIR}
diff --git a/python/adapters/kafka/adapter_proxy.py b/python/adapters/kafka/adapter_proxy.py
index 769de80..657a681 100644
--- a/python/adapters/kafka/adapter_proxy.py
+++ b/python/adapters/kafka/adapter_proxy.py
@@ -23,7 +23,7 @@
 from twisted.internet.defer import inlineCallbacks, returnValue
 from container_proxy import ContainerProxy
 from python.protos import third_party
-from python.protos.core_adapter_pb2 import InterAdapterHeader, \
+from python.protos.inter_container_pb2 import InterAdapterHeader, \
     InterAdapterMessage
 import time
 
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index cf7afd8..0efb811 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -24,7 +24,7 @@
 from twisted.internet import reactor
 from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
 from python.adapters.interface import IAdapterInterface
-from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
+from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index fa9544b..b897188 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -23,7 +23,7 @@
 
 from container_proxy import ContainerProxy
 from python.protos.common_pb2 import ID, ConnectStatus, OperStatus
-from python.protos.core_adapter_pb2 import StrType, BoolType, IntType, Packet
+from python.protos.inter_container_pb2 import StrType, BoolType, IntType, Packet
 from python.protos.device_pb2 import Device, Ports
 from python.protos.voltha_pb2 import CoreInstance
 
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index a3de4f1..fbb0834 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -28,7 +28,7 @@
 from python.common.utils import asleep
 from python.common.utils.registry import IComponent
 from kafka_proxy import KafkaProxy, get_kafka_proxy
-from python.protos.core_adapter_pb2 import MessageType, Argument, \
+from python.protos.inter_container_pb2 import MessageType, Argument, \
     InterContainerRequestBody, InterContainerMessage, Header, \
     InterContainerResponseBody
 
diff --git a/python/adapters/ponsim_olt/ponsim_olt.py b/python/adapters/ponsim_olt/ponsim_olt.py
index 95d6590..e3157e7 100644
--- a/python/adapters/ponsim_olt/ponsim_olt.py
+++ b/python/adapters/ponsim_olt/ponsim_olt.py
@@ -41,7 +41,7 @@
 from python.protos import ponsim_pb2
 from python.protos import third_party
 from python.protos.common_pb2 import OperStatus, ConnectStatus
-from python.protos.core_adapter_pb2 import SwitchCapability, PortCapability, \
+from python.protos.inter_container_pb2 import SwitchCapability, PortCapability, \
     InterAdapterMessageType, InterAdapterResponseBody
 from python.protos.device_pb2 import Port, PmConfig, PmConfigs
 from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
diff --git a/python/adapters/ponsim_onu/ponsim_onu.py b/python/adapters/ponsim_onu/ponsim_onu.py
index eb4d716..9ad0799 100644
--- a/python/adapters/ponsim_onu/ponsim_onu.py
+++ b/python/adapters/ponsim_onu/ponsim_onu.py
@@ -34,7 +34,7 @@
 from python.adapters.kafka.kafka_proxy import get_kafka_proxy
 from python.protos import third_party
 from python.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
-from python.protos.core_adapter_pb2 import PortCapability, \
+from python.protos.inter_container_pb2 import PortCapability, \
     InterAdapterMessageType, InterAdapterResponseBody
 from python.protos.device_pb2 import Port, PmConfig, PmConfigs
 from python.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
diff --git a/python/requirements.txt b/python/requirements.txt
index a0641b2..56cb356 100644
--- a/python/requirements.txt
+++ b/python/requirements.txt
@@ -3,6 +3,7 @@
 bitstring==3.1.5
 cmd2==0.7.0
 colorama==0.3.9
+confluent-kafka==0.11.5
 cython==0.24.1
 decorator==4.1.2
 docker-py==1.10.6
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index e0d0fe6..f7ac794 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -24,51 +24,53 @@
 
 // RW Core service default constants
 const (
-	ConsulStoreName          = "consul"
-	EtcdStoreName            = "etcd"
-	default_InstanceID       = "rwcore001"
-	default_GrpcPort         = 50057
-	default_GrpcHost         = ""
-	default_KafkaAdapterHost = "127.0.0.1"
-	default_KafkaAdapterPort = 9092
-	default_KafkaClusterHost = "127.0.0.1"
-	default_KafkaClusterPort = 9094
-	default_KVStoreType      = EtcdStoreName
-	default_KVStoreTimeout   = 5 //in seconds
-	default_KVStoreHost      = "127.0.0.1"
-	default_KVStorePort      = 2379 // Consul = 8500; Etcd = 2379
-	default_KVTxnKeyDelTime  = 60
-	default_LogLevel         = 0
-	default_Banner           = false
-	default_CoreTopic        = "rwcore"
-	default_RWCoreEndpoint   = "rwcore"
-	default_RWCoreKey        = "pki/voltha.key"
-	default_RWCoreCert       = "pki/voltha.crt"
-	default_RWCoreCA         = "pki/voltha-CA.pem"
+	ConsulStoreName               = "consul"
+	EtcdStoreName                 = "etcd"
+	default_InstanceID            = "rwcore001"
+	default_GrpcPort              = 50057
+	default_GrpcHost              = ""
+	default_KafkaAdapterHost      = "127.0.0.1"
+	default_KafkaAdapterPort      = 9092
+	default_KafkaClusterHost      = "127.0.0.1"
+	default_KafkaClusterPort      = 9094
+	default_KVStoreType           = EtcdStoreName
+	default_KVStoreTimeout        = 5 //in seconds
+	default_KVStoreHost           = "127.0.0.1"
+	default_KVStorePort           = 2379 // Consul = 8500; Etcd = 2379
+	default_KVTxnKeyDelTime       = 60
+	default_LogLevel              = 0
+	default_Banner                = false
+	default_CoreTopic             = "rwcore"
+	default_RWCoreEndpoint        = "rwcore"
+	default_RWCoreKey             = "pki/voltha.key"
+	default_RWCoreCert            = "pki/voltha.crt"
+	default_RWCoreCA              = "pki/voltha-CA.pem"
+	default_Affinity_Router_Topic = "affinityRouter"
 )
 
 // RWCoreFlags represents the set of configurations used by the read-write core service
 type RWCoreFlags struct {
 	// Command line parameters
-	InstanceID       string
-	RWCoreEndpoint   string
-	GrpcHost         string
-	GrpcPort         int
-	KafkaAdapterHost string
-	KafkaAdapterPort int
-	KafkaClusterHost string
-	KafkaClusterPort int
-	KVStoreType      string
-	KVStoreTimeout   int // in seconds
-	KVStoreHost      string
-	KVStorePort      int
-	KVTxnKeyDelTime  int
-	CoreTopic        string
-	LogLevel         int
-	Banner           bool
-	RWCoreKey        string
-	RWCoreCert       string
-	RWCoreCA         string
+	InstanceID          string
+	RWCoreEndpoint      string
+	GrpcHost            string
+	GrpcPort            int
+	KafkaAdapterHost    string
+	KafkaAdapterPort    int
+	KafkaClusterHost    string
+	KafkaClusterPort    int
+	KVStoreType         string
+	KVStoreTimeout      int // in seconds
+	KVStoreHost         string
+	KVStorePort         int
+	KVTxnKeyDelTime     int
+	CoreTopic           string
+	LogLevel            int
+	Banner              bool
+	RWCoreKey           string
+	RWCoreCert          string
+	RWCoreCA            string
+	AffinityRouterTopic string
 }
 
 func init() {
@@ -78,25 +80,26 @@
 // NewRWCoreFlags returns a new RWCore config
 func NewRWCoreFlags() *RWCoreFlags {
 	var rwCoreFlag = RWCoreFlags{ // Default values
-		InstanceID:       default_InstanceID,
-		RWCoreEndpoint:   default_RWCoreEndpoint,
-		GrpcHost:         default_GrpcHost,
-		GrpcPort:         default_GrpcPort,
-		KafkaAdapterHost: default_KafkaAdapterHost,
-		KafkaAdapterPort: default_KafkaAdapterPort,
-		KafkaClusterHost: default_KafkaClusterHost,
-		KafkaClusterPort: default_KafkaClusterPort,
-		KVStoreType:      default_KVStoreType,
-		KVStoreTimeout:   default_KVStoreTimeout,
-		KVStoreHost:      default_KVStoreHost,
-		KVStorePort:      default_KVStorePort,
-		KVTxnKeyDelTime:  default_KVTxnKeyDelTime,
-		CoreTopic:        default_CoreTopic,
-		LogLevel:         default_LogLevel,
-		Banner:           default_Banner,
-		RWCoreKey:        default_RWCoreKey,
-		RWCoreCert:       default_RWCoreCert,
-		RWCoreCA:         default_RWCoreCA,
+		InstanceID:          default_InstanceID,
+		RWCoreEndpoint:      default_RWCoreEndpoint,
+		GrpcHost:            default_GrpcHost,
+		GrpcPort:            default_GrpcPort,
+		KafkaAdapterHost:    default_KafkaAdapterHost,
+		KafkaAdapterPort:    default_KafkaAdapterPort,
+		KafkaClusterHost:    default_KafkaClusterHost,
+		KafkaClusterPort:    default_KafkaClusterPort,
+		KVStoreType:         default_KVStoreType,
+		KVStoreTimeout:      default_KVStoreTimeout,
+		KVStoreHost:         default_KVStoreHost,
+		KVStorePort:         default_KVStorePort,
+		KVTxnKeyDelTime:     default_KVTxnKeyDelTime,
+		CoreTopic:           default_CoreTopic,
+		LogLevel:            default_LogLevel,
+		Banner:              default_Banner,
+		RWCoreKey:           default_RWCoreKey,
+		RWCoreCert:          default_RWCoreCert,
+		RWCoreCA:            default_RWCoreCA,
+		AffinityRouterTopic: default_Affinity_Router_Topic,
 	}
 	return &rwCoreFlag
 }
@@ -130,6 +133,9 @@
 	help = fmt.Sprintf("RW Core topic")
 	flag.StringVar(&(cf.CoreTopic), "rw_core_topic", default_CoreTopic, help)
 
+	help = fmt.Sprintf("Affinity Router topic")
+	flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", default_Affinity_Router_Topic, help)
+
 	help = fmt.Sprintf("KV store type")
 	flag.StringVar(&(cf.KVStoreType), "kv_store_type", default_KVStoreType, help)
 
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index 3ce59a7..c287c10 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -21,7 +21,7 @@
 	a "github.com/golang/protobuf/ptypes/any"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/kafka"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
@@ -43,7 +43,7 @@
 	if success {
 		return nil
 	} else {
-		unpackResult := &ca.Error{}
+		unpackResult := &ic.Error{}
 		var err error
 		if err = ptypes.UnmarshalAny(response, unpackResult); err != nil {
 			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -170,7 +170,7 @@
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
-func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
+func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ic.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
 	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
@@ -183,14 +183,14 @@
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
-		unpackResult := &ca.SwitchCapability{}
+		unpackResult := &ic.SwitchCapability{}
 		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
 		}
 		return unpackResult, nil
 	} else {
-		unpackResult := &ca.Error{}
+		unpackResult := &ic.Error{}
 		var err error
 		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
 			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -201,7 +201,7 @@
 	}
 }
 
-func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
+func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
 	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 2)
@@ -209,7 +209,7 @@
 		Key:   "device",
 		Value: device,
 	}
-	pNo := &ca.IntType{Val: int64(portNo)}
+	pNo := &ic.IntType{Val: int64(portNo)}
 	args[1] = &kafka.KVArg{
 		Key:   "port_no",
 		Value: pNo,
@@ -219,14 +219,14 @@
 	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
-		unpackResult := &ca.PortCapability{}
+		unpackResult := &ic.PortCapability{}
 		if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 			return nil, status.Errorf(codes.InvalidArgument, "%s", err.Error())
 		}
 		return unpackResult, nil
 	} else {
-		unpackResult := &ca.Error{}
+		unpackResult := &ic.Error{}
 		var err error
 		if err = ptypes.UnmarshalAny(result, unpackResult); err != nil {
 			log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
@@ -303,13 +303,13 @@
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
 	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
 	rpc := "receive_packet_out"
-	dId := &ca.StrType{Val: deviceId}
+	dId := &ic.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
 		Key:   "deviceId",
 		Value: dId,
 	}
-	op := &ca.IntType{Val: int64(outPort)}
+	op := &ic.IntType{Val: int64(outPort)}
 	args[1] = &kafka.KVArg{
 		Key:   "outPort",
 		Value: op,
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 40563d4..d7e1b0a 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -22,7 +22,7 @@
 	"github.com/golang/protobuf/ptypes/empty"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
@@ -47,7 +47,7 @@
 	return &proxy
 }
 
-func (rhp *AdapterRequestHandlerProxy) Register(args []*ca.Argument) (*voltha.CoreInstance, error) {
+func (rhp *AdapterRequestHandlerProxy) Register(args []*ic.Argument) (*voltha.CoreInstance, error) {
 	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -78,7 +78,7 @@
 	return &voltha.CoreInstance{InstanceId: rhp.coreInstanceId}, nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetDevice(args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) != 1 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -99,6 +99,7 @@
 	if device, err := rhp.deviceMgr.GetDevice(pID.Id); err != nil {
 		return nil, status.Errorf(codes.NotFound, "%s", err.Error())
 	} else {
+		log.Debugw("GetDevice-response", log.Fields{"deviceId": pID.Id})
 		return device, nil
 	}
 }
@@ -122,7 +123,7 @@
 	return cloned, nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceUpdate(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) != 1 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -151,7 +152,7 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevice(args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) < 1 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -170,14 +171,14 @@
 	return nil, nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ca.Argument) (*voltha.Ports, error) {
+func (rhp *AdapterRequestHandlerProxy) GetPorts(args []*ic.Argument) (*voltha.Ports, error) {
 	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
-	pt := &ca.IntType{}
+	pt := &ic.IntType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -202,7 +203,7 @@
 	return rhp.deviceMgr.getPorts(nil, deviceId.Id, voltha.Port_PortType(pt.Val))
 }
 
-func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ca.Argument) (*voltha.Device, error) {
+func (rhp *AdapterRequestHandlerProxy) GetChildDevices(args []*ic.Argument) (*voltha.Device, error) {
 	if len(args) != 1 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -225,7 +226,7 @@
 // ChildDeviceDetected is invoked when a child device is detected.  The following
 // parameters are expected:
 // {parent_device_id, parent_port_no, child_device_type, proxy_address, admin_state, **kw)
-func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildDeviceDetected(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 4 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -233,9 +234,9 @@
 	}
 
 	pID := &voltha.ID{}
-	portNo := &ca.IntType{}
-	dt := &ca.StrType{}
-	chnlId := &ca.IntType{}
+	portNo := &ic.IntType{}
+	dt := &ic.StrType{}
+	chnlId := &ic.IntType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "parent_device_id":
@@ -272,15 +273,15 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DeviceStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
-	operStatus := &ca.IntType{}
-	connStatus := &ca.IntType{}
+	operStatus := &ic.IntType{}
+	connStatus := &ic.IntType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -309,15 +310,15 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) ChildrenStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
-	operStatus := &ca.IntType{}
-	connStatus := &ca.IntType{}
+	operStatus := &ic.IntType{}
+	connStatus := &ic.IntType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -347,16 +348,16 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortStateUpdate(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
-	portType := &ca.IntType{}
-	portNo := &ca.IntType{}
-	operStatus := &ca.IntType{}
+	portType := &ic.IntType{}
+	portNo := &ic.IntType{}
+	operStatus := &ic.IntType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
@@ -390,7 +391,7 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PortCreated(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
@@ -423,14 +424,14 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) DevicePMConfigUpdate(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) != 2 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	pmConfigs := &voltha.PmConfigs{}
-	init := &ca.BoolType{}
+	init := &ic.BoolType{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_pm_config":
@@ -458,15 +459,15 @@
 	return new(empty.Empty), nil
 }
 
-func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
+func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ic.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
 		err := errors.New("invalid-number-of-args")
 		return nil, err
 	}
 	deviceId := &voltha.ID{}
-	portNo := &ca.IntType{}
-	packet := &ca.Packet{}
+	portNo := &ic.IntType{}
+	packet := &ic.Packet{}
 	for _, arg := range args {
 		switch arg.Key {
 		case "device_id":
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 7423563..0908146 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -131,7 +131,8 @@
 		kafka.InterContainerHost(core.config.KafkaAdapterHost),
 		kafka.InterContainerPort(core.config.KafkaAdapterPort),
 		kafka.MsgClient(core.kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
+		kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic}),
+		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: core.config.AffinityRouterTopic})); err != nil {
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 7e7f42a..784b506 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -21,7 +21,7 @@
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
-	"github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	ofp "github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
 	fu "github.com/opencord/voltha-go/rw_core/utils"
@@ -326,12 +326,12 @@
 
 // getSwitchCapability is a helper method that a logical device agent uses to retrieve the switch capability of a
 // parent device
-func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*core_adapter.SwitchCapability, error) {
+func (agent *DeviceAgent) getSwitchCapability(ctx context.Context) (*ic.SwitchCapability, error) {
 	log.Debugw("getSwitchCapability", log.Fields{"deviceId": agent.deviceId})
 	if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
 		return nil, err
 	} else {
-		var switchCap *core_adapter.SwitchCapability
+		var switchCap *ic.SwitchCapability
 		var err error
 		if switchCap, err = agent.adapterProxy.GetOfpDeviceInfo(ctx, device); err != nil {
 			log.Debugw("getSwitchCapability-error", log.Fields{"id": device.Id, "error": err})
@@ -343,12 +343,12 @@
 
 // getPortCapability is a helper method that a logical device agent uses to retrieve the port capability of a
 // device
-func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*core_adapter.PortCapability, error) {
+func (agent *DeviceAgent) getPortCapability(ctx context.Context, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("getPortCapability", log.Fields{"deviceId": agent.deviceId})
 	if device, err := agent.deviceMgr.GetDevice(agent.deviceId); device == nil {
 		return nil, err
 	} else {
-		var portCap *core_adapter.PortCapability
+		var portCap *ic.PortCapability
 		var err error
 		if portCap, err = agent.adapterProxy.GetOfpPortInfo(ctx, device, portNo); err != nil {
 			log.Debugw("getPortCapability-error", log.Fields{"id": device.Id, "error": err})
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 6f4a874..45584a1 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -22,7 +22,7 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/kafka"
-	"github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	ofp "github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
 	"google.golang.org/grpc/codes"
@@ -249,7 +249,7 @@
 	return status.Errorf(codes.NotFound, "%s", deviceId)
 }
 
-func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*core_adapter.SwitchCapability, error) {
+func (dMgr *DeviceManager) getSwitchCapability(ctx context.Context, deviceId string) (*ic.SwitchCapability, error) {
 	log.Debugw("getSwitchCapability", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
 		return agent.getSwitchCapability(ctx)
@@ -266,7 +266,7 @@
 
 }
 
-func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*core_adapter.PortCapability, error) {
+func (dMgr *DeviceManager) getPortCapability(ctx context.Context, deviceId string, portNo uint32) (*ic.PortCapability, error) {
 	log.Debugw("getPortCapability", log.Fields{"deviceid": deviceId})
 	if agent := dMgr.getDeviceAgent(deviceId); agent != nil {
 		return agent.getPortCapability(ctx, portNo)
@@ -340,9 +340,12 @@
 
 	// Activate the child device
 	if agent := dMgr.getDeviceAgent(agent.deviceId); agent != nil {
-		return agent.enableDevice(nil)
+		go agent.enableDevice(nil)
 	}
 
+	// Publish on the messaging bus that we have discovered new devices
+	go dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceId, deviceType, parentDeviceId)
+
 	return nil
 }
 
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 60692e5..8a69967 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -22,7 +22,7 @@
 	"github.com/gogo/protobuf/proto"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/model"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	ofp "github.com/opencord/voltha-go/protos/openflow_13"
 	"github.com/opencord/voltha-go/protos/voltha"
 	fd "github.com/opencord/voltha-go/rw_core/flow_decomposition"
@@ -68,7 +68,7 @@
 func (agent *LogicalDeviceAgent) start(ctx context.Context) error {
 	log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	//Build the logical device based on information retrieved from the device adapter
-	var switchCap *ca.SwitchCapability
+	var switchCap *ic.SwitchCapability
 	var err error
 	if switchCap, err = agent.deviceMgr.getSwitchCapability(ctx, agent.rootDeviceId); err != nil {
 		log.Errorw("error-creating-logical-device", log.Fields{"error": err})
@@ -88,7 +88,7 @@
 	if nniPorts, err = agent.deviceMgr.getPorts(ctx, agent.rootDeviceId, voltha.Port_ETHERNET_NNI); err != nil {
 		log.Errorw("error-creating-logical-port", log.Fields{"error": err})
 	}
-	var portCap *ca.PortCapability
+	var portCap *ic.PortCapability
 	for _, port := range nniPorts.Items {
 		log.Infow("!!!!!!!NNI PORTS", log.Fields{"NNI": port})
 		if portCap, err = agent.deviceMgr.getPortCapability(ctx, agent.rootDeviceId, port.PortNo); err != nil {
@@ -223,7 +223,6 @@
 	return nil
 }
 
-
 // getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it.   This is used only by
 // functions that have already acquired the logical device lock to the model
 func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -240,7 +239,7 @@
 func (agent *LogicalDeviceAgent) addUNILogicalPort(ctx context.Context, childDevice *voltha.Device) error {
 	log.Infow("addUNILogicalPort-start", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 	// Build the logical device based on information retrieved from the device adapter
-	var portCap *ca.PortCapability
+	var portCap *ic.PortCapability
 	var err error
 
 	//Get UNI port number
@@ -327,7 +326,6 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
-
 //updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device.  This function
 //must only be called by a function that is holding the lock on the logical device
 func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
@@ -737,7 +735,7 @@
 func (agent *LogicalDeviceAgent) GetRoute(ingressPortNo uint32, egressPortNo uint32) []graph.RouteHop {
 	log.Debugw("getting-route", log.Fields{"ingress-port": ingressPortNo, "egress-port": egressPortNo})
 	// Get the updated logical device
-	var ld *ca.LogicalDevice
+	var ld *ic.LogicalDevice
 	routes := make([]graph.RouteHop, 0)
 	var err error
 	if ld, err = agent.getLogicalDeviceWithoutLock(); err != nil {
diff --git a/rw_core/main.go b/rw_core/main.go
index 472072a..dbb82b0 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -23,7 +23,7 @@
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
 	"github.com/opencord/voltha-go/kafka"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/rw_core/config"
 	c "github.com/opencord/voltha-go/rw_core/core"
 	"os"
@@ -43,7 +43,7 @@
 	kafkaClient kafka.Client
 	core        *c.Core
 	//For test
-	receiverChannels []<-chan *ca.InterContainerMessage
+	receiverChannels []<-chan *ic.InterContainerMessage
 }
 
 func init() {
@@ -73,7 +73,7 @@
 			kafka.ProducerReturnOnErrors(true),
 			kafka.ProducerReturnOnSuccess(true),
 			kafka.ProducerMaxRetries(6),
-			kafka.ProducerRetryBackoff(time.Millisecond * 30)), nil
+			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
 	}
 	return nil, errors.New("unsupported-client-type")
 }
@@ -83,7 +83,7 @@
 	rwCore.config = cf
 	rwCore.halted = false
 	rwCore.exitChannel = make(chan int, 1)
-	rwCore.receiverChannels = make([]<-chan *ca.InterContainerMessage, 0)
+	rwCore.receiverChannels = make([]<-chan *ic.InterContainerMessage, 0)
 	return &rwCore
 }
 
diff --git a/tests/kafka/kafka_client_test.go b/tests/kafka/kafka_client_test.go
index 76d63c6..12f0ae4 100644
--- a/tests/kafka/kafka_client_test.go
+++ b/tests/kafka/kafka_client_test.go
@@ -22,7 +22,7 @@
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	kk "github.com/opencord/voltha-go/kafka"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/stretchr/testify/assert"
 	"os"
 	"testing"
@@ -63,7 +63,7 @@
 	numMessageToSend = 1
 }
 
-func waitForMessage(ch <-chan *ca.InterContainerMessage, doneCh chan string, maxMessages int) {
+func waitForMessage(ch <-chan *ic.InterContainerMessage, doneCh chan string, maxMessages int) {
 	totalTime = 0
 	totalMessageReceived = 0
 	mytime := time.Now()
@@ -92,17 +92,17 @@
 func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
 	// Loop for numMessages
 	for i := 0; i < numMessages; i++ {
-		msg := &ca.InterContainerMessage{}
-		msg.Header = &ca.Header{
+		msg := &ic.InterContainerMessage{}
+		msg.Header = &ic.Header{
 			Id:        uuid.New().String(),
-			Type:      ca.MessageType_REQUEST,
+			Type:      ic.MessageType_REQUEST,
 			FromTopic: topic.Name,
 			ToTopic:   topic.Name,
 			Timestamp: time.Now().UnixNano(),
 		}
 		var marshalledArg *any.Any
 		var err error
-		body := &ca.InterContainerRequestBody{Rpc: "testRPC", Args: []*ca.Argument{}}
+		body := &ic.InterContainerRequestBody{Rpc: "testRPC", Args: []*ic.Argument{}}
 		if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
 			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 			return err
@@ -116,7 +116,7 @@
 }
 
 func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = partionClient.Subscribe(topic); err != nil {
 		return nil
@@ -130,7 +130,7 @@
 }
 
 func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = groupClient.Subscribe(topic); err != nil {
 		return nil
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 0293d6d..57e7ab9 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -21,10 +21,11 @@
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	kk "github.com/opencord/voltha-go/kafka"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"github.com/opencord/voltha-go/protos/voltha"
 	rhp "github.com/opencord/voltha-go/rw_core/core"
 	"github.com/stretchr/testify/assert"
+	"os"
 	"testing"
 	"time"
 )
@@ -35,27 +36,42 @@
 
 var coreKafkaProxy *kk.InterContainerProxy
 var adapterKafkaProxy *kk.InterContainerProxy
+var kafkaPartitionClient kk.Client
+var affinityRouterTopic string
+var hostIP string
+var kafkaClient kk.Client
 
 func init() {
 	log.AddPackage(log.JSON, log.ErrorLevel, nil)
 	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
 	log.SetAllLogLevel(log.ErrorLevel)
-	kafkaClient := kk.NewSaramaClient(
-		kk.Host("10.176.212.108"),
+	affinityRouterTopic = "AffinityRouter"
+	hostIP = os.Getenv("DOCKER_HOST_IP")
+	kafkaClient = kk.NewSaramaClient(
+		kk.Host(hostIP),
 		kk.Port(9092))
 
 	coreKafkaProxy, _ = kk.NewInterContainerProxy(
-		kk.InterContainerHost("10.176.212.108"),
+		kk.InterContainerHost(hostIP),
 		kk.InterContainerPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Core"}),
-		kk.MsgClient(kafkaClient))
+		kk.MsgClient(kafkaClient),
+		kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
 
 	adapterKafkaProxy, _ = kk.NewInterContainerProxy(
-		kk.InterContainerHost("10.176.212.108"),
+		kk.InterContainerHost(hostIP),
 		kk.InterContainerPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
 		kk.MsgClient(kafkaClient))
 
+	kafkaPartitionClient = kk.NewSaramaClient(
+		kk.ConsumerType(kk.PartitionConsumer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(true),
+		kk.ProducerFlushFrequency(5))
+	kafkaPartitionClient.Start()
+
 	coreKafkaProxy.Start()
 	adapterKafkaProxy.Start()
 	subscribeTarget(coreKafkaProxy)
@@ -67,11 +83,11 @@
 	kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
 }
 
-func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
+func waitForRPCMessage(topic kk.Topic, ch <-chan *ic.InterContainerMessage, doneCh chan string) {
 	for msg := range ch {
 		log.Debugw("Got-RPC-message", log.Fields{"msg": msg})
 		//	Unpack message
-		requestBody := &ca.InterContainerRequestBody{}
+		requestBody := &ic.InterContainerRequestBody{}
 		if err := ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			doneCh <- "Error"
 		} else {
@@ -106,8 +122,8 @@
 //	// First subscribe to the specific topic
 //	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 //	var err error
-//	var ch1 <-chan *ca.InterContainerMessage
-//	var ch2 <-chan *ca.InterContainerMessage
+//	var ch1 <-chan *ic.InterContainerMessage
+//	var ch2 <-chan *ic.InterContainerMessage
 //	topic := kk.Topic{Name: "Core"}
 //	ch1, err = coreKafkaProxy.Subscribe(topic)
 //	assert.NotNil(t, ch1)
@@ -159,7 +175,7 @@
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
 	//Unpack the result into the actual proto object
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -177,12 +193,12 @@
 	rpc := "GetDevice"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
 	//Unpack the result into the actual proto object
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -204,7 +220,7 @@
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -229,11 +245,11 @@
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -252,7 +268,7 @@
 	topic := kk.Topic{Name: "Core"}
 	expectedResponse := &voltha.Device{Id: trnsId}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -275,7 +291,7 @@
 	topic := kk.Topic{Name: "Core"}
 	expectedResponse := &voltha.Device{Id: trnsId}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -294,7 +310,7 @@
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "portType",
 		Value: protoArg2,
@@ -302,7 +318,7 @@
 	rpc := "GetPorts"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -325,12 +341,12 @@
 	rpc := "GetPorts"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
 	//Unpack the result into the actual proto object
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -339,18 +355,18 @@
 
 func TestChildDeviceDetected(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &ca.StrType{Val: trnsId}
+	protoArg1 := &ic.StrType{Val: trnsId}
 	args := make([]*kk.KVArg, 5)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "parentPortNo",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.StrType{Val: "great_onu"}
+	protoArg3 := &ic.StrType{Val: "great_onu"}
 	args[2] = &kk.KVArg{
 		Key:   "childDeviceType",
 		Value: protoArg3,
@@ -360,7 +376,7 @@
 		Key:   "proxyAddress",
 		Value: protoArg4,
 	}
-	protoArg5 := &ca.IntType{Val: 1}
+	protoArg5 := &ic.IntType{Val: 1}
 	args[4] = &kk.KVArg{
 		Key:   "portType",
 		Value: protoArg5,
@@ -369,7 +385,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -378,18 +394,18 @@
 
 func TestChildDeviceDetectedNoWait(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &ca.StrType{Val: trnsId}
+	protoArg1 := &ic.StrType{Val: trnsId}
 	args := make([]*kk.KVArg, 5)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "parentPortNo",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.StrType{Val: "great_onu"}
+	protoArg3 := &ic.StrType{Val: "great_onu"}
 	args[2] = &kk.KVArg{
 		Key:   "childDeviceType",
 		Value: protoArg3,
@@ -399,7 +415,7 @@
 		Key:   "proxyAddress",
 		Value: protoArg4,
 	}
-	protoArg5 := &ca.IntType{Val: 1}
+	protoArg5 := &ic.IntType{Val: 1}
 	args[4] = &kk.KVArg{
 		Key:   "portType",
 		Value: protoArg5,
@@ -408,7 +424,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,false, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, false, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -417,18 +433,18 @@
 
 func TestChildDeviceDetectedMissingArgs(t *testing.T) {
 	trnsId := uuid.New().String()
-	protoArg1 := &ca.StrType{Val: trnsId}
+	protoArg1 := &ic.StrType{Val: trnsId}
 	args := make([]*kk.KVArg, 4)
 	args[0] = &kk.KVArg{
 		Key:   "deviceID",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "parentPortNo",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.StrType{Val: "great_onu"}
+	protoArg3 := &ic.StrType{Val: "great_onu"}
 	args[2] = &kk.KVArg{
 		Key:   "childDeviceType",
 		Value: protoArg3,
@@ -437,11 +453,11 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
-	unpackResult := &ca.Error{}
+	unpackResult := &ic.Error{}
 	if err := ptypes.UnmarshalAny(result, unpackResult); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 	}
@@ -457,12 +473,12 @@
 		Key:   "device_id",
 		Value: protoArg1,
 	}
-	protoArg2 := &ca.IntType{Val: 1}
+	protoArg2 := &ic.IntType{Val: 1}
 	args[1] = &kk.KVArg{
 		Key:   "oper_status",
 		Value: protoArg2,
 	}
-	protoArg3 := &ca.IntType{Val: 1}
+	protoArg3 := &ic.IntType{Val: 1}
 	args[2] = &kk.KVArg{
 		Key:   "connect_status",
 		Value: protoArg3,
@@ -471,13 +487,58 @@
 	rpc := "DeviceStateUpdate"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
 	assert.Nil(t, result)
 }
 
+func subscribeToTopic(topic *kk.Topic, waitingChannel chan *ic.InterContainerMessage) error {
+	var ch <-chan *ic.InterContainerMessage
+	var err error
+	if ch, err = kafkaPartitionClient.Subscribe(topic); err != nil {
+		return nil
+	}
+	msg := <-ch
+
+	log.Debugw("msg-received", log.Fields{"msg": msg})
+	waitingChannel <- msg
+	return nil
+}
+
+func TestDeviceDiscovery(t *testing.T) {
+	// Create an intercontainer proxy - similar to the Core
+	testProxy, _ := kk.NewInterContainerProxy(
+		kk.InterContainerHost(hostIP),
+		kk.InterContainerPort(9092),
+		kk.DefaultTopic(&kk.Topic{Name: "Test"}),
+		kk.MsgClient(kafkaClient),
+		kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
+
+	//	First start to wait for the message
+	waitingChannel := make(chan *ic.InterContainerMessage)
+	go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
+
+	// Sleep to make sure the consumer is ready
+	time.Sleep(time.Millisecond * 100)
+
+	// Send the message
+	go testProxy.DeviceDiscovered("TestDeviceId", "TestDevicetype", "TestParentId")
+
+	msg := <-waitingChannel
+	totalTime := (time.Now().UnixNano() - msg.Header.Timestamp) / int64(time.Millisecond)
+	assert.Equal(t, msg.Header.Type, ic.MessageType_DEVICE_DISCOVERED)
+	//	Unpack message
+	dd := &ic.DeviceDiscovered{}
+	err := ptypes.UnmarshalAny(msg.Body, dd)
+	assert.Nil(t, err)
+	assert.Equal(t, dd.Id, "TestDeviceId")
+	assert.Equal(t, dd.DeviceType, "TestDevicetype")
+	assert.Equal(t, dd.ParentId, "TestParentId")
+	log.Debugw("TotalTime", log.Fields{"time": totalTime})
+}
+
 func TestStopKafkaProxy(t *testing.T) {
 	adapterKafkaProxy.Stop()
 	coreKafkaProxy.Stop()