This update addresses the following:
1.  Decouple the kafka messaging proxy from the kafka client.  This
will allow us to try out different kafka clients as well as test
the client separately.
2. Create unique device topics for the core, olt adapter and onu
adapters.  This will ensure only cores and adapters handling these
devices will listens to the device messages.
3. Update the core with the latest device model APIs and changes.
While most of the model issues have been fixed, there is still an
issue with updating a child branch.   This will be dealt in a separate
update.

Change-Id: I622ef5c636d7466bb3adefaa4ac4c85d7c450bea
diff --git a/db/model/root.go b/db/model/root.go
index a05fbdd..c4339a4 100644
--- a/db/model/root.go
+++ b/db/model/root.go
@@ -48,7 +48,7 @@
 	Loading       bool
 	RevisionClass interface{}
 
-	mutex    sync.RWMutex
+	mutex sync.RWMutex
 }
 
 // NewRoot creates an new instance of a root object
@@ -116,12 +116,12 @@
 	for len(r.Callbacks) > 0 {
 		callback := r.Callbacks[0]
 		r.Callbacks = r.Callbacks[1:]
-		callback.Execute(nil)
+		go callback.Execute(nil)
 	}
 	for len(r.NotificationCallbacks) > 0 {
 		callback := r.NotificationCallbacks[0]
 		r.NotificationCallbacks = r.NotificationCallbacks[1:]
-		callback.Execute(nil)
+		go callback.Execute(nil)
 	}
 }
 
diff --git a/kafka/client.go b/kafka/client.go
new file mode 100644
index 0000000..cb33a35
--- /dev/null
+++ b/kafka/client.go
@@ -0,0 +1,45 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+)
+
+const (
+	DefaultKafkaHost         = "127.0.0.1"
+	DefaultKafkaPort         = 9092
+	DefaultGroupName         = "rw_core"
+	DefaultSleepOnError      = 1
+	DefaultFlushFrequency    = 1
+	DefaultFlushMessages     = 1
+	DefaultFlushMaxmessages  = 1
+	DefaultReturnSuccess     = false
+	DefaultReturnErrors      = true
+	DefaultConsumerMaxwait   = 10
+	DefaultMaxProcessingTime = 100
+)
+
+// MsgClient represents the set of APIs  a Kafka MsgClient must implement
+type Client interface {
+	Start(retries int) error
+	Stop()
+	CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error
+	DeleteTopic(topic *Topic) error
+	Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error)
+	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
+	Send(msg interface{}, topic *Topic, keys ...string)
+}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 78a8a5a..25fc1b7 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -19,7 +19,6 @@
 	"context"
 	"errors"
 	"fmt"
-	"github.com/Shopify/sarama"
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -37,73 +36,87 @@
 }
 
 const (
-	DefaultKafkaHost         = "10.100.198.240"
-	DefaultKafkaPort         = 9092
-	DefaultTopicName         = "Core"
-	DefaultSleepOnError      = 1
-	DefaultFlushFrequency    = 1
-	DefaultFlushMessages     = 1
-	DefaultFlushMaxmessages  = 1
-	DefaultMaxRetries        = 3
-	DefaultReturnSuccess     = false
-	DefaultReturnErrors      = true
-	DefaultConsumerMaxwait   = 50
-	DefaultMaxProcessingTime = 100
-	DefaultRequestTimeout    = 500 // 500 milliseconds - to handle a wider latency range
+	DefaultMaxRetries     = 3
+	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
 )
 
-type consumerChannels struct {
-	consumer sarama.PartitionConsumer
-	channels []chan *ca.InterContainerMessage
+// requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
+// obtained from that channel, this interface is invoked.   This is used to handle
+// async requests into the Core via the kafka messaging bus
+type requestHandlerChannel struct {
+	requesthandlerInterface interface{}
+	ch                      <-chan *ca.InterContainerMessage
 }
 
-// KafkaMessagingProxy represents the messaging proxy
-type KafkaMessagingProxy struct {
-	KafkaHost                     string
-	KafkaPort                     int
-	DefaultTopic                  *Topic
-	TargetInterface               interface{}
-	producer                      sarama.AsyncProducer
-	consumer                      sarama.Consumer
-	doneCh                        chan int
-	waitForResponseRoutineStarted bool
-	topicToConsumerChannelMap     map[string]*consumerChannels
-	transactionIdToChannelMap     map[string]chan *ca.InterContainerMessage
-	lockTopicToConsumerChannelMap sync.RWMutex
+// transactionChannel represents a combination of a topic and a channel onto which a response received
+// on the kafka bus will be sent to
+type transactionChannel struct {
+	topic *Topic
+	ch    chan *ca.InterContainerMessage
+}
+
+// InterContainerProxy represents the messaging proxy
+type InterContainerProxy struct {
+	kafkaHost                      string
+	kafkaPort                      int
+	DefaultTopic                   *Topic
+	defaultRequestHandlerInterface interface{}
+	kafkaClient                    Client
+	doneCh                         chan int
+
+	// This map is used to map a topic to an interface and channel.   When a request is received
+	// on that channel (registered to the topic) then that interface is invoked.
+	topicToRequestHandlerChannelMap   map[string]*requestHandlerChannel
+	lockTopicRequestHandlerChannelMap sync.RWMutex
+
+	// This map is used to map a channel to a response topic.   This channel handles all responses on that
+	// channel for that topic and forward them to the appropriate consumer channel, using the
+	// transactionIdToChannelMap.
+	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
+	lockTopicResponseChannelMap sync.RWMutex
+
+	// This map is used to map a transaction to a consumer channel.  This is used whenever a request has been
+	// sent out and we are waiting for a response.
+	transactionIdToChannelMap     map[string]*transactionChannel
 	lockTransactionIdToChannelMap sync.RWMutex
 }
 
-type KafkaProxyOption func(*KafkaMessagingProxy)
+type InterContainerProxyOption func(*InterContainerProxy)
 
-func KafkaHost(host string) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.KafkaHost = host
+func InterContainerHost(host string) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaHost = host
 	}
 }
 
-func KafkaPort(port int) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.KafkaPort = port
+func InterContainerPort(port int) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaPort = port
 	}
 }
 
-func DefaultTopic(topic *Topic) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
+func DefaultTopic(topic *Topic) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
 		args.DefaultTopic = topic
 	}
 }
 
-func TargetInterface(target interface{}) KafkaProxyOption {
-	return func(args *KafkaMessagingProxy) {
-		args.TargetInterface = target
+func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.defaultRequestHandlerInterface = handler
 	}
 }
 
-func NewKafkaMessagingProxy(opts ...KafkaProxyOption) (*KafkaMessagingProxy, error) {
-	proxy := &KafkaMessagingProxy{
-		KafkaHost:    DefaultKafkaHost,
-		KafkaPort:    DefaultKafkaPort,
-		DefaultTopic: &Topic{Name: DefaultTopicName},
+func MsgClient(client Client) InterContainerProxyOption {
+	return func(args *InterContainerProxy) {
+		args.kafkaClient = client
+	}
+}
+
+func NewInterContainerProxy(opts ...InterContainerProxyOption) (*InterContainerProxy, error) {
+	proxy := &InterContainerProxy{
+		kafkaHost: DefaultKafkaHost,
+		kafkaPort: DefaultKafkaPort,
 	}
 
 	for _, option := range opts {
@@ -111,59 +124,65 @@
 	}
 
 	// Create the locks for all the maps
-	proxy.lockTopicToConsumerChannelMap = sync.RWMutex{}
+	proxy.lockTopicRequestHandlerChannelMap = sync.RWMutex{}
 	proxy.lockTransactionIdToChannelMap = sync.RWMutex{}
+	proxy.lockTopicResponseChannelMap = sync.RWMutex{}
 
 	return proxy, nil
 }
 
-func (kp *KafkaMessagingProxy) Start() error {
+func (kp *InterContainerProxy) Start() error {
 	log.Info("Starting-Proxy")
 
+	// Kafka MsgClient should already have been created.  If not, output fatal error
+	if kp.kafkaClient == nil {
+		log.Fatal("kafka-client-not-set")
+	}
+
 	// Create the Done channel
 	kp.doneCh = make(chan int, 1)
 
-	// Create the Publisher
-	if err := kp.createPublisher(DefaultMaxRetries); err != nil {
-		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+	// Start the kafka client
+	if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
 
-	// Create the master consumer
-	if err := kp.createConsumer(DefaultMaxRetries); err != nil {
-		log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
-		return err
-	}
-
-	// Create the topic to consumer/channel map
-	kp.topicToConsumerChannelMap = make(map[string]*consumerChannels)
-
+	// Create the topic to response channel map
+	kp.topicToResponseChannelMap = make(map[string]<-chan *ca.InterContainerMessage)
+	//
 	// Create the transactionId to Channel Map
-	kp.transactionIdToChannelMap = make(map[string]chan *ca.InterContainerMessage)
+	kp.transactionIdToChannelMap = make(map[string]*transactionChannel)
+
+	// Create the topic to request channel map
+	kp.topicToRequestHandlerChannelMap = make(map[string]*requestHandlerChannel)
 
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) Stop() {
-	log.Info("Stopping-Proxy")
-	if kp.producer != nil {
-		if err := kp.producer.Close(); err != nil {
-			panic(err)
-		}
-	}
-	if kp.consumer != nil {
-		if err := kp.consumer.Close(); err != nil {
-			panic(err)
-		}
-	}
-	//Close the done channel to close all long processing Go routines
-	close(kp.doneCh)
+func (kp *InterContainerProxy) Stop() {
+	log.Info("stopping-intercontainer-proxy")
+	kp.doneCh <- 1
+	// TODO : Perform cleanup
+	//kp.kafkaClient.Stop()
+	//kp.deleteAllTopicRequestHandlerChannelMap()
+	//kp.deleteAllTopicResponseChannelMap()
+	//kp.deleteAllTransactionIdToChannelMap()
 }
 
-func (kp *KafkaMessagingProxy) InvokeRPC(ctx context.Context, rpc string, topic *Topic, waitForResponse bool,
-	kvArgs ...*KVArg) (bool, *any.Any) {
+// InvokeRPC is used to send a request to a given topic
+func (kp *InterContainerProxy) InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
+	waitForResponse bool, kvArgs ...*KVArg) (bool, *any.Any) {
+
+	//	If a replyToTopic is provided then we use it, otherwise just use the  default toTopic.  The replyToTopic is
+	// typically the device ID.
+	responseTopic := replyToTopic
+	if responseTopic == nil {
+		responseTopic = kp.DefaultTopic
+	}
+
 	// Encode the request
-	protoRequest, err := encodeRequest(rpc, topic, kp.DefaultTopic, kvArgs...)
+	protoRequest, err := encodeRequest(rpc, toTopic, responseTopic, kvArgs...)
 	if err != nil {
 		log.Warnw("cannot-format-request", log.Fields{"rpc": rpc, "error": err})
 		return false, nil
@@ -173,13 +192,17 @@
 	var ch <-chan *ca.InterContainerMessage
 	if waitForResponse {
 		var err error
-		if ch, err = kp.subscribeForResponse(*kp.DefaultTopic, protoRequest.Header.Id); err != nil {
-			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "topic": topic.Name})
+		if ch, err = kp.subscribeForResponse(*responseTopic, protoRequest.Header.Id); err != nil {
+			log.Errorw("failed-to-subscribe-for-response", log.Fields{"error": err, "toTopic": toTopic.Name})
 		}
 	}
 
-	// Send request
-	go kp.sendToKafkaTopic(protoRequest, topic)
+	// Send request - if the topic is formatted with a device Id then we will send the request using a
+	// specific key, hence ensuring a single partition is used to publish the request.  This ensures that the
+	// subscriber on that topic will receive the request in the order it was sent.  The key used is the deviceId.
+	key := GetDeviceIdFromTopic(*toTopic)
+	log.Debugw("sending-msg", log.Fields{"rpc": rpc, "toTopic": toTopic, "replyTopic": responseTopic, "key": key})
+	go kp.kafkaClient.Send(protoRequest, toTopic, key)
 
 	if waitForResponse {
 		// Create a child context based on the parent context, if any
@@ -197,8 +220,7 @@
 		defer kp.unSubscribeForResponse(protoRequest.Header.Id)
 		select {
 		case msg := <-ch:
-			log.Debugw("received-response", log.Fields{"rpc": rpc, "msg": msg})
-
+			log.Debugw("received-response", log.Fields{"rpc": rpc, "msgHeader": msg.Header})
 			var responseBody *ca.InterContainerResponseBody
 			var err error
 			if responseBody, err = decodeResponse(msg); err != nil {
@@ -224,194 +246,183 @@
 			}
 			return false, marshalledArg
 		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name, "rpc": rpc})
+			log.Infow("received-exit-signal", log.Fields{"toTopic": toTopic.Name, "rpc": rpc})
 			return true, nil
 		}
 	}
 	return true, nil
 }
 
-// Subscribe allows a caller to subscribe to a given topic.  A channel is returned to the
-// caller to receive messages from that topic.
-func (kp *KafkaMessagingProxy) Subscribe(topic Topic) (<-chan *ca.InterContainerMessage, error) {
-
-	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
-
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
-		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
-		// Create a channel specific for that consumer and add it to the consumer channel map
-		ch := make(chan *ca.InterContainerMessage)
-		kp.addChannelToConsumerChannelMap(topic, ch)
-		return ch, nil
-	}
-
-	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
-	var err error
-	if consumerListeningChannel, err = kp.setupConsumerChannel(topic); err != nil {
-		log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	return consumerListeningChannel, nil
-}
-
-func (kp *KafkaMessagingProxy) UnSubscribe(topic Topic, ch <-chan *ca.InterContainerMessage) error {
-	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
-	err := kp.removeChannelFromConsumerChannelMap(topic, ch)
-	return err
-}
-
-// SubscribeWithTarget allows a caller to assign a target object to be invoked automatically
+// SubscribeWithRequestHandlerInterface allows a caller to assign a target object to be invoked automatically
 // when a message is received on a given topic
-func (kp *KafkaMessagingProxy) SubscribeWithTarget(topic Topic, targetInterface interface{}) error {
+func (kp *InterContainerProxy) SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error {
 
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.Subscribe(topic); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		//if ch, err = kp.Subscribe(topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
+
+	kp.defaultRequestHandlerInterface = handler
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: handler, ch: ch})
 	// Launch a go routine to receive and process kafka messages
-	go kp.waitForRequest(ch, topic, targetInterface)
+	go kp.waitForRequest(ch, topic, handler)
 
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) UnSubscribeTarget(ctx context.Context, topic Topic, targetInterface interface{}) error {
-	// TODO - mostly relevant with multiple interfaces
+// SubscribeWithDefaultRequestHandler allows a caller to add a topic to an existing target object to be invoked automatically
+// when a message is received on a given topic.  So far there is only 1 target registered per microservice
+func (kp *InterContainerProxy) SubscribeWithDefaultRequestHandler(topic Topic) error {
+	// Subscribe to receive messages for that topic
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+	}
+	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
+
+	// Launch a go routine to receive and process kafka messages
+	go kp.waitForRequest(ch, topic, kp.defaultRequestHandlerInterface)
+
 	return nil
 }
 
-func (kp *KafkaMessagingProxy) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := kp.topicToConsumerChannelMap[id]; !exist {
-		kp.topicToConsumerChannelMap[id] = arg
+func (kp *InterContainerProxy) UnSubscribeFromRequestHandler(topic Topic) error {
+	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+}
+
+// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// responses from that topic.
+func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; !exist {
+		kp.topicToResponseChannelMap[topic] = arg
 	}
 }
 
-func (kp *KafkaMessagingProxy) deleteFromTopicToConsumerChannelMap(id string) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if _, exist := kp.topicToConsumerChannelMap[id]; exist {
-		delete(kp.topicToConsumerChannelMap, id)
+func (kp *InterContainerProxy) isTopicSubscribedForResponse(topic string) bool {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	_, exist := kp.topicToResponseChannelMap[topic]
+	return exist
+}
+
+func (kp *InterContainerProxy) deleteFromTopicResponseChannelMap(topic string) error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	if _, exist := kp.topicToResponseChannelMap[topic]; exist {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		var err error
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
+		return err
+	} else {
+		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
 	}
 }
 
-func (kp *KafkaMessagingProxy) getConsumerChannel(topic Topic) *consumerChannels {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		return consumerCh
+func (kp *InterContainerProxy) deleteAllTopicResponseChannelMap() error {
+	kp.lockTopicResponseChannelMap.Lock()
+	defer kp.lockTopicResponseChannelMap.Unlock()
+	var err error
+	for topic, _ := range kp.topicToResponseChannelMap {
+		// Unsubscribe to this topic first - this will close the subscribed channel
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToResponseChannelMap[topic]); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+		}
+		delete(kp.topicToResponseChannelMap, topic)
 	}
-	return nil
+	return err
 }
 
-func (kp *KafkaMessagingProxy) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		consumerCh.channels = append(consumerCh.channels, ch)
-		return
+func (kp *InterContainerProxy) addToTopicRequestHandlerChannelMap(topic string, arg *requestHandlerChannel) {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; !exist {
+		kp.topicToRequestHandlerChannelMap[topic] = arg
 	}
-	log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
 }
 
-func (kp *KafkaMessagingProxy) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	if consumerCh, exist := kp.topicToConsumerChannelMap[topic.Name]; exist {
-		// Channel will be closed in the removeChannel method
-		consumerCh.channels = removeChannel(consumerCh.channels, ch)
+func (kp *InterContainerProxy) deleteFromTopicRequestHandlerChannelMap(topic string) error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	if _, exist := kp.topicToRequestHandlerChannelMap[topic]; exist {
+		// Close the kafka client client first by unsubscribing to this topic
+		kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch)
+		delete(kp.topicToRequestHandlerChannelMap, topic)
 		return nil
+	} else {
+		return errors.New(fmt.Sprintf("%s-Topic-not-found", topic))
 	}
-	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
-	return errors.New("topic-does-not-exist")
 }
 
-func (kp *KafkaMessagingProxy) addToTransactionIdToChannelMap(id string, arg chan *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) deleteAllTopicRequestHandlerChannelMap() error {
+	kp.lockTopicRequestHandlerChannelMap.Lock()
+	defer kp.lockTopicRequestHandlerChannelMap.Unlock()
+	var err error
+	for topic, _ := range kp.topicToRequestHandlerChannelMap {
+		// Close the kafka client client first by unsubscribing to this topic
+		if err = kp.kafkaClient.UnSubscribe(&Topic{Name: topic}, kp.topicToRequestHandlerChannelMap[topic].ch); err != nil {
+			log.Errorw("unsubscribing-error", log.Fields{"topic": topic, "error": err})
+		}
+		delete(kp.topicToRequestHandlerChannelMap, topic)
+	}
+	return err
+}
+
+func (kp *InterContainerProxy) addToTransactionIdToChannelMap(id string, topic *Topic, arg chan *ca.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[id]; !exist {
-		kp.transactionIdToChannelMap[id] = arg
+		kp.transactionIdToChannelMap[id] = &transactionChannel{topic: topic, ch: arg}
 	}
 }
 
-func (kp *KafkaMessagingProxy) deleteFromTransactionIdToChannelMap(id string) {
+func (kp *InterContainerProxy) deleteFromTransactionIdToChannelMap(id string) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
-	if _, exist := kp.transactionIdToChannelMap[id]; exist {
+	if transChannel, exist := kp.transactionIdToChannelMap[id]; exist {
+		// Close the channel first
+		close(transChannel.ch)
 		delete(kp.transactionIdToChannelMap, id)
 	}
 }
 
-func (kp *KafkaMessagingProxy) createPublisher(retries int) error {
-	// This Creates the publisher
-	config := sarama.NewConfig()
-	config.Producer.Partitioner = sarama.NewRandomPartitioner
-	config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
-	config.Producer.Flush.Messages = DefaultFlushMessages
-	config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
-	config.Producer.Return.Errors = DefaultReturnErrors
-	config.Producer.Return.Successes = DefaultReturnSuccess
-	config.Producer.RequiredAcks = sarama.WaitForAll
-	kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
-	brokers := []string{kafkaFullAddr}
-
-	for {
-		producer, err := sarama.NewAsyncProducer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-publisher", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			kp.producer = producer
-			break
+func (kp *InterContainerProxy) deleteTopicTransactionIdToChannelMap(id string) {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		if value.topic.Name == id {
+			close(value.ch)
+			delete(kp.transactionIdToChannelMap, key)
 		}
 	}
-	log.Info("Kafka-publisher-created")
-	return nil
 }
 
-func (kp *KafkaMessagingProxy) createConsumer(retries int) error {
-	config := sarama.NewConfig()
-	config.Consumer.Return.Errors = true
-	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
-	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
-	config.Consumer.Offsets.Initial = sarama.OffsetNewest
-	kafkaFullAddr := fmt.Sprintf("%s:%d", kp.KafkaHost, kp.KafkaPort)
-	brokers := []string{kafkaFullAddr}
-
-	for {
-		consumer, err := sarama.NewConsumer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-consumer", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			kp.consumer = consumer
-			break
-		}
+func (kp *InterContainerProxy) deleteAllTransactionIdToChannelMap() {
+	kp.lockTransactionIdToChannelMap.Lock()
+	defer kp.lockTransactionIdToChannelMap.Unlock()
+	for key, value := range kp.transactionIdToChannelMap {
+		close(value.ch)
+		delete(kp.transactionIdToChannelMap, key)
 	}
-	log.Info("Kafka-consumer-created")
-	return nil
 }
 
-func encodeReturnedValue(request *ca.InterContainerMessage, returnedVal interface{}) (*any.Any, error) {
+func (kp *InterContainerProxy) DeleteTopic(topic Topic) error {
+	// If we have any consumers on that topic we need to close them
+	kp.deleteFromTopicResponseChannelMap(topic.Name)
+	kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
+	kp.deleteTopicTransactionIdToChannelMap(topic.Name)
+	return kp.kafkaClient.DeleteTopic(&topic)
+}
+
+func encodeReturnedValue(returnedVal interface{}) (*any.Any, error) {
 	// Encode the response argument - needs to be a proto message
 	if returnedVal == nil {
 		return nil, nil
@@ -462,8 +473,7 @@
 //formatRequest formats a request to send over kafka and returns an InterContainerMessage message on success
 //or an error on failure
 func encodeResponse(request *ca.InterContainerMessage, success bool, returnedValues ...interface{}) (*ca.InterContainerMessage, error) {
-
-	log.Infow("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
+	//log.Debugw("encodeResponse", log.Fields{"success": success, "returnedValues": returnedValues})
 	responseHeader := &ca.Header{
 		Id:        request.Header.Id,
 		Type:      ca.MessageType_RESPONSE,
@@ -476,7 +486,7 @@
 	var marshalledReturnedVal *any.Any
 	var err error
 	for _, returnVal := range returnedValues {
-		if marshalledReturnedVal, err = encodeReturnedValue(request, returnVal); err != nil {
+		if marshalledReturnedVal, err = encodeReturnedValue(returnVal); err != nil {
 			log.Warnw("cannot-marshal-response-body", log.Fields{"error": err})
 		}
 		break // for now we support only 1 returned value - (excluding the error)
@@ -504,7 +514,7 @@
 	myClassValue := reflect.ValueOf(myClass)
 	m := myClassValue.MethodByName(funcName)
 	if !m.IsValid() {
-		return make([]reflect.Value, 0), fmt.Errorf("Method not found \"%s\"", funcName)
+		return make([]reflect.Value, 0), fmt.Errorf("method-not-found \"%s\"", funcName)
 	}
 	in := make([]reflect.Value, len(params))
 	for i, param := range params {
@@ -514,11 +524,10 @@
 	return
 }
 
-func (kp *KafkaMessagingProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
+func (kp *InterContainerProxy) handleRequest(msg *ca.InterContainerMessage, targetInterface interface{}) {
 
-	// First extract the header to know whether this is a request of a response
+	// First extract the header to know whether this is a request - responses are handled by a different handler
 	if msg.Header.Type == ca.MessageType_REQUEST {
-		log.Debugw("received-request", log.Fields{"header": msg.Header})
 
 		var out []reflect.Value
 		var err error
@@ -528,6 +537,7 @@
 		if err = ptypes.UnmarshalAny(msg.Body, requestBody); err != nil {
 			log.Warnw("cannot-unmarshal-request", log.Fields{"error": err})
 		} else {
+			log.Debugw("received-request", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header})
 			// let the callee unpack the arguments as its the only one that knows the real proto type
 			out, err = CallFuncByName(targetInterface, requestBody.Rpc, requestBody.Args)
 			if err != nil {
@@ -545,7 +555,7 @@
 				returnedValues = make([]interface{}, 1)
 				returnedValues[0] = returnError
 			} else {
-				log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
+				//log.Debugw("returned-api-response", log.Fields{"len": len(out), "err": err})
 				returnedValues = make([]interface{}, 0)
 				// Check for errors first
 				lastIndex := len(out) - 1
@@ -560,7 +570,7 @@
 				} else { // Non-error case
 					success = true
 					for idx, val := range out {
-						log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
+						//log.Debugw("returned-api-response-loop", log.Fields{"idx": idx, "val": val.Interface()})
 						if idx != lastIndex {
 							returnedValues = append(returnedValues, val.Interface())
 						}
@@ -573,185 +583,94 @@
 				log.Warnw("error-encoding-response-returning-failure-result", log.Fields{"error": err})
 				icm = encodeDefaultFailedResponse(msg)
 			}
-			log.Debugw("sending-to-kafka", log.Fields{"msg": icm, "send-to-topic": msg.Header.FromTopic})
-			kp.sendToKafkaTopic(icm, &Topic{Name: msg.Header.FromTopic})
+			// To preserve ordering of messages, all messages to a given topic are sent to the same partition
+			// by providing a message key.   The key is encoded in the topic name.  If the deviceId is not
+			// present then the key will be empty, hence all messages for a given topic will be sent to all
+			// partitions.
+			replyTopic := &Topic{Name: msg.Header.FromTopic}
+			key := GetDeviceIdFromTopic(*replyTopic)
+			log.Debugw("sending-response-to-kafka", log.Fields{"rpc": requestBody.Rpc, "header": msg.Header, "key": key})
+			// TODO: handle error response.
+			kp.kafkaClient.Send(icm, replyTopic, key)
 		}
 
-	} else if msg.Header.Type == ca.MessageType_RESPONSE {
-		log.Warnw("received-response-on-request-handler", log.Fields{"header": msg.Header})
-	} else {
-		log.Errorw("invalid-message", log.Fields{"header": msg.Header})
 	}
 }
 
-func (kp *KafkaMessagingProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
+func (kp *InterContainerProxy) waitForRequest(ch <-chan *ca.InterContainerMessage, topic Topic, targetInterface interface{}) {
 	//	Wait for messages
 	for msg := range ch {
-		log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
+		//log.Debugw("request-received", log.Fields{"msg": msg, "topic": topic.Name, "target": targetInterface})
 		go kp.handleRequest(msg, targetInterface)
 	}
 }
 
-// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
-// topic via the unique channel each subsciber received during subscription
-func (kp *KafkaMessagingProxy) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
-	// Need to go over all channels and publish messages to them - do we need to copy msg?
-	kp.lockTopicToConsumerChannelMap.Lock()
-	defer kp.lockTopicToConsumerChannelMap.Unlock()
-	for _, ch := range consumerCh.channels {
-		go func(c chan *ca.InterContainerMessage) {
-			c <- protoMessage
-		}(ch)
-	}
-}
-
-func (kp *KafkaMessagingProxy) consumeMessagesLoop(topic Topic) {
-	log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
-	var consumerCh *consumerChannels
-	if consumerCh = kp.getConsumerChannel(topic); consumerCh == nil {
-		log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
-		return
-	}
-startloop:
-	for {
-		select {
-		case err := <-consumerCh.consumer.Errors():
-			log.Warnw("consumer-error", log.Fields{"error": err})
-		case msg := <-consumerCh.consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg})
-			// Since the only expected message is a proto intercontainermessage then extract it right away
-			// instead of dispatching it to the consumers
-			msgBody := msg.Value
-			icm := &ca.InterContainerMessage{}
-			if err := proto.Unmarshal(msgBody, icm); err != nil {
-				log.Warnw("invalid-message", log.Fields{"error": err})
-				continue
-			}
-			if icm.Header.Type == ca.MessageType_REQUEST {
-				log.Debugw("request-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-				go kp.dispatchToConsumers(consumerCh, icm)
-			} else if icm.Header.Type == ca.MessageType_RESPONSE {
-				log.Debugw("response-received", log.Fields{"msg": *icm, "len": len(consumerCh.channels)})
-				go kp.dispatchResponse(icm)
-			} else {
-				log.Debugw("unsupported-msg-received", log.Fields{"msg": *icm})
-			}
-			//// TODO:  Dispatch requests and responses separately
-			//go kp.dispatchToConsumers(consumerCh, icm)
-		case <-kp.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
-			break startloop
-		}
-	}
-}
-
-func (kp *KafkaMessagingProxy) dispatchResponse(msg *ca.InterContainerMessage) {
+func (kp *InterContainerProxy) dispatchResponse(msg *ca.InterContainerMessage) {
 	kp.lockTransactionIdToChannelMap.Lock()
 	defer kp.lockTransactionIdToChannelMap.Unlock()
 	if _, exist := kp.transactionIdToChannelMap[msg.Header.Id]; !exist {
 		log.Debugw("no-waiting-channel", log.Fields{"transaction": msg.Header.Id})
 		return
 	}
-	kp.transactionIdToChannelMap[msg.Header.Id] <- msg
+	kp.transactionIdToChannelMap[msg.Header.Id].ch <- msg
 }
 
-func (kp *KafkaMessagingProxy) waitForResponse(ch chan *ca.InterContainerMessage, topic Topic) {
-	log.Debugw("starting-consuming-responses-loop", log.Fields{"topic": topic.Name})
-	kp.waitForResponseRoutineStarted = true
+// waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
+// and then dispatches to the consumer
+func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
+	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
-		case msg := <-ch:
-			log.Debugw("message-received", log.Fields{"topic": topic.Name, "msg": msg})
-			go kp.dispatchResponse(msg)
-			//	Need to handle program exit - TODO
+		case msg := <-subscribedCh:
+			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+			if msg.Header.Type == ca.MessageType_RESPONSE {
+				go kp.dispatchResponse(msg)
+			}
 		case <-kp.doneCh:
 			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-}
-
-// createConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
-// for that topic.  It also starts the routine that listens for messages on that topic.
-func (kp *KafkaMessagingProxy) setupConsumerChannel(topic Topic) (chan *ca.InterContainerMessage, error) {
-
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh != nil {
-		return nil, nil // Already created, so just ignore
-	}
-
-	partitionList, err := kp.consumer.Partitions(topic.Name)
-	if err != nil {
-		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
-	// Create a partition consumer for that topic - for now just use one partition
-	var pConsumer sarama.PartitionConsumer
-	if pConsumer, err = kp.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
-		log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	// Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
-	// unbuffered to verify race conditions.
-	consumerListeningChannel := make(chan *ca.InterContainerMessage)
-	cc := &consumerChannels{
-		consumer: pConsumer,
-		channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
-	}
-
-	// Add the consumer channel to the map
-	kp.addTopicToConsumerChannelMap(topic.Name, cc)
-
-	//Start a consumer to listen on that specific topic
-	go kp.consumeMessagesLoop(topic)
-
-	return consumerListeningChannel, nil
+	//log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+	//	We got an exit signal.  Unsubscribe to the channel
+	//kp.kafkaClient.UnSubscribe(topic, subscribedCh)
 }
 
 // subscribeForResponse allows a caller to subscribe to a given topic when waiting for a response.
 // This method is built to prevent all subscribers to receive all messages as is the case of the Subscribe
 // API. There is one response channel waiting for kafka messages before dispatching the message to the
 // corresponding waiting channel
-func (kp *KafkaMessagingProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
+func (kp *InterContainerProxy) subscribeForResponse(topic Topic, trnsId string) (chan *ca.InterContainerMessage, error) {
 	log.Debugw("subscribeForResponse", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 
-	if consumerCh := kp.getConsumerChannel(topic); consumerCh == nil {
-		log.Debugw("topic-not-subscribed", log.Fields{"topic": topic.Name})
+	// First check whether we already have a channel listening for response on that topic.  If there is
+	// already one then it will be reused.  If not, it will be created.
+	if !kp.isTopicSubscribedForResponse(topic.Name) {
+		var subscribedCh <-chan *ca.InterContainerMessage
 		var err error
-
-		if _, err = kp.setupConsumerChannel(topic); err != nil {
-			log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
 			return nil, err
 		}
+		kp.setupTopicResponseChannelMap(topic.Name, subscribedCh)
+		go kp.waitForResponseLoop(subscribedCh, &topic)
 	}
 
+	// Create a specific channel for this consumer.  We cannot use the channel from the kafkaclient as it will
+	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ca.InterContainerMessage)
-	kp.addToTransactionIdToChannelMap(trnsId, ch)
+	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
 
 	return ch, nil
 }
 
-func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
-	var i int
-	var channel chan *ca.InterContainerMessage
-	for i, channel = range channels {
-		if channel == ch {
-			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
-			close(channel)
-			return channels[:len(channels)-1]
-		}
-	}
-	return channels
-}
-
-func (kp *KafkaMessagingProxy) unSubscribeForResponse(trnsId string) error {
+func (kp *InterContainerProxy) unSubscribeForResponse(trnsId string) error {
 	log.Debugw("unsubscribe-for-response", log.Fields{"trnsId": trnsId})
-	// Close the channel first
-	close(kp.transactionIdToChannelMap[trnsId])
-	kp.deleteFromTransactionIdToChannelMap(trnsId)
+	if _, exist := kp.transactionIdToChannelMap[trnsId]; exist {
+		// The delete operation will close the channel
+		kp.deleteFromTransactionIdToChannelMap(trnsId)
+	}
 	return nil
 }
 
@@ -809,34 +728,14 @@
 	return request, nil
 }
 
-// sendRequest formats and sends the request onto the kafka messaging bus.  It waits for the
-// response if needed.  This function must, therefore, be run in its own routine.
-func (kp *KafkaMessagingProxy) sendToKafkaTopic(msg *ca.InterContainerMessage, topic *Topic) {
-
-	//	Create the Sarama producer message
-	time := time.Now().Unix()
-	marshalled, _ := proto.Marshal(msg)
-	kafkaMsg := &sarama.ProducerMessage{
-		Topic: topic.Name,
-		Key:   sarama.StringEncoder(time),
-		Value: sarama.ByteEncoder(marshalled),
-	}
-
-	// Send message to kafka
-	kp.producer.Input() <- kafkaMsg
-
-}
-
 func decodeResponse(response *ca.InterContainerMessage) (*ca.InterContainerResponseBody, error) {
 	//	Extract the message body
 	responseBody := ca.InterContainerResponseBody{}
-
-	log.Debugw("decodeResponse", log.Fields{"icr": &response})
 	if err := ptypes.UnmarshalAny(response.Body, &responseBody); err != nil {
 		log.Warnw("cannot-unmarshal-response", log.Fields{"error": err})
 		return nil, err
 	}
-	log.Debugw("decodeResponse", log.Fields{"icrbody": &responseBody})
+	//log.Debugw("response-decoded-successfully", log.Fields{"response-status": &responseBody.Success})
 
 	return &responseBody, nil
 
diff --git a/kafka/kafka_inter_container_library_test.go b/kafka/kafka_inter_container_library_test.go
index 6a3bb37..790425e 100644
--- a/kafka/kafka_inter_container_library_test.go
+++ b/kafka/kafka_inter_container_library_test.go
@@ -21,38 +21,35 @@
 )
 
 func TestDefaultKafkaProxy(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy()
+	actualResult, error := NewInterContainerProxy()
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionHost(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy(KafkaHost("10.20.30.40"))
+	actualResult, error := NewInterContainerProxy(InterContainerHost("10.20.30.40"))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, "10.20.30.40")
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionPort(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy(KafkaPort(1020))
+	actualResult, error := NewInterContainerProxy(InterContainerPort(1020))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, 1020)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, 1020)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionTopic(t *testing.T) {
-	actualResult, error := NewKafkaMessagingProxy(DefaultTopic(&Topic{Name: "Adapter"}))
+	actualResult, error := NewInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, interface{}(nil))
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 	assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
 }
 
@@ -64,24 +61,23 @@
 
 func TestKafkaProxyOptionTargetInterface(t *testing.T) {
 	var m *myInterface
-	actualResult, error := NewKafkaMessagingProxy(TargetInterface(m))
+	actualResult, error := NewInterContainerProxy(RequestHandlerInterface(m))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.KafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.TargetInterface, m)
-	assert.Equal(t, actualResult.DefaultTopic.Name, "Core")
+	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
+	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
 }
 
 func TestKafkaProxyChangeAllOptions(t *testing.T) {
 	var m *myInterface
-	actualResult, error := NewKafkaMessagingProxy(
-		KafkaHost("10.20.30.40"),
-		KafkaPort(1020),
+	actualResult, error := NewInterContainerProxy(
+		InterContainerHost("10.20.30.40"),
+		InterContainerPort(1020),
 		DefaultTopic(&Topic{Name: "Adapter"}),
-		TargetInterface(m))
+		RequestHandlerInterface(m))
 	assert.Equal(t, error, nil)
-	assert.Equal(t, actualResult.KafkaHost, "10.20.30.40")
-	assert.Equal(t, actualResult.KafkaPort, 1020)
-	assert.Equal(t, actualResult.TargetInterface, m)
+	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
+	assert.Equal(t, actualResult.kafkaPort, 1020)
+	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
 	assert.Equal(t, actualResult.DefaultTopic.Name, "Adapter")
 }
diff --git a/kafka/messaging_interface.go b/kafka/messaging_interface.go
deleted file mode 100644
index 78d9e75..0000000
--- a/kafka/messaging_interface.go
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Copyright 2018-present Open Networking Foundation
-
- * Licensed under the Apache License, Version 2.0 (the "License");
- * you may not use this file except in compliance with the License.
- * You may obtain a copy of the License at
-
- * http://www.apache.org/licenses/LICENSE-2.0
-
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package kafka
-
-// A Topic definition - may be augmented with additional attributes eventually
-type Topic struct {
-	// The name of the topic. It must start with a letter,
-	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
-	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
-	// signs (`%`).
-	Name string
-}
-
-type KVArg struct {
-	Key   string
-	Value interface{}
-}
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
new file mode 100644
index 0000000..eed0588
--- /dev/null
+++ b/kafka/sarama_client.go
@@ -0,0 +1,537 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"errors"
+	"fmt"
+	"github.com/Shopify/sarama"
+	scc "github.com/bsm/sarama-cluster"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-go/common/log"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"sync"
+	"time"
+)
+
+// consumerChannels represents a consumer listening on a kafka topic.  Once it receives a message on that
+// topic it broadcasts the message to all the listening channels
+type consumerChannels struct {
+	consumer sarama.PartitionConsumer
+	//consumer *sc.Consumer
+	channels []chan *ca.InterContainerMessage
+}
+
+// SaramaClient represents the messaging proxy
+type SaramaClient struct {
+	broker                        *sarama.Broker
+	client                        sarama.Client
+	KafkaHost                     string
+	KafkaPort                     int
+	producer                      sarama.AsyncProducer
+	consumer                      sarama.Consumer
+	groupConsumer                 *scc.Consumer
+	doneCh                        chan int
+	topicToConsumerChannelMap     map[string]*consumerChannels
+	lockTopicToConsumerChannelMap sync.RWMutex
+}
+
+type SaramaClientOption func(*SaramaClient)
+
+func Host(host string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.KafkaHost = host
+	}
+}
+
+func Port(port int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.KafkaPort = port
+	}
+}
+
+func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
+	client := &SaramaClient{
+		KafkaHost: DefaultKafkaHost,
+		KafkaPort: DefaultKafkaPort,
+	}
+
+	for _, option := range opts {
+		option(client)
+	}
+
+	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
+
+	return client
+}
+
+func (sc *SaramaClient) Start(retries int) error {
+	log.Info("Starting-Proxy")
+
+	// Create the Done channel
+	sc.doneCh = make(chan int, 1)
+
+	// Create the Publisher
+	if err := sc.createPublisher(retries); err != nil {
+		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
+		return err
+	}
+
+	// Create the master consumer
+	if err := sc.createConsumer(retries); err != nil {
+		log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
+		return err
+	}
+
+	// Create the topic to consumer/channel map
+	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+
+	return nil
+}
+
+func (sc *SaramaClient) Stop() {
+	log.Info("stopping-sarama-client")
+
+	//Send a message over the done channel to close all long running routines
+	sc.doneCh <- 1
+
+	// Clear the consumer map
+	//sc.clearConsumerChannelMap()
+
+	if sc.producer != nil {
+		if err := sc.producer.Close(); err != nil {
+			panic(err)
+		}
+	}
+	if sc.consumer != nil {
+		if err := sc.consumer.Close(); err != nil {
+			panic(err)
+		}
+	}
+}
+
+func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if _, exist := sc.topicToConsumerChannelMap[id]; !exist {
+		sc.topicToConsumerChannelMap[id] = arg
+	}
+}
+
+func (sc *SaramaClient) deleteFromTopicToConsumerChannelMap(id string) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if _, exist := sc.topicToConsumerChannelMap[id]; exist {
+		delete(sc.topicToConsumerChannelMap, id)
+	}
+}
+
+func (sc *SaramaClient) getConsumerChannel(topic Topic) *consumerChannels {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		return consumerCh
+	}
+	return nil
+}
+
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		consumerCh.channels = append(consumerCh.channels, ch)
+		return
+	}
+	log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
+}
+
+func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		// Channel will be closed in the removeChannel method
+		consumerCh.channels = removeChannel(consumerCh.channels, ch)
+		// If there are no more channels then we can close the consumer itself
+		if len(consumerCh.channels) == 0 {
+			log.Debugw("closing-consumer", log.Fields{"topic": topic})
+			err := consumerCh.consumer.Close()
+			delete(sc.topicToConsumerChannelMap, topic.Name)
+			return err
+		}
+		return nil
+	}
+	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return errors.New("topic-does-not-exist")
+}
+
+func (sc *SaramaClient) clearTopicFromConsumerChannelMap(topic Topic) error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
+		for _, ch := range consumerCh.channels {
+			// Channel will be closed in the removeChannel method
+			removeChannel(consumerCh.channels, ch)
+		}
+		err := consumerCh.consumer.Close()
+		delete(sc.topicToConsumerChannelMap, topic.Name)
+		return err
+	}
+	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return errors.New("topic-does-not-exist")
+}
+
+func (sc *SaramaClient) clearConsumerChannelMap() error {
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	var err error
+	for topic, consumerCh := range sc.topicToConsumerChannelMap {
+		for _, ch := range consumerCh.channels {
+			// Channel will be closed in the removeChannel method
+			removeChannel(consumerCh.channels, ch)
+		}
+		err = consumerCh.consumer.Close()
+		delete(sc.topicToConsumerChannelMap, topic)
+	}
+	return err
+}
+
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error {
+	// This Creates the kafka topic
+	// Set broker configuration
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	broker := sarama.NewBroker(kafkaFullAddr)
+
+	// Additional configurations. Check sarama doc for more info
+	config := sarama.NewConfig()
+	config.Version = sarama.V1_0_0_0
+
+	// Open broker connection with configs defined above
+	broker.Open(config)
+
+	// check if the connection was OK
+	_, err := broker.Connected()
+	if err != nil {
+		return err
+	}
+
+	topicDetail := &sarama.TopicDetail{}
+	topicDetail.NumPartitions = int32(numPartition)
+	topicDetail.ReplicationFactor = int16(repFactor)
+	topicDetail.ConfigEntries = make(map[string]*string)
+
+	topicDetails := make(map[string]*sarama.TopicDetail)
+	topicDetails[topic.Name] = topicDetail
+
+	request := sarama.CreateTopicsRequest{
+		Timeout:      time.Second * 1,
+		TopicDetails: topicDetails,
+	}
+
+	for {
+		// Send request to Broker
+		if response, err := broker.CreateTopics(&request); err != nil {
+			if retries == 0 {
+				log.Errorw("error-creating-topic", log.Fields{"error": err})
+				return err
+			} else {
+				// If retries is -ve then we will retry indefinitely
+				retries--
+			}
+			log.Debug("retrying-after-a-second-delay")
+			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
+		} else {
+			log.Debug("topic-response", log.Fields{"response": response})
+			break
+		}
+	}
+
+	log.Debug("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+	return nil
+}
+
+func (sc *SaramaClient) createPublisher(retries int) error {
+	// This Creates the publisher
+	config := sarama.NewConfig()
+	config.Producer.Partitioner = sarama.NewRandomPartitioner
+	config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
+	config.Producer.Flush.Messages = DefaultFlushMessages
+	config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
+	config.Producer.Return.Errors = DefaultReturnErrors
+	config.Producer.Return.Successes = DefaultReturnSuccess
+	config.Producer.RequiredAcks = sarama.WaitForAll
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	brokers := []string{kafkaFullAddr}
+
+	for {
+		producer, err := sarama.NewAsyncProducer(brokers, config)
+		if err != nil {
+			if retries == 0 {
+				log.Errorw("error-starting-publisher", log.Fields{"error": err})
+				return err
+			} else {
+				// If retries is -ve then we will retry indefinitely
+				retries--
+			}
+			log.Info("retrying-after-a-second-delay")
+			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
+		} else {
+			sc.producer = producer
+			break
+		}
+	}
+	log.Info("Kafka-publisher-created")
+	return nil
+}
+
+func (sc *SaramaClient) createConsumer(retries int) error {
+	config := sarama.NewConfig()
+	config.Consumer.Return.Errors = true
+	config.Consumer.Fetch.Min = 1
+	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	brokers := []string{kafkaFullAddr}
+
+	for {
+		consumer, err := sarama.NewConsumer(brokers, config)
+		if err != nil {
+			if retries == 0 {
+				log.Errorw("error-starting-consumer", log.Fields{"error": err})
+				return err
+			} else {
+				// If retries is -ve then we will retry indefinitely
+				retries--
+			}
+			log.Info("retrying-after-a-second-delay")
+			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
+		} else {
+			sc.consumer = consumer
+			break
+		}
+	}
+	log.Info("Kafka-consumer-created")
+	return nil
+}
+
+// createGroupConsumer creates a consumer group
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+	config := scc.NewConfig()
+	config.Group.Mode = scc.ConsumerModeMultiplex
+	config.Consumer.Return.Errors = true
+	config.Group.Return.Notifications = true
+	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	brokers := []string{kafkaFullAddr}
+
+	if groupId == nil {
+		g := DefaultGroupName
+		groupId = &g
+	}
+	topics := []string{topic.Name}
+	var consumer *scc.Consumer
+	var err error
+
+	// Create the topic with default attributes
+	// TODO: needs to be revisited
+	//sc.CreateTopic(&Topic{Name:topic.Name}, 3, 1, 1)
+
+	if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
+		log.Errorw("create-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		return nil, err
+	}
+	log.Debugw("create-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	//time.Sleep(10*time.Second)
+	sc.groupConsumer = consumer
+	return consumer, nil
+}
+
+// send formats and sends the request onto the kafka messaging bus.
+func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) {
+
+	// Assert message is a proto message
+	var protoMsg proto.Message
+	var ok bool
+	// ascertain the value interface type is a proto.Message
+	if protoMsg, ok = msg.(proto.Message); !ok {
+		log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+		return
+	}
+
+	//	Create the Sarama producer message
+	marshalled, _ := proto.Marshal(protoMsg)
+	key := ""
+	if len(keys) > 0 {
+		key = keys[0] // Only the first key is relevant
+	}
+	kafkaMsg := &sarama.ProducerMessage{
+		Topic: topic.Name,
+		Key:   sarama.StringEncoder(key),
+		Value: sarama.ByteEncoder(marshalled),
+	}
+
+	// Send message to kafka
+	sc.producer.Input() <- kafkaMsg
+}
+
+// Subscribe registers a caller to a topic.   It returns a channel that the caller can use to receive
+// messages from that topic
+func (sc *SaramaClient) Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error) {
+	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+
+	// If a consumer already exist for that topic then resuse it
+	if consumerCh := sc.getConsumerChannel(*topic); consumerCh != nil {
+		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+		// Create a channel specific for that consumer and add it to the consumer channel map
+		ch := make(chan *ca.InterContainerMessage)
+		sc.addChannelToConsumerChannelMap(*topic, ch)
+		return ch, nil
+	}
+
+	// Register for the topic and set it up
+	var consumerListeningChannel chan *ca.InterContainerMessage
+	var err error
+	if consumerListeningChannel, err = sc.setupConsumerChannel(topic); err != nil {
+		log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	return consumerListeningChannel, nil
+}
+
+// dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
+// topic via the unique channel each subsciber received during subscription
+func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
+	// Need to go over all channels and publish messages to them - do we need to copy msg?
+	sc.lockTopicToConsumerChannelMap.Lock()
+	defer sc.lockTopicToConsumerChannelMap.Unlock()
+	for _, ch := range consumerCh.channels {
+		go func(c chan *ca.InterContainerMessage) {
+			c <- protoMessage
+		}(ch)
+	}
+}
+
+func (sc *SaramaClient) consumeMessagesLoop(topic Topic) {
+	log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
+	var consumerCh *consumerChannels
+	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
+		log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
+		return
+	}
+startloop:
+	for {
+		select {
+		case err := <-consumerCh.consumer.Errors():
+			log.Warnw("consumer-error", log.Fields{"error": err})
+		case msg := <-consumerCh.consumer.Messages():
+			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			// Since the only expected message is a proto intercontainermessage then extract it right away
+			// instead of dispatching it to the consumers
+			msgBody := msg.Value
+			icm := &ca.InterContainerMessage{}
+			if err := proto.Unmarshal(msgBody, icm); err != nil {
+				log.Warnw("invalid-message", log.Fields{"error": err})
+				continue
+			}
+			go sc.dispatchToConsumers(consumerCh, icm)
+
+			//consumerCh.consumer.MarkOffset(msg, "")
+			//// TODO:  Dispatch requests and responses separately
+		case <-sc.doneCh:
+			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
+			break startloop
+		}
+	}
+	log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+}
+
+// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+// for that topic.  It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupConsumerChannel(topic *Topic) (chan *ca.InterContainerMessage, error) {
+	// TODO:  Replace this development partition consumer with a group consumer
+	var pConsumer *sarama.PartitionConsumer
+	var err error
+	if pConsumer, err = sc.CreatePartionConsumer(topic, DefaultMaxRetries); err != nil {
+		log.Errorw("creating-partition-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	// Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
+	// unbuffered to verify race conditions.
+	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	cc := &consumerChannels{
+		consumer: *pConsumer,
+		channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+	}
+
+	// Add the consumer channel to the map
+	sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+	//Start a consumer to listen on that specific topic
+	go sc.consumeMessagesLoop(*topic)
+
+	return consumerListeningChannel, nil
+}
+
+func (sc *SaramaClient) CreatePartionConsumer(topic *Topic, retries int) (*sarama.PartitionConsumer, error) {
+	log.Debugw("creating-partition-consumer", log.Fields{"topic": topic.Name})
+	partitionList, err := sc.consumer.Partitions(topic.Name)
+	if err != nil {
+		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+
+	log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
+	// Create a partition consumer for that topic - for now just use one partition
+	var pConsumer sarama.PartitionConsumer
+	if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
+		log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+	log.Debugw("partition-consumer-created", log.Fields{"topic": topic.Name})
+	return &pConsumer, nil
+}
+
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
+	return err
+}
+
+func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
+	var i int
+	var channel chan *ca.InterContainerMessage
+	for i, channel = range channels {
+		if channel == ch {
+			channels[len(channels)-1], channels[i] = channels[i], channels[len(channels)-1]
+			close(channel)
+			return channels[:len(channels)-1]
+		}
+	}
+	return channels
+}
+
+func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+	if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
+		log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+		return err
+	}
+	return nil
+}
diff --git a/kafka/utils.go b/kafka/utils.go
new file mode 100644
index 0000000..beac9f9
--- /dev/null
+++ b/kafka/utils.go
@@ -0,0 +1,71 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"fmt"
+	"strings"
+)
+
+const (
+	TopicSeparator = "_"
+	DeviceIdLength = 24
+)
+
+// A Topic definition - may be augmented with additional attributes eventually
+type Topic struct {
+	// The name of the topic. It must start with a letter,
+	// and contain only letters (`[A-Za-z]`), numbers (`[0-9]`), dashes (`-`),
+	// underscores (`_`), periods (`.`), tildes (`~`), plus (`+`) or percent
+	// signs (`%`).
+	Name string
+}
+
+type KVArg struct {
+	Key   string
+	Value interface{}
+}
+
+//CreateSubTopic concatenate a list of arguments together using underscores.
+func CreateSubTopic(args ...string) Topic {
+	topic := ""
+	for index, arg := range args {
+		if index == 0 {
+			topic = arg
+		} else {
+			topic = fmt.Sprintf("%s%s%s", topic, TopicSeparator, arg)
+		}
+	}
+	return Topic{Name: topic}
+}
+
+// GetDeviceIdFromTopic extract the deviceId from the topic name.  The topic name is formatted either as:
+//			<any string> or <any string>_<deviceId>.  The device Id is 24 characters long.
+func GetDeviceIdFromTopic(topic Topic) string {
+	pos := strings.LastIndex(topic.Name, TopicSeparator)
+	if pos == -1 {
+		return ""
+	}
+	adjustedPos := pos + len(TopicSeparator)
+	if adjustedPos >= len(topic.Name) {
+		return ""
+	}
+	deviceId := topic.Name[adjustedPos:len(topic.Name)]
+	if len(deviceId) != DeviceIdLength {
+		return ""
+	}
+	return deviceId
+}
diff --git a/python/adapters/iadapter.py b/python/adapters/iadapter.py
index 04cb303..78cf42a 100644
--- a/python/adapters/iadapter.py
+++ b/python/adapters/iadapter.py
@@ -30,6 +30,7 @@
 from python.protos.device_pb2 import DeviceType, DeviceTypes
 from python.protos.health_pb2 import HealthStatus
 
+
 log = structlog.get_logger()
 
 
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index cbae56d..978f57d 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -21,12 +21,15 @@
 
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
+from twisted.internet import reactor
 
 from python.adapters.interface import IAdapterInterface
 from python.protos.core_adapter_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
 from python.protos.device_pb2 import Device
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
+from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
+    get_messaging_proxy
 
 
 class MacAddressError(BaseException):
@@ -62,11 +65,26 @@
     def stop(self):
         self.log.debug('stopping')
 
+
+    def createKafkaDeviceTopic(self, deviceId):
+        kafka_proxy = get_messaging_proxy()
+        device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
+        kafka_proxy.subscribe(topic=device_topic, target_cls=self)
+
     def adopt_device(self, device):
         d = Device()
         if device:
             device.Unpack(d)
-            return True, self.adapter.adopt_device(d)
+
+            result = self.adapter.adopt_device(d)
+            # return True, self.adapter.adopt_device(d)
+
+            # Before we return, create a device specific topic to handle all
+            # subsequent requests from the Core. This adapter instance will
+            # handle all requests for that device
+            reactor.callLater(0, self.createKafkaDeviceTopic, d.id)
+
+            return True, result
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
@@ -151,7 +169,16 @@
         d = Device()
         if device:
             device.Unpack(d)
-            return (True, self.adapter.delete_device(d))
+            result = self.adapter.delete_device(d)
+            # return (True, self.adapter.delete_device(d))
+
+            # Before we return, delete the device specific topic as we will no
+            # longer receive requests from the Core for that device
+            kafka_proxy = get_messaging_proxy()
+            device_topic = kafka_proxy.get_default_topic() + "/" + d.id
+            kafka_proxy.unsubscribe(topic=device_topic)
+
+            return (True, result)
         else:
             return False, Error(code=ErrorCode.INVALID_PARAMETERS,
                                 reason="device-invalid")
diff --git a/python/adapters/kafka/container_proxy.py b/python/adapters/kafka/container_proxy.py
index 8c4e828..efcf558 100644
--- a/python/adapters/kafka/container_proxy.py
+++ b/python/adapters/kafka/container_proxy.py
@@ -83,16 +83,21 @@
         return real_wrapper
 
     @inlineCallbacks
-    def invoke(self, rpc, to_topic=None, **kwargs):
+    def invoke(self, rpc, to_topic=None, reply_topic=None, **kwargs):
         @inlineCallbacks
-        def _send_request(rpc, m_callback, to_topic, **kwargs):
+        def _send_request(rpc, m_callback, to_topic, reply_topic, **kwargs):
             try:
-                log.debug("sending-request", rpc=rpc)
+                log.debug("sending-request",
+                          rpc=rpc,
+                          to_topic=to_topic,
+                          reply_topic=reply_topic)
                 if to_topic is None:
                     to_topic = self.core_topic
+                if reply_topic is None:
+                    reply_topic = self.listening_topic
                 result = yield self.kafka_proxy.send_request(rpc=rpc,
                                                              to_topic=to_topic,
-                                                             reply_topic=self.listening_topic,
+                                                             reply_topic=reply_topic,
                                                              callback=None,
                                                              **kwargs)
                 if not m_callback.called:
@@ -104,11 +109,25 @@
                 if not m_callback.called:
                     m_callback.errback(failure.Failure())
 
-        cb = DeferredWithTimeout(timeout=self.default_timeout)
-        _send_request(rpc, cb, to_topic, **kwargs)
-        try:
-            res = yield cb
-            returnValue(res)
-        except TimeOutError as e:
-            log.warn('invoke-timeout', e=e)
-            raise e
+        # We are going to resend the request on the to_topic if there is a
+        # timeout error. This time the timeout will be longer.  If the second
+        # request times out then we will send the request to the default
+        # core_topic.
+        timeouts = [self.default_timeout,
+                    self.default_timeout*2,
+                    self.default_timeout]
+        retry = 0
+        max_retry = 2
+        for timeout in timeouts:
+            cb = DeferredWithTimeout(timeout=timeout)
+            _send_request(rpc, cb, to_topic, reply_topic, **kwargs)
+            try:
+                res = yield cb
+                returnValue(res)
+            except TimeOutError as e:
+                log.warn('invoke-timeout', e=e)
+                if retry == max_retry:
+                    raise e
+                retry += 1
+                if retry == max_retry:
+                    to_topic = self.core_topic
diff --git a/python/adapters/kafka/core_proxy.py b/python/adapters/kafka/core_proxy.py
index 4bab30d..47f6a61 100644
--- a/python/adapters/kafka/core_proxy.py
+++ b/python/adapters/kafka/core_proxy.py
@@ -30,6 +30,9 @@
 log = structlog.get_logger()
 
 
+def createSubTopic(*args):
+    return '_'.join(args)
+
 class CoreProxy(ContainerProxy):
 
     def __init__(self, kafka_proxy, core_topic, my_listening_topic):
@@ -56,7 +59,15 @@
         log.debug("get-device")
         id = ID()
         id.id = device_id
-        res = yield self.invoke(rpc="GetDevice", device_id=id)
+        # Once we have a device being managed, all communications between the
+        # the adapter and the core occurs over a topic associated with that
+        # device
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
+        res = yield self.invoke(rpc="GetDevice",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device_id=id)
         returnValue(res)
 
     @ContainerProxy.wrap_request(Device)
@@ -71,7 +82,11 @@
         id.id = device_id
         p_type = IntType()
         p_type.val = port_type
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="GetPorts",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=p_type)
         returnValue(res)
@@ -117,9 +132,12 @@
         cdt.val = child_device_type
         channel = IntType()
         channel.val = channel_id
-
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         args = self._to_proto(**kw)
         res = yield self.invoke(rpc="ChildDeviceDetected",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 parent_port_no=ppn,
                                 child_device_type=cdt,
@@ -131,7 +149,12 @@
     @inlineCallbacks
     def device_update(self, device):
         log.debug("device_update")
-        res = yield self.invoke(rpc="DeviceUpdate", device=device)
+        to_topic = createSubTopic(self.core_topic, device.id)
+        reply_topic = createSubTopic(self.listening_topic, device.id)
+        res = yield self.invoke(rpc="DeviceUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
+                                device=device)
         returnValue(res)
 
     def child_device_removed(parent_device_id, child_device_id):
@@ -155,7 +178,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="DeviceStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -179,7 +206,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="ChildrenStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -201,7 +232,11 @@
         o_status = IntType()
         o_status.val = oper_status
 
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortStateUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=id,
                                 port_type=pt,
                                 port_no=pNo,
@@ -227,7 +262,11 @@
         else:
             c_status.val = -1
 
+        to_topic = createSubTopic(self.core_topic, parent_device_id)
+        reply_topic = createSubTopic(self.listening_topic, parent_device_id)
         res = yield self.invoke(rpc="child_devices_state_update",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 parent_device_id=id,
                                 oper_status=o_status,
                                 connect_status=c_status)
@@ -242,7 +281,11 @@
         log.debug("device_pm_config_update")
         b = BoolType()
         b.val = init
+        to_topic = createSubTopic(self.core_topic, device_pm_config.id)
+        reply_topic = createSubTopic(self.listening_topic, device_pm_config.id)
         res = yield self.invoke(rpc="DevicePMConfigUpdate",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_pm_config=device_pm_config,
                                 init=b)
         returnValue(res)
@@ -253,7 +296,11 @@
         log.debug("port_created")
         proto_id = ID()
         proto_id.id = device_id
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PortCreated",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=port)
         returnValue(res)
@@ -284,7 +331,11 @@
         p.val = port
         pac = Packet()
         pac.payload = packet
+        to_topic = createSubTopic(self.core_topic, device_id)
+        reply_topic = createSubTopic(self.listening_topic, device_id)
         res = yield self.invoke(rpc="PacketIn",
+                                to_topic=to_topic,
+                                reply_topic=reply_topic,
                                 device_id=proto_id,
                                 port=p,
                                 packet=pac)
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index 1d2b05c..aaa0c3c 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -152,6 +152,12 @@
             log.info('reconnected-to-consul', after_retries=self.retries)
             self.retries = 0
 
+    def get_target_cls(self):
+        return self.target_cls
+
+    def get_default_topic(self):
+        return self.default_topic
+
     @inlineCallbacks
     def _subscribe(self, topic, callback=None, target_cls=None):
         try:
@@ -332,12 +338,13 @@
             request_body = InterContainerRequestBody()
             request.header.id = transaction_id
             request.header.type = MessageType.Value("REQUEST")
-            request.header.from_topic = self.default_topic
+            request.header.from_topic = reply_topic
             request.header.to_topic = to_topic
 
             response_required = False
             if reply_topic:
                 request_body.reply_to_topic = reply_topic
+                request_body.response_required = True
                 response_required = True
 
             request.header.timestamp = int(round(time.time() * 1000))
@@ -350,8 +357,6 @@
                     request_body.args.extend([arg])
                 except Exception as e:
                     log.exception("Failed-parsing-value", e=e)
-            request_body.reply_to_topic = self.default_topic
-            request_body.response_required = response_required
             request.body.Pack(request_body)
             return request, transaction_id, response_required
         except Exception as e:
@@ -479,7 +484,7 @@
                                 response.header.to_topic)
                             self._send_kafka_message(res_topic, response)
 
-                        log.debug("Response-sent", response=response.body)
+                        log.debug("Response-sent", response=response.body, to_topic=res_topic)
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map:
diff --git a/rw_core/core/adapter_proxy.go b/rw_core/core/adapter_proxy.go
index ab35037..4af4fb0 100644
--- a/rw_core/core/adapter_proxy.go
+++ b/rw_core/core/adapter_proxy.go
@@ -29,13 +29,13 @@
 )
 
 type AdapterProxy struct {
-	TestMode   bool
-	kafkaProxy *kafka.KafkaMessagingProxy
+	TestMode     bool
+	kafkaICProxy *kafka.InterContainerProxy
 }
 
-func NewAdapterProxy(kafkaProxy *kafka.KafkaMessagingProxy) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy *kafka.InterContainerProxy) *AdapterProxy {
 	var proxy AdapterProxy
-	proxy.kafkaProxy = kafkaProxy
+	proxy.kafkaICProxy = kafkaProxy
 	return &proxy
 }
 
@@ -54,6 +54,18 @@
 	}
 }
 
+//func kafka.CreateSubTopic(args ...string) kafka.Topic{
+//	topic := ""
+//	for index , arg := range args {
+//		if index == 0 {
+//			topic = arg
+//		} else {
+//			topic = fmt.Sprintf("%s_%s",  topic, arg)
+//		}
+//	}
+//	return kafka.Topic{Name:topic}
+//}
+
 func (ap *AdapterProxy) AdoptDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("AdoptDevice", log.Fields{"device": device})
 	rpc := "adopt_device"
@@ -63,21 +75,35 @@
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
-	log.Debugw("AdoptDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+	// Use a device topic for the response as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &topic, &replyToTopic, true, args...)
+	log.Debugw("AdoptDevice-response", log.Fields{"replyTopic": replyToTopic, "deviceid": device.Id, "success": success})
+	if success {
+		// From now on, any unsolicited requests from the adapters for this device will come over the device topic.
+		// We should therefore include the replyToTopic as part of the target when unsolicited messages come in.
+		if err := ap.kafkaICProxy.SubscribeWithDefaultRequestHandler(replyToTopic); err != nil {
+			log.Errorw("Unable-to-subscribe-new-topic", log.Fields{"topic": replyToTopic, "error": err})
+			return err
+		}
+	}
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) DisableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DisableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "disable_device"
-	topic := kafka.Topic{Name: device.Type}
+	// Use a device specific topic to send the request.  The adapter handling the device creates a device
+	// specific topic
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DisableDevice-response", log.Fields{"deviceId": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -85,13 +111,15 @@
 func (ap *AdapterProxy) ReEnableDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("ReEnableDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reenable_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("ReEnableDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -99,13 +127,15 @@
 func (ap *AdapterProxy) RebootDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("RebootDevice", log.Fields{"deviceId": device.Id})
 	rpc := "reboot_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("RebootDevice-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
@@ -113,26 +143,40 @@
 func (ap *AdapterProxy) DeleteDevice(ctx context.Context, device *voltha.Device) error {
 	log.Debugw("DeleteDevice", log.Fields{"deviceId": device.Id})
 	rpc := "delete_device"
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("DeleteDevice-response", log.Fields{"deviceid": device.Id, "success": success})
+
+	// We no longer need to have a target against that topic as we won't receive any unsolicited messages on that
+	// topic
+	if err := ap.kafkaICProxy.UnSubscribeFromRequestHandler(replyToTopic); err != nil {
+		log.Errorw("Unable-to-subscribe-from-target", log.Fields{"topic": replyToTopic, "error": err})
+		return err
+	}
+	// Now delete the topic altogether
+	ap.kafkaICProxy.DeleteTopic(replyToTopic)
+
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) GetOfpDeviceInfo(ctx context.Context, device *voltha.Device) (*ca.SwitchCapability, error) {
 	log.Debugw("GetOfpDeviceInfo", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 1)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
 		Value: device,
 	}
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_device_info", &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_device_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpDeviceInfo-response", log.Fields{"deviceId": device.Id, "success": success, "result": result})
 	if success {
 		unpackResult := &ca.SwitchCapability{}
@@ -155,7 +199,7 @@
 
 func (ap *AdapterProxy) GetOfpPortInfo(ctx context.Context, device *voltha.Device, portNo uint32) (*ca.PortCapability, error) {
 	log.Debugw("GetOfpPortInfo", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	args := make([]*kafka.KVArg, 2)
 	args[0] = &kafka.KVArg{
 		Key:   "device",
@@ -166,8 +210,9 @@
 		Key:   "port_no",
 		Value: pNo,
 	}
-
-	success, result := ap.kafkaProxy.InvokeRPC(ctx, "get_ofp_port_info", &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(ctx, "get_ofp_port_info", &toTopic, &replyToTopic, true, args...)
 	log.Debugw("GetOfpPortInfo-response", log.Fields{"deviceid": device.Id, "success": success})
 	if success {
 		unpackResult := &ca.PortCapability{}
@@ -252,15 +297,15 @@
 
 func (ap *AdapterProxy) packetOut(deviceType string, deviceId string, outPort uint32, packet *openflow_13.OfpPacketOut) error {
 	log.Debugw("packetOut", log.Fields{"deviceId": deviceId})
-	topic := kafka.Topic{Name: deviceType}
+	toTopic := kafka.CreateSubTopic(deviceType, deviceId)
 	rpc := "receive_packet_out"
-	dId := &ca.StrType{Val:deviceId}
+	dId := &ca.StrType{Val: deviceId}
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
 		Key:   "deviceId",
 		Value: dId,
 	}
-	op := &ca.IntType{Val:int64(outPort)}
+	op := &ca.IntType{Val: int64(outPort)}
 	args[1] = &kafka.KVArg{
 		Key:   "outPort",
 		Value: op,
@@ -271,15 +316,16 @@
 	}
 
 	// TODO:  Do we need to wait for an ACK on a packet Out?
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, deviceId)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, false, args...)
 	log.Debugw("packetOut", log.Fields{"deviceid": deviceId, "success": success})
 	return unPackResponse(rpc, deviceId, success, result)
 }
 
-
 func (ap *AdapterProxy) UpdateFlowsBulk(device *voltha.Device, flows *voltha.Flows, groups *voltha.FlowGroups) error {
 	log.Debugw("UpdateFlowsBulk", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -295,14 +341,16 @@
 		Value: groups,
 	}
 
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsBulk-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
 
 func (ap *AdapterProxy) UpdateFlowsIncremental(device *voltha.Device, flowChanges *openflow_13.FlowChanges, groupChanges *openflow_13.FlowGroupChanges) error {
 	log.Debugw("UpdateFlowsIncremental", log.Fields{"deviceId": device.Id})
-	topic := kafka.Topic{Name: device.Type}
+	toTopic := kafka.CreateSubTopic(device.Type, device.Id)
 	rpc := "update_flows_bulk"
 	args := make([]*kafka.KVArg, 3)
 	args[0] = &kafka.KVArg{
@@ -318,7 +366,9 @@
 		Value: groupChanges,
 	}
 
-	success, result := ap.kafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	// Use a device specific topic as we are the only core handling requests for this device
+	replyToTopic := kafka.CreateSubTopic(ap.kafkaICProxy.DefaultTopic.Name, device.Id)
+	success, result := ap.kafkaICProxy.InvokeRPC(nil, rpc, &toTopic, &replyToTopic, true, args...)
 	log.Debugw("UpdateFlowsIncremental-response", log.Fields{"deviceid": device.Id, "success": success})
 	return unPackResponse(rpc, device.Id, success, result)
 }
diff --git a/rw_core/core/adapter_request_handler.go b/rw_core/core/adapter_request_handler.go
index 85e43be..40563d4 100644
--- a/rw_core/core/adapter_request_handler.go
+++ b/rw_core/core/adapter_request_handler.go
@@ -30,7 +30,7 @@
 
 type AdapterRequestHandlerProxy struct {
 	TestMode         bool
-	coreInstanceId	string
+	coreInstanceId   string
 	deviceMgr        *DeviceManager
 	lDeviceMgr       *LogicalDeviceManager
 	localDataProxy   *model.Proxy
@@ -458,7 +458,6 @@
 	return new(empty.Empty), nil
 }
 
-
 func (rhp *AdapterRequestHandlerProxy) PacketIn(args []*ca.Argument) (*empty.Empty, error) {
 	if len(args) < 3 {
 		log.Warn("invalid-number-of-args", log.Fields{"args": args})
@@ -488,7 +487,7 @@
 
 		}
 	}
-	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val,  "packet": packet})
+	log.Debugw("PacketIn", log.Fields{"deviceId": deviceId.Id, "port": portNo.Val, "packet": packet})
 	if rhp.TestMode { // Execute only for test cases
 		return nil, nil
 	}
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 6015e7f..7423563 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -34,32 +34,34 @@
 	grpcServer        *grpcserver.GrpcServer
 	grpcNBIAPIHanfler *APIHandler
 	config            *config.RWCoreFlags
-	kmp               *kafka.KafkaMessagingProxy
+	kmp               *kafka.InterContainerProxy
 	clusterDataRoot   model.Root
 	localDataRoot     model.Root
 	clusterDataProxy  *model.Proxy
 	localDataProxy    *model.Proxy
 	exitChannel       chan int
 	kvClient          kvstore.Client
+	kafkaClient       kafka.Client
 }
 
 func init() {
 	log.AddPackage(log.JSON, log.WarnLevel, nil)
 }
 
-func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client) *Core {
+func NewCore(id string, cf *config.RWCoreFlags, kvClient kvstore.Client, kafkaClient kafka.Client) *Core {
 	var core Core
 	core.instanceId = id
 	core.exitChannel = make(chan int, 1)
 	core.config = cf
 	core.kvClient = kvClient
+	core.kafkaClient = kafkaClient
 
 	// Setup the KV store
 	// Do not call NewBackend constructor; it creates its own KV client
 	// Commented the backend for now until the issue between the model and the KV store
 	// is resolved.
 	//backend := model.Backend{
-	//	Client:     kvClient,
+	//	MsgClient:     kvClient,
 	//	StoreType:  cf.KVStoreType,
 	//	Host:       cf.KVStoreHost,
 	//	Port:       cf.KVStorePort,
@@ -67,8 +69,8 @@
 	//	PathPrefix: "service/voltha"}
 	core.clusterDataRoot = model.NewRoot(&voltha.Voltha{}, nil)
 	core.localDataRoot = model.NewRoot(&voltha.CoreInstance{}, nil)
-	core.clusterDataProxy = core.clusterDataRoot.GetProxy("/", false)
-	core.localDataProxy = core.localDataRoot.GetProxy("/", false)
+	core.clusterDataProxy = core.clusterDataRoot.CreateProxy("/", false)
+	core.localDataProxy = core.localDataRoot.CreateProxy("/", false)
 	return &core
 }
 
@@ -78,7 +80,7 @@
 	log.Info("values", log.Fields{"kmp": core.kmp})
 	core.deviceMgr = newDeviceManager(core.kmp, core.clusterDataProxy)
 	core.logicalDeviceMgr = newLogicalDeviceManager(core.deviceMgr, core.kmp, core.clusterDataProxy)
-	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.localDataProxy, core.clusterDataProxy)
+	core.registerAdapterRequestHandler(ctx, core.instanceId, core.deviceMgr, core.logicalDeviceMgr, core.clusterDataProxy, core.localDataProxy)
 	go core.startDeviceManager(ctx)
 	go core.startLogicalDeviceManager(ctx)
 	go core.startGRPCService(ctx)
@@ -89,6 +91,11 @@
 func (core *Core) Stop(ctx context.Context) {
 	log.Info("stopping-core")
 	core.exitChannel <- 1
+	// Stop all the started services
+	core.grpcServer.Stop()
+	core.logicalDeviceMgr.stop(ctx)
+	core.deviceMgr.stop(ctx)
+	core.kmp.Stop()
 	log.Info("core-stopped")
 }
 
@@ -120,9 +127,10 @@
 	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host": core.config.KafkaAdapterHost,
 		"port": core.config.KafkaAdapterPort, "topic": core.config.CoreTopic})
 	var err error
-	if core.kmp, err = kafka.NewKafkaMessagingProxy(
-		kafka.KafkaHost(core.config.KafkaAdapterHost),
-		kafka.KafkaPort(core.config.KafkaAdapterPort),
+	if core.kmp, err = kafka.NewInterContainerProxy(
+		kafka.InterContainerHost(core.config.KafkaAdapterHost),
+		kafka.InterContainerPort(core.config.KafkaAdapterPort),
+		kafka.MsgClient(core.kafkaClient),
 		kafka.DefaultTopic(&kafka.Topic{Name: core.config.CoreTopic})); err != nil {
 		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
 		return err
@@ -140,7 +148,7 @@
 func (core *Core) registerAdapterRequestHandler(ctx context.Context, coreInstanceId string, dMgr *DeviceManager, ldMgr *LogicalDeviceManager,
 	cdProxy *model.Proxy, ldProxy *model.Proxy) error {
 	requestProxy := NewAdapterRequestHandlerProxy(coreInstanceId, dMgr, ldMgr, cdProxy, ldProxy)
-	core.kmp.SubscribeWithTarget(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
+	core.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: core.config.CoreTopic}, requestProxy)
 
 	log.Info("request-handlers")
 	return nil
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 92f00bf..7e7f42a 100644
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -33,7 +33,7 @@
 
 type DeviceAgent struct {
 	deviceId         string
-	deviceType 		string
+	deviceType       string
 	lastData         *voltha.Device
 	adapterProxy     *AdapterProxy
 	deviceMgr        *DeviceManager
@@ -79,18 +79,18 @@
 	if added := agent.clusterDataProxy.Add("/devices", agent.lastData, ""); added == nil {
 		log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
 	}
-	agent.deviceProxy = agent.clusterDataProxy.Root.GetProxy("/devices/"+agent.deviceId, false)
-	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate, nil)
+	agent.deviceProxy = agent.clusterDataProxy.Root.CreateProxy("/devices/"+agent.deviceId, false)
+	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
-	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/devices/%s/flows", agent.deviceId),
 		false)
-	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/devices/%s/flow_groups", agent.deviceId),
 		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	log.Debug("device-agent-started")
 }
@@ -249,12 +249,6 @@
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		//TODO: callback will be invoked to handle this state change
-		//For now force the state transition to happen
-		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
 	}
 	return nil
 }
@@ -289,7 +283,8 @@
 		agent.lockDevice.Unlock()
 		return status.Errorf(codes.NotFound, "%s", agent.deviceId)
 	} else {
-		if device.AdminState != voltha.AdminState_DISABLED {
+		if (device.AdminState != voltha.AdminState_DISABLED) &&
+			(device.AdminState != voltha.AdminState_PREPROVISIONED) {
 			log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
 			//TODO:  Needs customized error message
 			agent.lockDevice.Unlock()
@@ -311,13 +306,6 @@
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		//TODO: callback will be invoked to handle this state change
-		//For now force the state transition to happen
-		if err := agent.deviceMgr.processTransition(device, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
-
 	}
 	return nil
 }
@@ -379,38 +367,54 @@
 	return nil
 }
 
-
-// TODO: implement when callback from the data model is ready
 // processUpdate is a callback invoked whenever there is a change on the device manages by this device agent
 func (agent *DeviceAgent) processUpdate(args ...interface{}) interface{} {
-	log.Debug("!!!!!!!!!!!!!!!!!!!!!!!!!")
-	log.Debugw("processUpdate", log.Fields{"deviceId": agent.deviceId, "args": args})
+	//// Run this callback in its own go routine
+	go func(args ...interface{}) interface{} {
+		var previous *voltha.Device
+		var current *voltha.Device
+		var ok bool
+		if len(args) == 2 {
+			if previous, ok = args[0].(*voltha.Device); !ok {
+				log.Errorw("invalid-callback-type", log.Fields{"data": args[0]})
+				return nil
+			}
+			if current, ok = args[1].(*voltha.Device); !ok {
+				log.Errorw("invalid-callback-type", log.Fields{"data": args[1]})
+				return nil
+			}
+		} else {
+			log.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
+			return nil
+		}
+		// Perform the state transition in it's own go routine
+		agent.deviceMgr.processTransition(previous, current)
+		return nil
+	}(args...)
+
 	return nil
 }
 
 func (agent *DeviceAgent) updateDevice(device *voltha.Device) error {
 	agent.lockDevice.Lock()
-	//defer agent.lockDevice.Unlock()
+	defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
-	// Get the dev info from the model
-	if storedData, err := agent.getDeviceWithoutLock(); err != nil {
-		agent.lockDevice.Unlock()
-		return status.Errorf(codes.NotFound, "%s", device.Id)
-	} else {
-		// store the changed data
-		cloned := proto.Clone(device).(*voltha.Device)
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
-		agent.lockDevice.Unlock()
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", device.Id)
-		}
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storedData, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": device.Id, "error": err})
-			return err
-		}
-		return nil
+	cloned := proto.Clone(device).(*voltha.Device)
+	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "%s", device.Id)
 	}
+	return nil
+}
+
+func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
+	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
+	cloned := proto.Clone(device).(*voltha.Device)
+	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "%s", device.Id)
+	}
+	return nil
 }
 
 func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
@@ -439,11 +443,6 @@
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
-			return err
-		}
 		return nil
 	}
 }
@@ -482,11 +481,6 @@
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		agent.lockDevice.Unlock()
-		// Perform the state transition
-		if err := agent.deviceMgr.processTransition(storeDevice, cloned); err != nil {
-			log.Warnw("process-transition-error", log.Fields{"deviceid": agent.deviceId, "error": err})
-			return err
-		}
 		return nil
 	}
 }
@@ -561,20 +555,11 @@
 				break
 			}
 		}
-		//To track an issue when adding peer-port.
-		log.Debugw("before-peer-added", log.Fields{"device": cloned})
 		// Store the device
 		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
-		//To track an issue when adding peer-port.
-		if d, ok := afterUpdate.(*voltha.Device); ok {
-			log.Debugw("after-peer-added", log.Fields{"device": d})
-		} else {
-			log.Debug("after-peer-added-incorrect-type", log.Fields{"type": reflect.ValueOf(afterUpdate).Type()})
-		}
-
 		return nil
 	}
 }
@@ -615,7 +600,7 @@
 	groups := device.FlowGroups
 
 	// Send update to adapters
-	// Check whether the device supports incremental flow changes
+	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
 	if !acceptsAddRemoveFlowUpdates {
@@ -691,7 +676,7 @@
 	flows := device.Flows
 
 	// Send update to adapters
-	// Check whether the device supports incremental flow changes
+	// TODO: Check whether the device supports incremental flow changes
 	// Assume false for test
 	acceptsAddRemoveFlowUpdates := false
 	if !acceptsAddRemoveFlowUpdates {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index c4ac343..6f4a874 100644
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -36,19 +36,19 @@
 	deviceAgents        map[string]*DeviceAgent
 	adapterProxy        *AdapterProxy
 	logicalDeviceMgr    *LogicalDeviceManager
-	kafkaProxy          *kafka.KafkaMessagingProxy
+	kafkaICProxy        *kafka.InterContainerProxy
 	stateTransitions    *TransitionMap
 	clusterDataProxy    *model.Proxy
 	exitChannel         chan int
 	lockDeviceAgentsMap sync.RWMutex
 }
 
-func newDeviceManager(kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *DeviceManager {
+func newDeviceManager(kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *DeviceManager {
 	var deviceMgr DeviceManager
 	deviceMgr.exitChannel = make(chan int, 1)
 	deviceMgr.deviceAgents = make(map[string]*DeviceAgent)
-	deviceMgr.adapterProxy = NewAdapterProxy(kafkaProxy)
-	deviceMgr.kafkaProxy = kafkaProxy
+	deviceMgr.adapterProxy = NewAdapterProxy(kafkaICProxy)
+	deviceMgr.kafkaICProxy = kafkaICProxy
 	deviceMgr.clusterDataProxy = cdProxy
 	deviceMgr.lockDeviceAgentsMap = sync.RWMutex{}
 	return &deviceMgr
@@ -327,7 +327,7 @@
 	//Get parent device type
 	parent, err := dMgr.GetDevice(parentDeviceId)
 	if err != nil {
-		log.Error("no-parent-found", log.Fields{"parentId":parentDeviceId})
+		log.Error("no-parent-found", log.Fields{"parentId": parentDeviceId})
 		return status.Errorf(codes.NotFound, "%s", parentDeviceId)
 	}
 
@@ -350,7 +350,7 @@
 	// This will be triggered on every update to the device.
 	handlers := dMgr.stateTransitions.GetTransitionHandler(previous, current)
 	if handlers == nil {
-		log.Debugw("handlers-not-found", log.Fields{"deviceId": current.Id})
+		log.Debugw("no-op-transition", log.Fields{"deviceId": current.Id})
 		return nil
 	}
 	for _, handler := range handlers {
@@ -379,7 +379,7 @@
 		log.Errorw("device-not-found", log.Fields{"deviceId": deviceId})
 		return err
 	}
-	if !device.Root{
+	if !device.Root {
 		log.Errorw("device-not-root", log.Fields{"deviceId": deviceId})
 		return status.Errorf(codes.FailedPrecondition, "%s", deviceId)
 	}
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index b3fecc0..e46a418 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -50,24 +50,24 @@
 func NewTransitionMap(dMgr *DeviceManager) *TransitionMap {
 	var transitionMap TransitionMap
 	transitionMap.transitions = make([]Transition, 0)
-	transitionMap.transitions = append(transitionMap.transitions,
-		Transition{
-			deviceType:    any,
-			previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handlers:      []TransitionHandler{dMgr.activateDevice}})
+	//transitionMap.transitions = append(transitionMap.transitions,
+	//	Transition{
+	//		deviceType:    any,
+	//		previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		handlers:      []TransitionHandler{dMgr.activateDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
 			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			handlers:      []TransitionHandler{dMgr.notAllowed}})
-	transitionMap.transitions = append(transitionMap.transitions,
-		Transition{
-			deviceType:    any,
-			previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
-			handlers:      []TransitionHandler{dMgr.activateDevice}})
+	//transitionMap.transitions = append(transitionMap.transitions,
+	//	Transition{
+	//		deviceType:    any,
+	//		previousState: DeviceState{Admin: voltha.AdminState_PREPROVISIONED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
+	//		handlers:      []TransitionHandler{dMgr.activateDevice}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
 			deviceType:    any,
diff --git a/rw_core/core/grpc_nbi_api_handler.go b/rw_core/core/grpc_nbi_api_handler.go
index 2913b51..d44ccf4 100644
--- a/rw_core/core/grpc_nbi_api_handler.go
+++ b/rw_core/core/grpc_nbi_api_handler.go
@@ -31,7 +31,7 @@
 	"time"
 )
 
-const MAX_RESPONSE_TIME = 500   // milliseconds
+const MAX_RESPONSE_TIME = 500 // milliseconds
 
 type APIHandler struct {
 	deviceMgr        *DeviceManager
@@ -58,9 +58,9 @@
 // and create a KV transaction for that serial number for the current core.
 func (handler *APIHandler) createKvTransaction(ctx context.Context) (*KVTransaction, error) {
 	var (
-		err error
-		ok bool
-		md metadata.MD
+		err    error
+		ok     bool
+		md     metadata.MD
 		serNum []string
 	)
 	if md, ok = metadata.FromIncomingContext(ctx); !ok {
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 4f53474..60692e5 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -107,21 +107,21 @@
 	agent.lockLogicalDevice.Lock()
 	defer agent.lockLogicalDevice.Unlock()
 	// Save the logical device
-	if added := agent.clusterDataProxy.Add("/logical_devices", ld, ""); added == nil {
+	if added := agent.clusterDataProxy.AddWithID("/logical_devices", ld.Id, ld, ""); added == nil {
 		log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	} else {
 		log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceId})
 	}
 
-	agent.flowProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.flowProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceId),
 		false)
-	agent.groupProxy = agent.clusterDataProxy.Root.GetProxy(
+	agent.groupProxy = agent.clusterDataProxy.Root.CreateProxy(
 		fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceId),
 		false)
 
 	agent.flowProxy.RegisterCallback(model.POST_UPDATE, agent.flowTableUpdated)
-	//agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
+	agent.groupProxy.RegisterCallback(model.POST_UPDATE, agent.groupTableUpdated)
 
 	return nil
 }
@@ -193,6 +193,37 @@
 	return nil
 }
 
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowsWithoutLock(flows *ofp.Flows) error {
+	cloned := proto.Clone(flows).(*ofp.Flows)
+	afterUpdate := agent.flowProxy.Update("/", cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-flows:%s", agent.logicalDeviceId)
+	}
+	// TODO:  Remove this code when the model update is fixed
+	ld, _ := agent.getLogicalDeviceWithoutLock()
+	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
+	clonedDevice.Flows = proto.Clone(flows).(*ofp.Flows)
+	agent.updateLogicalDeviceWithoutLock(clonedDevice)
+	return nil
+}
+
+//updateLogicalDeviceWithoutLock updates the model with the logical device.  It clones the logicaldevice before saving it
+func (agent *LogicalDeviceAgent) updateLogicalDeviceFlowGroupsWithoutLock(flowGroups *ofp.FlowGroups) error {
+	cloned := proto.Clone(flowGroups).(*ofp.FlowGroups)
+	afterUpdate := agent.groupProxy.Update("/", cloned, false, "")
+	if afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-updating-logical-device-flow-groups:%s", agent.logicalDeviceId)
+	}
+	// TODO:  Remove this code when the model update is fixed
+	ld, _ := agent.getLogicalDeviceWithoutLock()
+	clonedDevice := proto.Clone(ld).(*voltha.LogicalDevice)
+	clonedDevice.FlowGroups = proto.Clone(flowGroups).(*ofp.FlowGroups)
+	agent.updateLogicalDeviceWithoutLock(clonedDevice)
+	return nil
+}
+
+
 // getLogicalDeviceWithoutLock retrieves a logical device from the model without locking it.   This is used only by
 // functions that have already acquired the logical device lock to the model
 func (agent *LogicalDeviceAgent) getLogicalDeviceWithoutLock() (*voltha.LogicalDevice, error) {
@@ -296,30 +327,16 @@
 		"unhandled-command: lDeviceId:%s, command:%s", agent.logicalDeviceId, groupMod.GetCommand())
 }
 
-//updateFlowsWithoutLock updates the flows in the logical device without locking the logical device.  This function
-//must only be called by a function that is holding the lock on the logical device
-func (agent *LogicalDeviceAgent) updateFlowsWithoutLock(flows []*ofp.OfpFlowStats) error {
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		flowsCloned := make([]*ofp.OfpFlowStats, len(flows))
-		copy(flowsCloned, flows)
-		ldevice.Flows.Items = flowsCloned
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
-	}
-}
 
 //updateFlowGroupsWithoutLock updates the flows in the logical device without locking the logical device.  This function
 //must only be called by a function that is holding the lock on the logical device
 func (agent *LogicalDeviceAgent) updateFlowGroupsWithoutLock(groups []*ofp.OfpGroupEntry) error {
-	if ldevice, err := agent.getLogicalDeviceWithoutLock(); err != nil {
-		return status.Error(codes.NotFound, agent.logicalDeviceId)
-	} else {
-		groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
-		copy(groupsCloned, groups)
-		ldevice.FlowGroups.Items = groupsCloned
-		return agent.updateLogicalDeviceWithoutLock(ldevice)
+	groupsCloned := make([]*ofp.OfpGroupEntry, len(groups))
+	copy(groupsCloned, groups)
+	if afterUpdate := agent.groupProxy.Update("/", groupsCloned, true, ""); afterUpdate == nil {
+		return errors.New(fmt.Sprintf("update-flow-group-failed:%s", agent.logicalDeviceId))
 	}
+	return nil
 }
 
 //flowAdd adds a flow to the flow table of that logical device
@@ -343,7 +360,7 @@
 		flows = lDevice.Flows.Items
 	}
 
-	oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
+	//oldData := proto.Clone(lDevice.Flows).(*voltha.Flows)
 	changed := false
 	checkOverlap := (mod.Flags & uint32(ofp.OfpFlowModFlags_OFPFF_CHECK_OVERLAP)) != 0
 	if checkOverlap {
@@ -373,17 +390,17 @@
 	}
 	if changed {
 		//	Update model
-		if lDevice.Flows == nil {
-			lDevice.Flows = &ofp.Flows{}
+		flowsToUpdate := &ofp.Flows{}
+		if lDevice.Flows != nil {
+			flowsToUpdate = &ofp.Flows{Items: flows}
 		}
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(flowsToUpdate); err != nil {
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
-	// For now, force the callback to occur
-	go agent.flowTableUpdated(oldData, lDevice.Flows)
+	//// For now, force the callback to occur
+	//go agent.flowTableUpdated(oldData, lDevice.Flows)
 	return nil
 }
 
@@ -414,9 +431,8 @@
 
 	//Update flows
 	if len(toKeep) < len(flows) {
-		lDevice.Flows.Items = toKeep
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
+			log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
@@ -452,8 +468,7 @@
 
 	//Update flows
 	if len(toKeep) < len(flows) {
-		lDevice.Flows.Items = toKeep
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -488,8 +503,7 @@
 	}
 
 	if changed {
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -523,19 +537,15 @@
 		return errors.New(fmt.Sprintf("no-logical-device-present:%s", agent.logicalDeviceId))
 	}
 	groups := lDevice.FlowGroups.Items
-	oldData := proto.Clone(lDevice.FlowGroups).(*voltha.FlowGroups)
 	if fu.FindGroup(groups, groupMod.GroupId) == -1 {
 		groups = append(groups, fd.GroupEntryFromGroupMod(groupMod))
-		lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	} else {
 		return errors.New(fmt.Sprintf("Groups %d already present", groupMod.GroupId))
 	}
-	// For now, force the callback to occur
-	go agent.groupTableUpdated(oldData, lDevice.FlowGroups)
 	return nil
 }
 
@@ -572,14 +582,19 @@
 			groupsChanged = true
 		}
 	}
-	if groupsChanged || flowsChanged {
-		lDevice.FlowGroups.Items = groups
-		lDevice.Flows.Items = flows
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
-			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+	if groupsChanged {
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
+			log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
 	}
+	if flowsChanged {
+		if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
+			log.Errorw("Cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
+			return err
+		}
+	}
+
 	return nil
 }
 
@@ -609,8 +624,8 @@
 		groupsChanged = true
 	}
 	if groupsChanged {
-		lDevice.FlowGroups.Items = groups
-		if err := agent.updateLogicalDeviceWithoutLock(lDevice); err != nil {
+		//lDevice.FlowGroups.Items = groups
+		if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
 			log.Errorw("Cannot-update-logical-group", log.Fields{"logicalDeviceId": agent.logicalDeviceId})
 			return err
 		}
@@ -939,92 +954,94 @@
 func (agent *LogicalDeviceAgent) flowTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("flowTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	// Run this callback in it's own go routine since callbacks are not invoked in their own
+	// go routine
+	go func(args ...interface{}) interface{} {
+		//agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
 
-	var previousData *ofp.Flows
-	var latestData *ofp.Flows
+		var previousData *ofp.Flows
+		var latestData *ofp.Flows
 
-	var ok bool
-	if previousData, ok = args[0].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.Flows); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
+		var ok bool
+		if previousData, ok = args[0].(*ofp.Flows); !ok {
+			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		}
+		if latestData, ok = args[1].(*ofp.Flows); !ok {
+			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		}
 
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
+		if reflect.DeepEqual(previousData.Items, latestData.Items) {
+			log.Debug("flow-update-not-required")
+			return nil
+		}
+
+		// Ensure the device graph has been setup
+		agent.setupDeviceGraph()
+
+		var groups *ofp.FlowGroups
+		lDevice, _ := agent.getLogicalDeviceWithoutLock()
+		groups = lDevice.FlowGroups
+		log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+
+		for deviceId, value := range deviceRules.GetRules() {
+			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		}
+
 		return nil
-	}
+	}(args...)
 
-	// Ensure the device graph has been setup
-	agent.setupDeviceGraph()
-
-	var groups *ofp.FlowGroups
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	groups = lDevice.FlowGroups
-	log.Debugw("flowsinfo", log.Fields{"flows": latestData, "groups": groups})
-	//groupsIf := agent.groupProxy.Get("/", 1, false, "")
-	//if groups, ok = groupsIf.(*ofp.FlowGroups); !ok {
-	//	log.Errorw("cannot-retrieve-groups", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "group": groupsIf})
-	//	//return errors.New("cannot-retrieve-groups")
-	//	groups = &ofp.FlowGroups{Items:nil}
-	//}
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
 	return nil
 }
 
 func (agent *LogicalDeviceAgent) groupTableUpdated(args ...interface{}) interface{} {
 	log.Debugw("groupTableUpdated-callback", log.Fields{"argsLen": len(args)})
 
-	//agent.lockLogicalDevice.Lock()
-	//defer agent.lockLogicalDevice.Unlock()
+	// Run this callback in it's own go routine since callbacks are not invoked in their own
+	// go routine
+	go func(args ...interface{}) interface{} {
+		//agent.lockLogicalDevice.Lock()
+		//defer agent.lockLogicalDevice.Unlock()
 
-	var previousData *ofp.FlowGroups
-	var latestData *ofp.FlowGroups
+		var previousData *ofp.FlowGroups
+		var latestData *ofp.FlowGroups
 
-	var ok bool
-	if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args0": args[0]})
-	}
-	if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
-		log.Errorw("invalid-args", log.Fields{"args1": args[1]})
-	}
+		var ok bool
+		if previousData, ok = args[0].(*ofp.FlowGroups); !ok {
+			log.Errorw("invalid-args", log.Fields{"args0": args[0]})
+		}
+		if latestData, ok = args[1].(*ofp.FlowGroups); !ok {
+			log.Errorw("invalid-args", log.Fields{"args1": args[1]})
+		}
 
-	if reflect.DeepEqual(previousData.Items, latestData.Items) {
-		log.Debug("flow-update-not-required")
+		if reflect.DeepEqual(previousData.Items, latestData.Items) {
+			log.Debug("flow-update-not-required")
+			return nil
+		}
+
+		// Ensure the device graph has been setup
+		agent.setupDeviceGraph()
+
+		var flows *ofp.Flows
+		lDevice, _ := agent.getLogicalDeviceWithoutLock()
+		flows = lDevice.Flows
+		log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
+		deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
+		log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+		for deviceId, value := range deviceRules.GetRules() {
+			agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
+			agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		}
 		return nil
-	}
+	}(args...)
 
-	// Ensure the device graph has been setup
-	agent.setupDeviceGraph()
-
-	var flows *ofp.Flows
-	lDevice, _ := agent.getLogicalDeviceWithoutLock()
-	flows = lDevice.Flows
-	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
-	//flowsIf := agent.flowProxy.Get("/", 1, false, "")
-	//if flows, ok = flowsIf.(*ofp.Flows); !ok {
-	//	log.Errorw("cannot-retrieve-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceId, "flows": flows})
-	//	//return errors.New("cannot-retrieve-groups")
-	//	flows = &ofp.Flows{Items:nil}
-	//}
-	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
-	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
-	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
 	return nil
 }
 
-func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut ) {
+func (agent *LogicalDeviceAgent) packetOut(packet *ofp.OfpPacketOut) {
 	log.Debugw("packet-out", log.Fields{"packet": packet.GetInPort()})
 	outPort := fd.GetPacketOutPort(packet)
 	//frame := packet.GetData()
@@ -1032,10 +1049,8 @@
 	agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
 }
 
-
 func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
 	packet_in := fd.MkPacketIn(port, packet)
 	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
 }
-
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index 4625518..64743cc 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -33,18 +33,18 @@
 	logicalDeviceAgents        map[string]*LogicalDeviceAgent
 	deviceMgr                  *DeviceManager
 	adapterProxy               *AdapterProxy
-	kafkaProxy                 *kafka.KafkaMessagingProxy
+	kafkaICProxy               *kafka.InterContainerProxy
 	clusterDataProxy           *model.Proxy
 	exitChannel                chan int
 	lockLogicalDeviceAgentsMap sync.RWMutex
 }
 
-func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaProxy *kafka.KafkaMessagingProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
+func newLogicalDeviceManager(deviceMgr *DeviceManager, kafkaICProxy *kafka.InterContainerProxy, cdProxy *model.Proxy) *LogicalDeviceManager {
 	var logicalDeviceMgr LogicalDeviceManager
 	logicalDeviceMgr.exitChannel = make(chan int, 1)
 	logicalDeviceMgr.logicalDeviceAgents = make(map[string]*LogicalDeviceAgent)
 	logicalDeviceMgr.deviceMgr = deviceMgr
-	logicalDeviceMgr.kafkaProxy = kafkaProxy
+	logicalDeviceMgr.kafkaICProxy = kafkaICProxy
 	logicalDeviceMgr.clusterDataProxy = cdProxy
 	logicalDeviceMgr.lockLogicalDeviceAgentsMap = sync.RWMutex{}
 	return &logicalDeviceMgr
@@ -325,7 +325,7 @@
 	sendAPIResponse(ctx, ch, res)
 }
 
-func (ldMgr *LogicalDeviceManager) packetOut( packetOut *openflow_13.PacketOut) {
+func (ldMgr *LogicalDeviceManager) packetOut(packetOut *openflow_13.PacketOut) {
 	log.Debugw("packetOut", log.Fields{"logicalDeviceId": packetOut.Id})
 	if agent := ldMgr.getLogicalDeviceAgent(packetOut.Id); agent != nil {
 		agent.packetOut(packetOut.PacketOut)
@@ -343,4 +343,3 @@
 	}
 	return nil
 }
-
diff --git a/rw_core/flow_decomposition/flow_decomposer.go b/rw_core/flow_decomposition/flow_decomposer.go
index bd0e591..a6b90aa 100644
--- a/rw_core/flow_decomposition/flow_decomposer.go
+++ b/rw_core/flow_decomposition/flow_decomposer.go
@@ -715,13 +715,13 @@
 			Type: ofp.OfpMatchType_OFPMT_OXM,
 			OxmFields: []*ofp.OfpOxmField{
 				{
-					OxmClass:ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
+					OxmClass: ofp.OfpOxmClass_OFPXMC_OPENFLOW_BASIC,
 					Field: &ofp.OfpOxmField_OfbField{
 						OfbField: InPort(port)},
 				},
 			},
 		},
-		Data:packet,
+		Data: packet,
 	}
 	return packetIn
 }
diff --git a/rw_core/main.go b/rw_core/main.go
index cd5dbe9..77ce304 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -22,6 +22,7 @@
 	grpcserver "github.com/opencord/voltha-go/common/grpc"
 	"github.com/opencord/voltha-go/common/log"
 	"github.com/opencord/voltha-go/db/kvstore"
+	"github.com/opencord/voltha-go/kafka"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
 	"github.com/opencord/voltha-go/rw_core/config"
 	c "github.com/opencord/voltha-go/rw_core/core"
@@ -38,8 +39,9 @@
 	halted      bool
 	exitChannel chan int
 	//kmp         *kafka.KafkaMessagingProxy
-	grpcServer *grpcserver.GrpcServer
-	core       *c.Core
+	grpcServer  *grpcserver.GrpcServer
+	kafkaClient kafka.Client
+	core        *c.Core
 	//For test
 	receiverChannels []<-chan *ca.InterContainerMessage
 }
@@ -60,6 +62,18 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
+func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+
+	log.Infow("kafka-client-type", log.Fields{"client": clientType})
+	switch clientType {
+	case "sarama":
+		return kafka.NewSaramaClient(
+			kafka.Host(host),
+			kafka.Port(port)), nil
+	}
+	return nil, errors.New("unsupported-client-type")
+}
+
 func newRWCore(cf *config.RWCoreFlags) *rwCore {
 	var rwCore rwCore
 	rwCore.config = cf
@@ -92,47 +106,10 @@
 	}
 }
 
-//func (rw *rwCore) createGRPCService(context.Context) {
-//	//	create an insecure gserver server
-//	rw.grpcServer = grpcserver.NewGrpcServer(rw.config.GrpcHost, rw.config.GrpcPort, nil, false)
-//	log.Info("grpc-server-created")
-//}
-
-//func (rw *rwCore) startKafkaMessagingProxy(ctx context.Context) error {
-//	log.Infow("starting-kafka-messaging-proxy", log.Fields{"host":rw.config.KafkaAdapterHost,
-//	"port":rw.config.KafkaAdapterPort, "topic":rw.config.CoreTopic})
-//	var err error
-//	if rw.kmp, err = kafka.NewKafkaMessagingProxy(
-//		kafka.KafkaHost(rw.config.KafkaAdapterHost),
-//		kafka.KafkaPort(rw.config.KafkaAdapterPort),
-//		kafka.DefaultTopic(&kafka.Topic{Name: rw.config.CoreTopic})); err != nil {
-//		log.Errorw("fail-to-create-kafka-proxy", log.Fields{"error": err})
-//		return err
-//	}
-//	if err = rw.kmp.Start(); err != nil {
-//		log.Fatalw("error-starting-messaging-proxy", log.Fields{"error": err})
-//		return err
-//	}
-//
-//	requestProxy := &c.RequestHandlerProxy{}
-//	rw.kmp.SubscribeWithTarget(kafka.Topic{Name: rw.config.CoreTopic}, requestProxy)
-//
-//	log.Info("started-kafka-messaging-proxy")
-//	return nil
-//}
-
 func (rw *rwCore) start(ctx context.Context) {
 	log.Info("Starting RW Core components")
 
-	//// Setup GRPC Server
-	//rw.createGRPCService(ctx)
-
-	//// Setup Kafka messaging services
-	//if err := rw.startKafkaMessagingProxy(ctx); err != nil {
-	//	log.Fatalw("failed-to-start-kafka-proxy", log.Fields{"err":err})
-	//}
-
-	// Setup KV Client
+	// Setup KV MsgClient
 	log.Debugw("create-kv-client", log.Fields{"kvstore": rw.config.KVStoreType})
 	err := rw.setKVClient()
 	if err == nil {
@@ -144,8 +121,12 @@
 			rw.config.KVTxnKeyDelTime)
 	}
 
+	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
+		log.Fatal("Unsupported-kafka-client")
+	}
+
 	// Create the core service
-	rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient)
+	rw.core = c.NewCore(rw.config.InstanceID, rw.config, rw.kvClient, rw.kafkaClient)
 
 	// start the core
 	rw.core.Start(ctx)
@@ -155,11 +136,6 @@
 	// Stop leadership tracking
 	rw.halted = true
 
-	//// Stop the Kafka messaging service
-	//if rw.kmp != nil {
-	//	rw.kmp.Stop()
-	//}
-
 	// send exit signal
 	rw.exitChannel <- 0
 
@@ -172,6 +148,12 @@
 		// Close the DB connection
 		rw.kvClient.Close()
 	}
+
+	rw.core.Stop(nil)
+
+	//if rw.kafkaClient != nil {
+	//	rw.kafkaClient.Stop()
+	//}
 }
 
 func waitForExit() int {
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index b31f2ce..0293d6d 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -33,33 +33,38 @@
 Prerequite:  Start the kafka/zookeeper containers.
 */
 
-var coreKafkaProxy *kk.KafkaMessagingProxy
-var adapterKafkaProxy *kk.KafkaMessagingProxy
+var coreKafkaProxy *kk.InterContainerProxy
+var adapterKafkaProxy *kk.InterContainerProxy
 
 func init() {
 	log.AddPackage(log.JSON, log.ErrorLevel, nil)
 	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
 	log.SetAllLogLevel(log.ErrorLevel)
+	kafkaClient := kk.NewSaramaClient(
+		kk.Host("10.176.212.108"),
+		kk.Port(9092))
 
-	coreKafkaProxy, _ = kk.NewKafkaMessagingProxy(
-		kk.KafkaHost("192.168.0.17"),
-		kk.KafkaPort(9092),
-		kk.DefaultTopic(&kk.Topic{Name: "Core"}))
+	coreKafkaProxy, _ = kk.NewInterContainerProxy(
+		kk.InterContainerHost("10.176.212.108"),
+		kk.InterContainerPort(9092),
+		kk.DefaultTopic(&kk.Topic{Name: "Core"}),
+		kk.MsgClient(kafkaClient))
 
-	adapterKafkaProxy, _ = kk.NewKafkaMessagingProxy(
-		kk.KafkaHost("192.168.0.17"),
-		kk.KafkaPort(9092),
-		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}))
+	adapterKafkaProxy, _ = kk.NewInterContainerProxy(
+		kk.InterContainerHost("10.176.212.108"),
+		kk.InterContainerPort(9092),
+		kk.DefaultTopic(&kk.Topic{Name: "Adapter"}),
+		kk.MsgClient(kafkaClient))
 
 	coreKafkaProxy.Start()
 	adapterKafkaProxy.Start()
 	subscribeTarget(coreKafkaProxy)
 }
 
-func subscribeTarget(kmp *kk.KafkaMessagingProxy) {
+func subscribeTarget(kmp *kk.InterContainerProxy) {
 	topic := kk.Topic{Name: "Core"}
 	requestProxy := &rhp.AdapterRequestHandlerProxy{TestMode: true}
-	kmp.SubscribeWithTarget(topic, requestProxy)
+	kmp.SubscribeWithRequestHandlerInterface(topic, requestProxy)
 }
 
 func waitForRPCMessage(topic kk.Topic, ch <-chan *ca.InterContainerMessage, doneCh chan string) {
@@ -76,66 +81,66 @@
 	}
 }
 
-func TestSubscribeUnsubscribe(t *testing.T) {
-	// First subscribe to the specific topic
-	topic := kk.Topic{Name: "Core"}
-	ch, err := coreKafkaProxy.Subscribe(topic)
-	assert.NotNil(t, ch)
-	assert.Nil(t, err)
-	// Create a channel to receive a response
-	waitCh := make(chan string)
-	// Wait for a message
-	go waitForRPCMessage(topic, ch, waitCh)
-	// Send the message - don't care of the response
-	rpc := "AnyRPCRequestForTest"
-	adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
-	// Wait for the result on ouw own channel
-	result := <-waitCh
-	assert.Equal(t, result, rpc)
-	close(waitCh)
-	err = coreKafkaProxy.UnSubscribe(topic, ch)
-	assert.Nil(t, err)
-}
-
-func TestMultipleSubscribeUnsubscribe(t *testing.T) {
-	// First subscribe to the specific topic
-	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
-	var err error
-	var ch1 <-chan *ca.InterContainerMessage
-	var ch2 <-chan *ca.InterContainerMessage
-	topic := kk.Topic{Name: "Core"}
-	ch1, err = coreKafkaProxy.Subscribe(topic)
-	assert.NotNil(t, ch1)
-	assert.Nil(t, err)
-	// Create a channel to receive responses
-	waitCh := make(chan string)
-	ch2, err = coreKafkaProxy.Subscribe(topic)
-	assert.NotNil(t, ch2)
-	assert.Nil(t, err)
-	// Wait for a message
-	go waitForRPCMessage(topic, ch2, waitCh)
-	go waitForRPCMessage(topic, ch1, waitCh)
-
-	// Send the message - don't care of the response
-	rpc := "AnyRPCRequestForTest"
-	adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
-	// Wait for the result on ouw own channel
-
-	responses := 0
-	for msg := range waitCh {
-		assert.Equal(t, msg, rpc)
-		responses = responses + 1
-		if responses > 1 {
-			break
-		}
-	}
-	assert.Equal(t, responses, 2)
-	close(waitCh)
-	err = coreKafkaProxy.UnSubscribe(topic, ch1)
-	assert.Nil(t, err)
-	err = coreKafkaProxy.UnSubscribe(topic, ch2)
-	assert.Nil(t, err)
-}
+//func TestSubscribeUnsubscribe(t *testing.T) {
+//	// First subscribe to the specific topic
+//	topic := kk.Topic{Name: "Core"}
+//	ch, err := coreKafkaProxy.Subs(topic)
+//	assert.NotNil(t, ch)
+//	assert.Nil(t, err)
+//	// Create a channel to receive a response
+//	waitCh := make(chan string)
+//	// Wait for a message
+//	go waitForRPCMessage(topic, ch, waitCh)
+//	// Send the message - don't care of the response
+//	rpc := "AnyRPCRequestForTest"
+//	adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
+//	// Wait for the result on ouw own channel
+//	result := <-waitCh
+//	assert.Equal(t, result, rpc)
+//	close(waitCh)
+//	err = coreKafkaProxy.UnSubscribe(topic, ch)
+//	assert.Nil(t, err)
+//}
+//
+//func TestMultipleSubscribeUnsubscribe(t *testing.T) {
+//	// First subscribe to the specific topic
+//	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+//	var err error
+//	var ch1 <-chan *ca.InterContainerMessage
+//	var ch2 <-chan *ca.InterContainerMessage
+//	topic := kk.Topic{Name: "Core"}
+//	ch1, err = coreKafkaProxy.Subscribe(topic)
+//	assert.NotNil(t, ch1)
+//	assert.Nil(t, err)
+//	// Create a channel to receive responses
+//	waitCh := make(chan string)
+//	ch2, err = coreKafkaProxy.Subscribe(topic)
+//	assert.NotNil(t, ch2)
+//	assert.Nil(t, err)
+//	// Wait for a message
+//	go waitForRPCMessage(topic, ch2, waitCh)
+//	go waitForRPCMessage(topic, ch1, waitCh)
+//
+//	// Send the message - don't care of the response
+//	rpc := "AnyRPCRequestForTest"
+//	adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true)
+//	// Wait for the result on ouw own channel
+//
+//	responses := 0
+//	for msg := range waitCh {
+//		assert.Equal(t, msg, rpc)
+//		responses = responses + 1
+//		if responses > 1 {
+//			break
+//		}
+//	}
+//	assert.Equal(t, responses, 2)
+//	close(waitCh)
+//	err = coreKafkaProxy.UnSubscribe(topic, ch1)
+//	assert.Nil(t, err)
+//	err = coreKafkaProxy.UnSubscribe(topic, ch2)
+//	assert.Nil(t, err)
+//}
 
 func TestIncorrectAPI(t *testing.T) {
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.ErrorLevel)
@@ -149,7 +154,7 @@
 	rpc := "IncorrectAPI"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic, true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
@@ -172,7 +177,7 @@
 	rpc := "GetDevice"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
@@ -199,7 +204,7 @@
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -224,7 +229,7 @@
 	ctx, cancel := context.WithTimeout(context.Background(), timeout)
 	defer cancel()
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(ctx, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
@@ -247,7 +252,7 @@
 	topic := kk.Topic{Name: "Core"}
 	expectedResponse := &voltha.Device{Id: trnsId}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -270,7 +275,7 @@
 	topic := kk.Topic{Name: "Core"}
 	expectedResponse := &voltha.Device{Id: trnsId}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -297,7 +302,7 @@
 	rpc := "GetPorts"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -320,7 +325,7 @@
 	rpc := "GetPorts"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
@@ -364,7 +369,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -403,7 +408,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, false, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,false, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)
@@ -432,7 +437,7 @@
 	rpc := "ChildDeviceDetected"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, false)
@@ -466,7 +471,7 @@
 	rpc := "DeviceStateUpdate"
 	topic := kk.Topic{Name: "Core"}
 	start := time.Now()
-	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, true, args...)
+	status, result := adapterKafkaProxy.InvokeRPC(nil, rpc, &topic, &topic,true, args...)
 	elapsed := time.Since(start)
 	log.Infow("Result", log.Fields{"status": status, "result": result, "time": elapsed})
 	assert.Equal(t, status, true)