[VOL-1346]  This commit addresses device discovery notifications
which will be principally used by the affinity router.  In doing so
this commit also rename the core_adapter.proto to inter_container.proto.

Change-Id: Ib2a7b84efa50367d0ffbc482fba6096a225f3150
diff --git a/kafka/client.go b/kafka/client.go
index b93ad86..1df700e 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -16,7 +16,7 @@
 package kafka
 
 import (
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ca "github.com/opencord/voltha-go/protos/inter_container"
 	"time"
 )
 
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index ff3584f..4a3ec92 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -24,7 +24,7 @@
 	"github.com/golang/protobuf/ptypes/any"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"reflect"
 	"sync"
 	"time"
@@ -45,14 +45,14 @@
 // async requests into the Core via the kafka messaging bus
 type requestHandlerChannel struct {
 	requesthandlerInterface interface{}
-	ch                      <-chan *ca.InterContainerMessage
+	ch                      <-chan *ic.InterContainerMessage
 }
 
 // transactionChannel represents a combination of a topic and a channel onto which a response received
 // on the kafka bus will be sent to
 type transactionChannel struct {
 	topic *Topic
-	ch    chan *ca.InterContainerMessage
+	ch    chan *ic.InterContainerMessage
 }
 
 // InterContainerProxy represents the messaging proxy
@@ -61,6 +61,7 @@
 	kafkaPort                      int
 	DefaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
+	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
 	doneCh                         chan int
 
@@ -72,7 +73,7 @@
 	// This map is used to map a channel to a response topic.   This channel handles all responses on that
 	// channel for that topic and forward them to the appropriate consumers channel, using the
 	// transactionIdToChannelMap.
-	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
+	topicToResponseChannelMap   map[string]<-chan *ic.InterContainerMessage
 	lockTopicResponseChannelMap sync.RWMutex
 
 	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
@@ -101,6 +102,12 @@
 	}
 }
 
+func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.deviceDiscoveryTopic = topic
+	}
+}
+
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
 	return func(args *InterContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
@@ -149,7 +156,7 @@
 	}
 
 	// Create the topic to response channel map
-	kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ic.InterContainerMessage)
 	//
 	// Create the transactionId to Channel Map
 	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
@@ -170,6 +177,47 @@
 	//kp.deleteAllTransactionIdToChannelMap()
 }
 
+// DeviceDiscovered publish the discovered device onto the kafka messaging bus
+func (kp *InterContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string) error {
+	log.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
+	//	Simple validation
+	if deviceId == "" || deviceType == "" {
+		log.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
+		return errors.New("invalid-parameters")
+	}
+	//	Create the device discovery message
+	header := &ic.Header{
+		Id:        uuid.New().String(),
+		Type:      ic.MessageType_DEVICE_DISCOVERED,
+		FromTopic: kp.DefaultTopic.Name,
+		ToTopic:   kp.deviceDiscoveryTopic.Name,
+		Timestamp: time.Now().UnixNano(),
+	}
+	body := &ic.DeviceDiscovered{
+		Id:         deviceId,
+		DeviceType: deviceType,
+		ParentId:   parentId,
+	}
+
+	var marshalledData *any.Any
+	var err error
+	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
+		log.Errorw("cannot-marshal-request", log.Fields{"error": err})
+		return err
+	}
+	msg := &ic.InterContainerMessage{
+		Header: header,
+		Body:   marshalledData,
+	}
+
+	// Send the message
+	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
+		log.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
+		return err
+	}
+	return nil
+}
+
 // InvokeRPC is used to send a request to a given topic
 func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
@@ -189,7 +237,7 @@
 	}
 
 	// Subscribe for response, if needed, before sending request
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	if waitForResponse {
 		var err error
 		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
@@ -221,7 +269,7 @@
 		select {
 		case msg := <-ch:
 			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
-			var responseBody *ca.InterContainerResponseBody
+			var responseBody *ic.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
 				log.Errorw("decode-response-error", log.Fields{"error": err})
@@ -230,7 +278,7 @@
 		case <-ctx.Done():
 			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": ctx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ca.Error{Reason: ctx.Err().Error()}
+			protoError := &ic.Error{Reason: ctx.Err().Error()}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -239,7 +287,7 @@
 		case <-childCtx.Done():
 			log.Debugw("context-cancelled", log.Fields{"rpc": rpc, "ctx": childCtx.Err()})
 			//	 pack the error as proto any type
-			protoError := &ca.Error{Reason: childCtx.Err().Error()}
+			protoError := &ic.Error{Reason: childCtx.Err().Error()}
 			var marshalledArg *any.Any
 			if marshalledArg, err = ptypes.MarshalAny(protoError); err != nil {
 				return false, nil // Should never happen
@@ -258,7 +306,7 @@
 func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
@@ -277,7 +325,7 @@
 // when a message is received on a given topic.  So far there is only 1 target registered per microservice
 func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
 	// Subscribe to receive messages for that topic
-	var ch <-chan *ca.InterContainerMessage
+	var ch <-chan *ic.InterContainerMessage
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
@@ -296,7 +344,7 @@
 
 // setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
-func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ic.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
 	defer kp.lockTopicResponseChannelMap.Unlock()
 	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
@@ -376,7 +424,7 @@
 	return err
 }
 
-func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
@@ -444,15 +492,15 @@
 	return marshalledReturnedVal, nil
 }
 
-func encodeDefaultFailedResponse(request *ca.InterContainerMessage) *ca.InterContainerMessage {
-	responseHeader := &ca.Header{
+func encodeDefaultFailedResponse(request *ic.InterContainerMessage) *ic.InterContainerMessage {
+	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
-		Type:      ca.MessageType_RESPONSE,
+		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		Timestamp: time.Now().Unix(),
 	}
-	responseBody := &ca.InterContainerResponseBody{
+	responseBody := &ic.InterContainerResponseBody{
 		Success: false,
 		Result:  nil,
 	}
@@ -463,7 +511,7 @@
 		log.Warnw("cannot-marshal-failed-response-body", log.Fields{"error": err})
 	}
 
-	return &ca.InterContainerMessage{
+	return &ic.InterContainerMessage{
 		Header: responseHeader,
 		Body:   marshalledResponseBody,
 	}
@@ -472,11 +520,11 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
+func encodeResponse(request *ic.InterContainerMessage, success bool, returnedValues ...interface{}) (*ic.InterContainerMessage, error) {
 	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
-	responseHeader := &ca.Header{
+	responseHeader := &ic.Header{
 		Id:        request.Header.Id,
-		Type:      ca.MessageType_RESPONSE,
+		Type:      ic.MessageType_RESPONSE,
 		FromTopic: request.Header.ToTopic,
 		ToTopic:   request.Header.FromTopic,
 		Timestamp: time.Now().Unix(),
@@ -492,7 +540,7 @@
 		break // for now we support only 1 returned value - (excluding the error)
 	}
 
-	responseBody := &ca.InterContainerResponseBody{
+	responseBody := &ic.InterContainerResponseBody{
 		Success: success,
 		Result:  marshalledReturnedVal,
 	}
@@ -504,7 +552,7 @@
 		return nil, err
 	}
 
-	return &ca.InterContainerMessage{
+	return &ic.InterContainerMessage{
 		Header: responseHeader,
 		Body:   marshalledResponseBody,
 	}, nil
@@ -524,16 +572,16 @@
 	return
 }
 
-func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ic.InterContainerMessage, targetInterface interface{}) {
 
 	// First extract the header to know whether this is a request - responses are handled by a different handler
-	if msg.Header.Type == ca.MessageType_REQUEST {
+	if msg.Header.Type == ic.MessageType_REQUEST {
 
 		var out []reflect.Value
 		var err error
 
 		// Get the request body
-		requestBody := &ca.InterContainerRequestBody{}
+		requestBody := &ic.InterContainerRequestBody{}
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
@@ -547,11 +595,11 @@
 		// Response required?
 		if requestBody.ResponseRequired {
 			// If we already have an error before then just return that
-			var returnError *ca.Error
+			var returnError *ic.Error
 			var returnedValues []interface{}
 			var success bool
 			if err != nil {
-				returnError = &ca.Error{Reason: err.Error()}
+				returnError = &ic.Error{Reason: err.Error()}
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
@@ -561,10 +609,10 @@
 				lastIndex := len(out) - 1
 				if out[lastIndex].Interface() != nil { // Error
 					if goError, ok := out[lastIndex].Interface().(error); ok {
-						returnError = &ca.Error{Reason: goError.Error()}
+						returnError = &ic.Error{Reason: goError.Error()}
 						returnedValues = append(returnedValues, returnError)
 					} else { // Should never happen
-						returnError = &ca.Error{Reason: "incorrect-error-returns"}
+						returnError = &ic.Error{Reason: "incorrect-error-returns"}
 						returnedValues = append(returnedValues, returnError)
 					}
 				} else { // Non-error case
@@ -578,7 +626,7 @@
 				}
 			}
 
-			var icm *ca.InterContainerMessage
+			var icm *ic.InterContainerMessage
 			if icm, err = encodeResponse(msg, success, returnedValues...); err != nil {
 				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
@@ -597,7 +645,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ic.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
 		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
@@ -605,7 +653,7 @@
 	}
 }
 
-func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ic.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
@@ -617,14 +665,14 @@
 
 // waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
 // and then dispatches to the consumers
-func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ic.InterContainerMessage, topic *Topic) {
 	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
 		case msg := <-subscribedCh:
 			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
-			if msg.Header.Type == ca.MessageType_RESPONSE {
+			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
 		case <-kp.doneCh:
@@ -641,13 +689,13 @@
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
 	// First check whether we already have a channel listening for response on that topic.  If there is
 	// already one then it will be reused.  If not, it will be created.
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
-		var subscribedCh <-chan *ca.InterContainerMessage
+		var subscribedCh <-chan *ic.InterContainerMessage
 		var err error
 		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
@@ -659,7 +707,7 @@
 
 	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
-	ch := make(chan *ca.InterContainerMessage)
+	ch := make(chan *ic.InterContainerMessage)
 	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
 
 	return ch, nil
@@ -676,15 +724,15 @@
 
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
-func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ca.InterContainerMessage, error) {
-	requestHeader := &ca.Header{
+func encodeRequest(rpc string, toTopic *Topic, replyTopic *Topic, kvArgs ...*KVArg) (*ic.InterContainerMessage, error) {
+	requestHeader := &ic.Header{
 		Id:        uuid.New().String(),
-		Type:      ca.MessageType_REQUEST,
+		Type:      ic.MessageType_REQUEST,
 		FromTopic: replyTopic.Name,
 		ToTopic:   toTopic.Name,
 		Timestamp: time.Now().Unix(),
 	}
-	requestBody := &ca.InterContainerRequestBody{
+	requestBody := &ic.InterContainerRequestBody{
 		Rpc:              rpc,
 		ResponseRequired: true,
 		ReplyToTopic:     replyTopic.Name,
@@ -708,7 +756,7 @@
 			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 			return nil, err
 		}
-		protoArg := &ca.Argument{
+		protoArg := &ic.Argument{
 			Key:   arg.Key,
 			Value: marshalledArg,
 		}
@@ -721,16 +769,16 @@
 		log.Warnw("cannot-marshal-request", log.Fields{"error": err})
 		return nil, err
 	}
-	request := &ca.InterContainerMessage{
+	request := &ic.InterContainerMessage{
 		Header: requestHeader,
 		Body:   marshalledData,
 	}
 	return request, nil
 }
 
-func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
+func decodeResponse(response *ic.InterContainerMessage) (*ic.InterContainerResponseBody, error) {
 	//	Extract the message body
-	responseBody := ca.InterContainerResponseBody{}
+	responseBody := ic.InterContainerResponseBody{}
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 8468e42..e330b85 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -22,7 +22,7 @@
 	"github.com/golang/protobuf/proto"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
-	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	ic "github.com/opencord/voltha-go/protos/inter_container"
 	"gopkg.in/Shopify/sarama.v1"
 	"strings"
 	"sync"
@@ -40,7 +40,7 @@
 //consumer or a group consumer
 type consumerChannels struct {
 	consumers []interface{}
-	channels  []chan *ca.InterContainerMessage
+	channels  []chan *ic.InterContainerMessage
 }
 
 // SaramaClient represents the messaging proxy
@@ -307,20 +307,20 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
 	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
 
 	// If a consumers already exist for that topic then resuse it
 	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
 		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
 		// Create a channel specific for that consumers and add it to the consumers channel map
-		ch := make(chan *ca.InterContainerMessage)
+		ch := make(chan *ic.InterContainerMessage)
 		sc.addChannelToConsumerChannelMap(topic, ch)
 		return ch, nil
 	}
 
 	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
+	var consumerListeningChannel chan *ic.InterContainerMessage
 	var err error
 
 	// Use the consumerType option to figure out the type of consumer to launch
@@ -351,7 +351,7 @@
 }
 
 //UnSubscribe unsubscribe a consumer from a given topic
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ic.InterContainerMessage) error {
 	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
 	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
 	return err
@@ -393,9 +393,9 @@
 	// TODO: Use a lock or a different mechanism to ensure the response received corresponds to the message sent.
 	select {
 	case ok := <-sc.producer.Successes():
-		log.Debugw("message-sent", log.Fields{"status":ok})
+		log.Debugw("message-sent", log.Fields{"status": ok})
 	case notOk := <-sc.producer.Errors():
-		log.Debugw("error-sending", log.Fields{"status":notOk})
+		log.Debugw("error-sending", log.Fields{"status": notOk})
 		return notOk
 	}
 	return nil
@@ -443,7 +443,7 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ic.InterContainerMessage) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -482,7 +482,7 @@
 	return err
 }
 
-func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ic.InterContainerMessage) error {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
@@ -620,12 +620,12 @@
 
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
 // topic via the unique channel each subsciber received during subscription
-func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ic.InterContainerMessage) {
 	// Need to go over all channels and publish messages to them - do we need to copy msg?
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	for _, ch := range consumerCh.channels {
-		go func(c chan *ca.InterContainerMessage) {
+		go func(c chan *ic.InterContainerMessage) {
 			c <- protoMessage
 		}(ch)
 	}
@@ -652,7 +652,7 @@
 				break startloop
 			}
 			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
+			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("partition-invalid-message", log.Fields{"error": err})
 				continue
@@ -688,7 +688,7 @@
 				break startloop
 			}
 			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
+			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("invalid-message", log.Fields{"error": err})
 				continue
@@ -728,7 +728,7 @@
 
 //// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 //// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ic.InterContainerMessage, error) {
 	var pConsumers []sarama.PartitionConsumer
 	var err error
 
@@ -744,10 +744,10 @@
 
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
 	cc := &consumerChannels{
 		consumers: consumersIf,
-		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -761,7 +761,7 @@
 
 // setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
 // for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ic.InterContainerMessage, error) {
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
@@ -771,10 +771,10 @@
 	}
 	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	consumerListeningChannel := make(chan *ic.InterContainerMessage)
 	cc := &consumerChannels{
 		consumers: []interface{}{pConsumer},
-		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+		channels:  []chan *ic.InterContainerMessage{consumerListeningChannel},
 	}
 
 	// Add the consumers channel to the map
@@ -806,9 +806,9 @@
 	return pConsumers, nil
 }
 
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+func removeChannel(channels []chan *ic.InterContainerMessage, ch <-chan *ic.InterContainerMessage) []chan *ic.InterContainerMessage {
 	var i int
-	var channel chan *ca.InterContainerMessage
+	var channel chan *ic.InterContainerMessage
 	for i, channel = range channels {
 		if channel == ch {
 			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]