diff --git a/kafka/client.go b/kafka/client.go
index 1df700e..8cc1999 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -26,9 +26,13 @@
 )
 
 const (
+	GroupIdKey = "groupId"
+)
+
+const (
 	DefaultKafkaHost                = "127.0.0.1"
 	DefaultKafkaPort                = 9092
-	DefaultGroupName                = "rw_core"
+	DefaultGroupName                = "voltha"
 	DefaultSleepOnError             = 1
 	DefaultProducerFlushFrequency   = 5
 	DefaultProducerFlushMessages    = 1
@@ -51,7 +55,7 @@
 	Stop()
 	CreateTopic(topic *Topic, numPartition int, repFactor int) error
 	DeleteTopic(topic *Topic) error
-	Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
+	Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
 	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
 	Send(msg interface{}, topic *Topic, keys ...string) error
 }
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 3381fdc..c527619 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -33,12 +33,12 @@
 
 // Initialize the logger - gets the default until the main function setup the logger
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 const (
 	DefaultMaxRetries     = 3
-	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
+	DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
 )
 
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
@@ -172,7 +172,7 @@
 	log.Info("stopping-intercontainer-proxy")
 	kp.doneCh <- 1
 	// TODO : Perform cleanup
-	//kp.kafkaClient.Stop()
+	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()
 	//kp.deleteAllTopicResponseChannelMap()
 	//kp.deleteAllTransactionIdToChannelMap()
@@ -331,6 +331,7 @@
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
@@ -676,7 +677,7 @@
 	for {
 		select {
 		case msg := <-subscribedCh:
-			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+			log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
 			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
@@ -700,6 +701,7 @@
 	// First check whether we already have a channel listening for response on that topic.  If there is
 	// already one then it will be reused.  If not, it will be created.
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
+		log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 		var subscribedCh <-chan *ic.InterContainerMessage
 		var err error
 		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 2df19e5..10c692a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -30,7 +30,7 @@
 )
 
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 type returnErrorFunction func() error
@@ -51,9 +51,10 @@
 	KafkaPort                     int
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
-	groupConsumer                 *scc.Consumer
+	groupConsumers                map[string]*scc.Consumer
+	consumerGroupPrefix           string
 	consumerType                  int
-	groupName                     string
+	consumerGroupName             string
 	producerFlushFrequency        int
 	producerFlushMessages         int
 	producerFlushMaxmessages      int
@@ -87,6 +88,18 @@
 	}
 }
 
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupPrefix = prefix
+	}
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupName = name
+	}
+}
+
 func ConsumerType(consumer int) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.consumerType = consumer
@@ -188,6 +201,8 @@
 		option(client)
 	}
 
+	client.groupConsumers = make(map[string]*scc.Consumer)
+
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
 	client.topicLockMap = make(map[string]*sync.RWMutex)
 	client.lockOfTopicLockMap = sync.RWMutex{}
@@ -214,15 +229,19 @@
 		return err
 	}
 
-	// Create the master consumers
-	if err := sc.createConsumer(); err != nil {
-		log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
-		return err
+	if sc.consumerType == DefaultConsumerType {
+		// Create the master consumers
+		if err := sc.createConsumer(); err != nil {
+			log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+			return err
+		}
 	}
 
 	// Create the topic to consumers/channel map
 	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
 
+	log.Info("kafka-sarama-client-started")
+
 	return nil
 }
 
@@ -234,39 +253,38 @@
 
 	if sc.producer != nil {
 		if err := sc.producer.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-producer-failed", log.Fields{"error": err})
 		}
 	}
 
 	if sc.consumer != nil {
 		if err := sc.consumer.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
 		}
 	}
 
-	if sc.groupConsumer != nil {
-		if err := sc.groupConsumer.Close(); err != nil {
-			panic(err)
+	for key, val := range sc.groupConsumers {
+		log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+		if err := val.Close(); err != nil {
+			log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
 		}
 	}
 
 	if sc.cAdmin != nil {
 		if err := sc.cAdmin.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
 		}
 	}
 
 	//TODO: Clear the consumers map
-	sc.clearConsumerChannelMap()
+	//sc.clearConsumerChannelMap()
 
 	log.Info("sarama-client-stopped")
 }
 
-//CreateTopic creates a topic on the Kafka Broker.
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
-	sc.lockTopic(topic)
-	defer sc.unLockTopic(topic)
-
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
 	// Set the topic details
 	topicDetail := &sarama.TopicDetail{}
 	topicDetail.NumPartitions = int32(numPartition)
@@ -290,6 +308,15 @@
 	return nil
 }
 
+//CreateTopic is a public API to create a topic on the Kafka Broker.  It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	return sc.createTopic(topic, numPartition, repFactor)
+}
+
 //DeleteTopic removes a topic from the kafka Broker
 func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
 	sc.lockTopic(topic)
@@ -316,7 +343,7 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -338,7 +365,7 @@
 	// Use the consumerType option to figure out the type of consumer to launch
 	if sc.consumerType == PartitionConsumer {
 		if sc.autoCreateTopic {
-			if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+			if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
 				log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
 				return nil, err
 			}
@@ -350,10 +377,26 @@
 	} else if sc.consumerType == GroupCustomer {
 		// TODO: create topic if auto create is on.  There is an issue with the sarama cluster library that
 		// does not consume from a precreated topic in some scenarios
-		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
-			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		//if sc.autoCreateTopic {
+		//	if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+		//		log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+		//		return nil, err
+		//	}
+		//}
+		//groupId := sc.consumerGroupName
+		groupId := getGroupId(kvArgs...)
+		// Include the group prefix
+		if groupId != "" {
+			groupId = sc.consumerGroupPrefix + groupId
+		} else {
+			// Need to use a unique group Id per topic
+			groupId = sc.consumerGroupPrefix + topic.Name
+		}
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
+
 	} else {
 		log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
 		return nil, errors.New("unknown-consumer-type")
@@ -416,6 +459,16 @@
 	return nil
 }
 
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+	for _, arg := range kvArgs {
+		if arg.Key == GroupIdKey {
+			return arg.Value.(string)
+		}
+	}
+	return ""
+}
+
 func (sc *SaramaClient) createClusterAdmin() error {
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	config := sarama.NewConfig()
@@ -623,7 +676,7 @@
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
@@ -632,24 +685,22 @@
 	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
 
-	if groupId == nil {
-		g := DefaultGroupName
-		groupId = &g
-	}
 	topics := []string{topic.Name}
 	var consumer *scc.Consumer
 	var err error
 
-	if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
-		log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+		log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 	//time.Sleep(10*time.Second)
-	sc.groupConsumer = consumer
+	//sc.groupConsumer = consumer
+	sc.groupConsumers[topic.Name] = consumer
 	return consumer, nil
 }
 
@@ -704,24 +755,36 @@
 func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
 	log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
+	//go func() {
+	//	for msg := range consumer.Errors() {
+	//		log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
+	//	}
+	//}()
+	//
+	//go func() {
+	//	for ntf := range consumer.Notifications() {
+	//		log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+	//	}
+	//}()
+
 startloop:
 	for {
 		select {
 		case err := <-consumer.Errors():
 			if err != nil {
-				log.Warnw("group-consumers-error", log.Fields{"error": err})
+				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
 				// There is a race condition when this loop is stopped and the consumer is closed where
 				// the actual error comes as nil
-				log.Warn("group-consumers-error")
+				log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
 			}
 		case msg := <-consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			if msg == nil {
 				// There is a race condition when this loop is stopped and the consumer is closed where
 				// the actual msg comes as nil
 				break startloop
 			}
+			log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
@@ -800,7 +863,7 @@
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+	if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
 		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
diff --git a/python/adapters/kafka/adapter_request_facade.py b/python/adapters/kafka/adapter_request_facade.py
index 0efb811..de8ae0b 100644
--- a/python/adapters/kafka/adapter_request_facade.py
+++ b/python/adapters/kafka/adapter_request_facade.py
@@ -22,6 +22,7 @@
 from twisted.internet.defer import inlineCallbacks
 from zope.interface import implementer
 from twisted.internet import reactor
+
 from afkak.consumer import OFFSET_LATEST, OFFSET_EARLIEST
 from python.adapters.interface import IAdapterInterface
 from python.protos.inter_container_pb2 import IntType, InterAdapterMessage, StrType, Error, ErrorCode
@@ -29,7 +30,7 @@
 from python.protos.openflow_13_pb2 import FlowChanges, FlowGroups, Flows, \
     FlowGroupChanges, ofp_packet_out
 from python.adapters.kafka.kafka_inter_container_library import IKafkaMessagingProxy, \
-    get_messaging_proxy
+    get_messaging_proxy, KAFKA_OFFSET_LATEST, KAFKA_OFFSET_EARLIEST
 
 log = structlog.get_logger()
 
@@ -72,7 +73,7 @@
         kafka_proxy = get_messaging_proxy()
         device_topic = kafka_proxy.get_default_topic() + "_" + deviceId
         # yield kafka_proxy.create_topic(topic=device_topic)
-        yield kafka_proxy.subscribe(topic=device_topic, target_cls=self, offset=OFFSET_EARLIEST)
+        yield kafka_proxy.subscribe(topic=device_topic, group_id=device_topic, target_cls=self, offset=KAFKA_OFFSET_EARLIEST)
         log.debug("subscribed-to-topic", topic=device_topic)
 
     def adopt_device(self, device):
diff --git a/python/adapters/kafka/kafka_inter_container_library.py b/python/adapters/kafka/kafka_inter_container_library.py
index fbb0834..5cad2e8 100644
--- a/python/adapters/kafka/kafka_inter_container_library.py
+++ b/python/adapters/kafka/kafka_inter_container_library.py
@@ -18,8 +18,6 @@
 from uuid import uuid4
 
 import structlog
-from afkak.client import KafkaClient
-from afkak.consumer import OFFSET_LATEST, Consumer
 from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue, Deferred, \
     DeferredQueue, gatherResults
@@ -34,6 +32,9 @@
 
 log = structlog.get_logger()
 
+KAFKA_OFFSET_LATEST = 'latest'
+KAFKA_OFFSET_EARLIEST = 'earliest'
+
 
 class KafkaMessagingError(BaseException):
     def __init__(self, error):
@@ -48,6 +49,7 @@
                  kafka_host_port,
                  kv_store,
                  default_topic,
+                 group_id_prefix,
                  target_cls):
         """
         Initialize the kafka proxy.  This is a singleton (may change to
@@ -67,15 +69,15 @@
         self.kafka_host_port = kafka_host_port
         self.kv_store = kv_store
         self.default_topic = default_topic
+        self.default_group_id = "_".join((group_id_prefix, default_topic))
         self.target_cls = target_cls
         self.topic_target_cls_map = {}
-        self.topic_consumer_map = {}
         self.topic_callback_map = {}
         self.subscribers = {}
-        self.kafka_client = None
         self.kafka_proxy = None
         self.transaction_id_deferred_map = {}
         self.received_msg_queue = DeferredQueue()
+        self.stopped = False
 
         self.init_time = 0
         self.init_received_time = 0
@@ -91,11 +93,7 @@
 
     def start(self):
         try:
-            # Create the kafka client
-            # assert self.kafka_host is not None
-            # assert self.kafka_port is not None
-            # kafka_host_port = ":".join((self.kafka_host, self.kafka_port))
-            self.kafka_client = KafkaClient(self.kafka_host_port)
+            log.debug("KafkaProxy-starting")
 
             # Get the kafka proxy instance.  If it does not exist then
             # create it
@@ -110,12 +108,17 @@
             # Start the queue to handle incoming messages
             reactor.callLater(0, self._received_message_processing_loop)
 
-            # Start listening for incoming messages
-            reactor.callLater(0, self.subscribe, self.default_topic,
-                              target_cls=self.target_cls)
+            # Subscribe using the default topic and default group id.  Whenever
+            # a message is received on that topic then teh target_cls will be
+            # invoked.
+            reactor.callLater(0, self.subscribe,
+                              topic=self.default_topic,
+                              target_cls=self.target_cls,
+                              group_id=self.default_group_id)
 
             # Setup the singleton instance
             IKafkaMessagingProxy._kafka_messaging_instance = self
+            log.debug("KafkaProxy-started")
         except Exception as e:
             log.exception("Failed-to-start-proxy", e=e)
 
@@ -126,37 +129,14 @@
         """
         log.debug("Stopping-messaging-proxy ...")
         try:
-            # Stop all the consumers
-            deferred_list = []
-            for key, values in self.topic_consumer_map.iteritems():
-                deferred_list.extend([c.stop() for c in values])
-
-            if not deferred_list:
-                d = gatherResults(deferred_list)
-                d.addCallback(lambda result: self.kafka_client.close())
+            # Stop the kafka proxy.  This will stop all the consumers
+            # and producers
+            self.stopped = True
+            self.kafka_proxy.stop()
             log.debug("Messaging-proxy-stopped.")
         except Exception as e:
             log.exception("Exception-when-stopping-messaging-proxy:", e=e)
 
-
-    @inlineCallbacks
-    def create_topic(self, topic):
-        yield self._wait_until_topic_is_ready(self.kafka_client, topic)
-
-    @inlineCallbacks
-    def _wait_until_topic_is_ready(self, client, topic):
-        e = True
-        while e:
-            yield client.load_metadata_for_topics(topic)
-            e = client.metadata_error_for_topic(topic)
-            if e:
-                log.debug("Topic-not-ready-retrying...", topic=topic)
-
-    def _clear_backoff(self):
-        if self.retries:
-            log.info('reconnected-to-consul', after_retries=self.retries)
-            self.retries = 0
-
     def get_target_cls(self):
         return self.target_cls
 
@@ -164,24 +144,13 @@
         return self.default_topic
 
     @inlineCallbacks
-    def _subscribe(self, topic, offset, callback=None, target_cls=None):
+    def _subscribe_group_consumer(self, group_id, topic, offset, callback=None,
+                                  target_cls=None):
         try:
             log.debug("subscribing-to-topic-start", topic=topic)
-            yield self._wait_until_topic_is_ready(self.kafka_client, topic)
-            partitions = self.kafka_client.topic_partitions[topic]
-            consumers = []
-
-            # First setup the generic callback - all received messages will
-            # go through that queue
-            if topic not in self.topic_consumer_map:
-                log.debug("topic-not-in-consumer-map", topic=topic)
-                consumers = [Consumer(self.kafka_client, topic, partition,
-                                      self._enqueue_received_message)
-                             for partition in partitions]
-                self.topic_consumer_map[topic] = consumers
-
-            log.debug("_subscribe", topic=topic,
-                      consumermap=self.topic_consumer_map)
+            yield self.kafka_proxy.subscribe(topic,
+                                             self._enqueue_received_group_message,
+                                             group_id, offset)
 
             if target_cls is not None and callback is None:
                 # Scenario #1
@@ -198,25 +167,6 @@
             else:
                 log.warn("invalid-parameters")
 
-            def cb_closed(result):
-                """
-                Called when a consumer cleanly stops.
-                """
-                log.debug("Consumers-cleanly-stopped")
-
-            def eb_failed(failure):
-                """
-                Called when a consumer fails due to an uncaught exception in the
-                processing callback or a network error on shutdown. In this case we
-                simply log the error.
-                """
-                log.warn("Consumers-failed", failure=failure)
-
-            for c in consumers:
-                c.start(offset).addCallbacks(cb_closed, eb_failed)
-
-            log.debug("subscribed-to-topic", topic=topic)
-
             returnValue(True)
         except Exception as e:
             log.exception("Exception-during-subscription", e=e)
@@ -224,7 +174,7 @@
 
     @inlineCallbacks
     def subscribe(self, topic, callback=None, target_cls=None,
-                  max_retry=3, offset=OFFSET_LATEST):
+                  max_retry=3, group_id=None, offset=KAFKA_OFFSET_LATEST):
         """
         Scenario 1:  invoked to subscribe to a specific topic with a
         target_cls to invoke when a message is received on that topic.  This
@@ -245,6 +195,8 @@
         :param max_retry:  the number of retries before reporting failure
         to subscribe.  This caters for scenario where the kafka topic is not
         ready.
+        :param group_id:  The ID of the group the consumer is subscribing to
+        :param offset: The topic offset on the kafka bus from where message consumption will start
         :return: True on success, False on failure
         """
         RETRY_BACKOFF = [0.05, 0.1, 0.2, 0.5, 1, 2, 5]
@@ -253,13 +205,20 @@
             wait_time = RETRY_BACKOFF[min(retries,
                                           len(RETRY_BACKOFF) - 1)]
             log.info(msg, retry_in=wait_time)
-            return asleep(wait_time)
+            return asleep.asleep(wait_time)
+
+        log.debug("subscribing", topic=topic, group_id=group_id,
+                  callback=callback, target=target_cls)
 
         retry = 0
         subscribed = False
+        if group_id is None:
+            group_id = self.default_group_id
         while not subscribed:
-            subscribed = yield self._subscribe(topic, callback=callback,
-                                               target_cls=target_cls, offset=offset)
+            subscribed = yield self._subscribe_group_consumer(group_id, topic,
+                                                              callback=callback,
+                                                              target_cls=target_cls,
+                                                              offset=offset)
             if subscribed:
                 returnValue(True)
             elif retry > max_retry:
@@ -268,51 +227,56 @@
                 _backoff("subscription-not-complete", retry)
                 retry += 1
 
-        # while not self._subscribe(topic, callback=callback,
-        #                           target_cls=target_cls):
-        #     if retry > max_retry:
-        #         return False
-        #     else:
-        #         _backoff("subscription-not-complete", retry)
-        #         retry += 1
-        # return True
-
-    def unsubscribe(self, topic):
+    def unsubscribe(self, topic, callback=None, target_cls=None):
         """
         Invoked when unsubscribing to a topic
-        :param topic: topic to unsubscibe from
+        :param topic: topic to unsubscribe from
+        :param callback:  the callback used when subscribing to the topic, if any
+        :param target_cls: the targert class used when subscribing to the topic, if any
         :return: None on success or Exception on failure
         """
         log.debug("Unsubscribing-to-topic", topic=topic)
 
-        def remove_topic(topic):
-            if topic in self.topic_consumer_map:
-                del self.topic_consumer_map[topic]
-
         try:
-            if topic in self.topic_consumer_map:
-                consumers = self.topic_consumer_map[topic]
-                d = gatherResults([c.stop() for c in consumers])
-                d.addCallback(remove_topic, topic)
-                log.debug("Unsubscribed-to-topic.", topic=topic)
-            else:
-                log.debug("Topic-does-not-exist.", topic=topic)
+            self.kafka_proxy.unsubscribe(topic,
+                                         self._enqueue_received_group_message)
+
+            if callback is None and target_cls is None:
+                log.error("both-call-and-target-cls-cannot-be-none",
+                          topic=topic)
+                raise KafkaMessagingError(
+                    error="both-call-and-target-cls-cannot-be-none")
+
+            if target_cls is not None and topic in self.topic_target_cls_map:
+                del self.topic_target_cls_map[topic]
+
+            if callback is not None and topic in self.topic_callback_map:
+                index = 0
+                for cb in self.topic_callback_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callback_map[topic]):
+                    self.topic_callback_map[topic].pop(index)
+
+                if len(self.topic_callback_map[topic]) == 0:
+                    del self.topic_callback_map[topic]
         except Exception as e:
-            log.exception("Exception-when-stopping-messaging-proxy:", e=e)
+            log.exception("Exception-when-unsubscribing-to-topic", topic=topic,
+                          e=e)
+            return e
 
     @inlineCallbacks
-    def _enqueue_received_message(self, reactor, message_list):
+    def _enqueue_received_group_message(self, msg):
         """
         Internal method to continuously queue all received messaged
         irrespective of topic
-        :param reactor: A requirement by the Twisted Python kafka library
-        :param message_list: Received list of messages
+        :param msg: Received message
         :return: None on success, Exception on failure
         """
         try:
-            for m in message_list:
-                log.debug("received-msg", msg=m)
-                yield self.received_msg_queue.put(m)
+            log.debug("received-msg", msg=msg)
+            yield self.received_msg_queue.put(msg)
         except Exception as e:
             log.exception("Failed-enqueueing-received-message", e=e)
 
@@ -327,6 +291,8 @@
             try:
                 message = yield self.received_msg_queue.get()
                 yield self._process_message(message)
+                if self.stopped:
+                    break
             except Exception as e:
                 log.exception("Failed-dequeueing-received-message", e=e)
 
@@ -457,16 +423,18 @@
         current_time = int(round(time.time() * 1000))
         # log.debug("Got Message", message=m)
         try:
-            val = m.message.value
+            val = m.value()
+            # val = m.message.value
             # print m.topic
 
             # Go over customized callbacks first
-            if m.topic in self.topic_callback_map:
-                for c in self.topic_callback_map[m.topic]:
+            m_topic = m.topic()
+            if m_topic in self.topic_callback_map:
+                for c in self.topic_callback_map[m_topic]:
                     yield c(val)
 
             #  Check whether we need to process request/response scenario
-            if m.topic not in self.topic_target_cls_map:
+            if m_topic not in self.topic_target_cls_map:
                 return
 
             # Process request/response scenario
@@ -506,7 +474,8 @@
                                 response.header.to_topic)
                             self._send_kafka_message(res_topic, response)
 
-                        log.debug("Response-sent", response=response.body, to_topic=res_topic)
+                        log.debug("Response-sent", response=response.body,
+                                  to_topic=res_topic)
             elif message.header.type == MessageType.Value("RESPONSE"):
                 trns_id = self._to_string(message.header.id)
                 if trns_id in self.transaction_id_deferred_map:
diff --git a/python/adapters/kafka/kafka_proxy.py b/python/adapters/kafka/kafka_proxy.py
index d596334..cefc590 100644
--- a/python/adapters/kafka/kafka_proxy.py
+++ b/python/adapters/kafka/kafka_proxy.py
@@ -14,18 +14,19 @@
 # limitations under the License.
 #
 
-from afkak.client import KafkaClient as _KafkaClient
-from afkak.common import (
-    PRODUCER_ACK_NOT_REQUIRED
-)
-from afkak.producer import Producer as _kafkaProducer
+
+from confluent_kafka import Producer as _kafkaProducer
 from structlog import get_logger
+from twisted.internet import reactor
 from twisted.internet.defer import inlineCallbacks, returnValue
+from twisted.internet.threads import deferToThread
 from zope.interface import implementer
 
 from python.common.utils.consulhelpers import get_endpoint_from_consul
-from python.common.utils.registry import IComponent
 from event_bus_publisher import EventBusPublisher
+from python.common.utils.registry import IComponent
+from confluent_kafka import Consumer, KafkaError
+import threading
 
 log = get_logger()
 
@@ -33,7 +34,11 @@
 @implementer(IComponent)
 class KafkaProxy(object):
     """
-    This is a singleton proxy kafka class to hide the kafka client details.
+    This is a singleton proxy kafka class to hide the kafka client details. This
+    proxy uses confluent-kafka-python as the kafka client. Since that client is
+    not a Twisted client then requests to that client are wrapped with
+    twisted.internet.threads.deferToThread to avoid any potential blocking of
+    the Twisted loop.
     """
     _kafka_instance = None
 
@@ -42,6 +47,7 @@
                  kafka_endpoint='localhost:9092',
                  ack_timeout=1000,
                  max_req_attempts=10,
+                 consumer_poll_timeout=10,
                  config={}):
 
         # return an exception if the object already exist
@@ -59,6 +65,10 @@
         self.event_bus_publisher = None
         self.stopping = False
         self.faulty = False
+        self.consumer_poll_timeout = consumer_poll_timeout
+        self.topic_consumer_map = {}
+        self.topic_callbacks_map = {}
+        self.topic_any_map_lock = threading.Lock()
         log.debug('initialized', endpoint=kafka_endpoint)
 
     @inlineCallbacks
@@ -77,6 +87,7 @@
     def stop(self):
         try:
             log.debug('stopping-kafka-proxy')
+            self.stopping = True
             try:
                 if self.kclient:
                     yield self.kclient.close()
@@ -84,16 +95,38 @@
                     log.debug('stopped-kclient-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kclient-kafka-proxy', e=e)
-                pass
 
             try:
                 if self.kproducer:
-                    yield self.kproducer.stop()
+                    yield self.kproducer.flush()
                     self.kproducer = None
                     log.debug('stopped-kproducer-kafka-proxy')
             except Exception, e:
                 log.exception('failed-stopped-kproducer-kafka-proxy', e=e)
-                pass
+
+            # Stop all consumers
+            try:
+                self.topic_any_map_lock.acquire()
+                log.debug('stopping-consumers-kafka-proxy')
+                for _, c in self.topic_consumer_map.iteritems():
+                    yield deferToThread(c.close)
+                self.topic_consumer_map.clear()
+                self.topic_callbacks_map.clear()
+                log.debug('stopped-consumers-kafka-proxy')
+            except Exception, e:
+                log.exception('failed-stopped-consumers-kafka-proxy', e=e)
+            finally:
+                self.topic_any_map_lock.release()
+                log.debug('stopping-consumers-kafka-proxy-released-lock')
+
+            # try:
+            #    if self.event_bus_publisher:
+            #        yield self.event_bus_publisher.stop()
+            #        self.event_bus_publisher = None
+            #        log.debug('stopped-event-bus-publisher-kafka-proxy')
+            # except Exception, e:
+            #    log.debug('failed-stopped-event-bus-publisher-kafka-proxy')
+            #    pass
 
             log.debug('stopped-kafka-proxy')
 
@@ -105,8 +138,7 @@
             pass
 
     def _get_kafka_producer(self):
-        # PRODUCER_ACK_LOCAL_WRITE : server will wait till the data is written
-        #  to a local log before sending response
+
         try:
 
             if self.kafka_endpoint.startswith('@'):
@@ -124,39 +156,125 @@
                     return
             else:
                 _k_endpoint = self.kafka_endpoint
-
-            self.kclient = _KafkaClient(_k_endpoint)
-            self.kproducer = _kafkaProducer(self.kclient,
-                                            req_acks=PRODUCER_ACK_NOT_REQUIRED,
-                                            # req_acks=PRODUCER_ACK_LOCAL_WRITE,
-                                            # ack_timeout=self.ack_timeout,
-                                            # max_req_attempts=self.max_req_attempts)
-                                            )
+            self.kproducer = _kafkaProducer(
+                {'bootstrap.servers': _k_endpoint,
+                 }
+            )
+            pass
         except Exception, e:
             log.exception('failed-get-kafka-producer', e=e)
             return
 
+    @inlineCallbacks
+    def _wait_for_messages(self, consumer, topic):
+        while True:
+            try:
+                msg = yield deferToThread(consumer.poll,
+                                          self.consumer_poll_timeout)
 
-    # @inlineCallbacks
-    # def wait_until_topic_is_ready(self, topic):
-    #     #  Assumes "auto.create.topics.enable" is set in the broker
-    #     #  configuration
-    #     e = True
-    #     while e:
-    #         yield self.kclient.load_metadata_for_topics(topic)
-    #         e = self.kclient.metadata_error_for_topic(topic)
-    #         if e:
-    #             log.debug("Topic-not-ready-retrying...", topic=topic)
+                if self.stopping:
+                    log.debug("stop-request-recieved", topic=topic)
+                    break
 
+                if msg is None:
+                    continue
+                if msg.error():
+                    # This typically is received when there are no more messages
+                    # to read from kafka. Ignore.
+                    continue
+
+                # Invoke callbacks
+                for cb in self.topic_callbacks_map[topic]:
+                    yield cb(msg)
+            except Exception as e:
+                log.debug("exception-receiving-msg", topic=topic, e=e)
 
     @inlineCallbacks
-    def create_topic(self, topic):
-        # Assume auto create is enabled on the broker configuration
-        yield self.wait_until_topic_is_ready(topic)
+    def subscribe(self, topic, callback, groupId, offset='latest'):
+        """
+        subscribe allows a caller to subscribe to a given kafka topic.  This API
+        always create a group consumer.
+        :param topic - the topic to subscribe to
+        :param callback - the callback to invoke whenever a message is received
+        on that topic
+        :param groupId - the groupId for this consumer.  In the current
+        implementation there is a one-to-one mapping between a topic and a
+        groupId.  In other words, once a groupId is used for a given topic then
+        we won't be able to create another groupId for the same topic.
+        :param offset:  the kafka offset from where the consumer will start
+        consuming messages
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            if topic in self.topic_consumer_map:
+                # Just add the callback
+                if topic in self.topic_callbacks_map:
+                    self.topic_callbacks_map[topic].append(callback)
+                else:
+                    self.topic_callbacks_map[topic] = [callback]
+                return
 
+            # Create consumer for that topic
+            c = Consumer({
+                'bootstrap.servers': self.kafka_endpoint,
+                'group.id': groupId,
+                'auto.offset.reset': offset
+            })
+            yield deferToThread(c.subscribe, [topic])
+            # c.subscribe([topic])
+            self.topic_consumer_map[topic] = c
+            self.topic_callbacks_map[topic] = [callback]
+            # Start the consumer
+            reactor.callLater(0, self._wait_for_messages, c, topic)
+        except Exception, e:
+            log.exception("topic-subscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
 
     @inlineCallbacks
-    def send_message(self, topic, msg):
+    def unsubscribe(self, topic, callback):
+        """
+        Unsubscribe to a given topic.  Since there they be multiple callers
+        consuming from the same topic then to ensure only the relevant caller
+        gets unsubscribe then the callback is used as a differentiator.   The
+        kafka consumer will be closed when there are no callbacks required.
+        :param topic: topic to unsubscribe
+        :param callback: callback the caller used when subscribing to the topic.
+        If multiple callers have subscribed to a topic using the same callback
+        then the first callback on the list will be removed.
+        :return:None
+        """
+        try:
+            self.topic_any_map_lock.acquire()
+            log.debug("unsubscribing-to-topic", topic=topic)
+            if topic in self.topic_callbacks_map:
+                index = 0
+                for cb in self.topic_callbacks_map[topic]:
+                    if cb == callback:
+                        break
+                    index += 1
+                if index < len(self.topic_callbacks_map[topic]):
+                    self.topic_callbacks_map[topic].pop(index)
+
+                if len(self.topic_callbacks_map[topic]) == 0:
+                    # Stop the consumer
+                    if topic in self.topic_consumer_map:
+                        yield deferToThread(
+                            self.topic_consumer_map[topic].close)
+                        del self.topic_consumer_map[topic]
+                    del self.topic_callbacks_map[topic]
+                    log.debug("unsubscribed-to-topic", topic=topic)
+                else:
+                    log.debug("consumers-for-topic-still-exist", topic=topic,
+                              num=len(self.topic_callbacks_map[topic]))
+        except Exception, e:
+            log.exception("topic-unsubscription-error", e=e)
+        finally:
+            self.topic_any_map_lock.release()
+            log.debug("unsubscribing-to-topic-release-lock", topic=topic)
+
+    @inlineCallbacks
+    def send_message(self, topic, msg, key=None):
         assert topic is not None
         assert msg is not None
 
@@ -174,22 +292,23 @@
                                   endpoint=self.kafka_endpoint)
                         return
 
-                # log.debug('sending-kafka-msg', topic=topic, msg=msg)
+                log.debug('sending-kafka-msg', topic=topic, kafka_msg=msg)
                 msgs = [msg]
 
-                if self.kproducer and self.kclient and \
-                        self.event_bus_publisher and self.faulty is False:
-                    # log.debug('sending-kafka-msg-I-am-here0', time=int(round(time.time() * 1000)))
-
-                    yield self.kproducer.send_messages(topic, msgs=msgs)
-                    # self.kproducer.send_messages(topic, msgs=msgs)
-                    # log.debug('sent-kafka-msg', topic=topic, msg=msg)
+                if self.kproducer is not None and self.event_bus_publisher and self.faulty is False:
+                    d = deferToThread(self.kproducer.produce, topic, msg, key)
+                    yield d
+                    log.debug('sent-kafka-msg', topic=topic, kafka_msg=msg)
+                    # send a lightweight poll to avoid an exception after 100k messages.
+                    d1 = deferToThread(self.kproducer.poll, 0)
+                    yield d1
                 else:
                     return
 
         except Exception, e:
             self.faulty = True
-            log.error('failed-to-send-kafka-msg', topic=topic, msg=msg, e=e)
+            log.error('failed-to-send-kafka-msg', topic=topic, kafka_msg=msg,
+                      e=e)
 
             # set the kafka producer to None.  This is needed if the
             # kafka docker went down and comes back up with a different
diff --git a/python/adapters/ponsim_olt/main.py b/python/adapters/ponsim_olt/main.py
index 09b78fc..86b6be0 100755
--- a/python/adapters/ponsim_olt/main.py
+++ b/python/adapters/ponsim_olt/main.py
@@ -63,8 +63,8 @@
     core_topic=os.environ.get('CORE_TOPIC', 'rwcore'),
     interface=os.environ.get('INTERFACE', get_my_primary_interface()),
     instance_id=os.environ.get('INSTANCE_ID', os.environ.get('HOSTNAME', '1')),
-    kafka_adapter=os.environ.get('KAFKA_ADAPTER', '192.168.0.20:9092'),
-    kafka_cluster=os.environ.get('KAFKA_CLUSTER', '10.100.198.220:9092'),
+    kafka_adapter=os.environ.get('KAFKA_ADAPTER', '172.20.10.3:9092'),
+    kafka_cluster=os.environ.get('KAFKA_CLUSTER', '172.20.10.3:9092'),
     backend=os.environ.get('BACKEND', 'none'),
     retry_interval=os.environ.get('RETRY_INTERVAL', 2),
     heartbeat_topic=os.environ.get('HEARTBEAT_TOPIC', "adapters.heartbeat"),
@@ -380,7 +380,7 @@
                     # TODO: Add KV Store object reference
                     kv_store=self.args.backend,
                     default_topic=self.args.name,
-                    # Needs to assign a real class
+                    group_id_prefix=self.args.instance_id,
                     target_cls=ponsim_request_handler
                 )
             ).start()
diff --git a/python/adapters/ponsim_onu/main.py b/python/adapters/ponsim_onu/main.py
index d6418e9..c4c12db 100755
--- a/python/adapters/ponsim_onu/main.py
+++ b/python/adapters/ponsim_onu/main.py
@@ -379,6 +379,7 @@
                     # TODO: Add KV Store object reference
                     kv_store=self.args.backend,
                     default_topic=self.args.name,
+                    group_id_prefix=self.args.instance_id,
                     target_cls=ponsim_request_handler
                 )
             ).start()
diff --git a/rw_core/core/id.go b/rw_core/core/id.go
index b28151f..369b4f1 100644
--- a/rw_core/core/id.go
+++ b/rw_core/core/id.go
@@ -18,7 +18,10 @@
 import (
 	"crypto/rand"
 	"encoding/hex"
+	"errors"
+	"fmt"
 	m "math/rand"
+	"strconv"
 )
 
 func randomHex(n int) (string, error) {
@@ -49,3 +52,16 @@
 	//	A logical port is a uint32
 	return m.Uint32()
 }
+
+func CreateDataPathId(idInHexString string) (uint64, error) {
+	if idInHexString == "" {
+		return 0, errors.New("id-empty")
+	}
+	// First prepend 0x to the string
+	newId := fmt.Sprintf("0x%s", idInHexString)
+	if d, err := strconv.ParseUint(newId, 0, 64); err != nil {
+		return 0, err
+	} else {
+		return d, nil
+	}
+}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 86b79f9..ee3d82a 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -75,7 +75,16 @@
 		log.Errorw("error-creating-logical-device", log.Fields{"error": err})
 		return err
 	}
+
 	ld := &voltha.LogicalDevice{Id: agent.logicalDeviceId, RootDeviceId: agent.rootDeviceId}
+
+	// Create the datapath ID (uint64) using the logical device ID (based on the MAC Address)
+	var datapathID uint64
+	if datapathID, err = CreateDataPathId(agent.logicalDeviceId); err != nil {
+		log.Errorw("error-creating-datapath-id", log.Fields{"error": err})
+		return err
+	}
+	ld.DatapathId = datapathID
 	ld.Desc = (proto.Clone(switchCap.Desc)).(*ofp.OfpDesc)
 	ld.SwitchFeatures = (proto.Clone(switchCap.SwitchFeatures)).(*ofp.OfpSwitchFeatures)
 	ld.Flows = &ofp.Flows{Items: nil}
@@ -596,7 +605,7 @@
 	groupsChanged := false
 	groupId := groupMod.GroupId
 	if idx := fu.FindGroup(groups, groupId); idx == -1 {
-		return errors.New(fmt.Sprintf("group-absent:%s", groupId))
+		return errors.New(fmt.Sprintf("group-absent:%d", groupId))
 	} else {
 		//replace existing group entry with new group definition
 		groupEntry := fd.GroupEntryFromGroupMod(groupMod)
@@ -878,7 +887,7 @@
 	}
 
 	deviceNodeIds := agent.deviceGraph.GetDeviceNodeIds()
-	for deviceId, _ := range deviceNodeIds {
+	for deviceId := range deviceNodeIds {
 		if deviceId == ld.RootDeviceId {
 			rules.AddFlowsAndGroup(deviceId, agent.rootDeviceDefaultRules())
 		} else {
@@ -960,9 +969,14 @@
 	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *latestData, *groups)
 	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
 
+	var err error
 	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
+		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
+			log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+		}
+		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
+			log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+		}
 	}
 
 	return nil
@@ -996,11 +1010,16 @@
 	log.Debugw("groupsinfo", log.Fields{"groups": latestData, "flows": flows})
 	deviceRules := agent.flowDecomposer.DecomposeRules(agent, *flows, *latestData)
 	log.Debugw("rules", log.Fields{"rules": deviceRules.String()})
+	var err error
 	for deviceId, value := range deviceRules.GetRules() {
-		agent.deviceMgr.updateFlows(deviceId, value.ListFlows())
-		agent.deviceMgr.updateGroups(deviceId, value.ListGroups())
-	}
+		if err = agent.deviceMgr.updateFlows(deviceId, value.ListFlows()); err != nil {
+			log.Error("update-flows-failed", log.Fields{"deviceID":deviceId})
+		}
+		if err = agent.deviceMgr.updateGroups(deviceId, value.ListGroups()); err != nil {
+			log.Error("update-groups-failed", log.Fields{"deviceID":deviceId})
+		}
 
+	}
 	return nil
 }
 
@@ -1009,12 +1028,14 @@
 	outPort := fd.GetPacketOutPort(packet)
 	//frame := packet.GetData()
 	//TODO: Use a channel between the logical agent and the device agent
-	agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet)
+	if err := agent.deviceMgr.packetOut(agent.rootDeviceId, outPort, packet); err != nil {
+		log.Error("packetout-failed", log.Fields{"logicalDeviceID":agent.rootDeviceId})
+	}
 }
 
 func (agent *LogicalDeviceAgent) packetIn(port uint32, packet []byte) {
 	log.Debugw("packet-in", log.Fields{"port": port, "packet": packet})
-	packet_in := fd.MkPacketIn(port, packet)
-	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packet_in)
-	log.Debugw("sending-packet-in", log.Fields{"packet-in": packet_in})
+	packetIn := fd.MkPacketIn(port, packet)
+	agent.ldeviceMgr.grpcNbiHdlr.sendPacketIn(agent.logicalDeviceId, packetIn)
+	log.Debugw("sending-packet-in", log.Fields{"packet-in": packetIn})
 }
diff --git a/rw_core/main.go b/rw_core/main.go
index dd830c1..336e731 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -62,7 +62,7 @@
 	return nil, errors.New("unsupported-kv-store")
 }
 
-func newKafkaClient(clientType string, host string, port int) (kafka.Client, error) {
+func newKafkaClient(clientType string, host string, port int, instanceID string) (kafka.Client, error) {
 
 	log.Infow("kafka-client-type", log.Fields{"client": clientType})
 	switch clientType {
@@ -70,9 +70,15 @@
 		return kafka.NewSaramaClient(
 			kafka.Host(host),
 			kafka.Port(port),
+			kafka.ConsumerType(kafka.GroupCustomer),
 			kafka.ProducerReturnOnErrors(true),
 			kafka.ProducerReturnOnSuccess(true),
 			kafka.ProducerMaxRetries(6),
+			kafka.NumPartitions(3),
+			kafka.ConsumerGroupName(instanceID),
+			kafka.ConsumerGroupPrefix(instanceID),
+			kafka.AutoCreateTopic(false),
+			kafka.ProducerFlushFrequency(5),
 			kafka.ProducerRetryBackoff(time.Millisecond*30)), nil
 	}
 	return nil, errors.New("unsupported-client-type")
@@ -126,7 +132,7 @@
 	}
 
 	// Setup Kafka Client
-	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort); err != nil {
+	if rw.kafkaClient, err = newKafkaClient("sarama", rw.config.KafkaAdapterHost, rw.config.KafkaAdapterPort, rw.config.InstanceID); err != nil {
 		log.Fatal("Unsupported-kafka-client")
 	}
 
@@ -219,7 +225,7 @@
 	}
 
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
-	//log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
+	log.SetPackageLogLevel("github.com/opencord/voltha-go/kafka", log.DebugLevel)
 
 	defer log.CleanUp()
 
