This update consists of the following changes:
1) Add GroupConsumer to the Go sarama_client and modify the Core
code to use a groupConsumer instead of a partition consumer. This
change will ensure that multiple consumers (with different group Ids)
can consume kafka messages from the same topic.
2) Remove afkak kafka client and replace it with confluent kakfa,
a change done in voltha 1.x. Modify the code accordingly.
3) Add a Group Consumer to the Python kakfa client such that
several instances of an Adapter can consume the same messages from
the same topic.
4) Set the datapath_id for the logical device in the Core.
Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 2df19e5..10c692a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -30,7 +30,7 @@
)
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
type returnErrorFunction func() error
@@ -51,9 +51,10 @@
KafkaPort int
producer sarama.AsyncProducer
consumer sarama.Consumer
- groupConsumer *scc.Consumer
+ groupConsumers map[string]*scc.Consumer
+ consumerGroupPrefix string
consumerType int
- groupName string
+ consumerGroupName string
producerFlushFrequency int
producerFlushMessages int
producerFlushMaxmessages int
@@ -87,6 +88,18 @@
}
}
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupPrefix = prefix
+ }
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupName = name
+ }
+}
+
func ConsumerType(consumer int) SaramaClientOption {
return func(args *SaramaClient) {
args.consumerType = consumer
@@ -188,6 +201,8 @@
option(client)
}
+ client.groupConsumers = make(map[string]*scc.Consumer)
+
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
client.topicLockMap = make(map[string]*sync.RWMutex)
client.lockOfTopicLockMap = sync.RWMutex{}
@@ -214,15 +229,19 @@
return err
}
- // Create the master consumers
- if err := sc.createConsumer(); err != nil {
- log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
- return err
+ if sc.consumerType == DefaultConsumerType {
+ // Create the master consumers
+ if err := sc.createConsumer(); err != nil {
+ log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+ return err
+ }
}
// Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+ log.Info("kafka-sarama-client-started")
+
return nil
}
@@ -234,39 +253,38 @@
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-producer-failed", log.Fields{"error": err})
}
}
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
}
}
- if sc.groupConsumer != nil {
- if err := sc.groupConsumer.Close(); err != nil {
- panic(err)
+ for key, val := range sc.groupConsumers {
+ log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+ if err := val.Close(); err != nil {
+ log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
}
}
if sc.cAdmin != nil {
if err := sc.cAdmin.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
}
}
//TODO: Clear the consumers map
- sc.clearConsumerChannelMap()
+ //sc.clearConsumerChannelMap()
log.Info("sarama-client-stopped")
}
-//CreateTopic creates a topic on the Kafka Broker.
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
- sc.lockTopic(topic)
- defer sc.unLockTopic(topic)
-
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
// Set the topic details
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(numPartition)
@@ -290,6 +308,15 @@
return nil
}
+//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ return sc.createTopic(topic, numPartition, repFactor)
+}
+
//DeleteTopic removes a topic from the kafka Broker
func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
sc.lockTopic(topic)
@@ -316,7 +343,7 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -338,7 +365,7 @@
// Use the consumerType option to figure out the type of consumer to launch
if sc.consumerType == PartitionConsumer {
if sc.autoCreateTopic {
- if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -350,10 +377,26 @@
} else if sc.consumerType == GroupCustomer {
// TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
// does not consume from a precreated topic in some scenarios
- if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ //if sc.autoCreateTopic {
+ // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ // return nil, err
+ // }
+ //}
+ //groupId := sc.consumerGroupName
+ groupId := getGroupId(kvArgs...)
+ // Include the group prefix
+ if groupId != "" {
+ groupId = sc.consumerGroupPrefix + groupId
+ } else {
+ // Need to use a unique group Id per topic
+ groupId = sc.consumerGroupPrefix + topic.Name
+ }
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+ log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
+
} else {
log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
return nil, errors.New("unknown-consumer-type")
@@ -416,6 +459,16 @@
return nil
}
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+ for _, arg := range kvArgs {
+ if arg.Key == GroupIdKey {
+ return arg.Value.(string)
+ }
+ }
+ return ""
+}
+
func (sc *SaramaClient) createClusterAdmin() error {
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
config := sarama.NewConfig()
@@ -623,7 +676,7 @@
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
@@ -632,24 +685,22 @@
//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ //config.Consumer.Offsets.Initial = sarama.OffsetOldest
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
- if groupId == nil {
- g := DefaultGroupName
- groupId = &g
- }
topics := []string{topic.Name}
var consumer *scc.Consumer
var err error
- if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
- log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+ log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//time.Sleep(10*time.Second)
- sc.groupConsumer = consumer
+ //sc.groupConsumer = consumer
+ sc.groupConsumers[topic.Name] = consumer
return consumer, nil
}
@@ -704,24 +755,36 @@
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+ //go func() {
+ // for msg := range consumer.Errors() {
+ // log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
+ // }
+ //}()
+ //
+ //go func() {
+ // for ntf := range consumer.Notifications() {
+ // log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+ // }
+ //}()
+
startloop:
for {
select {
case err := <-consumer.Errors():
if err != nil {
- log.Warnw("group-consumers-error", log.Fields{"error": err})
+ log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
// There is a race condition when this loop is stopped and the consumer is closed where
// the actual error comes as nil
- log.Warn("group-consumers-error")
+ log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
}
case msg := <-consumer.Messages():
- //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
if msg == nil {
// There is a race condition when this loop is stopped and the consumer is closed where
// the actual msg comes as nil
break startloop
}
+ log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
@@ -800,7 +863,7 @@
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
- if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+ if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}