diff --git a/kafka/client.go b/kafka/client.go
new file mode 100644
index 0000000..cb33a35
--- /dev/null
+++ b/kafka/client.go
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+)
+
+const (
+	DefaultKafkaHost         = "127.0.0.1"
+	DefaultKafkaPort         = 9092
+	DefaultGroupName         = "rw_core"
+	DefaultSleepOnError      = 1
+	DefaultFlushFrequency    = 1
+	DefaultFlushMessages     = 1
+	DefaultFlushMaxmessages  = 1
+	DefaultReturnSuccess     = false
+	DefaultReturnErrors      = true
+	DefaultConsumerMaxwait   = 10
+	DefaultMaxProcessingTime = 100
+)
+
+// MsgClient represents the set of APIs  a Kafka MsgClient must implement
+type Client interface {
+	Start(retries int) error
+	Stop()
+	CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error
+	DeleteTopic(topic *Topic) error
+	Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error)
+	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
+	Send(msg interface{}, topic *Topic, keys ...string)
+}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 78a8a5a..25fc1b7 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -19,7 +19,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"github.com/Shopify/sarama"
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -37,73 +36,87 @@
 }
 
 const (
-	DefaultKafkaHost         = "10.100.198.240"
-	DefaultKafkaPort         = 9092
-	DefaultTopicName         = "Core"
-	DefaultSleepOnError      = 1
-	DefaultFlushFrequency    = 1
-	DefaultFlushMessages     = 1
-	DefaultFlushMaxmessages  = 1
-	DefaultMaxRetries        = 3
-	DefaultReturnSuccess     = false
-	DefaultReturnErrors      = true
-	DefaultConsumerMaxwait   = 50
-	DefaultMaxProcessingTime = 100
-	DefaultRequestTimeout    = 500 // 500 milliseconds - to handle a wider latency range
+	DefaultMaxRetries     = 3
+	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
 )
 
-type consumerChannels struct {
-	consumer sarama.PartitionConsumer
-	channels []chan *ca.InterContainerMessage
+// requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
+// obtained from that channel, this interface is invoked.   This is used to handle
+// async requests into the Core via the kafka messaging bus
+type requestHandlerChannel struct {
+	requesthandlerInterface interface{}
+	ch                      <-chan *ca.InterContainerMessage
 }
 
-// KafkaMessagingProxy represents the messaging proxy
-type KafkaMessagingProxy struct {
-	KafkaHost                     string
-	KafkaPort                     int
-	DefaultTopic                  *Topic
-	TargetInterface               interface{}
-	producer                      sarama.AsyncProducer
-	consumer                      sarama.Consumer
-	doneCh                        chan int
-	waitForResponseRoutineStarted bool
-	topicToConsumerChannelMap     map[string]*consumerChannels
-	transactionIdToChannelMap     map[string]chan *ca.InterContainerMessage
-	lockTopicToConsumerChannelMap sync.RWMutex
+// transactionChannel represents a combination of a topic and a channel onto which a response received
+// on the kafka bus will be sent to
+type transactionChannel struct {
+	topic *Topic
+	ch    chan *ca.InterContainerMessage
+}
+
+// InterContainerProxy represents the messaging proxy
+type InterContainerProxy struct {
+	kafkaHost                      string
+	kafkaPort                      int
+	DefaultTopic                   *Topic
+	defaultRequestHandlerInterface interface{}
+	kafkaClient                    Client
+	doneCh                         chan int
+
+	// This map is used to map a topic to an interface and channel.   When a request is received
+	// on that channel (registered to the topic) then that interface is invoked.
+	topicToRequestHandlerChannelMap   map[string]*requestHandlerChannel
+	lockTopicRequestHandlerChannelMap sync.RWMutex
+
+	// This map is used to map a channel to a response topic.   This channel handles all responses on that
+	// channel for that topic and forward them to the appropriate consumer channel, using the
+	// transactionIdToChannelMap.
+	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
+	lockTopicResponseChannelMap sync.RWMutex
+
+	// This map is used to map a transaction to a consumer channel.  This is used whenever a request has been
+	// sent out and we are waiting for a response.
+	transactionIdToChannelMap     map[string]*transactionChannel
 	lockTransactionIdToChannelMap sync.RWMutex
 }
 
-type KafkaProxyOption func(*KafkaMessagingProxy)
+type InterContainerProxyOption func(*InterContainerProxy)
 
-func KafkaHost(host string) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.KafkaHost = host
+func InterContainerHost(host string) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaHost = host
 	}
 }
 
-func KafkaPort(port int) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.KafkaPort = port
+func InterContainerPort(port int) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaPort = port
 	}
 }
 
-func DefaultTopic(topic *Topic) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
+func DefaultTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
 		args.DefaultTopic = topic
 	}
 }
 
-func TargetInterface(target interface{}) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.TargetInterface = target
+func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.defaultRequestHandlerInterface = handler
 	}
 }
 
-func NewKafkaMessagingProxy(opts ...KafkaProxyOption) (*KafkaMessagingProxy, error) {
-	proxy := &KafkaMessagingProxy{
-		KafkaHost:    DefaultKafkaHost,
-		KafkaPort:    DefaultKafkaPort,
-		DefaultTopic: &Topic{Name: DefaultTopicName},
+func MsgClient(client Client) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaClient = client
+	}
+}
+
+func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
+	proxy := &InterContainerProxy{
+		kafkaHost: DefaultKafkaHost,
+		kafkaPort: DefaultKafkaPort,
 	}
 
 	for _, option := range opts {
@@ -111,59 +124,65 @@
 	}
 
 	// Create the locks for all the maps
-	proxy.lockTopicToConsumerChannelMap = sync.RWMutex{}
+	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
 	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
+	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
 
 	return proxy, nil
 }
 
-func (kp *KafkaMessagingProxy) Start() error {
+func (kp *InterContainerProxy) Start() error {
 	log.Info("Starting-Proxy")
 
+	// Kafka MsgClient should already have been created.  If not, output fatal error
+	if kp.kafkaClient == nil {
+		log.Fatal("kafka-client-not-set")
+	}
+
 	// Create the Done channel
 	kp.doneCh = make(chan int, 1)
 
-	// Create the Publisher
-	if err := kp.createPublisher(DefaultMaxRetries); err != nil {
-		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+	// Start the kafka client
+	if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
 
-	// Create the master consumer
-	if err := kp.createConsumer(DefaultMaxRetries); err != nil {
-		log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
-		return err
-	}
-
-	// Create the topic to consumer/channel map
-	kp.topicToConsumerChannelMap = make(map[string]*consumerChannels)
-
+	// Create the topic to response channel map
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+	//
 	// Create the transactionId to Channel Map
-	kp.transactionIdToChannelMap = make(map[string]chan *ca.InterContainerMessage)
+	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
+
+	// Create the topic to request channel map
+	kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
 
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) Stop() {
-	log.Info("Stopping-Proxy")
-	if kp.producer != nil {
-		if err := kp.producer.Close(); err != nil {
-			panic(err)
-		}
-	}
-	if kp.consumer != nil {
-		if err := kp.consumer.Close(); err != nil {
-			panic(err)
-		}
-	}
-	//Close the done channel to close all long processing Go routines
-	close(kp.doneCh)
+func (kp *InterContainerProxy) Stop() {
+	log.Info("stopping-intercontainer-proxy")
+	kp.doneCh <- 1
+	// TODO : Perform cleanup
+	//kp.kafkaClient.Stop()
+	//kp.deleteAllTopicRequestHandlerChannelMap()
+	//kp.deleteAllTopicResponseChannelMap()
+	//kp.deleteAllTransactionIdToChannelMap()
 }
 
-func (kp *KafkaMessagingProxy) InvokeRPC(ctx context.Context, rpc string, topic *Topic, waitForResponse bool,
-	kvArgs ...*KVArg) (bool, *any.Any) {
+// InvokeRPC is used to send a request to a given topic
+func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+
+	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
+	// typically the device ID.
+	responseTopic := replyToTopic
+	if responseTopic == nil {
+		responseTopic = kp.DefaultTopic
+	}
+
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, topic, kp.DefaultTopic, kvArgs...)
+	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
 	if err != nil {
 		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
@@ -173,13 +192,17 @@
 	var ch <-chan *ca.InterContainerMessage
 	if waitForResponse {
 		var err error
-		if ch, err = kp.subscribeForResponse(*kp.DefaultTopic, protoRequest.Header.Id); err != nil {
-			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "topic": topic.Name})
+		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 		}
 	}
 
-	// Send request
-	go kp.sendToKafkaTopic(protoRequest, topic)
+	// Send request - if the topic is formatted with a device Id then we will send the request using a
+	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
+	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
+	key := GetDeviceIdFromTopic(*toTopic)
+	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
+	go kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
 		// Create a child context based on the parent context, if any
@@ -197,8 +220,7 @@
 		defer kp.unSubscribeForResponse(protoRequest.Header.Id)
 		select {
 		case msg := <-ch:
-			log.Debugw("received-response", log.Fields{"rpc": rpc, "msg": msg})
-
+			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ca.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
@@ -224,194 +246,183 @@
 			}
 			return false, marshalledArg
 		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
+			log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 			return true, nil
 		}
 	}
 	return true, nil
 }
 
-// Subscribe allows a caller to subscribe to a given topic.  A channel is returned to the
-// caller to receive messages from that topic.
-func (kp *KafkaMessagingProxy) Subscribe(topic Topic) (<-chan *ca.InterContainerMessage, error) {
-
-	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
-
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
-		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
-		// Create a channel specific for that consumer and add it to the consumer channel map
-		ch := make(chan *ca.InterContainerMessage)
-		kp.addChannelToConsumerChannelMap(topic, ch)
-		return ch, nil
-	}
-
-	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
-	var err error
-	if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
-		log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	return consumerListeningChannel, nil
-}
-
-func (kp *KafkaMessagingProxy) UnSubscribe(topic Topic, ch <-chan *ca.InterContainerMessage) error {
-	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
-	err := kp.removeChannelFromConsumerChannelMap(topic, ch)
-	return err
-}
-
-// SubscribeWithTarget allows a caller to assign a target object to be invoked automatically
+// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *KafkaMessagingProxy) SubscribeWithTarget(topic Topic, targetInterface interface{}) error {
+func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.Subscribe(topic); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		//if ch, err = kp.Subscribe(topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
+
+	kp.defaultRequestHandlerInterface = handler
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, targetInterface)
+	go kp.waitForRequest(ch, topic, handler)
 
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) UnSubscribeTarget(ctx context.Context, topic Topic, targetInterface interface{}) error {
-	// TODO - mostly relevant with multiple interfaces
+// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
+// when a message is received on a given topic.  So far there is only 1 target registered per microservice
+func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
+	// Subscribe to receive messages for that topic
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+	}
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
+
+	// Launch a go routine to receive and process kafka messages
+	go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
+
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
-		kp.topicToConsumerChannelMap[id] = arg
+func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+}
+
+// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// responses from that topic.
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
+		kp.topicToResponseChannelMap[topic] = arg
 	}
 }
 
-func (kp *KafkaMessagingProxy) deleteFromTopicToConsumerChannelMap(id string) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := kp.topicToConsumerChannelMap[id]; exist {
-		delete(kp.topicToConsumerChannelMap, id)
+func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	_, exist := kp.topicToResponseChannelMap[topic]
+	return exist
+}
+
+func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		var err error
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
+		return err
+	} else {
+		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
 	}
 }
 
-func (kp *KafkaMessagingProxy) getConsumerChannel(topic Topic) *consumerChannels {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		return consumerCh
+func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	var err error
+	for topic, _ := range kp.topicToResponseChannelMap {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return nil
+	return err
 }
 
-func (kp *KafkaMessagingProxy) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		consumerCh.channels = append(consumerCh.channels, ch)
-		return
+func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
+		kp.topicToRequestHandlerChannelMap[topic] = arg
 	}
-	log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
 }
 
-func (kp *KafkaMessagingProxy) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		// Channel will be closed in the removeChannel method
-		consumerCh.channels = removeChannel(consumerCh.channels, ch)
+func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
+		// Close the kafka client client first by unsubscribing to this topic
+		kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
+		delete(kp.topicToRequestHandlerChannelMap, topic)
 		return nil
+	} else {
+		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
 	}
-	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
-	return errors.New("topic-does-not-exist")
 }
 
-func (kp *KafkaMessagingProxy) addToTransactionIdToChannelMap(id string, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	var err error
+	for topic, _ := range kp.topicToRequestHandlerChannelMap {
+		// Close the kafka client client first by unsubscribing to this topic
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+		}
+		delete(kp.topicToRequestHandlerChannelMap, topic)
+	}
+	return err
+}
+
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
-		kp.transactionIdToChannelMap[id] = arg
+		kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
 	}
 }
 
-func (kp *KafkaMessagingProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
-	if _, exist := kp.transactionIdToChannelMap[id]; exist {
+	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
+		// Close the channel first
+		close(transChannel.ch)
 		delete(kp.transactionIdToChannelMap, id)
 	}
 }
 
-func (kp *KafkaMessagingProxy) createPublisher(retries int) error {
-	// This Creates the publisher
-	config := sarama.NewConfig()
-	config.Producer.Partitioner = sarama.NewRandomPartitioner
-	config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
-	config.Producer.Flush.Messages = DefaultFlushMessages
-	config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
-	config.Producer.Return.Errors = DefaultReturnErrors
-	config.Producer.Return.Successes = DefaultReturnSuccess
-	config.Producer.RequiredAcks = sarama.WaitForAll
-	kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
-	brokers := []string{kafkaFullAddr}
-
-	for {
-		producer, err := sarama.NewAsyncProducer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-publisher", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			kp.producer = producer
-			break
+func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		if value.topic.Name == id {
+			close(value.ch)
+			delete(kp.transactionIdToChannelMap, key)
 		}
 	}
-	log.Info("Kafka-publisher-created")
-	return nil
 }
 
-func (kp *KafkaMessagingProxy) createConsumer(retries int) error {
-	config := sarama.NewConfig()
-	config.Consumer.Return.Errors = true
-	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
-	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
-	config.Consumer.Offsets.Initial = sarama.OffsetNewest
-	kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
-	brokers := []string{kafkaFullAddr}
-
-	for {
-		consumer, err := sarama.NewConsumer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-consumer", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			kp.consumer = consumer
-			break
-		}
+func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		close(value.ch)
+		delete(kp.transactionIdToChannelMap, key)
 	}
-	log.Info("Kafka-consumer-created")
-	return nil
 }
 
-func encodeReturnedValue(request *ca.InterContainerMessage, returnedVal interface{}) (*any.Any, error) {
+func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+	// If we have any consumers on that topic we need to close them
+	kp.deleteFromTopicResponseChannelMap(topic.Name)
+	kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+	return kp.kafkaClient.DeleteTopic(&topic)
+}
+
+func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
 	// Encode the response argument - needs to be a proto message
 	if returnedVal == nil {
 		return nil, nil
@@ -462,8 +473,7 @@
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
 func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
-
-	log.Infow("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
 	responseHeader := &ca.Header{
 		Id:        request.Header.Id,
 		Type:      ca.MessageType_RESPONSE,
@@ -476,7 +486,7 @@
 	var marshalledReturnedVal *any.Any
 	var err error
 	for _, returnVal := range returnedValues {
-		if marshalledReturnedVal, err = encodeReturnedValue(request, returnVal); err != nil {
+		if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
 			log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
 		}
 		break // for now we support only 1 returned value - (excluding the error)
@@ -504,7 +514,7 @@
 	myClassValue := reflect.ValueOf(myClass)
 	m := myClassValue.MethodByName(funcName)
 	if !m.IsValid() {
-		return make([]reflect.Value, 0), fmt.Errorf("Method not found \"%s\"", funcName)
+		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
 	}
 	in := make([]reflect.Value, len(params))
 	for i, param := range params {
@@ -514,11 +524,10 @@
 	return
 }
 
-func (kp *KafkaMessagingProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
 
-	// First extract the header to know whether this is a request of a response
+	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ca.MessageType_REQUEST {
-		log.Debugw("received-request", log.Fields{"header": msg.Header})
 
 		var out []reflect.Value
 		var err error
@@ -528,6 +537,7 @@
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
+			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
@@ -545,7 +555,7 @@
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
-				log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
+				//log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
 				returnedValues = make([]interface{}, 0)
 				// Check for errors first
 				lastIndex := len(out) - 1
@@ -560,7 +570,7 @@
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
-						log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						//log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
 						if idx != lastIndex {
 							returnedValues = append(returnedValues, val.Interface())
 						}
@@ -573,185 +583,94 @@
 				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
 			}
-			log.Debugw("sending-to-kafka", log.Fields{"msg": icm, "send-to-topic": msg.Header.FromTopic})
-			kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
+			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
+			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
+			// present then the key will be empty, hence all messages for a given topic will be sent to all
+			// partitions.
+			replyTopic := &Topic{Name: msg.Header.FromTopic}
+			key := GetDeviceIdFromTopic(*replyTopic)
+			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
+			// TODO: handle error response.
+			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 
-	} else if msg.Header.Type == ca.MessageType_RESPONSE {
-		log.Warnw("received-response-on-request-handler", log.Fields{"header": msg.Header})
-	} else {
-		log.Errorw("invalid-message", log.Fields{"header": msg.Header})
 	}
 }
 
-func (kp *KafkaMessagingProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
-		log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
 		go kp.handleRequest(msg, targetInterface)
 	}
 }
 
-// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
-// topic via the unique channel each subsciber received during subscription
-func (kp *KafkaMessagingProxy) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
-	// Need to go over all channels and publish messages to them - do we need to copy msg?
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	for _, ch := range consumerCh.channels {
-		go func(c chan *ca.InterContainerMessage) {
-			c <- protoMessage
-		}(ch)
-	}
-}
-
-func (kp *KafkaMessagingProxy) consumeMessagesLoop(topic Topic) {
-	log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
-	var consumerCh *consumerChannels
-	if consumerCh = kp.getConsumerChannel(topic); consumerCh == nil {
-		log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
-		return
-	}
-startloop:
-	for {
-		select {
-		case err := <-consumerCh.consumer.Errors():
-			log.Warnw("consumer-error", log.Fields{"error": err})
-		case msg := <-consumerCh.consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg})
-			// Since the only expected message is a proto intercontainermessage then extract it right away
-			// instead of dispatching it to the consumers
-			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				log.Warnw("invalid-message", log.Fields{"error": err})
-				continue
-			}
-			if icm.Header.Type == ca.MessageType_REQUEST {
-				log.Debugw("request-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-				go kp.dispatchToConsumers(consumerCh, icm)
-			} else if icm.Header.Type == ca.MessageType_RESPONSE {
-				log.Debugw("response-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-				go kp.dispatchResponse(icm)
-			} else {
-				log.Debugw("unsupported-msg-received", log.Fields{"msg": *icm})
-			}
-			//// TODO:  Dispatch requests and responses separately
-			//go kp.dispatchToConsumers(consumerCh, icm)
-		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
-			break startloop
-		}
-	}
-}
-
-func (kp *KafkaMessagingProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
 		log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return
 	}
-	kp.transactionIdToChannelMap[msg.Header.Id] <- msg
+	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
 }
 
-func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
-	log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
-	kp.waitForResponseRoutineStarted = true
+// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
+// and then dispatches to the consumer
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
-		case msg := <-ch:
-			log.Debugw("message-received", log.Fields{"topic": topic.Name, "msg": msg})
-			go kp.dispatchResponse(msg)
-			//	Need to handle program exit - TODO
+		case msg := <-subscribedCh:
+			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+			if msg.Header.Type == ca.MessageType_RESPONSE {
+				go kp.dispatchResponse(msg)
+			}
 		case <-kp.doneCh:
 			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-}
-
-// createConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
-// for that topic.  It also starts the routine that listens for messages on that topic.
-func (kp *KafkaMessagingProxy) setupConsumerChannel(topic Topic) (chan *ca.InterContainerMessage, error) {
-
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
-		return nil, nil // Already created, so just ignore
-	}
-
-	partitionList, err := kp.consumer.Partitions(topic.Name)
-	if err != nil {
-		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
-	// Create a partition consumer for that topic - for now just use one partition
-	var pConsumer sarama.PartitionConsumer
-	if pConsumer, err = kp.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
-		log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	// Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
-	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
-	cc := &consumerChannels{
-		consumer: pConsumer,
-		channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
-	}
-
-	// Add the consumer channel to the map
-	kp.addTopicToConsumerChannelMap(topic.Name, cc)
-
-	//Start a consumer to listen on that specific topic
-	go kp.consumeMessagesLoop(topic)
-
-	return consumerListeningChannel, nil
+	//log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+	//	We got an exit signal.  Unsubscribe to the channel
+	//kp.kafkaClient.UnSubscribe(topic, subscribedCh)
 }
 
 // subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
-		log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
+	// First check whether we already have a channel listening for response on that topic.  If there is
+	// already one then it will be reused.  If not, it will be created.
+	if !kp.isTopicSubscribedForResponse(topic.Name) {
+		var subscribedCh <-chan *ca.InterContainerMessage
 		var err error
-
-		if _, err = kp.setupConsumerChannel(topic); err != nil {
-			log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
 			return nil, err
 		}
+		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
+		go kp.waitForResponseLoop(subscribedCh, &topic)
 	}
 
+	// Create a specific channel for this consumer.  We cannot use the channel from the kafkaclient as it will
+	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ca.InterContainerMessage)
-	kp.addToTransactionIdToChannelMap(trnsId, ch)
+	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
 
 	return ch, nil
 }
 
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
-	var i int
-	var channel chan *ca.InterContainerMessage
-	for i, channel = range channels {
-		if channel == ch {
-			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
-			close(channel)
-			return channels[:len(channels)-1]
-		}
-	}
-	return channels
-}
-
-func (kp *KafkaMessagingProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
 	log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
-	// Close the channel first
-	close(kp.transactionIdToChannelMap[trnsId])
-	kp.deleteFromTransactionIdToChannelMap(trnsId)
+	if _, exist := kp.transactionIdToChannelMap[trnsId]; exist {
+		// The delete operation will close the channel
+		kp.deleteFromTransactionIdToChannelMap(trnsId)
+	}
 	return nil
 }
 
@@ -809,34 +728,14 @@
 	return request, nil
 }
 
-// sendRequest formats and sends the request onto the kafka messaging bus.  It waits for the
-// response if needed.  This function must, therefore, be run in its own routine.
-func (kp *KafkaMessagingProxy) sendToKafkaTopic(msg *ca.InterContainerMessage, topic *Topic) {
-
-	//	Create the Sarama producer message
-	time := time.Now().Unix()
-	marshalled, _ := proto.Marshal(msg)
-	kafkaMsg := &sarama.ProducerMessage{
-		Topic: topic.Name,
-		Key:   sarama.StringEncoder(time),
-		Value: sarama.ByteEncoder(marshalled),
-	}
-
-	// Send message to kafka
-	kp.producer.Input() <- kafkaMsg
-
-}
-
 func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
 	//	Extract the message body
 	responseBody := ca.InterContainerResponseBody{}
-
-	log.Debugw("decodeResponse", log.Fields{"icr": &response})
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
 	}
-	log.Debugw("decodeResponse", log.Fields{"icrbody": &responseBody})
+	//log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
 
 	return &responseBody, nil
 
diff --git a/kafka/kafka_inter_container_library_test.go b/kafka/kafka_inter_container_library_test.go
index 6a3bb37..790425e 100644
--- a/kafka/kafka_inter_container_library_test.go
+++ b/kafka/kafka_inter_container_library_test.go
@@ -21,38 +21,35 @@
 )
 
 func TestDefaultKafkaProxy(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy()
+	actualResult, error := NewInterContainerProxy()
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionHost(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy(KafkaHost("10.20.30.40"))
+	actualResult, error := NewInterContainerProxy(InterContainerHost("10.20.30.40"))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, "10.20.30.40")
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionPort(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy(KafkaPort(1020))
+	actualResult, error := NewInterContainerProxy(InterContainerPort(1020))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, 1020)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, 1020)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionTopic(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy(DefaultTopic(&Topic{Name: "Adapter"}))
+	actualResult, error := NewInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 	assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
 }
 
@@ -64,24 +61,23 @@
 
 func TestKafkaProxyOptionTargetInterface(t *testing.T) {
 	var m *myInterface
-	actualResult, error := NewKafkaMessagingProxy(TargetInterface(m))
+	actualResult, error := NewInterContainerProxy(RequestHandlerInterface(m))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, m)
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
 }
 
 func TestKafkaProxyChangeAllOptions(t *testing.T) {
 	var m *myInterface
-	actualResult, error := NewKafkaMessagingProxy(
-		KafkaHost("10.20.30.40"),
-		KafkaPort(1020),
+	actualResult, error := NewInterContainerProxy(
+		InterContainerHost("10.20.30.40"),
+		InterContainerPort(1020),
 		DefaultTopic(&Topic{Name: "Adapter"}),
-		TargetInterface(m))
+		RequestHandlerInterface(m))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, "10.20.30.40")
-	assert.Equal(t, actualResult.KafkaPort, 1020)
-	assert.Equal(t, actualResult.TargetInterface, m)
+	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
+	assert.Equal(t, actualResult.kafkaPort, 1020)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
 	assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
 }
diff --git a/kafka/messaging_interface.go b/kafka/messaging_interface.go
deleted file mode 100644
index 78d9e75..0000000
--- a/kafka/messaging_interface.go
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-// A Topic definition - may be augmented with additional attributes eventually
-type Topic struct {
-	// The name of the topic. It must start with a letter,
-	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
-	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
-	// signs (`%`).
-	Name string
-}
-
-type KVArg struct {
-	Key   string
-	Value interface{}
-}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
new file mode 100644
index 0000000..eed0588
--- /dev/null
+++ b/kafka/sarama_client.go
@@ -0,0 +1,537 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"errors"
+	"fmt"
+	"github.com/Shopify/sarama"
+	scc "github.com/bsm/sarama-cluster"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-go/common/log"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"sync"
+	"time"
+)
+
+// consumerChannels represents a consumer listening on a kafka topic.  Once it receives a message on that
+// topic it broadcasts the message to all the listening channels
+type consumerChannels struct {
+	consumer sarama.PartitionConsumer
+	//consumer *sc.Consumer
+	channels []chan *ca.InterContainerMessage
+}
+
+// SaramaClient represents the messaging proxy
+type SaramaClient struct {
+	broker                        *sarama.Broker
+	client                        sarama.Client
+	KafkaHost                     string
+	KafkaPort                     int
+	producer                      sarama.AsyncProducer
+	consumer                      sarama.Consumer
+	groupConsumer                 *scc.Consumer
+	doneCh                        chan int
+	topicToConsumerChannelMap     map[string]*consumerChannels
+	lockTopicToConsumerChannelMap sync.RWMutex
+}
+
+type SaramaClientOption func(*SaramaClient)
+
+func Host(host string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.KafkaHost = host
+	}
+}
+
+func Port(port int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.KafkaPort = port
+	}
+}
+
+func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
+	client := &SaramaClient{
+		KafkaHost: DefaultKafkaHost,
+		KafkaPort: DefaultKafkaPort,
+	}
+
+	for _, option := range opts {
+		option(client)
+	}
+
+	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
+
+	return client
+}
+
+func (sc *SaramaClient) Start(retries int) error {
+	log.Info("Starting-Proxy")
+
+	// Create the Done channel
+	sc.doneCh = make(chan int, 1)
+
+	// Create the Publisher
+	if err := sc.createPublisher(retries); err != nil {
+		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+		return err
+	}
+
+	// Create the master consumer
+	if err := sc.createConsumer(retries); err != nil {
+		log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
+		return err
+	}
+
+	// Create the topic to consumer/channel map
+	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+
+	return nil
+}
+
+func (sc *SaramaClient) Stop() {
+	log.Info("stopping-sarama-client")
+
+	//Send a message over the done channel to close all long running routines
+	sc.doneCh <- 1
+
+	// Clear the consumer map
+	//sc.clearConsumerChannelMap()
+
+	if sc.producer != nil {
+		if err := sc.producer.Close(); err != nil {
+			panic(err)
+		}
+	}
+	if sc.consumer != nil {
+		if err := sc.consumer.Close(); err != nil {
+			panic(err)
+		}
+	}
+}
+
+func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
+		sc.topicToConsumerChannelMap[id] = arg
+	}
+}
+
+func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if _, exist := sc.topicToConsumerChannelMap[id]; exist {
+		delete(sc.topicToConsumerChannelMap, id)
+	}
+}
+
+func (sc *SaramaClient) getConsumerChannel(topic Topic) *consumerChannels {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		return consumerCh
+	}
+	return nil
+}
+
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		consumerCh.channels = append(consumerCh.channels, ch)
+		return
+	}
+	log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
+}
+
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		// Channel will be closed in the removeChannel method
+		consumerCh.channels = removeChannel(consumerCh.channels, ch)
+		// If there are no more channels then we can close the consumer itself
+		if len(consumerCh.channels) == 0 {
+			log.Debugw("closing-consumer", log.Fields{"topic": topic})
+			err := consumerCh.consumer.Close()
+			delete(sc.topicToConsumerChannelMap, topic.Name)
+			return err
+		}
+		return nil
+	}
+	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return errors.New("topic-does-not-exist")
+}
+
+func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		for _, ch := range consumerCh.channels {
+			// Channel will be closed in the removeChannel method
+			removeChannel(consumerCh.channels, ch)
+		}
+		err := consumerCh.consumer.Close()
+		delete(sc.topicToConsumerChannelMap, topic.Name)
+		return err
+	}
+	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return errors.New("topic-does-not-exist")
+}
+
+func (sc *SaramaClient) clearConsumerChannelMap() error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	var err error
+	for topic, consumerCh := range sc.topicToConsumerChannelMap {
+		for _, ch := range consumerCh.channels {
+			// Channel will be closed in the removeChannel method
+			removeChannel(consumerCh.channels, ch)
+		}
+		err = consumerCh.consumer.Close()
+		delete(sc.topicToConsumerChannelMap, topic)
+	}
+	return err
+}
+
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error {
+	// This Creates the kafka topic
+	// Set broker configuration
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	broker := sarama.NewBroker(kafkaFullAddr)
+
+	// Additional configurations. Check sarama doc for more info
+	config := sarama.NewConfig()
+	config.Version = sarama.V1_0_0_0
+
+	// Open broker connection with configs defined above
+	broker.Open(config)
+
+	// check if the connection was OK
+	_, err := broker.Connected()
+	if err != nil {
+		return err
+	}
+
+	topicDetail := &sarama.TopicDetail{}
+	topicDetail.NumPartitions = int32(numPartition)
+	topicDetail.ReplicationFactor = int16(repFactor)
+	topicDetail.ConfigEntries = make(map[string]*string)
+
+	topicDetails := make(map[string]*sarama.TopicDetail)
+	topicDetails[topic.Name] = topicDetail
+
+	request := sarama.CreateTopicsRequest{
+		Timeout:      time.Second * 1,
+		TopicDetails: topicDetails,
+	}
+
+	for {
+		// Send request to Broker
+		if response, err := broker.CreateTopics(&request); err != nil {
+			if retries == 0 {
+				log.Errorw("error-creating-topic", log.Fields{"error": err})
+				return err
+			} else {
+				// If retries is -ve then we will retry indefinitely
+				retries--
+			}
+			log.Debug("retrying-after-a-second-delay")
+			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
+		} else {
+			log.Debug("topic-response", log.Fields{"response": response})
+			break
+		}
+	}
+
+	log.Debug("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+	return nil
+}
+
+func (sc *SaramaClient) createPublisher(retries int) error {
+	// This Creates the publisher
+	config := sarama.NewConfig()
+	config.Producer.Partitioner = sarama.NewRandomPartitioner
+	config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
+	config.Producer.Flush.Messages = DefaultFlushMessages
+	config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
+	config.Producer.Return.Errors = DefaultReturnErrors
+	config.Producer.Return.Successes = DefaultReturnSuccess
+	config.Producer.RequiredAcks = sarama.WaitForAll
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	brokers := []string{kafkaFullAddr}
+
+	for {
+		producer, err := sarama.NewAsyncProducer(brokers, config)
+		if err != nil {
+			if retries == 0 {
+				log.Errorw("error-starting-publisher", log.Fields{"error": err})
+				return err
+			} else {
+				// If retries is -ve then we will retry indefinitely
+				retries--
+			}
+			log.Info("retrying-after-a-second-delay")
+			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
+		} else {
+			sc.producer = producer
+			break
+		}
+	}
+	log.Info("Kafka-publisher-created")
+	return nil
+}
+
+func (sc *SaramaClient) createConsumer(retries int) error {
+	config := sarama.NewConfig()
+	config.Consumer.Return.Errors = true
+	config.Consumer.Fetch.Min = 1
+	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	brokers := []string{kafkaFullAddr}
+
+	for {
+		consumer, err := sarama.NewConsumer(brokers, config)
+		if err != nil {
+			if retries == 0 {
+				log.Errorw("error-starting-consumer", log.Fields{"error": err})
+				return err
+			} else {
+				// If retries is -ve then we will retry indefinitely
+				retries--
+			}
+			log.Info("retrying-after-a-second-delay")
+			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
+		} else {
+			sc.consumer = consumer
+			break
+		}
+	}
+	log.Info("Kafka-consumer-created")
+	return nil
+}
+
+// createGroupConsumer creates a consumer group
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+	config := scc.NewConfig()
+	config.Group.Mode = scc.ConsumerModeMultiplex
+	config.Consumer.Return.Errors = true
+	config.Group.Return.Notifications = true
+	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	brokers := []string{kafkaFullAddr}
+
+	if groupId == nil {
+		g := DefaultGroupName
+		groupId = &g
+	}
+	topics := []string{topic.Name}
+	var consumer *scc.Consumer
+	var err error
+
+	// Create the topic with default attributes
+	// TODO: needs to be revisited
+	//sc.CreateTopic(&Topic{Name:topic.Name}, 3, 1, 1)
+
+	if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
+		log.Errorw("create-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		return nil, err
+	}
+	log.Debugw("create-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	//time.Sleep(10*time.Second)
+	sc.groupConsumer = consumer
+	return consumer, nil
+}
+
+// send formats and sends the request onto the kafka messaging bus.
+func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) {
+
+	// Assert message is a proto message
+	var protoMsg proto.Message
+	var ok bool
+	// ascertain the value interface type is a proto.Message
+	if protoMsg, ok = msg.(proto.Message); !ok {
+		log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+		return
+	}
+
+	//	Create the Sarama producer message
+	marshalled, _ := proto.Marshal(protoMsg)
+	key := ""
+	if len(keys) > 0 {
+		key = keys[0] // Only the first key is relevant
+	}
+	kafkaMsg := &sarama.ProducerMessage{
+		Topic: topic.Name,
+		Key:   sarama.StringEncoder(key),
+		Value: sarama.ByteEncoder(marshalled),
+	}
+
+	// Send message to kafka
+	sc.producer.Input() <- kafkaMsg
+}
+
+// Subscribe registers a caller to a topic.   It returns a channel that the caller can use to receive
+// messages from that topic
+func (sc *SaramaClient) Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error) {
+	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+
+	// If a consumer already exist for that topic then resuse it
+	if consumerCh := sc.getConsumerChannel(*topic); consumerCh != nil {
+		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+		// Create a channel specific for that consumer and add it to the consumer channel map
+		ch := make(chan *ca.InterContainerMessage)
+		sc.addChannelToConsumerChannelMap(*topic, ch)
+		return ch, nil
+	}
+
+	// Register for the topic and set it up
+	var consumerListeningChannel chan *ca.InterContainerMessage
+	var err error
+	if consumerListeningChannel, err = sc.setupConsumerChannel(topic); err != nil {
+		log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	return consumerListeningChannel, nil
+}
+
+// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
+// topic via the unique channel each subsciber received during subscription
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+	// Need to go over all channels and publish messages to them - do we need to copy msg?
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	for _, ch := range consumerCh.channels {
+		go func(c chan *ca.InterContainerMessage) {
+			c <- protoMessage
+		}(ch)
+	}
+}
+
+func (sc *SaramaClient) consumeMessagesLoop(topic Topic) {
+	log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
+	var consumerCh *consumerChannels
+	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
+		log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
+		return
+	}
+startloop:
+	for {
+		select {
+		case err := <-consumerCh.consumer.Errors():
+			log.Warnw("consumer-error", log.Fields{"error": err})
+		case msg := <-consumerCh.consumer.Messages():
+			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			// Since the only expected message is a proto intercontainermessage then extract it right away
+			// instead of dispatching it to the consumers
+			msgBody := msg.Value
+			icm := &ca.InterContainerMessage{}
+			if err := proto.Unmarshal(msgBody, icm); err != nil {
+				log.Warnw("invalid-message", log.Fields{"error": err})
+				continue
+			}
+			go sc.dispatchToConsumers(consumerCh, icm)
+
+			//consumerCh.consumer.MarkOffset(msg, "")
+			//// TODO:  Dispatch requests and responses separately
+		case <-sc.doneCh:
+			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
+			break startloop
+		}
+	}
+	log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+}
+
+// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+// for that topic.  It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupConsumerChannel(topic *Topic) (chan *ca.InterContainerMessage, error) {
+	// TODO:  Replace this development partition consumer with a group consumer
+	var pConsumer *sarama.PartitionConsumer
+	var err error
+	if pConsumer, err = sc.CreatePartionConsumer(topic, DefaultMaxRetries); err != nil {
+		log.Errorw("creating-partition-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	// Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
+	// unbuffered to verify race conditions.
+	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	cc := &consumerChannels{
+		consumer: *pConsumer,
+		channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+	}
+
+	// Add the consumer channel to the map
+	sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+	//Start a consumer to listen on that specific topic
+	go sc.consumeMessagesLoop(*topic)
+
+	return consumerListeningChannel, nil
+}
+
+func (sc *SaramaClient) CreatePartionConsumer(topic *Topic, retries int) (*sarama.PartitionConsumer, error) {
+	log.Debugw("creating-partition-consumer", log.Fields{"topic": topic.Name})
+	partitionList, err := sc.consumer.Partitions(topic.Name)
+	if err != nil {
+		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
+	// Create a partition consumer for that topic - for now just use one partition
+	var pConsumer sarama.PartitionConsumer
+	if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
+		log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+	log.Debugw("partition-consumer-created", log.Fields{"topic": topic.Name})
+	return &pConsumer, nil
+}
+
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
+	return err
+}
+
+func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+	var i int
+	var channel chan *ca.InterContainerMessage
+	for i, channel = range channels {
+		if channel == ch {
+			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
+			close(channel)
+			return channels[:len(channels)-1]
+		}
+	}
+	return channels
+}
+
+func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+	if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
+		log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+		return err
+	}
+	return nil
+}
diff --git a/kafka/utils.go b/kafka/utils.go
new file mode 100644
index 0000000..beac9f9
--- /dev/null
+++ b/kafka/utils.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"fmt"
+	"strings"
+)
+
+const (
+	TopicSeparator = "_"
+	DeviceIdLength = 24
+)
+
+// A Topic definition - may be augmented with additional attributes eventually
+type Topic struct {
+	// The name of the topic. It must start with a letter,
+	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
+	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
+	// signs (`%`).
+	Name string
+}
+
+type KVArg struct {
+	Key   string
+	Value interface{}
+}
+
+//CreateSubTopic concatenate a list of arguments together using underscores.
+func CreateSubTopic(args ...string) Topic {
+	topic := ""
+	for index, arg := range args {
+		if index == 0 {
+			topic = arg
+		} else {
+			topic = fmt.Sprintf("%s%s%s", topic, TopicSeparator, arg)
+		}
+	}
+	return Topic{Name: topic}
+}
+
+// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
+//			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
+func GetDeviceIdFromTopic(topic Topic) string {
+	pos := strings.LastIndex(topic.Name, TopicSeparator)
+	if pos == -1 {
+		return ""
+	}
+	adjustedPos := pos + len(TopicSeparator)
+	if adjustedPos >= len(topic.Name) {
+		return ""
+	}
+	deviceId := topic.Name[adjustedPos:len(topic.Name)]
+	if len(deviceId) != DeviceIdLength {
+		return ""
+	}
+	return deviceId
+}
