This update consists of the following changes:
1) Add GroupConsumer to the Go sarama_client and modify the Core
code to use a groupConsumer instead of a partition consumer. This
change will ensure that multiple consumers (with different group Ids)
can consume kafka messages from the same topic.
2) Remove afkak kafka client and replace it with confluent kakfa,
a change done in voltha 1.x. Modify the code accordingly.
3) Add a Group Consumer to the Python kakfa client such that
several instances of an Adapter can consume the same messages from
the same topic.
4) Set the datapath_id for the logical device in the Core.
Change-Id: I5d7ced27c9aeca4f6211baa3dc8cb3db861545e4
diff --git a/kafka/client.go b/kafka/client.go
index 1df700e..8cc1999 100644
--- a/kafka/client.go
+++ b/kafka/client.go
@@ -26,9 +26,13 @@
)
const (
+ GroupIdKey = "groupId"
+)
+
+const (
DefaultKafkaHost = "127.0.0.1"
DefaultKafkaPort = 9092
- DefaultGroupName = "rw_core"
+ DefaultGroupName = "voltha"
DefaultSleepOnError = 1
DefaultProducerFlushFrequency = 5
DefaultProducerFlushMessages = 1
@@ -51,7 +55,7 @@
Stop()
CreateTopic(topic *Topic, numPartition int, repFactor int) error
DeleteTopic(topic *Topic) error
- Subscribe(topic *Topic) (<-chan *ca.InterContainerMessage, error)
+ Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ca.InterContainerMessage, error)
UnSubscribe(topic *Topic, ch <-chan *ca.InterContainerMessage) error
Send(msg interface{}, topic *Topic, keys ...string) error
}
diff --git a/kafka/kafka_inter_container_library.go b/kafka/kafka_inter_container_library.go
index 3381fdc..c527619 100644
--- a/kafka/kafka_inter_container_library.go
+++ b/kafka/kafka_inter_container_library.go
@@ -33,12 +33,12 @@
// Initialize the logger - gets the default until the main function setup the logger
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
const (
DefaultMaxRetries = 3
- DefaultRequestTimeout = 500 // 500 milliseconds - to handle a wider latency range
+ DefaultRequestTimeout = 3000 // 3000 milliseconds - to handle a wider latency range
)
// requestHandlerChannel represents an interface associated with a channel. Whenever, an event is
@@ -172,7 +172,7 @@
log.Info("stopping-intercontainer-proxy")
kp.doneCh <- 1
// TODO : Perform cleanup
- //kp.kafkaClient.Stop()
+ kp.kafkaClient.Stop()
//kp.deleteAllTopicRequestHandlerChannelMap()
//kp.deleteAllTopicResponseChannelMap()
//kp.deleteAllTransactionIdToChannelMap()
@@ -331,6 +331,7 @@
var err error
if ch, err = kp.kafkaClient.Subscribe(&topic); err != nil {
log.Errorw("failed-to-subscribe", log.Fields{"error": err, "topic": topic.Name})
+ return err
}
kp.addToTopicRequestHandlerChannelMap(topic.Name, &requestHandlerChannel{requesthandlerInterface: kp.defaultRequestHandlerInterface, ch: ch})
@@ -676,7 +677,7 @@
for {
select {
case msg := <-subscribedCh:
- //log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
+ log.Debugw("message-received", log.Fields{"msg": msg, "fromTopic": msg.Header.FromTopic})
if msg.Header.Type == ic.MessageType_RESPONSE {
go kp.dispatchResponse(msg)
}
@@ -700,6 +701,7 @@
// First check whether we already have a channel listening for response on that topic. If there is
// already one then it will be reused. If not, it will be created.
if !kp.isTopicSubscribedForResponse(topic.Name) {
+ log.Debugw("not-subscribed-for-response", log.Fields{"topic": topic.Name, "trnsid": trnsId})
var subscribedCh <-chan *ic.InterContainerMessage
var err error
if subscribedCh, err = kp.kafkaClient.Subscribe(&topic); err != nil {
diff --git a/kafka/sarama_client.go b/kafka/sarama_client.go
index 2df19e5..10c692a 100644
--- a/kafka/sarama_client.go
+++ b/kafka/sarama_client.go
@@ -30,7 +30,7 @@
)
func init() {
- log.AddPackage(log.JSON, log.WarnLevel, nil)
+ log.AddPackage(log.JSON, log.DebugLevel, nil)
}
type returnErrorFunction func() error
@@ -51,9 +51,10 @@
KafkaPort int
producer sarama.AsyncProducer
consumer sarama.Consumer
- groupConsumer *scc.Consumer
+ groupConsumers map[string]*scc.Consumer
+ consumerGroupPrefix string
consumerType int
- groupName string
+ consumerGroupName string
producerFlushFrequency int
producerFlushMessages int
producerFlushMaxmessages int
@@ -87,6 +88,18 @@
}
}
+func ConsumerGroupPrefix(prefix string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupPrefix = prefix
+ }
+}
+
+func ConsumerGroupName(name string) SaramaClientOption {
+ return func(args *SaramaClient) {
+ args.consumerGroupName = name
+ }
+}
+
func ConsumerType(consumer int) SaramaClientOption {
return func(args *SaramaClient) {
args.consumerType = consumer
@@ -188,6 +201,8 @@
option(client)
}
+ client.groupConsumers = make(map[string]*scc.Consumer)
+
client.lockTopicToConsumerChannelMap = sync.RWMutex{}
client.topicLockMap = make(map[string]*sync.RWMutex)
client.lockOfTopicLockMap = sync.RWMutex{}
@@ -214,15 +229,19 @@
return err
}
- // Create the master consumers
- if err := sc.createConsumer(); err != nil {
- log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
- return err
+ if sc.consumerType == DefaultConsumerType {
+ // Create the master consumers
+ if err := sc.createConsumer(); err != nil {
+ log.Errorw("Cannot-create-kafka-consumers", log.Fields{"error": err})
+ return err
+ }
}
// Create the topic to consumers/channel map
sc.topicToConsumerChannelMap = make(map[string]*consumerChannels)
+ log.Info("kafka-sarama-client-started")
+
return nil
}
@@ -234,39 +253,38 @@
if sc.producer != nil {
if err := sc.producer.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-producer-failed", log.Fields{"error": err})
}
}
if sc.consumer != nil {
if err := sc.consumer.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-partition-consumer-failed", log.Fields{"error": err})
}
}
- if sc.groupConsumer != nil {
- if err := sc.groupConsumer.Close(); err != nil {
- panic(err)
+ for key, val := range sc.groupConsumers {
+ log.Debugw("closing-group-consumer", log.Fields{"topic": key})
+ if err := val.Close(); err != nil {
+ log.Errorw("closing-group-consumer-failed", log.Fields{"error": err, "topic": key})
}
}
if sc.cAdmin != nil {
if err := sc.cAdmin.Close(); err != nil {
- panic(err)
+ log.Errorw("closing-cluster-admin-failed", log.Fields{"error": err})
}
}
//TODO: Clear the consumers map
- sc.clearConsumerChannelMap()
+ //sc.clearConsumerChannelMap()
log.Info("sarama-client-stopped")
}
-//CreateTopic creates a topic on the Kafka Broker.
-func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
- sc.lockTopic(topic)
- defer sc.unLockTopic(topic)
-
+//createTopic is an internal function to create a topic on the Kafka Broker. No locking is required as
+// the invoking function must hold the lock
+func (sc *SaramaClient) createTopic(topic *Topic, numPartition int, repFactor int) error {
// Set the topic details
topicDetail := &sarama.TopicDetail{}
topicDetail.NumPartitions = int32(numPartition)
@@ -290,6 +308,15 @@
return nil
}
+//CreateTopic is a public API to create a topic on the Kafka Broker. It uses a lock on a specific topic to
+// ensure no two go routines are performing operations on the same topic
+func (sc *SaramaClient) CreateTopic(topic *Topic, numPartition int, repFactor int) error {
+ sc.lockTopic(topic)
+ defer sc.unLockTopic(topic)
+
+ return sc.createTopic(topic, numPartition, repFactor)
+}
+
//DeleteTopic removes a topic from the kafka Broker
func (sc *SaramaClient) DeleteTopic(topic *Topic) error {
sc.lockTopic(topic)
@@ -316,7 +343,7 @@
// Subscribe registers a caller to a topic. It returns a channel that the caller can use to receive
// messages from that topic
-func (sc *SaramaClient) Subscribe(topic *Topic) (<-chan *ic.InterContainerMessage, error) {
+func (sc *SaramaClient) Subscribe(topic *Topic, kvArgs ...*KVArg) (<-chan *ic.InterContainerMessage, error) {
sc.lockTopic(topic)
defer sc.unLockTopic(topic)
@@ -338,7 +365,7 @@
// Use the consumerType option to figure out the type of consumer to launch
if sc.consumerType == PartitionConsumer {
if sc.autoCreateTopic {
- if err = sc.CreateTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}
@@ -350,10 +377,26 @@
} else if sc.consumerType == GroupCustomer {
// TODO: create topic if auto create is on. There is an issue with the sarama cluster library that
// does not consume from a precreated topic in some scenarios
- if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, "mytest"); err != nil {
- log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name})
+ //if sc.autoCreateTopic {
+ // if err = sc.createTopic(topic, sc.numPartitions, sc.numReplicas); err != nil {
+ // log.Errorw("create-topic-failure", log.Fields{"error": err, "topic": topic.Name})
+ // return nil, err
+ // }
+ //}
+ //groupId := sc.consumerGroupName
+ groupId := getGroupId(kvArgs...)
+ // Include the group prefix
+ if groupId != "" {
+ groupId = sc.consumerGroupPrefix + groupId
+ } else {
+ // Need to use a unique group Id per topic
+ groupId = sc.consumerGroupPrefix + topic.Name
+ }
+ if consumerListeningChannel, err = sc.setupGroupConsumerChannel(topic, groupId); err != nil {
+ log.Warnw("create-consumers-channel-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
+
} else {
log.Warnw("unknown-consumer-type", log.Fields{"consumer-type": sc.consumerType})
return nil, errors.New("unknown-consumer-type")
@@ -416,6 +459,16 @@
return nil
}
+// getGroupId returns the group id from the key-value args.
+func getGroupId(kvArgs ...*KVArg) string {
+ for _, arg := range kvArgs {
+ if arg.Key == GroupIdKey {
+ return arg.Value.(string)
+ }
+ }
+ return ""
+}
+
func (sc *SaramaClient) createClusterAdmin() error {
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
config := sarama.NewConfig()
@@ -623,7 +676,7 @@
}
// createGroupConsumer creates a consumers group
-func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId *string, retries int) (*scc.Consumer, error) {
+func (sc *SaramaClient) createGroupConsumer(topic *Topic, groupId string, retries int) (*scc.Consumer, error) {
config := scc.NewConfig()
config.ClientID = uuid.New().String()
config.Group.Mode = scc.ConsumerModeMultiplex
@@ -632,24 +685,22 @@
//config.Consumer.MaxWaitTime = time.Duration(DefaultConsumerMaxwait) * time.Millisecond
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
+ //config.Consumer.Offsets.Initial = sarama.OffsetOldest
kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
brokers := []string{kafkaFullAddr}
- if groupId == nil {
- g := DefaultGroupName
- groupId = &g
- }
topics := []string{topic.Name}
var consumer *scc.Consumer
var err error
- if consumer, err = scc.NewConsumer(brokers, *groupId, topics, config); err != nil {
- log.Errorw("create-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
+ if consumer, err = scc.NewConsumer(brokers, groupId, topics, config); err != nil {
+ log.Errorw("create-group-consumers-failure", log.Fields{"error": err, "topic": topic.Name, "groupId": groupId})
return nil, err
}
- log.Debugw("create-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
+ log.Debugw("create-group-consumers-success", log.Fields{"topic": topic.Name, "groupId": groupId})
//time.Sleep(10*time.Second)
- sc.groupConsumer = consumer
+ //sc.groupConsumer = consumer
+ sc.groupConsumers[topic.Name] = consumer
return consumer, nil
}
@@ -704,24 +755,36 @@
func (sc *SaramaClient) consumeGroupMessages(topic *Topic, consumer *scc.Consumer, consumerChnls *consumerChannels) {
log.Debugw("starting-group-consumption-loop", log.Fields{"topic": topic.Name})
+ //go func() {
+ // for msg := range consumer.Errors() {
+ // log.Warnw("group-consumers-error", log.Fields{"error": msg.Error()})
+ // }
+ //}()
+ //
+ //go func() {
+ // for ntf := range consumer.Notifications() {
+ // log.Debugw("group-received-notification", log.Fields{"notification": ntf})
+ // }
+ //}()
+
startloop:
for {
select {
case err := <-consumer.Errors():
if err != nil {
- log.Warnw("group-consumers-error", log.Fields{"error": err})
+ log.Warnw("group-consumers-error", log.Fields{"topic": topic.Name, "error": err})
} else {
// There is a race condition when this loop is stopped and the consumer is closed where
// the actual error comes as nil
- log.Warn("group-consumers-error")
+ log.Warnw("group-consumers-error-nil", log.Fields{"topic": topic.Name})
}
case msg := <-consumer.Messages():
- //log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
if msg == nil {
// There is a race condition when this loop is stopped and the consumer is closed where
// the actual msg comes as nil
break startloop
}
+ log.Debugw("message-received", log.Fields{"msg": msg, "receivedTopic": msg.Topic})
msgBody := msg.Value
icm := &ic.InterContainerMessage{}
if err := proto.Unmarshal(msgBody, icm); err != nil {
@@ -800,7 +863,7 @@
// TODO: Replace this development partition consumers with a group consumers
var pConsumer *scc.Consumer
var err error
- if pConsumer, err = sc.createGroupConsumer(topic, &groupId, DefaultMaxRetries); err != nil {
+ if pConsumer, err = sc.createGroupConsumer(topic, groupId, DefaultMaxRetries); err != nil {
log.Errorw("creating-partition-consumers-failure", log.Fields{"error": err, "topic": topic.Name})
return nil, err
}