[VOL-1346]  This commit addresses device discovery notifications
which will be principally used by the affinity router.  In doing so
this commit also rename the core_adapter.proto to inter_container.proto.

Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index ff3584f..4a3ec92 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -24,7 +24,7 @@
 	"github.com/golang/protobuf/ptypes/any"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"reflect"
 	"sync"
 	"time"
@@ -45,14 +45,14 @@
 // async requests into the Core via the kafka messaging bus
 type requestHandlerChannel struct {
 	requesthandlerInterface interface{}
-	ch                      <-chan *ca.InterContainerMessage
+	ch                      <-chan *ic.InterContainerMessage
 }
 
 // transactionChannel represents a combination of a topic and a channel onto which a response received
 // on the kafka bus will be sent to
 type transactionChannel struct {
 	topic *Topic
-	ch    chan *ca.InterContainerMessage
+	ch    chan *ic.InterContainerMessage
 }
 
 // InterContainerProxy represents the messaging proxy
@@ -61,6 +61,7 @@
 	kafkaPort                      int
 	DefaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
+	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
 	doneCh                         chan int
 
@@ -72,7 +73,7 @@
 	// This map is used to map a channel to a response topic.   This channel handles all responses on that
 	// channel for that topic and forward them to the appropriate consumers channel, using the
 	// transactionIdToChannelMap.
-	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
+	topicToResponseChannelMap   map[string]<-chan *ic.InterContainerMessage
 	lockTopicResponseChannelMap sync.RWMutex
 
 	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
@@ -101,6 +102,12 @@
 	}
 }
 
+func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.deviceDiscoveryTopic = topic
+	}
+}
+
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
 	return func(args *InterContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
@@ -149,7 +156,7 @@
 	}
 
 	// Create the topic to response channel map
-	kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
 	//
 	// Create the transactionId to Channel Map
 	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
@@ -170,6 +177,47 @@
 	//kp.deleteAllTransactionIdToChannelMap()
 }
 
+// DeviceDiscovered publish the discovered device onto the kafka messaging bus
+func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
+	log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+	//	Simple validation
+	if deviceId == "" || deviceType == "" {
+		log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+		return errors.New("invalid-parameters")
+	}
+	//	Create the device discovery message
+	header := &ic.Header{
+		Id:        uuid.New().String(),
+		Type:      ic.MessageType_DEVICE_DISCOVERED,
+		FromTopic: kp.DefaultTopic.Name,
+		ToTopic:   kp.deviceDiscoveryTopic.Name,
+		Timestamp: time.Now().UnixNano(),
+	}
+	body := &ic.DeviceDiscovered{
+		Id:         deviceId,
+		DeviceType: deviceType,
+		ParentId:   parentId,
+	}
+
+	var marshalledData *any.Any
+	var err error
+	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
+		log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+		return err
+	}
+	msg := &ic.InterContainerMessage{
+		Header: header,
+		Body:   marshalledData,
+	}
+
+	// Send the message
+	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
+		log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+		return err
+	}
+	return nil
+}
+
 // InvokeRPC is used to send a request to a given topic
 func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
@@ -189,7 +237,7 @@
 	}
 
 	// Subscribe for response, if needed, before sending request
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	if waitForResponse {
 		var err error
 		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
@@ -221,7 +269,7 @@
 		select {
 		case msg := <-ch:
 			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			var responseBody *ca.InterContainerResponseBody
+			var responseBody *ic.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
 				log.Errorw("decode-response-error", log.Fields{"error": err})
@@ -230,7 +278,7 @@
 		case <-ctx.Done():
 			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ca.Error{Reason: ctx.Err().Error()}
+			protoError := &ic.Error{Reason: ctx.Err().Error()}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -239,7 +287,7 @@
 		case <-childCtx.Done():
 			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ca.Error{Reason: childCtx.Err().Error()}
+			protoError := &ic.Error{Reason: childCtx.Err().Error()}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -258,7 +306,7 @@
 func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
@@ -277,7 +325,7 @@
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
 func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
 	// Subscribe to receive messages for that topic
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
@@ -296,7 +344,7 @@
 
 // setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -376,7 +424,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -444,15 +492,15 @@
 	return marshalledReturnedVal, nil
 }
 
-func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
-	responseHeader := &ca.Header{
+func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
-		Type:      ca.MessageType_RESPONSE,
+		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		Timestamp: time.Now().Unix(),
 	}
-	responseBody := &ca.InterContainerResponseBody{
+	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
 		Result:  nil,
 	}
@@ -463,7 +511,7 @@
 		log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
 	}
 
-	return &ca.InterContainerMessage{
+	return &ic.InterContainerMessage{
 		Header: responseHeader,
 		Body:   marshalledResponseBody,
 	}
@@ -472,11 +520,11 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
+func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
 	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
-	responseHeader := &ca.Header{
+	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
-		Type:      ca.MessageType_RESPONSE,
+		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		Timestamp: time.Now().Unix(),
@@ -492,7 +540,7 @@
 		break // for now we support only 1 returned value - (excluding the error)
 	}
 
-	responseBody := &ca.InterContainerResponseBody{
+	responseBody := &ic.InterContainerResponseBody{
 		Success: success,
 		Result:  marshalledReturnedVal,
 	}
@@ -504,7 +552,7 @@
 		return nil, err
 	}
 
-	return &ca.InterContainerMessage{
+	return &ic.InterContainerMessage{
 		Header: responseHeader,
 		Body:   marshalledResponseBody,
 	}, nil
@@ -524,16 +572,16 @@
 	return
 }
 
-func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
-	if msg.Header.Type == ca.MessageType_REQUEST {
+	if msg.Header.Type == ic.MessageType_REQUEST {
 
 		var out []reflect.Value
 		var err error
 
 		// Get the request body
-		requestBody := &ca.InterContainerRequestBody{}
+		requestBody := &ic.InterContainerRequestBody{}
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
@@ -547,11 +595,11 @@
 		// Response required?
 		if requestBody.ResponseRequired {
 			// If we already have an error before then just return that
-			var returnError *ca.Error
+			var returnError *ic.Error
 			var returnedValues []interface{}
 			var success bool
 			if err != nil {
-				returnError = &ca.Error{Reason: err.Error()}
+				returnError = &ic.Error{Reason: err.Error()}
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
@@ -561,10 +609,10 @@
 				lastIndex := len(out) - 1
 				if out[lastIndex].Interface() != nil { // Error
 					if goError, ok := out[lastIndex].Interface().(error); ok {
-						returnError = &ca.Error{Reason: goError.Error()}
+						returnError = &ic.Error{Reason: goError.Error()}
 						returnedValues = append(returnedValues, returnError)
 					} else { // Should never happen
-						returnError = &ca.Error{Reason: "incorrect-error-returns"}
+						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else { // Non-error case
@@ -578,7 +626,7 @@
 				}
 			}
 
-			var icm *ca.InterContainerMessage
+			var icm *ic.InterContainerMessage
 			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
 				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
@@ -597,7 +645,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -605,7 +653,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -617,14 +665,14 @@
 
 // waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
 // and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
 	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
 		case msg := <-subscribedCh:
 			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
-			if msg.Header.Type == ca.MessageType_RESPONSE {
+			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
 		case <-kp.doneCh:
@@ -641,13 +689,13 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// First check whether we already have a channel listening for response on that topic.  If there is
 	// already one then it will be reused.  If not, it will be created.
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
-		var subscribedCh <-chan *ca.InterContainerMessage
+		var subscribedCh <-chan *ic.InterContainerMessage
 		var err error
 		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
@@ -659,7 +707,7 @@
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
-	ch := make(chan *ca.InterContainerMessage)
+	ch := make(chan *ic.InterContainerMessage)
 	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
 
 	return ch, nil
@@ -676,15 +724,15 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
-	requestHeader := &ca.Header{
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
-		Type:      ca.MessageType_REQUEST,
+		Type:      ic.MessageType_REQUEST,
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
 		Timestamp: time.Now().Unix(),
 	}
-	requestBody := &ca.InterContainerRequestBody{
+	requestBody := &ic.InterContainerRequestBody{
 		Rpc:              rpc,
 		ResponseRequired: true,
 		ReplyToTopic:     replyTopic.Name,
@@ -708,7 +756,7 @@
 			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 			return nil, err
 		}
-		protoArg := &ca.Argument{
+		protoArg := &ic.Argument{
 			Key:   arg.Key,
 			Value: marshalledArg,
 		}
@@ -721,16 +769,16 @@
 		log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 		return nil, err
 	}
-	request := &ca.InterContainerMessage{
+	request := &ic.InterContainerMessage{
 		Header: requestHeader,
 		Body:   marshalledData,
 	}
 	return request, nil
 }
 
-func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
+func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
 	//	Extract the message body
-	responseBody := ca.InterContainerResponseBody{}
+	responseBody := ic.InterContainerResponseBody{}
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err