This commit adds a complete partition consumer as well as a
group consumer to the sarama client library.  It also upgrades
the kafka running version.

Change-Id: Idca3eb1aa31d668afa86d12b39d6a1b0ab1965bc
diff --git a/compose/docker-compose-zk-kafka-test.yml b/compose/docker-compose-zk-kafka-test.yml
index e339aa4..dac9216 100644
--- a/compose/docker-compose-zk-kafka-test.yml
+++ b/compose/docker-compose-zk-kafka-test.yml
@@ -28,7 +28,8 @@
   # Single-node kafka service
   #
   kafka:
-    image: "wurstmeister/kafka:1.1.0"
+#    image: "wurstmeister/kafka:1.1.0"
+    image: "wurstmeister/kafka:2.11-2.0.1"
     ports:
      - 9092:9092
     environment:
diff --git a/kafka/client.go b/kafka/client.go
index cb33a35..ad8f01a 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -17,29 +17,41 @@
 
 import (
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"time"
 )
 
 const (
-	DefaultKafkaHost         = "127.0.0.1"
-	DefaultKafkaPort         = 9092
-	DefaultGroupName         = "rw_core"
-	DefaultSleepOnError      = 1
-	DefaultFlushFrequency    = 1
-	DefaultFlushMessages     = 1
-	DefaultFlushMaxmessages  = 1
-	DefaultReturnSuccess     = false
-	DefaultReturnErrors      = true
-	DefaultConsumerMaxwait   = 10
-	DefaultMaxProcessingTime = 100
+	PartitionConsumer = iota
+	GroupCustomer     = iota
+)
+
+const (
+	DefaultKafkaHost                = "127.0.0.1"
+	DefaultKafkaPort                = 9092
+	DefaultGroupName                = "rw_core"
+	DefaultSleepOnError             = 1
+	DefaultProducerFlushFrequency   = 5
+	DefaultProducerFlushMessages    = 1
+	DefaultProducerFlushMaxmessages = 5
+	DefaultProducerReturnSuccess    = false
+	DefaultProducerReturnErrors     = true
+	DefaultProducerRetryMax         = 3
+	DefaultProducerRetryBackoff     = time.Millisecond * 100
+	DefaultConsumerMaxwait          = 10
+	DefaultMaxProcessingTime        = 100
+	DefaultConsumerType             = PartitionConsumer
+	DefaultNumberPartitions         = 3
+	DefaultNumberReplicas           = 1
+	DefaultAutoCreateTopic          = false
 )
 
 // MsgClient represents the set of APIs  a Kafka MsgClient must implement
 type Client interface {
-	Start(retries int) error
+	Start() error
 	Stop()
-	CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error
+	CreateTopic(topic *Topic, numPartition int, repFactor int) error
 	DeleteTopic(topic *Topic) error
-	Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error)
+	Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
 	UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
-	Send(msg interface{}, topic *Topic, keys ...string)
+	Send(msg interface{}, topic *Topic, keys ...string) error
 }
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 25fc1b7..e2210c4 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -70,12 +70,12 @@
 	lockTopicRequestHandlerChannelMap sync.RWMutex
 
 	// This map is used to map a channel to a response topic.   This channel handles all responses on that
-	// channel for that topic and forward them to the appropriate consumer channel, using the
+	// channel for that topic and forward them to the appropriate consumers channel, using the
 	// transactionIdToChannelMap.
 	topicToResponseChannelMap   map[string]<-chan *ca.InterContainerMessage
 	lockTopicResponseChannelMap sync.RWMutex
 
-	// This map is used to map a transaction to a consumer channel.  This is used whenever a request has been
+	// This map is used to map a transaction to a consumers channel.  This is used whenever a request has been
 	// sent out and we are waiting for a response.
 	transactionIdToChannelMap     map[string]*transactionChannel
 	lockTransactionIdToChannelMap sync.RWMutex
@@ -143,7 +143,7 @@
 	kp.doneCh = make(chan int, 1)
 
 	// Start the kafka client
-	if err := kp.kafkaClient.Start(DefaultMaxRetries); err != nil {
+	if err := kp.kafkaClient.Start(); err != nil {
 		log.Errorw("Cannot-create-kafka-proxy", log.Fields{"error": err})
 		return err
 	}
@@ -260,7 +260,7 @@
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		//if ch, err = kp.Subscribe(topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
@@ -279,7 +279,7 @@
 	// Subscribe to receive messages for that topic
 	var ch <-chan *ca.InterContainerMessage
 	var err error
-	if ch, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+	if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 		log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
 	}
 	kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -294,7 +294,7 @@
 	return kp.deleteFromTopicRequestHandlerChannelMap(topic.Name)
 }
 
-// setupTopicResponseChannelMap sets up single consumer channel that will act as a broadcast channel for all
+// setupTopicResponseChannelMap sets up single consumers channel that will act as a broadcast channel for all
 // responses from that topic.
 func (kp *InterContainerProxy) setupTopicResponseChannelMap(topic string, arg <-chan *ca.InterContainerMessage) {
 	kp.lockTopicResponseChannelMap.Lock()
@@ -616,7 +616,7 @@
 }
 
 // waitForResponse listens for messages on the subscribedCh, ensure we get a response with the transaction ID,
-// and then dispatches to the consumer
+// and then dispatches to the consumers
 func (kp *InterContainerProxy) waitForResponseLoop(subscribedCh <-chan *ca.InterContainerMessage, topic *Topic) {
 	log.Debugw("starting-response-loop-for-topic", log.Fields{"topic": topic.Name})
 startloop:
@@ -649,7 +649,7 @@
 	if !kp.isTopicSubscribedForResponse(topic.Name) {
 		var subscribedCh <-chan *ca.InterContainerMessage
 		var err error
-		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic, 1); err != nil {
+		if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
 			log.Debugw("subscribe-failure", log.Fields{"topic": topic.Name})
 			return nil, err
 		}
@@ -657,7 +657,7 @@
 		go kp.waitForResponseLoop(subscribedCh, &topic)
 	}
 
-	// Create a specific channel for this consumer.  We cannot use the channel from the kafkaclient as it will
+	// Create a specific channel for this consumers.  We cannot use the channel from the kafkaclient as it will
 	// broadcast any message for this topic to all channels waiting on it.
 	ch := make(chan *ca.InterContainerMessage)
 	kp.addToTransactionIdToChannelMap(trnsId, &topic, ch)
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index eed0588..f2de01a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -18,32 +18,54 @@
 import (
 	"errors"
 	"fmt"
-	"github.com/Shopify/sarama"
 	scc "github.com/bsm/sarama-cluster"
 	"github.com/golang/protobuf/proto"
+	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/common/log"
 	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"gopkg.in/Shopify/sarama.v1"
+	"strings"
 	"sync"
 	"time"
 )
 
-// consumerChannels represents a consumer listening on a kafka topic.  Once it receives a message on that
-// topic it broadcasts the message to all the listening channels
+func init() {
+	log.AddPackage(log.JSON, log.WarnLevel, nil)
+}
+
+type returnErrorFunction func() error
+
+// consumerChannels represents one or more consumers listening on a kafka topic.  Once a message is received on that
+// topic, the consumer(s) broadcasts the message to all the listening channels.   The consumer can be a partition
+//consumer or a group consumer
 type consumerChannels struct {
-	consumer sarama.PartitionConsumer
-	//consumer *sc.Consumer
-	channels []chan *ca.InterContainerMessage
+	consumers []interface{}
+	channels  []chan *ca.InterContainerMessage
 }
 
 // SaramaClient represents the messaging proxy
 type SaramaClient struct {
-	broker                        *sarama.Broker
+	cAdmin                        sarama.ClusterAdmin
 	client                        sarama.Client
 	KafkaHost                     string
 	KafkaPort                     int
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
 	groupConsumer                 *scc.Consumer
+	consumerType                  int
+	groupName                     string
+	producerFlushFrequency        int
+	producerFlushMessages         int
+	producerFlushMaxmessages      int
+	producerRetryMax              int
+	producerRetryBackOff          time.Duration
+	producerReturnSuccess         bool
+	producerReturnErrors          bool
+	consumerMaxwait               int
+	maxProcessingTime             int
+	numPartitions                 int
+	numReplicas                   int
+	autoCreateTopic               bool
 	doneCh                        chan int
 	topicToConsumerChannelMap     map[string]*consumerChannels
 	lockTopicToConsumerChannelMap sync.RWMutex
@@ -63,11 +85,90 @@
 	}
 }
 
+func ConsumerType(consumer int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerType = consumer
+	}
+}
+
+func ProducerFlushFrequency(frequency int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerFlushFrequency = frequency
+	}
+}
+
+func ProducerFlushMessages(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerFlushMessages = num
+	}
+}
+
+func ProducerFlushMaxMessages(num int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerFlushMaxmessages = num
+	}
+}
+
+func ReturnOnErrors(opt bool) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerReturnErrors = opt
+	}
+}
+
+func ReturnOnSuccess(opt bool) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.producerReturnSuccess = opt
+	}
+}
+
+func ConsumerMaxWait(wait int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerMaxwait = wait
+	}
+}
+
+func MaxProcessingTime(pTime int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.maxProcessingTime = pTime
+	}
+}
+
+func NumPartitions(number int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.numPartitions = number
+	}
+}
+
+func NumReplicas(number int) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.numReplicas = number
+	}
+}
+
+func AutoCreateTopic(opt bool) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.autoCreateTopic = opt
+	}
+}
+
 func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
 	client := &SaramaClient{
 		KafkaHost: DefaultKafkaHost,
 		KafkaPort: DefaultKafkaPort,
 	}
+	client.consumerType = DefaultConsumerType
+	client.producerFlushFrequency = DefaultProducerFlushFrequency
+	client.producerFlushMessages = DefaultProducerFlushMessages
+	client.producerFlushMaxmessages = DefaultProducerFlushMaxmessages
+	client.producerReturnErrors = DefaultProducerReturnErrors
+	client.producerReturnSuccess = DefaultProducerReturnSuccess
+	client.producerRetryMax = DefaultProducerRetryMax
+	client.producerRetryBackOff = DefaultProducerRetryBackoff
+	client.consumerMaxwait = DefaultConsumerMaxwait
+	client.maxProcessingTime = DefaultMaxProcessingTime
+	client.numPartitions = DefaultNumberPartitions
+	client.numReplicas = DefaultNumberReplicas
+	client.autoCreateTopic = DefaultAutoCreateTopic
 
 	for _, option := range opts {
 		option(client)
@@ -78,25 +179,33 @@
 	return client
 }
 
-func (sc *SaramaClient) Start(retries int) error {
-	log.Info("Starting-Proxy")
+func (sc *SaramaClient) Start() error {
+	log.Info("Starting-kafka-sarama-client")
 
 	// Create the Done channel
 	sc.doneCh = make(chan int, 1)
 
+	var err error
+
+	// Create the Cluster Admin
+	if err = sc.createClusterAdmin(); err != nil {
+		log.Errorw("Cannot-create-cluster-admin", log.Fields{"error": err})
+		return err
+	}
+
 	// Create the Publisher
-	if err := sc.createPublisher(retries); err != nil {
+	if err := sc.createPublisher(); err != nil {
 		log.Errorw("Cannot-create-kafka-publisher", log.Fields{"error": err})
 		return err
 	}
 
-	// Create the master consumer
-	if err := sc.createConsumer(retries); err != nil {
-		log.Errorw("Cannot-create-kafka-consumer", log.Fields{"error": err})
+	// Create the master consumers
+	if err := sc.createConsumer(); err != nil {
+		log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
 		return err
 	}
 
-	// Create the topic to consumer/channel map
+	// Create the topic to consumers/channel map
 	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
 
 	return nil
@@ -108,19 +217,182 @@
 	//Send a message over the done channel to close all long running routines
 	sc.doneCh <- 1
 
-	// Clear the consumer map
-	//sc.clearConsumerChannelMap()
-
 	if sc.producer != nil {
 		if err := sc.producer.Close(); err != nil {
 			panic(err)
 		}
 	}
+
 	if sc.consumer != nil {
 		if err := sc.consumer.Close(); err != nil {
 			panic(err)
 		}
 	}
+
+	if sc.groupConsumer != nil {
+		if err := sc.groupConsumer.Close(); err != nil {
+			panic(err)
+		}
+	}
+
+	if sc.cAdmin != nil {
+		if err := sc.cAdmin.Close(); err != nil {
+			panic(err)
+		}
+	}
+
+	//TODO: Clear the consumers map
+	sc.clearConsumerChannelMap()
+
+	log.Info("sarama-client-stopped")
+}
+
+//CreateTopic creates a topic on the Kafka Broker.
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+	// Set the topic details
+	topicDetail := &sarama.TopicDetail{}
+	topicDetail.NumPartitions = int32(numPartition)
+	topicDetail.ReplicationFactor = int16(repFactor)
+	topicDetail.ConfigEntries = make(map[string]*string)
+	topicDetails := make(map[string]*sarama.TopicDetail)
+	topicDetails[topic.Name] = topicDetail
+
+	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
+		if err == sarama.ErrTopicAlreadyExists {
+			//	Not an error
+			log.Debugw("topic-already-exist", log.Fields{"topic": topic.Name})
+			return nil
+		}
+		log.Errorw("create-topic-failure", log.Fields{"error": err})
+		return err
+	}
+	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
+	// do so.
+	log.Debugw("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
+	return nil
+}
+
+//DeleteTopic removes a topic from the kafka Broker
+func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
+	// Remove the topic from the broker
+	if err := sc.cAdmin.DeleteTopic(topic.Name); err != nil {
+		if err == sarama.ErrUnknownTopicOrPartition {
+			//	Not an error as does not exist
+			log.Debugw("topic-not-exist", log.Fields{"topic": topic.Name})
+			return nil
+		}
+		log.Errorw("delete-topic-failed", log.Fields{"topic": topic, "error": err})
+		return err
+	}
+
+	// Clear the topic from the consumer channel.  This will also close any consumers listening on that topic.
+	if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
+		log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
+		return err
+	}
+	return nil
+}
+
+// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
+// messages from that topic
+func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error) {
+	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
+
+	// If a consumers already exist for that topic then resuse it
+	if consumerCh := sc.getConsumerChannel(topic); consumerCh != nil {
+		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
+		// Create a channel specific for that consumers and add it to the consumers channel map
+		ch := make(chan *ca.InterContainerMessage)
+		sc.addChannelToConsumerChannelMap(topic, ch)
+		return ch, nil
+	}
+
+	// Register for the topic and set it up
+	var consumerListeningChannel chan *ca.InterContainerMessage
+	var err error
+
+	// Use the consumerType option to figure out the type of consumer to launch
+	if sc.consumerType == PartitionConsumer {
+		if sc.autoCreateTopic {
+			if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+				log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+				return nil, err
+			}
+		}
+		if consumerListeningChannel, err = sc.setupPartitionConsumerChannel(topic, sarama.OffsetNewest); err != nil {
+			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+			return nil, err
+		}
+	} else if sc.consumerType == GroupCustomer {
+		// TODO: create topic if auto create is on.  There is an issue with the sarama cluster library that
+		// does not consume from a precreated topic in some scenarios
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
+			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+			return nil, err
+		}
+	} else {
+		log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
+		return nil, errors.New("unknown-consumer-type")
+	}
+
+	return consumerListeningChannel, nil
+}
+
+//UnSubscribe unsubscribe a consumer from a given topic
+func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
+	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
+	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
+	return err
+}
+
+// send formats and sends the request onto the kafka messaging bus.
+func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) error {
+
+	// Assert message is a proto message
+	var protoMsg proto.Message
+	var ok bool
+	// ascertain the value interface type is a proto.Message
+	if protoMsg, ok = msg.(proto.Message); !ok {
+		log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
+		return errors.New(fmt.Sprintf("not-a-proto-msg-%s", msg))
+	}
+
+	var marshalled []byte
+	var err error
+	//	Create the Sarama producer message
+	if marshalled, err = proto.Marshal(protoMsg); err != nil {
+		log.Errorw("marshalling-failed", log.Fields{"msg": protoMsg, "error": err})
+		return err
+	}
+	key := ""
+	if len(keys) > 0 {
+		key = keys[0] // Only the first key is relevant
+	}
+	kafkaMsg := &sarama.ProducerMessage{
+		Topic: topic.Name,
+		Key:   sarama.StringEncoder(key),
+		Value: sarama.ByteEncoder(marshalled),
+	}
+
+	// Send message to kafka
+	sc.producer.Input() <- kafkaMsg
+	return nil
+}
+
+func (sc *SaramaClient) createClusterAdmin() error {
+	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
+	config := sarama.NewConfig()
+	config.Version = sarama.V1_0_0_0
+
+	// Create a cluster Admin
+	var cAdmin sarama.ClusterAdmin
+	var err error
+	if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
+		log.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+		return err
+	}
+	sc.cAdmin = cAdmin
+	return nil
 }
 
 func (sc *SaramaClient) addTopicToConsumerChannelMap(id string, arg *consumerChannels) {
@@ -139,7 +411,7 @@
 	}
 }
 
-func (sc *SaramaClient) getConsumerChannel(topic Topic) *consumerChannels {
+func (sc *SaramaClient) getConsumerChannel(topic *Topic) *consumerChannels {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 
@@ -149,14 +421,43 @@
 	return nil
 }
 
-func (sc *SaramaClient) addChannelToConsumerChannelMap(topic Topic, ch chan *ca.InterContainerMessage) {
+func (sc *SaramaClient) addChannelToConsumerChannelMap(topic *Topic, ch chan *ca.InterContainerMessage) {
 	sc.lockTopicToConsumerChannelMap.Lock()
 	defer sc.lockTopicToConsumerChannelMap.Unlock()
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
 		consumerCh.channels = append(consumerCh.channels, ch)
 		return
 	}
-	log.Warnw("consumer-channel-not-exist", log.Fields{"topic": topic.Name})
+	log.Warnw("consumers-channel-not-exist", log.Fields{"topic": topic.Name})
+}
+
+//closeConsumers closes a list of sarama consumers.  The consumers can either be a partition consumers or a group consumers
+func closeConsumers(consumers []interface{}) error {
+	var err error
+	for _, consumer := range consumers {
+		//	Is it a partition consumers?
+		if partionConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+			if errTemp := partionConsumer.Close(); errTemp != nil {
+				log.Debugw("partition!!!", log.Fields{"err": errTemp})
+				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+					// This can occur on race condition
+					err = nil
+				} else {
+					err = errTemp
+				}
+			}
+		} else if groupConsumer, ok := consumer.(*scc.Consumer); ok {
+			if errTemp := groupConsumer.Close(); errTemp != nil {
+				if strings.Compare(errTemp.Error(), sarama.ErrUnknownTopicOrPartition.Error()) == 0 {
+					// This can occur on race condition
+					err = nil
+				} else {
+					err = errTemp
+				}
+			}
+		}
+	}
+	return err
 }
 
 func (sc *SaramaClient) removeChannelFromConsumerChannelMap(topic Topic, ch <-chan *ca.InterContainerMessage) error {
@@ -165,10 +466,11 @@
 	if consumerCh, exist := sc.topicToConsumerChannelMap[topic.Name]; exist {
 		// Channel will be closed in the removeChannel method
 		consumerCh.channels = removeChannel(consumerCh.channels, ch)
-		// If there are no more channels then we can close the consumer itself
+		// If there are no more channels then we can close the consumers itself
 		if len(consumerCh.channels) == 0 {
-			log.Debugw("closing-consumer", log.Fields{"topic": topic})
-			err := consumerCh.consumer.Close()
+			log.Debugw("closing-consumers", log.Fields{"topic": topic})
+			err := closeConsumers(consumerCh.consumers)
+			//err := consumerCh.consumers.Close()
 			delete(sc.topicToConsumerChannelMap, topic.Name)
 			return err
 		}
@@ -186,12 +488,17 @@
 			// Channel will be closed in the removeChannel method
 			removeChannel(consumerCh.channels, ch)
 		}
-		err := consumerCh.consumer.Close()
+		err := closeConsumers(consumerCh.consumers)
+		//if err == sarama.ErrUnknownTopicOrPartition {
+		//	// Not an error
+		//	err = nil
+		//}
+		//err := consumerCh.consumers.Close()
 		delete(sc.topicToConsumerChannelMap, topic.Name)
 		return err
 	}
-	log.Warnw("topic-does-not-exist", log.Fields{"topic": topic.Name})
-	return errors.New("topic-does-not-exist")
+	log.Debugw("topic-does-not-exist", log.Fields{"topic": topic.Name})
+	return nil
 }
 
 func (sc *SaramaClient) clearConsumerChannelMap() error {
@@ -203,139 +510,70 @@
 			// Channel will be closed in the removeChannel method
 			removeChannel(consumerCh.channels, ch)
 		}
-		err = consumerCh.consumer.Close()
+		if errTemp := closeConsumers(consumerCh.consumers); errTemp != nil {
+			err = errTemp
+		}
+		//err = consumerCh.consumers.Close()
 		delete(sc.topicToConsumerChannelMap, topic)
 	}
 	return err
 }
 
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int, retries int) error {
-	// This Creates the kafka topic
-	// Set broker configuration
-	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
-	broker := sarama.NewBroker(kafkaFullAddr)
-
-	// Additional configurations. Check sarama doc for more info
-	config := sarama.NewConfig()
-	config.Version = sarama.V1_0_0_0
-
-	// Open broker connection with configs defined above
-	broker.Open(config)
-
-	// check if the connection was OK
-	_, err := broker.Connected()
-	if err != nil {
-		return err
-	}
-
-	topicDetail := &sarama.TopicDetail{}
-	topicDetail.NumPartitions = int32(numPartition)
-	topicDetail.ReplicationFactor = int16(repFactor)
-	topicDetail.ConfigEntries = make(map[string]*string)
-
-	topicDetails := make(map[string]*sarama.TopicDetail)
-	topicDetails[topic.Name] = topicDetail
-
-	request := sarama.CreateTopicsRequest{
-		Timeout:      time.Second * 1,
-		TopicDetails: topicDetails,
-	}
-
-	for {
-		// Send request to Broker
-		if response, err := broker.CreateTopics(&request); err != nil {
-			if retries == 0 {
-				log.Errorw("error-creating-topic", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Debug("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			log.Debug("topic-response", log.Fields{"response": response})
-			break
-		}
-	}
-
-	log.Debug("topic-created", log.Fields{"topic": topic, "numPartition": numPartition, "replicationFactor": repFactor})
-	return nil
-}
-
-func (sc *SaramaClient) createPublisher(retries int) error {
+//createPublisher creates the publisher which is used to send a message onto kafka
+func (sc *SaramaClient) createPublisher() error {
 	// This Creates the publisher
 	config := sarama.NewConfig()
 	config.Producer.Partitioner = sarama.NewRandomPartitioner
-	config.Producer.Flush.Frequency = time.Duration(DefaultFlushFrequency)
-	config.Producer.Flush.Messages = DefaultFlushMessages
-	config.Producer.Flush.MaxMessages = DefaultFlushMaxmessages
-	config.Producer.Return.Errors = DefaultReturnErrors
-	config.Producer.Return.Successes = DefaultReturnSuccess
-	config.Producer.RequiredAcks = sarama.WaitForAll
+	config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
+	config.Producer.Flush.Messages = sc.producerFlushMessages
+	config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
+	config.Producer.Return.Errors = sc.producerReturnErrors
+	config.Producer.Return.Successes = sc.producerReturnSuccess
+	//config.Producer.RequiredAcks = sarama.WaitForAll
+	config.Producer.RequiredAcks = sarama.WaitForLocal
+
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
 
-	for {
-		producer, err := sarama.NewAsyncProducer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-publisher", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			sc.producer = producer
-			break
-		}
+	if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
+		log.Errorw("error-starting-publisher", log.Fields{"error": err})
+		return err
+	} else {
+		sc.producer = producer
 	}
 	log.Info("Kafka-publisher-created")
 	return nil
 }
 
-func (sc *SaramaClient) createConsumer(retries int) error {
+func (sc *SaramaClient) createConsumer() error {
 	config := sarama.NewConfig()
 	config.Consumer.Return.Errors = true
 	config.Consumer.Fetch.Min = 1
-	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
-	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	config.Consumer.MaxWaitTime = time.Duration(sc.consumerMaxwait) * time.Millisecond
+	config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
 
-	for {
-		consumer, err := sarama.NewConsumer(brokers, config)
-		if err != nil {
-			if retries == 0 {
-				log.Errorw("error-starting-consumer", log.Fields{"error": err})
-				return err
-			} else {
-				// If retries is -ve then we will retry indefinitely
-				retries--
-			}
-			log.Info("retrying-after-a-second-delay")
-			time.Sleep(time.Duration(DefaultSleepOnError) * time.Second)
-		} else {
-			sc.consumer = consumer
-			break
-		}
+	if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
+		log.Errorw("error-starting-consumers", log.Fields{"error": err})
+		return err
+	} else {
+		sc.consumer = consumer
 	}
-	log.Info("Kafka-consumer-created")
+	log.Info("Kafka-consumers-created")
 	return nil
 }
 
-// createGroupConsumer creates a consumer group
+// createGroupConsumer creates a consumers group
 func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
+	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
-	config.Consumer.Return.Errors = true
-	config.Group.Return.Notifications = true
-	config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
-	config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
+	//config.Consumer.Return.Errors = true
+	//config.Group.Return.Notifications = false
+	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
+	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
@@ -348,73 +586,16 @@
 	var consumer *scc.Consumer
 	var err error
 
-	// Create the topic with default attributes
-	// TODO: needs to be revisited
-	//sc.CreateTopic(&Topic{Name:topic.Name}, 3, 1, 1)
-
 	if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
-		log.Errorw("create-consumer-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+		log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	log.Debugw("create-consumer-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 	//time.Sleep(10*time.Second)
 	sc.groupConsumer = consumer
 	return consumer, nil
 }
 
-// send formats and sends the request onto the kafka messaging bus.
-func (sc *SaramaClient) Send(msg interface{}, topic *Topic, keys ...string) {
-
-	// Assert message is a proto message
-	var protoMsg proto.Message
-	var ok bool
-	// ascertain the value interface type is a proto.Message
-	if protoMsg, ok = msg.(proto.Message); !ok {
-		log.Warnw("message-not-proto-message", log.Fields{"msg": msg})
-		return
-	}
-
-	//	Create the Sarama producer message
-	marshalled, _ := proto.Marshal(protoMsg)
-	key := ""
-	if len(keys) > 0 {
-		key = keys[0] // Only the first key is relevant
-	}
-	kafkaMsg := &sarama.ProducerMessage{
-		Topic: topic.Name,
-		Key:   sarama.StringEncoder(key),
-		Value: sarama.ByteEncoder(marshalled),
-	}
-
-	// Send message to kafka
-	sc.producer.Input() <- kafkaMsg
-}
-
-// Subscribe registers a caller to a topic.   It returns a channel that the caller can use to receive
-// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic, retries int) (<-chan *ca.InterContainerMessage, error) {
-	log.Debugw("subscribe", log.Fields{"topic": topic.Name})
-
-	// If a consumer already exist for that topic then resuse it
-	if consumerCh := sc.getConsumerChannel(*topic); consumerCh != nil {
-		log.Debugw("topic-already-subscribed", log.Fields{"topic": topic.Name})
-		// Create a channel specific for that consumer and add it to the consumer channel map
-		ch := make(chan *ca.InterContainerMessage)
-		sc.addChannelToConsumerChannelMap(*topic, ch)
-		return ch, nil
-	}
-
-	// Register for the topic and set it up
-	var consumerListeningChannel chan *ca.InterContainerMessage
-	var err error
-	if consumerListeningChannel, err = sc.setupConsumerChannel(topic); err != nil {
-		log.Warnw("create-consumer-channel-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
-	}
-
-	return consumerListeningChannel, nil
-}
-
 // dispatchToConsumers sends the intercontainermessage received on a given topic to all subscribers for that
 // topic via the unique channel each subsciber received during subscription
 func (sc *SaramaClient) dispatchToConsumers(consumerCh *consumerChannels, protoMessage *ca.InterContainerMessage) {
@@ -428,91 +609,179 @@
 	}
 }
 
-func (sc *SaramaClient) consumeMessagesLoop(topic Topic) {
-	log.Debugw("starting-consuming-messages", log.Fields{"topic": topic.Name})
-	var consumerCh *consumerChannels
-	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
-		log.Errorw("consumer-not-exist", log.Fields{"topic": topic.Name})
-		return
-	}
+func (sc *SaramaClient) consumeFromAPartition(topic *Topic, consumer sarama.PartitionConsumer, consumerChnls *consumerChannels) {
+	log.Debugw("starting-partition-consumption-loop", log.Fields{"topic": topic.Name})
 startloop:
 	for {
 		select {
-		case err := <-consumerCh.consumer.Errors():
-			log.Warnw("consumer-error", log.Fields{"error": err})
-		case msg := <-consumerCh.consumer.Messages():
+		case err := <-consumer.Errors():
+			if err != nil {
+				log.Warnw("partition-consumers-error", log.Fields{"error": err})
+			} else {
+				// There is a race condition when this loop is stopped and the consumer is closed where
+				// the actual error comes as nil
+				log.Warn("partition-consumers-error")
+			}
+		case msg := <-consumer.Messages():
 			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
-			// Since the only expected message is a proto intercontainermessage then extract it right away
-			// instead of dispatching it to the consumers
+			if msg == nil {
+				// There is a race condition when this loop is stopped and the consumer is closed where
+				// the actual msg comes as nil
+				break startloop
+			}
+			msgBody := msg.Value
+			icm := &ca.InterContainerMessage{}
+			if err := proto.Unmarshal(msgBody, icm); err != nil {
+				log.Warnw("partition-invalid-message", log.Fields{"error": err})
+				continue
+			}
+			go sc.dispatchToConsumers(consumerChnls, icm)
+		case <-sc.doneCh:
+			log.Infow("partition-received-exit-signal", log.Fields{"topic": topic.Name})
+			break startloop
+		}
+	}
+	log.Infow("partition-consumer-stopped", log.Fields{"topic": topic.Name})
+}
+
+func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
+	log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+
+startloop:
+	for {
+		select {
+		case err := <-consumer.Errors():
+			if err != nil {
+				log.Warnw("group-consumers-error", log.Fields{"error": err})
+			} else {
+				// There is a race condition when this loop is stopped and the consumer is closed where
+				// the actual error comes as nil
+				log.Warn("group-consumers-error")
+			}
+		case msg := <-consumer.Messages():
+			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
+			if msg == nil {
+				// There is a race condition when this loop is stopped and the consumer is closed where
+				// the actual msg comes as nil
+				break startloop
+			}
 			msgBody := msg.Value
 			icm := &ca.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
 				log.Warnw("invalid-message", log.Fields{"error": err})
 				continue
 			}
-			go sc.dispatchToConsumers(consumerCh, icm)
-
-			//consumerCh.consumer.MarkOffset(msg, "")
-			//// TODO:  Dispatch requests and responses separately
+			go sc.dispatchToConsumers(consumerChnls, icm)
+			consumer.MarkOffset(msg, "")
+		case ntf := <-consumer.Notifications():
+			log.Debugw("group-received-notification", log.Fields{"notification": ntf})
 		case <-sc.doneCh:
-			log.Infow("received-exit-signal", log.Fields{"topic": topic.Name})
+			log.Infow("group-received-exit-signal", log.Fields{"topic": topic.Name})
 			break startloop
 		}
 	}
-	log.Infow("received-exit-signal-out-of-for-loop", log.Fields{"topic": topic.Name})
+	log.Infow("group-consumer-stopped", log.Fields{"topic": topic.Name})
 }
 
-// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
-// for that topic.  It also starts the routine that listens for messages on that topic.
-func (sc *SaramaClient) setupConsumerChannel(topic *Topic) (chan *ca.InterContainerMessage, error) {
-	// TODO:  Replace this development partition consumer with a group consumer
-	var pConsumer *sarama.PartitionConsumer
+func (sc *SaramaClient) startConsumers(topic *Topic) error {
+	log.Debugw("starting-consumers", log.Fields{"topic": topic.Name})
+	var consumerCh *consumerChannels
+	if consumerCh = sc.getConsumerChannel(topic); consumerCh == nil {
+		log.Errorw("consumers-not-exist", log.Fields{"topic": topic.Name})
+		return errors.New("consumers-not-exist")
+	}
+	// For each consumer listening for that topic, start a consumption loop
+	for _, consumer := range consumerCh.consumers {
+		if pConsumer, ok := consumer.(sarama.PartitionConsumer); ok {
+			go sc.consumeFromAPartition(topic, pConsumer, consumerCh)
+		} else if gConsumer, ok := consumer.(*scc.Consumer); ok {
+			go sc.consumeGroupMessages(topic, gConsumer, consumerCh)
+		} else {
+			log.Errorw("invalid-consumer", log.Fields{"topic": topic})
+			return errors.New("invalid-consumer")
+		}
+	}
+	return nil
+}
+
+//// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+//// for that topic.  It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupPartitionConsumerChannel(topic *Topic, initialOffset int64) (chan *ca.InterContainerMessage, error) {
+	var pConsumers []sarama.PartitionConsumer
 	var err error
-	if pConsumer, err = sc.CreatePartionConsumer(topic, DefaultMaxRetries); err != nil {
-		log.Errorw("creating-partition-consumer-failure", log.Fields{"error": err, "topic": topic.Name})
+
+	if pConsumers, err = sc.createPartionConsumers(topic, initialOffset); err != nil {
+		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 
-	// Create the consumer/channel structure and set the consumer and create a channel on that topic - for now
+	consumersIf := make([]interface{}, 0)
+	for _, pConsumer := range pConsumers {
+		consumersIf = append(consumersIf, pConsumer)
+	}
+
+	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
 	// unbuffered to verify race conditions.
 	consumerListeningChannel := make(chan *ca.InterContainerMessage)
 	cc := &consumerChannels{
-		consumer: *pConsumer,
-		channels: []chan *ca.InterContainerMessage{consumerListeningChannel},
+		consumers: consumersIf,
+		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
 	}
 
-	// Add the consumer channel to the map
+	// Add the consumers channel to the map
 	sc.addTopicToConsumerChannelMap(topic.Name, cc)
 
-	//Start a consumer to listen on that specific topic
-	go sc.consumeMessagesLoop(*topic)
+	//Start a consumers to listen on that specific topic
+	go sc.startConsumers(topic)
 
 	return consumerListeningChannel, nil
 }
 
-func (sc *SaramaClient) CreatePartionConsumer(topic *Topic, retries int) (*sarama.PartitionConsumer, error) {
-	log.Debugw("creating-partition-consumer", log.Fields{"topic": topic.Name})
+// setupConsumerChannel creates a consumerChannels object for that topic and add it to the consumerChannels map
+// for that topic.  It also starts the routine that listens for messages on that topic.
+func (sc *SaramaClient) setupGroupConsumerChannel(topic *Topic, groupId string) (chan *ca.InterContainerMessage, error) {
+	// TODO:  Replace this development partition consumers with a group consumers
+	var pConsumer *scc.Consumer
+	var err error
+	if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
+		return nil, err
+	}
+	// Create the consumers/channel structure and set the consumers and create a channel on that topic - for now
+	// unbuffered to verify race conditions.
+	consumerListeningChannel := make(chan *ca.InterContainerMessage)
+	cc := &consumerChannels{
+		consumers: []interface{}{pConsumer},
+		channels:  []chan *ca.InterContainerMessage{consumerListeningChannel},
+	}
+
+	// Add the consumers channel to the map
+	sc.addTopicToConsumerChannelMap(topic.Name, cc)
+
+	//Start a consumers to listen on that specific topic
+	go sc.startConsumers(topic)
+
+	return consumerListeningChannel, nil
+}
+
+func (sc *SaramaClient) createPartionConsumers(topic *Topic, initialOffset int64) ([]sarama.PartitionConsumer, error) {
+	log.Debugw("creating-partition-consumers", log.Fields{"topic": topic.Name})
 	partitionList, err := sc.consumer.Partitions(topic.Name)
 	if err != nil {
 		log.Warnw("get-partition-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}
 
-	log.Debugw("partitions", log.Fields{"topic": topic.Name, "partitionList": partitionList, "first": partitionList[0]})
-	// Create a partition consumer for that topic - for now just use one partition
-	var pConsumer sarama.PartitionConsumer
-	if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partitionList[0], sarama.OffsetNewest); err != nil {
-		log.Warnw("consumer-partition-failure", log.Fields{"error": err, "topic": topic.Name})
-		return nil, err
+	pConsumers := make([]sarama.PartitionConsumer, 0)
+	for _, partition := range partitionList {
+		var pConsumer sarama.PartitionConsumer
+		if pConsumer, err = sc.consumer.ConsumePartition(topic.Name, partition, initialOffset); err != nil {
+			log.Warnw("consumers-partition-failure", log.Fields{"error": err, "topic": topic.Name})
+			return nil, err
+		}
+		pConsumers = append(pConsumers, pConsumer)
 	}
-	log.Debugw("partition-consumer-created", log.Fields{"topic": topic.Name})
-	return &pConsumer, nil
-}
-
-func (sc *SaramaClient) UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error {
-	log.Debugw("unsubscribing-channel-from-topic", log.Fields{"topic": topic.Name})
-	err := sc.removeChannelFromConsumerChannelMap(*topic, ch)
-	return err
+	return pConsumers, nil
 }
 
 func removeChannel(channels []chan *ca.InterContainerMessage, ch <-chan *ca.InterContainerMessage) []chan *ca.InterContainerMessage {
@@ -527,11 +796,3 @@
 	}
 	return channels
 }
-
-func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
-	if err := sc.clearTopicFromConsumerChannelMap(*topic); err != nil {
-		log.Errorw("failure-clearing-channels", log.Fields{"topic": topic, "error": err})
-		return err
-	}
-	return nil
-}
diff --git a/tests/kafka/kafka_client_test.go b/tests/kafka/kafka_client_test.go
new file mode 100644
index 0000000..76d63c6
--- /dev/null
+++ b/tests/kafka/kafka_client_test.go
@@ -0,0 +1,189 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"fmt"
+	"github.com/golang/protobuf/ptypes"
+	"github.com/golang/protobuf/ptypes/any"
+	"github.com/google/uuid"
+	"github.com/opencord/voltha-go/common/log"
+	kk "github.com/opencord/voltha-go/kafka"
+	ca "github.com/opencord/voltha-go/protos/core_adapter"
+	"github.com/stretchr/testify/assert"
+	"os"
+	"testing"
+	"time"
+)
+
+/*
+Prerequite:  Start the kafka/zookeeper containers.
+*/
+
+var partionClient kk.Client
+var groupClient kk.Client
+var totalTime int64
+var numMessageToSend int
+var totalMessageReceived int
+
+type sendToKafka func(interface{}, *kk.Topic, ...string) error
+
+func init() {
+	log.AddPackage(log.JSON, log.ErrorLevel, nil)
+	hostIP := os.Getenv("DOCKER_HOST_IP")
+	log.UpdateAllLoggers(log.Fields{"instanceId": "testing"})
+	log.SetAllLogLevel(log.ErrorLevel)
+	partionClient = kk.NewSaramaClient(
+		kk.ConsumerType(kk.PartitionConsumer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(true),
+		kk.ProducerFlushFrequency(5))
+	partionClient.Start()
+	groupClient = kk.NewSaramaClient(
+		kk.ConsumerType(kk.GroupCustomer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(false),
+		kk.ProducerFlushFrequency(5))
+	groupClient.Start()
+	numMessageToSend = 1
+}
+
+func waitForMessage(ch <-chan *ca.InterContainerMessage, doneCh chan string, maxMessages int) {
+	totalTime = 0
+	totalMessageReceived = 0
+	mytime := time.Now()
+startloop:
+	for {
+		select {
+		case msg := <-ch:
+			if totalMessageReceived == 0 {
+				mytime = time.Now()
+			}
+			totalTime = totalTime + (time.Now().UnixNano()-msg.Header.Timestamp)/int64(time.Millisecond)
+			//log.Debugw("msg-received", log.Fields{"msg":msg})
+			totalMessageReceived = totalMessageReceived + 1
+			if totalMessageReceived == maxMessages {
+				doneCh <- "All received"
+				break startloop
+			}
+			if totalMessageReceived%10000 == 0 {
+				fmt.Println("received-so-far", totalMessageReceived, totalTime, totalTime/int64(totalMessageReceived))
+			}
+		}
+	}
+	log.Infow("Received all messages", log.Fields{"total": time.Since(mytime)})
+}
+
+func sendMessages(topic *kk.Topic, numMessages int, fn sendToKafka) error {
+	// Loop for numMessages
+	for i := 0; i < numMessages; i++ {
+		msg := &ca.InterContainerMessage{}
+		msg.Header = &ca.Header{
+			Id:        uuid.New().String(),
+			Type:      ca.MessageType_REQUEST,
+			FromTopic: topic.Name,
+			ToTopic:   topic.Name,
+			Timestamp: time.Now().UnixNano(),
+		}
+		var marshalledArg *any.Any
+		var err error
+		body := &ca.InterContainerRequestBody{Rpc: "testRPC", Args: []*ca.Argument{}}
+		if marshalledArg, err = ptypes.MarshalAny(body); err != nil {
+			log.Warnw("cannot-marshal-request", log.Fields{"error": err})
+			return err
+		}
+		msg.Body = marshalledArg
+		msg.Header.Timestamp = time.Now().UnixNano()
+		go fn(msg, topic)
+		//go partionClient.Send(msg, topic)
+	}
+	return nil
+}
+
+func runWithPartionConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = partionClient.Subscribe(topic); err != nil {
+		return nil
+	}
+	go waitForMessage(ch, doneCh, numMessages)
+
+	//Now create a routine to send messages
+	go sendMessages(topic, numMessages, partionClient.Send)
+
+	return nil
+}
+
+func runWithGroupConsumer(topic *kk.Topic, numMessages int, doneCh chan string) error {
+	var ch <-chan *ca.InterContainerMessage
+	var err error
+	if ch, err = groupClient.Subscribe(topic); err != nil {
+		return nil
+	}
+	go waitForMessage(ch, doneCh, numMessages)
+
+	//Now create a routine to send messages
+	go sendMessages(topic, numMessages, groupClient.Send)
+
+	return nil
+}
+
+func TestPartitionConsumer(t *testing.T) {
+	done := make(chan string)
+	topic := &kk.Topic{Name: "CoreTest1"}
+	runWithPartionConsumer(topic, numMessageToSend, done)
+	start := time.Now()
+	// Wait for done
+	val := <-done
+	err := partionClient.DeleteTopic(topic)
+	assert.Nil(t, err)
+	partionClient.Stop()
+	assert.Equal(t, numMessageToSend, totalMessageReceived)
+	log.Infow("Partition consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+}
+
+func TestGroupConsumer(t *testing.T) {
+	done := make(chan string)
+	topic := &kk.Topic{Name: "CoreTest2"}
+	runWithGroupConsumer(topic, numMessageToSend, done)
+	start := time.Now()
+	// Wait for done
+	val := <-done
+	err := groupClient.DeleteTopic(topic)
+	assert.Nil(t, err)
+	groupClient.Stop()
+	assert.Equal(t, numMessageToSend, totalMessageReceived)
+	log.Infow("Group consumer completed", log.Fields{"TotalMesages": totalMessageReceived, "TotalTime": totalTime, "val": val, "AverageTime": totalTime / int64(totalMessageReceived), "execTime": time.Since(start)})
+
+}
+
+func TestCreateDeleteTopic(t *testing.T) {
+	hostIP := os.Getenv("DOCKER_HOST_IP")
+	client := kk.NewSaramaClient(
+		kk.ConsumerType(kk.PartitionConsumer),
+		kk.Host(hostIP),
+		kk.Port(9092),
+		kk.AutoCreateTopic(true),
+		kk.ProducerFlushFrequency(5))
+	client.Start()
+	topic := &kk.Topic{Name: "CoreTest20"}
+	err := client.CreateTopic(topic, 3, 1)
+	assert.Nil(t, err)
+	err = client.DeleteTopic(topic)
+	assert.Nil(t, err)
+}