[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: I55b40d97afaed0d75240fd6557f26da90950f6c5
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
index 185f6ec..680aa67 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/kafka/sarama_client.go
@@ -327,13 +327,16 @@
 	topicDetails[topic.Name] = topicDetail
 
 	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
-		if err == sarama.ErrTopicAlreadyExists {
-			//	Not an error
-			logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
-			return nil
+		switch typedErr := err.(type) {
+		case *sarama.TopicError:
+			if typedErr.Err == sarama.ErrTopicAlreadyExists {
+				err = nil
+			}
 		}
-		logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
-		return err
+		if err != nil {
+			logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+			return err
+		}
 	}
 	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
 	// do so.
@@ -832,7 +835,7 @@
 	// This Creates the publisher
 	config := sarama.NewConfig()
 	config.Version = sarama.V1_0_0_0
-	config.Producer.Partitioner = sarama.NewRandomPartitioner
+	config.Producer.Partitioner = sarama.NewHashPartitioner
 	config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
 	config.Producer.Flush.Messages = sc.producerFlushMessages
 	config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
@@ -1152,3 +1155,21 @@
 	}
 	return nil
 }
+
+func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
+
+	config := sarama.NewConfig()
+	client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
+	if err != nil {
+		logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return nil, err
+	}
+
+	topics, err := client.Topics()
+	if err != nil {
+		logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return nil, err
+	}
+
+	return topics, nil
+}