This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 78a8a5a..25fc1b7 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -19,7 +19,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"github.com/Shopify/sarama"
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -37,73 +36,87 @@
 }
 
 const (
-	DefaultKafkaHost         = "10.100.198.240"
-	DefaultKafkaPort         = 9092
-	DefaultTopicName         = "Core"
-	DefaultSleepOnError      = 1
-	DefaultFlushFrequency    = 1
-	DefaultFlushMessages     = 1
-	DefaultFlushMaxmessages  = 1
-	DefaultMaxRetries        = 3
-	DefaultReturnSuccess     = false
-	DefaultReturnErrors      = true
-	DefaultConsumerMaxwait   = 50
-	DefaultMaxProcessingTime = 100
-	DefaultRequestTimeout    = 500 // 500 milliseconds - to handle a wider latency range
+	DefaultMaxRetries     = 3
+	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
 )
 
-type consumerChannels struct {
-	consumer sarama.PartitionConsumer
-	channels []chan *ca.InterContainerMessage
+// requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
+// obtained from that channel, this interface is invoked.   This is used to handle
+// async requests into the Core via the kafka messaging bus
+type requestHandlerChannel struct {
+	requesthandlerInterface interface{}
+	ch                      <-chan *ca.InterContainerMessage
 }
 
-// KafkaMessagingProxy represents the messaging proxy
-type KafkaMessagingProxy struct {
-	KafkaHost                     string
-	KafkaPort                     int
-	DefaultTopic                  *Topic
-	TargetInterface               interface{}
-	producer                      sarama.AsyncProducer
-	consumer                      sarama.Consumer
-	doneCh                        chan int
-	waitForResponseRoutineStarted bool
-	topicToConsumerChannelMap     map[string]*consumerChannels
-	transactionIdToChannelMap     map[string]chan *ca.InterContainerMessage
-	lockTopicToConsumerChannelMap sync.RWMutex
+// transactionChannel represents a combination of a topic and a channel onto which a response received
+// on the kafka bus will be sent to
+type transactionChannel struct {
+	topic *Topic
+	ch    chan *ca.InterContainerMessage
+}
+
+// InterContainerProxy represents the messaging proxy
+type InterContainerProxy struct {
+	kafkaHost                      string
+	kafkaPort                      int
+	DefaultTopic                   *Topic
+	defaultRequestHandlerInterface interface{}
+	kafkaClient                    Client
+	doneCh                         chan int
+
+	// This map is used to map a topic to an interface and channel.   When a request is received
+	// on that channel (registered to the topic) then that interface is invoked.
+	topicToRequestHandlerChannelMap   map[string]*requestHandlerChannel
+	lockTopicRequestHandlerChannelMap sync.RWMutex
+
+	// This map is used to map a channel to a response topic.   This channel handles all responses on that
+	// channel for that topic and forward them to the appropriate consumer channel, using the
+	// transactionIdToChannelMap.
+	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
+	lockTopicResponseChannelMap sync.RWMutex
+
+	// This map is used to map a transaction to a consumer channel.  This is used whenever a request has been
+	// sent out and we are waiting for a response.
+	transactionIdToChannelMap     map[string]*transactionChannel
 	lockTransactionIdToChannelMap sync.RWMutex
 }
 
-type KafkaProxyOption func(*KafkaMessagingProxy)
+type InterContainerProxyOption func(*InterContainerProxy)
 
-func KafkaHost(host string) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.KafkaHost = host
+func InterContainerHost(host string) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaHost = host
 	}
 }
 
-func KafkaPort(port int) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.KafkaPort = port
+func InterContainerPort(port int) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaPort = port
 	}
 }
 
-func DefaultTopic(topic *Topic) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
+func DefaultTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
 		args.DefaultTopic = topic
 	}
 }
 
-func TargetInterface(target interface{}) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.TargetInterface = target
+func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.defaultRequestHandlerInterface = handler
 	}
 }
 
-func NewKafkaMessagingProxy(opts ...KafkaProxyOption) (*KafkaMessagingProxy, error) {
-	proxy := &KafkaMessagingProxy{
-		KafkaHost:    DefaultKafkaHost,
-		KafkaPort:    DefaultKafkaPort,
-		DefaultTopic: &Topic{Name: DefaultTopicName},
+func MsgClient(client Client) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaClient = client
+	}
+}
+
+func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
+	proxy := &InterContainerProxy{
+		kafkaHost: DefaultKafkaHost,
+		kafkaPort: DefaultKafkaPort,
 	}
 
 	for _, option := range opts {
@@ -111,59 +124,65 @@
 	}
 
 	// Create the locks for all the maps
-	proxy.lockTopicToConsumerChannelMap = sync.RWMutex{}
+	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
 	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
+	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
 
 	return proxy, nil
 }
 
-func (kp *KafkaMessagingProxy) Start() error {
+func (kp *InterContainerProxy) Start() error {
 	log.Info("Starting-Proxy")
 
+	// Kafka MsgClient should already have been created.  If not, output fatal error
+	if kp.kafkaClient == nil {
+		log.Fatal("kafka-client-not-set")
+	}
+
 	// Create the Done channel
 	kp.doneCh = make(chan int, 1)
 
-	// Create the Publisher
-	if err := kp.createPublisher(DefaultMaxRetries); err != nil {
-		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+	// Start the kafka client
+	if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
 
-	// Create the master consumer
-	if err := kp.createConsumer(DefaultMaxRetries); err != nil {
-		log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
-		return err
-	}
-
-	// Create the topic to consumer/channel map
-	kp.topicToConsumerChannelMap = make(map[string]*consumerChannels)
-
+	// Create the topic to response channel map
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+	//
 	// Create the transactionId to Channel Map
-	kp.transactionIdToChannelMap = make(map[string]chan *ca.InterContainerMessage)
+	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
+
+	// Create the topic to request channel map
+	kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
 
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) Stop() {
-	log.Info("Stopping-Proxy")
-	if kp.producer != nil {
-		if err := kp.producer.Close(); err != nil {
-			panic(err)
-		}
-	}
-	if kp.consumer != nil {
-		if err := kp.consumer.Close(); err != nil {
-			panic(err)
-		}
-	}
-	//Close the done channel to close all long processing Go routines
-	close(kp.doneCh)
+func (kp *InterContainerProxy) Stop() {
+	log.Info("stopping-intercontainer-proxy")
+	kp.doneCh <- 1
+	// TODO : Perform cleanup
+	//kp.kafkaClient.Stop()
+	//kp.deleteAllTopicRequestHandlerChannelMap()
+	//kp.deleteAllTopicResponseChannelMap()
+	//kp.deleteAllTransactionIdToChannelMap()
 }
 
-func (kp *KafkaMessagingProxy) InvokeRPC(ctx context.Context, rpc string, topic *Topic, waitForResponse bool,
-	kvArgs ...*KVArg) (bool, *any.Any) {
+// InvokeRPC is used to send a request to a given topic
+func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+
+	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
+	// typically the device ID.
+	responseTopic := replyToTopic
+	if responseTopic == nil {
+		responseTopic = kp.DefaultTopic
+	}
+
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, topic, kp.DefaultTopic, kvArgs...)
+	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
 	if err != nil {
 		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
@@ -173,13 +192,17 @@
 	var ch <-chan *ca.InterContainerMessage
 	if waitForResponse {
 		var err error
-		if ch, err = kp.subscribeForResponse(*kp.DefaultTopic, protoRequest.Header.Id); err != nil {
-			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "topic": topic.Name})
+		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 		}
 	}
 
-	// Send request
-	go kp.sendToKafkaTopic(protoRequest, topic)
+	// Send request - if the topic is formatted with a device Id then we will send the request using a
+	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
+	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
+	key := GetDeviceIdFromTopic(*toTopic)
+	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
+	go kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
 		// Create a child context based on the parent context, if any
@@ -197,8 +220,7 @@
 		defer kp.unSubscribeForResponse(protoRequest.Header.Id)
 		select {
 		case msg := <-ch:
-			log.Debugw("received-response", log.Fields{"rpc": rpc, "msg": msg})
-
+			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ca.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
@@ -224,194 +246,183 @@
 			}
 			return false, marshalledArg
 		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
+			log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 			return true, nil
 		}
 	}
 	return true, nil
 }
 
-// Subscribe allows a caller to subscribe to a given topic.  A channel is returned to the
-// caller to receive messages from that topic.
-func (kp *KafkaMessagingProxy) Subscribe(topic Topic) (<-chan *ca.InterContainerMessage, error) {
-
-	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
-
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
-		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
-		// Create a channel specific for that consumer and add it to the consumer channel map
-		ch := make(chan *ca.InterContainerMessage)
-		kp.addChannelToConsumerChannelMap(topic, ch)
-		return ch, nil
-	}
-
-	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
-	var err error
-	if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
-		log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	return consumerListeningChannel, nil
-}
-
-func (kp *KafkaMessagingProxy) UnSubscribe(topic Topic, ch <-chan *ca.InterContainerMessage) error {
-	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
-	err := kp.removeChannelFromConsumerChannelMap(topic, ch)
-	return err
-}
-
-// SubscribeWithTarget allows a caller to assign a target object to be invoked automatically
+// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *KafkaMessagingProxy) SubscribeWithTarget(topic Topic, targetInterface interface{}) error {
+func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.Subscribe(topic); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		//if ch, err = kp.Subscribe(topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
+
+	kp.defaultRequestHandlerInterface = handler
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, targetInterface)
+	go kp.waitForRequest(ch, topic, handler)
 
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) UnSubscribeTarget(ctx context.Context, topic Topic, targetInterface interface{}) error {
-	// TODO - mostly relevant with multiple interfaces
+// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
+// when a message is received on a given topic.  So far there is only 1 target registered per microservice
+func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
+	// Subscribe to receive messages for that topic
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+	}
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
+
+	// Launch a go routine to receive and process kafka messages
+	go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
+
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
-		kp.topicToConsumerChannelMap[id] = arg
+func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+}
+
+// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// responses from that topic.
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
+		kp.topicToResponseChannelMap[topic] = arg
 	}
 }
 
-func (kp *KafkaMessagingProxy) deleteFromTopicToConsumerChannelMap(id string) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := kp.topicToConsumerChannelMap[id]; exist {
-		delete(kp.topicToConsumerChannelMap, id)
+func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	_, exist := kp.topicToResponseChannelMap[topic]
+	return exist
+}
+
+func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		var err error
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
+		return err
+	} else {
+		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
 	}
 }
 
-func (kp *KafkaMessagingProxy) getConsumerChannel(topic Topic) *consumerChannels {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		return consumerCh
+func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	var err error
+	for topic, _ := range kp.topicToResponseChannelMap {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return nil
+	return err
 }
 
-func (kp *KafkaMessagingProxy) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		consumerCh.channels = append(consumerCh.channels, ch)
-		return
+func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
+		kp.topicToRequestHandlerChannelMap[topic] = arg
 	}
-	log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
 }
 
-func (kp *KafkaMessagingProxy) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		// Channel will be closed in the removeChannel method
-		consumerCh.channels = removeChannel(consumerCh.channels, ch)
+func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
+		// Close the kafka client client first by unsubscribing to this topic
+		kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
+		delete(kp.topicToRequestHandlerChannelMap, topic)
 		return nil
+	} else {
+		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
 	}
-	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
-	return errors.New("topic-does-not-exist")
 }
 
-func (kp *KafkaMessagingProxy) addToTransactionIdToChannelMap(id string, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	var err error
+	for topic, _ := range kp.topicToRequestHandlerChannelMap {
+		// Close the kafka client client first by unsubscribing to this topic
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+		}
+		delete(kp.topicToRequestHandlerChannelMap, topic)
+	}
+	return err
+}
+
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
-		kp.transactionIdToChannelMap[id] = arg
+		kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
 	}
 }
 
-func (kp *KafkaMessagingProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
-	if _, exist := kp.transactionIdToChannelMap[id]; exist {
+	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
+		// Close the channel first
+		close(transChannel.ch)
 		delete(kp.transactionIdToChannelMap, id)
 	}
 }
 
-func (kp *KafkaMessagingProxy) createPublisher(retries int) error {
-	// This Creates the publisher
-	config := sarama.NewConfig()
-	config.Producer.Partitioner = sarama.NewRandomPartitioner
-	config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
-	config.Producer.Flush.Messages = DefaultFlushMessages
-	config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
-	config.Producer.Return.Errors = DefaultReturnErrors
-	config.Producer.Return.Successes = DefaultReturnSuccess
-	config.Producer.RequiredAcks = sarama.WaitForAll
-	kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
-	brokers := []string{kafkaFullAddr}
-
-	for {
-		producer, err := sarama.NewAsyncProducer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-publisher", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			kp.producer = producer
-			break
+func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		if value.topic.Name == id {
+			close(value.ch)
+			delete(kp.transactionIdToChannelMap, key)
 		}
 	}
-	log.Info("Kafka-publisher-created")
-	return nil
 }
 
-func (kp *KafkaMessagingProxy) createConsumer(retries int) error {
-	config := sarama.NewConfig()
-	config.Consumer.Return.Errors = true
-	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
-	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
-	config.Consumer.Offsets.Initial = sarama.OffsetNewest
-	kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
-	brokers := []string{kafkaFullAddr}
-
-	for {
-		consumer, err := sarama.NewConsumer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-consumer", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			kp.consumer = consumer
-			break
-		}
+func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		close(value.ch)
+		delete(kp.transactionIdToChannelMap, key)
 	}
-	log.Info("Kafka-consumer-created")
-	return nil
 }
 
-func encodeReturnedValue(request *ca.InterContainerMessage, returnedVal interface{}) (*any.Any, error) {
+func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+	// If we have any consumers on that topic we need to close them
+	kp.deleteFromTopicResponseChannelMap(topic.Name)
+	kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+	return kp.kafkaClient.DeleteTopic(&topic)
+}
+
+func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
 	// Encode the response argument - needs to be a proto message
 	if returnedVal == nil {
 		return nil, nil
@@ -462,8 +473,7 @@
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
 func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
-
-	log.Infow("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
 	responseHeader := &ca.Header{
 		Id:        request.Header.Id,
 		Type:      ca.MessageType_RESPONSE,
@@ -476,7 +486,7 @@
 	var marshalledReturnedVal *any.Any
 	var err error
 	for _, returnVal := range returnedValues {
-		if marshalledReturnedVal, err = encodeReturnedValue(request, returnVal); err != nil {
+		if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
 			log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
 		}
 		break // for now we support only 1 returned value - (excluding the error)
@@ -504,7 +514,7 @@
 	myClassValue := reflect.ValueOf(myClass)
 	m := myClassValue.MethodByName(funcName)
 	if !m.IsValid() {
-		return make([]reflect.Value, 0), fmt.Errorf("Method not found \"%s\"", funcName)
+		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
 	}
 	in := make([]reflect.Value, len(params))
 	for i, param := range params {
@@ -514,11 +524,10 @@
 	return
 }
 
-func (kp *KafkaMessagingProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
 
-	// First extract the header to know whether this is a request of a response
+	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ca.MessageType_REQUEST {
-		log.Debugw("received-request", log.Fields{"header": msg.Header})
 
 		var out []reflect.Value
 		var err error
@@ -528,6 +537,7 @@
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
+			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
@@ -545,7 +555,7 @@
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
-				log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
+				//log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
 				returnedValues = make([]interface{}, 0)
 				// Check for errors first
 				lastIndex := len(out) - 1
@@ -560,7 +570,7 @@
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
-						log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						//log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
 						if idx != lastIndex {
 							returnedValues = append(returnedValues, val.Interface())
 						}
@@ -573,185 +583,94 @@
 				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
 			}
-			log.Debugw("sending-to-kafka", log.Fields{"msg": icm, "send-to-topic": msg.Header.FromTopic})
-			kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
+			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
+			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
+			// present then the key will be empty, hence all messages for a given topic will be sent to all
+			// partitions.
+			replyTopic := &Topic{Name: msg.Header.FromTopic}
+			key := GetDeviceIdFromTopic(*replyTopic)
+			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
+			// TODO: handle error response.
+			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 
-	} else if msg.Header.Type == ca.MessageType_RESPONSE {
-		log.Warnw("received-response-on-request-handler", log.Fields{"header": msg.Header})
-	} else {
-		log.Errorw("invalid-message", log.Fields{"header": msg.Header})
 	}
 }
 
-func (kp *KafkaMessagingProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
-		log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
 		go kp.handleRequest(msg, targetInterface)
 	}
 }
 
-// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
-// topic via the unique channel each subsciber received during subscription
-func (kp *KafkaMessagingProxy) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
-	// Need to go over all channels and publish messages to them - do we need to copy msg?
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	for _, ch := range consumerCh.channels {
-		go func(c chan *ca.InterContainerMessage) {
-			c <- protoMessage
-		}(ch)
-	}
-}
-
-func (kp *KafkaMessagingProxy) consumeMessagesLoop(topic Topic) {
-	log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
-	var consumerCh *consumerChannels
-	if consumerCh = kp.getConsumerChannel(topic); consumerCh == nil {
-		log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
-		return
-	}
-startloop:
-	for {
-		select {
-		case err := <-consumerCh.consumer.Errors():
-			log.Warnw("consumer-error", log.Fields{"error": err})
-		case msg := <-consumerCh.consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg})
-			// Since the only expected message is a proto intercontainermessage then extract it right away
-			// instead of dispatching it to the consumers
-			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				log.Warnw("invalid-message", log.Fields{"error": err})
-				continue
-			}
-			if icm.Header.Type == ca.MessageType_REQUEST {
-				log.Debugw("request-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-				go kp.dispatchToConsumers(consumerCh, icm)
-			} else if icm.Header.Type == ca.MessageType_RESPONSE {
-				log.Debugw("response-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-				go kp.dispatchResponse(icm)
-			} else {
-				log.Debugw("unsupported-msg-received", log.Fields{"msg": *icm})
-			}
-			//// TODO:  Dispatch requests and responses separately
-			//go kp.dispatchToConsumers(consumerCh, icm)
-		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
-			break startloop
-		}
-	}
-}
-
-func (kp *KafkaMessagingProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
 		log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return
 	}
-	kp.transactionIdToChannelMap[msg.Header.Id] <- msg
+	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
 }
 
-func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
-	log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
-	kp.waitForResponseRoutineStarted = true
+// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
+// and then dispatches to the consumer
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
-		case msg := <-ch:
-			log.Debugw("message-received", log.Fields{"topic": topic.Name, "msg": msg})
-			go kp.dispatchResponse(msg)
-			//	Need to handle program exit - TODO
+		case msg := <-subscribedCh:
+			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+			if msg.Header.Type == ca.MessageType_RESPONSE {
+				go kp.dispatchResponse(msg)
+			}
 		case <-kp.doneCh:
 			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-}
-
-// createConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
-// for that topic.  It also starts the routine that listens for messages on that topic.
-func (kp *KafkaMessagingProxy) setupConsumerChannel(topic Topic) (chan *ca.InterContainerMessage, error) {
-
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
-		return nil, nil // Already created, so just ignore
-	}
-
-	partitionList, err := kp.consumer.Partitions(topic.Name)
-	if err != nil {
-		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
-	// Create a partition consumer for that topic - for now just use one partition
-	var pConsumer sarama.PartitionConsumer
-	if pConsumer, err = kp.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
-		log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	// Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
-	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
-	cc := &consumerChannels{
-		consumer: pConsumer,
-		channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
-	}
-
-	// Add the consumer channel to the map
-	kp.addTopicToConsumerChannelMap(topic.Name, cc)
-
-	//Start a consumer to listen on that specific topic
-	go kp.consumeMessagesLoop(topic)
-
-	return consumerListeningChannel, nil
+	//log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+	//	We got an exit signal.  Unsubscribe to the channel
+	//kp.kafkaClient.UnSubscribe(topic, subscribedCh)
 }
 
 // subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
-		log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
+	// First check whether we already have a channel listening for response on that topic.  If there is
+	// already one then it will be reused.  If not, it will be created.
+	if !kp.isTopicSubscribedForResponse(topic.Name) {
+		var subscribedCh <-chan *ca.InterContainerMessage
 		var err error
-
-		if _, err = kp.setupConsumerChannel(topic); err != nil {
-			log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
 			return nil, err
 		}
+		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
+		go kp.waitForResponseLoop(subscribedCh, &topic)
 	}
 
+	// Create a specific channel for this consumer.  We cannot use the channel from the kafkaclient as it will
+	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ca.InterContainerMessage)
-	kp.addToTransactionIdToChannelMap(trnsId, ch)
+	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
 
 	return ch, nil
 }
 
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
-	var i int
-	var channel chan *ca.InterContainerMessage
-	for i, channel = range channels {
-		if channel == ch {
-			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
-			close(channel)
-			return channels[:len(channels)-1]
-		}
-	}
-	return channels
-}
-
-func (kp *KafkaMessagingProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
 	log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
-	// Close the channel first
-	close(kp.transactionIdToChannelMap[trnsId])
-	kp.deleteFromTransactionIdToChannelMap(trnsId)
+	if _, exist := kp.transactionIdToChannelMap[trnsId]; exist {
+		// The delete operation will close the channel
+		kp.deleteFromTransactionIdToChannelMap(trnsId)
+	}
 	return nil
 }
 
@@ -809,34 +728,14 @@
 	return request, nil
 }
 
-// sendRequest formats and sends the request onto the kafka messaging bus.  It waits for the
-// response if needed.  This function must, therefore, be run in its own routine.
-func (kp *KafkaMessagingProxy) sendToKafkaTopic(msg *ca.InterContainerMessage, topic *Topic) {
-
-	//	Create the Sarama producer message
-	time := time.Now().Unix()
-	marshalled, _ := proto.Marshal(msg)
-	kafkaMsg := &sarama.ProducerMessage{
-		Topic: topic.Name,
-		Key:   sarama.StringEncoder(time),
-		Value: sarama.ByteEncoder(marshalled),
-	}
-
-	// Send message to kafka
-	kp.producer.Input() <- kafkaMsg
-
-}
-
 func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
 	//	Extract the message body
 	responseBody := ca.InterContainerResponseBody{}
-
-	log.Debugw("decodeResponse", log.Fields{"icr": &response})
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
 	}
-	log.Debugw("decodeResponse", log.Fields{"icrbody": &responseBody})
+	//log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
 
 	return &responseBody, nil