This update consists of the following changes:
    1) Add GroupConsumer to the Go sarama_client and modify the Core
    code to use a groupConsumer instead of a partition consumer. This
    change will ensure that multiple consumers (with different group Ids)
    can consume kafka messages from the same topic.
    2) Remove afkak kafka client and replace it with confluent kakfa,
    a change done in voltha 1.x. Modify the code accordingly.
    3) Add a Group Consumer to the Python kakfa client such that
    several instances of an Adapter can consume the same messages from
    the same topic.
    4) Set the datapath_id for the logical device in the Core.

Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 2df19e5..10c692a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -30,7 +30,7 @@
 )
 
 func init() {
-	log.AddPackage(log.JSON, log.WarnLevel, nil)
+	log.AddPackage(log.JSON, log.DebugLevel, nil)
 }
 
 type returnErrorFunction func() error
@@ -51,9 +51,10 @@
 	KafkaPort                     int
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
-	groupConsumer                 *scc.Consumer
+	groupConsumers                map[string]*scc.Consumer
+	consumerGroupPrefix           string
 	consumerType                  int
-	groupName                     string
+	consumerGroupName             string
 	producerFlushFrequency        int
 	producerFlushMessages         int
 	producerFlushMaxmessages      int
@@ -87,6 +88,18 @@
 	}
 }
 
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupPrefix = prefix
+	}
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+	return func(args *SaramaClient) {
+		args.consumerGroupName = name
+	}
+}
+
 func ConsumerType(consumer int) SaramaClientOption {
 	return func(args *SaramaClient) {
 		args.consumerType = consumer
@@ -188,6 +201,8 @@
 		option(client)
 	}
 
+	client.groupConsumers = make(map[string]*scc.Consumer)
+
 	client.lockTopicToConsumerChannelMap = sync.RWMutex{}
 	client.topicLockMap = make(map[string]*sync.RWMutex)
 	client.lockOfTopicLockMap = sync.RWMutex{}
@@ -214,15 +229,19 @@
 		return err
 	}
 
-	// Create the master consumers
-	if err := sc.createConsumer(); err != nil {
-		log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
-		return err
+	if sc.consumerType == DefaultConsumerType {
+		// Create the master consumers
+		if err := sc.createConsumer(); err != nil {
+			log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+			return err
+		}
 	}
 
 	// Create the topic to consumers/channel map
 	sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
 
+	log.Info("kafka-sarama-client-started")
+
 	return nil
 }
 
@@ -234,39 +253,38 @@
 
 	if sc.producer != nil {
 		if err := sc.producer.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-producer-failed", log.Fields{"error": err})
 		}
 	}
 
 	if sc.consumer != nil {
 		if err := sc.consumer.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
 		}
 	}
 
-	if sc.groupConsumer != nil {
-		if err := sc.groupConsumer.Close(); err != nil {
-			panic(err)
+	for key, val := range sc.groupConsumers {
+		log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+		if err := val.Close(); err != nil {
+			log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
 		}
 	}
 
 	if sc.cAdmin != nil {
 		if err := sc.cAdmin.Close(); err != nil {
-			panic(err)
+			log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
 		}
 	}
 
 	//TODO: Clear the consumers map
-	sc.clearConsumerChannelMap()
+	//sc.clearConsumerChannelMap()
 
 	log.Info("sarama-client-stopped")
 }
 
-//CreateTopic creates a topic on the Kafka Broker.
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
-	sc.lockTopic(topic)
-	defer sc.unLockTopic(topic)
-
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
 	// Set the topic details
 	topicDetail := &sarama.TopicDetail{}
 	topicDetail.NumPartitions = int32(numPartition)
@@ -290,6 +308,15 @@
 	return nil
 }
 
+//CreateTopic is a public API to create a topic on the Kafka Broker.  It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+	sc.lockTopic(topic)
+	defer sc.unLockTopic(topic)
+
+	return sc.createTopic(topic, numPartition, repFactor)
+}
+
 //DeleteTopic removes a topic from the kafka Broker
 func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
 	sc.lockTopic(topic)
@@ -316,7 +343,7 @@
 
 // Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
 // messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
 	sc.lockTopic(topic)
 	defer sc.unLockTopic(topic)
 
@@ -338,7 +365,7 @@
 	// Use the consumerType option to figure out the type of consumer to launch
 	if sc.consumerType == PartitionConsumer {
 		if sc.autoCreateTopic {
-			if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+			if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
 				log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
 				return nil, err
 			}
@@ -350,10 +377,26 @@
 	} else if sc.consumerType == GroupCustomer {
 		// TODO: create topic if auto create is on.  There is an issue with the sarama cluster library that
 		// does not consume from a precreated topic in some scenarios
-		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
-			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+		//if sc.autoCreateTopic {
+		//	if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+		//		log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+		//		return nil, err
+		//	}
+		//}
+		//groupId := sc.consumerGroupName
+		groupId := getGroupId(kvArgs...)
+		// Include the group prefix
+		if groupId != "" {
+			groupId = sc.consumerGroupPrefix + groupId
+		} else {
+			// Need to use a unique group Id per topic
+			groupId = sc.consumerGroupPrefix + topic.Name
+		}
+		if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+			log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 			return nil, err
 		}
+
 	} else {
 		log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
 		return nil, errors.New("unknown-consumer-type")
@@ -416,6 +459,16 @@
 	return nil
 }
 
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+	for _, arg := range kvArgs {
+		if arg.Key == GroupIdKey {
+			return arg.Value.(string)
+		}
+	}
+	return ""
+}
+
 func (sc *SaramaClient) createClusterAdmin() error {
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	config := sarama.NewConfig()
@@ -623,7 +676,7 @@
 }
 
 // createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
 	config := scc.NewConfig()
 	config.ClientID = uuid.New().String()
 	config.Group.Mode = scc.ConsumerModeMultiplex
@@ -632,24 +685,22 @@
 	//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
+	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
 	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	brokers := []string{kafkaFullAddr}
 
-	if groupId == nil {
-		g := DefaultGroupName
-		groupId = &g
-	}
 	topics := []string{topic.Name}
 	var consumer *scc.Consumer
 	var err error
 
-	if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
-		log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+	if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+		log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
 		return nil, err
 	}
-	log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+	log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
 	//time.Sleep(10*time.Second)
-	sc.groupConsumer = consumer
+	//sc.groupConsumer = consumer
+	sc.groupConsumers[topic.Name] = consumer
 	return consumer, nil
 }
 
@@ -704,24 +755,36 @@
 func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
 	log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
 
+	//go func() {
+	//	for msg := range consumer.Errors() {
+	//		log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
+	//	}
+	//}()
+	//
+	//go func() {
+	//	for ntf := range consumer.Notifications() {
+	//		log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+	//	}
+	//}()
+
 startloop:
 	for {
 		select {
 		case err := <-consumer.Errors():
 			if err != nil {
-				log.Warnw("group-consumers-error", log.Fields{"error": err})
+				log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
 			} else {
 				// There is a race condition when this loop is stopped and the consumer is closed where
 				// the actual error comes as nil
-				log.Warn("group-consumers-error")
+				log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
 			}
 		case msg := <-consumer.Messages():
-			//log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			if msg == nil {
 				// There is a race condition when this loop is stopped and the consumer is closed where
 				// the actual msg comes as nil
 				break startloop
 			}
+			log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
 			msgBody := msg.Value
 			icm := &ic.InterContainerMessage{}
 			if err := proto.Unmarshal(msgBody, icm); err != nil {
@@ -800,7 +863,7 @@
 	// TODO:  Replace this development partition consumers with a group consumers
 	var pConsumer *scc.Consumer
 	var err error
-	if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+	if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
 		log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
 		return nil, err
 	}