[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: I55b40d97afaed0d75240fd6557f26da90950f6c5
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
index 185f6ec..680aa67 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
@@ -327,13 +327,16 @@
topicDetails[topic.Name] = topicDetail
if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
- if err == sarama.ErrTopicAlreadyExists {
- // Not an error
- logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
- return nil
+ switch typedErr := err.(type) {
+ case *sarama.TopicError:
+ if typedErr.Err == sarama.ErrTopicAlreadyExists {
+ err = nil
+ }
}
- logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
- return err
+ if err != nil {
+ logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+ return err
+ }
}
// TODO: Wait until the topic has been created. No API is available in the Sarama clusterAdmin to
// do so.
@@ -832,7 +835,7 @@
// This Creates the publisher
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
- config.Producer.Partitioner = sarama.NewRandomPartitioner
+ config.Producer.Partitioner = sarama.NewHashPartitioner
config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
config.Producer.Flush.Messages = sc.producerFlushMessages
config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
@@ -1152,3 +1155,21 @@
}
return nil
}
+
+func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
+
+ config := sarama.NewConfig()
+ client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
+ if err != nil {
+ logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+ return nil, err
+ }
+
+ topics, err := client.Topics()
+ if err != nil {
+ logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+ return nil, err
+ }
+
+ return topics, nil
+}