diff --git a/kafka/client.go b/kafka/client.go
index 1df700e..8cc1999 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -26,9 +26,13 @@
 )
 
 const (
+	GroupIdKey = "groupId"
+)
+
+const (
 	DefaultKafkaHost                = "127.0.0.1"
 	DefaultKafkaPort                = 9092
-	DefaultGroupName                = "rw_core"
+	DefaultGroupName                = "voltha"
 	DefaultSleepOnError             = 1
 	DefaultProducerFlushFrequency   = 5
 	DefaultProducerFlushMessages    = 1
@@ -51,7 +55,7 @@
 	Stop()
 	CreateTopic(topic *Topic, numPartition int, repFactor int) error
 	DeleteTopic(topic *Topic) error
-	Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
+	Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
 	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
 	Send(msg interface{}, topic *Topic, keys ...string) error
 }
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 3381fdc..c527619 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -33,12 +33,12 @@
 
 // Initialize the logger - gets the default until the main function setup the logger
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 const (
 	DefaultMaxRetries     = 3
-	DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
+	DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
 )
 
 // requestHandlerChannel represents an interface associated with a channel.  Whenever, an event is
@@ -172,7 +172,7 @@
 	log.Info("stopping-intercontainer-proxy")
 	kp.doneCh <- 1
 	// TODO : Perform cleanup
-	//kp.kafkaClient.Stop()
+	kp.kafkaClient.Stop()
 	//kp.deleteAllTopicRequestHandlerChannelMap()
 	//kp.deleteAllTopicResponseChannelMap()
 	//kp.deleteAllTransactionIdToChannelMap()
@@ -331,6 +331,7 @@
 	var err error
 	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+		return err
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
 
@@ -676,7 +677,7 @@
 	for {
 		select {
 		case msg := <-subscribedCh:
-			//log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+			log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
 			if msg.Header.Type == ic.MessageType_RESPONSE {
 				go kp.dispatchResponse(msg)
 			}
@@ -700,6 +701,7 @@
 	// First check whether we already have a channel listening for response on that topic.  If there is
 	// already one then it will be reused.  If not, it will be created.
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
+		log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
 		var subscribedCh <-chan *ic.InterContainerMessage
 		var err error
 		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 2df19e5..10c692a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -30,7 +30,7 @@
 )
 
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 type returnErrorFunction func() error
@@ -51,9 +51,10 @@
 	KafkaPort                     int
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
-	groupConsumer                 *scc.Consumer
+	groupConsumers                map[string]*scc.Consumer
+	consumerGroupPrefix           string
 	consumerType                  int
-	groupName                     string
+	consumerGroupName             string
 	producerFlushFrequency        int
 	producerFlushMessages         int
 	producerFlushMaxmessages      int
@@ -87,6 +88,18 @@
 	}
 }
 
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupPrefix = prefix
+	}
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupName = name
+	}
+}
+
 func ConsumerType(consumer int) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.consumerType = consumer
@@ -188,6 +201,8 @@
 		option(client)
 	}
 
+	client.groupConsumers = make(map[string]*scc.Consumer)
+
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
 	client.topicLockMap = make(map[string]*sync.RWMutex)
 	client.lockOfTopicLockMap = sync.RWMutex{}
@@ -214,15 +229,19 @@
 		return err
 	}
 
-	// Create the master consumers
-	if err := sc.createConsumer(); err != nil {
-		log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
-		return err
+	if sc.consumerType == DefaultConsumerType {
+		// Create the master consumers
+		if err := sc.createConsumer(); err != nil {
+			log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+			return err
+		}
 	}
 
 	// Create the topic to consumers/channel map
 	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
 
+	log.Info("kafka-sarama-client-started")
+
 	return nil
 }
 
@@ -234,39 +253,38 @@
 
 	if sc.producer != nil {
 		if err := sc.producer.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-producer-failed", log.Fields{"error": err})
 		}
 	}
 
 	if sc.consumer != nil {
 		if err := sc.consumer.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
 		}
 	}
 
-	if sc.groupConsumer != nil {
-		if err := sc.groupConsumer.Close(); err != nil {
-			panic(err)
+	for key, val := range sc.groupConsumers {
+		log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+		if err := val.Close(); err != nil {
+			log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
 		}
 	}
 
 	if sc.cAdmin != nil {
 		if err := sc.cAdmin.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
 		}
 	}
 
 	//TODO: Clear the consumers map
-	sc.clearConsumerChannelMap()
+	//sc.clearConsumerChannelMap()
 
 	log.Info("sarama-client-stopped")
 }
 
-//CreateTopic creates a topic on the Kafka Broker.
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
-	sc.lockTopic(topic)
-	defer sc.unLockTopic(topic)
-
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
 	// Set the topic details
 	topicDetail := &sarama.TopicDetail{}
 	topicDetail.NumPartitions = int32(numPartition)
@@ -290,6 +308,15 @@
 	return nil
 }
 
+//CreateTopic is a public API to create a topic on the Kafka Broker.  It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	return sc.createTopic(topic, numPartition, repFactor)
+}
+
 //DeleteTopic removes a topic from the kafka Broker
 func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
 	sc.lockTopic(topic)
@@ -316,7 +343,7 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -338,7 +365,7 @@
 	// Use the consumerType option to figure out the type of consumer to launch
 	if sc.consumerType == PartitionConsumer {
 		if sc.autoCreateTopic {
-			if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+			if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
 				log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
 				return nil, err
 			}
@@ -350,10 +377,26 @@
 	} else if sc.consumerType == GroupCustomer {
 		// TODO: create topic if auto create is on.  There is an issue with the sarama cluster library that
 		// does not consume from a precreated topic in some scenarios
-		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
-			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		//if sc.autoCreateTopic {
+		//	if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+		//		log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+		//		return nil, err
+		//	}
+		//}
+		//groupId := sc.consumerGroupName
+		groupId := getGroupId(kvArgs...)
+		// Include the group prefix
+		if groupId != "" {
+			groupId = sc.consumerGroupPrefix + groupId
+		} else {
+			// Need to use a unique group Id per topic
+			groupId = sc.consumerGroupPrefix + topic.Name
+		}
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
+
 	} else {
 		log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
 		return nil, errors.New("unknown-consumer-type")
@@ -416,6 +459,16 @@
 	return nil
 }
 
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+	for _, arg := range kvArgs {
+		if arg.Key == GroupIdKey {
+			return arg.Value.(string)
+		}
+	}
+	return ""
+}
+
 func (sc *SaramaClient) createClusterAdmin() error {
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	config := sarama.NewConfig()
@@ -623,7 +676,7 @@
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
@@ -632,24 +685,22 @@
 	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
 
-	if groupId == nil {
-		g := DefaultGroupName
-		groupId = &g
-	}
 	topics := []string{topic.Name}
 	var consumer *scc.Consumer
 	var err error
 
-	if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
-		log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+		log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 	//time.Sleep(10*time.Second)
-	sc.groupConsumer = consumer
+	//sc.groupConsumer = consumer
+	sc.groupConsumers[topic.Name] = consumer
 	return consumer, nil
 }
 
@@ -704,24 +755,36 @@
 func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
 	log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
+	//go func() {
+	//	for msg := range consumer.Errors() {
+	//		log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
+	//	}
+	//}()
+	//
+	//go func() {
+	//	for ntf := range consumer.Notifications() {
+	//		log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+	//	}
+	//}()
+
 startloop:
 	for {
 		select {
 		case err := <-consumer.Errors():
 			if err != nil {
-				log.Warnw("group-consumers-error", log.Fields{"error": err})
+				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
 				// There is a race condition when this loop is stopped and the consumer is closed where
 				// the actual error comes as nil
-				log.Warn("group-consumers-error")
+				log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
 			}
 		case msg := <-consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			if msg == nil {
 				// There is a race condition when this loop is stopped and the consumer is closed where
 				// the actual msg comes as nil
 				break startloop
 			}
+			log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
@@ -800,7 +863,7 @@
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+	if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
 		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
