[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: I2eb23a374b548d13b3e47d5f65d6d37d14419e32
diff --git a/VERSION b/VERSION
index 2380dcf..2fe040f 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-7.1.7
+7.1.8
diff --git a/pkg/events/eventif/events_proxy_if.go b/pkg/events/eventif/events_proxy_if.go
index e4ebc36..625cd0b 100644
--- a/pkg/events/eventif/events_proxy_if.go
+++ b/pkg/events/eventif/events_proxy_if.go
@@ -26,6 +26,8 @@
 type EventProxy interface {
 	SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
 		subCategory EventSubCategory, raisedTs int64) error
+	SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category EventCategory,
+		subCategory EventSubCategory, raisedTs int64, key string) error
 	SendKpiEvent(ctx context.Context, id string, deviceEvent *voltha.KpiEvent2, category EventCategory,
 		subCategory EventSubCategory, raisedTs int64) error
 	SendRPCEvent(ctx context.Context, id string, deviceEvent *voltha.RPCEvent, category EventCategory,
diff --git a/pkg/events/events_proxy.go b/pkg/events/events_proxy.go
index e4493f9..4b46854 100644
--- a/pkg/events/events_proxy.go
+++ b/pkg/events/events_proxy.go
@@ -122,6 +122,11 @@
 
 /* Send out device events*/
 func (ep *EventProxy) SendDeviceEvent(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64) error {
+	return ep.SendDeviceEventWithKey(ctx, deviceEvent, category, subCategory, raisedTs, "")
+}
+
+/* Send out device events with key*/
+func (ep *EventProxy) SendDeviceEventWithKey(ctx context.Context, deviceEvent *voltha.DeviceEvent, category eventif.EventCategory, subCategory eventif.EventSubCategory, raisedTs int64, key string) error {
 	if deviceEvent == nil {
 		logger.Error(ctx, "Recieved empty device event")
 		return errors.New("Device event nil")
@@ -134,11 +139,12 @@
 		return err
 	}
 	event.EventType = &de
-	if err := ep.sendEvent(ctx, &event); err != nil {
+
+	if err := ep.sendEvent(ctx, &event, key); err != nil {
 		logger.Errorw(ctx, "Failed to send device event to KAFKA bus", log.Fields{"device-event": deviceEvent})
 		return err
 	}
-	logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"Id": event.Header.Id, "Category": event.Header.Category,
+	logger.Infow(ctx, "Successfully sent device event KAFKA", log.Fields{"key": key, "Id": event.Header.Id, "Category": event.Header.Category,
 		"SubCategory": event.Header.SubCategory, "Type": event.Header.Type, "TypeVersion": event.Header.TypeVersion,
 		"ReportedTs": event.Header.ReportedTs, "ResourceId": deviceEvent.ResourceId, "Context": deviceEvent.Context,
 		"DeviceEventName": deviceEvent.DeviceEventName})
@@ -161,7 +167,8 @@
 		return err
 	}
 	event.EventType = &de
-	if err := ep.sendEvent(ctx, &event); err != nil {
+
+	if err := ep.sendEvent(ctx, &event, strconv.FormatInt(raisedTs, 10)); err != nil {
 		logger.Errorw(ctx, "Failed to send kpi event to KAFKA bus", log.Fields{"device-event": kpiEvent})
 		return err
 	}
@@ -173,9 +180,9 @@
 
 }
 
-func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event) error {
+func (ep *EventProxy) sendEvent(ctx context.Context, event *voltha.Event, key string) error {
 	logger.Debugw(ctx, "Send event to kafka", log.Fields{"event": event})
-	if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic); err != nil {
+	if err := ep.kafkaClient.Send(ctx, event, &ep.eventTopic, key); err != nil {
 		return err
 	}
 	logger.Debugw(ctx, "Sent event to kafka", log.Fields{"event": event})
@@ -193,7 +200,13 @@
 
 // Start the event proxy
 func (ep *EventProxy) Start() error {
+	if !ep.eventTopicExits(context.Background()) {
+		logger.Errorw(context.Background(), "event-topic-doesn't-exist-in-kafka", log.Fields{"element": ep.eventTopic.Name})
+		return fmt.Errorf("event topic doesn't exist in kafka")
+	}
+
 	eq := ep.eventQueue
+
 	go eq.start(ep.queueCtx)
 	logger.Debugw(context.Background(), "event-proxy-starting...", log.Fields{"events-threashold": EVENT_THRESHOLD})
 	for {
@@ -218,8 +231,8 @@
 			logger.Warnw(ctx, "invalid-event", log.Fields{"element": elem})
 			continue
 		}
-		if err := ep.sendEvent(ctx, event); err != nil {
-			logger.Errorw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
+		if err := ep.sendEvent(ctx, event, ""); err != nil {
+			logger.Warnw(ctx, "failed-to-send-event-to-kafka-bus", log.Fields{"event": event})
 		} else {
 			logger.Debugw(ctx, "successfully-sent-rpc-event-to-kafka-bus", log.Fields{"id": event.Header.Id, "category": event.Header.Category,
 				"sub-category": event.Header.SubCategory, "type": event.Header.Type, "type-version": event.Header.TypeVersion,
@@ -345,3 +358,21 @@
 	eq.mutex.Unlock()
 
 }
+
+func (ep *EventProxy) eventTopicExits(ctx context.Context) bool {
+
+	// check if voltha.events topic exists
+	topics, err := ep.kafkaClient.ListTopics(ctx)
+	if err != nil {
+		logger.Errorw(ctx, "fail-to-get-topics", log.Fields{"topic": ep.eventTopic.Name, "error": err})
+		return false
+	}
+
+	logger.Debugw(ctx, "topics in kafka", log.Fields{"topics": topics, "event-topic": ep.eventTopic.Name})
+	for _, topic := range topics {
+		if topic == ep.eventTopic.Name {
+			return true
+		}
+	}
+	return false
+}
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index fdc05bc..afc2955 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -77,4 +77,5 @@
 	SendLiveness(ctx context.Context) error
 	EnableLivenessChannel(ctx context.Context, enable bool) chan bool
 	EnableHealthinessChannel(ctx context.Context, enable bool) chan bool
+	ListTopics(ctx context.Context) ([]string, error)
 }
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 185f6ec..680aa67 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -327,13 +327,16 @@
 	topicDetails[topic.Name] = topicDetail
 
 	if err := sc.cAdmin.CreateTopic(topic.Name, topicDetail, false); err != nil {
-		if err == sarama.ErrTopicAlreadyExists {
-			//	Not an error
-			logger.Debugw(ctx, "topic-already-exist", log.Fields{"topic": topic.Name})
-			return nil
+		switch typedErr := err.(type) {
+		case *sarama.TopicError:
+			if typedErr.Err == sarama.ErrTopicAlreadyExists {
+				err = nil
+			}
 		}
-		logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
-		return err
+		if err != nil {
+			logger.Errorw(ctx, "create-topic-failure", log.Fields{"error": err})
+			return err
+		}
 	}
 	// TODO: Wait until the topic has been created.  No API is available in the Sarama clusterAdmin to
 	// do so.
@@ -832,7 +835,7 @@
 	// This Creates the publisher
 	config := sarama.NewConfig()
 	config.Version = sarama.V1_0_0_0
-	config.Producer.Partitioner = sarama.NewRandomPartitioner
+	config.Producer.Partitioner = sarama.NewHashPartitioner
 	config.Producer.Flush.Frequency = time.Duration(sc.producerFlushFrequency)
 	config.Producer.Flush.Messages = sc.producerFlushMessages
 	config.Producer.Flush.MaxMessages = sc.producerFlushMaxmessages
@@ -1152,3 +1155,21 @@
 	}
 	return nil
 }
+
+func (sc *SaramaClient) ListTopics(ctx context.Context) ([]string, error) {
+
+	config := sarama.NewConfig()
+	client, err := sarama.NewClient([]string{sc.KafkaAddress}, config)
+	if err != nil {
+		logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return nil, err
+	}
+
+	topics, err := client.Topics()
+	if err != nil {
+		logger.Debugw(ctx, "list-topics-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
+		return nil, err
+	}
+
+	return topics, nil
+}
diff --git a/pkg/mocks/kafka/kafka_client.go b/pkg/mocks/kafka/kafka_client.go
index ea410ac..a9b434f 100644
--- a/pkg/mocks/kafka/kafka_client.go
+++ b/pkg/mocks/kafka/kafka_client.go
@@ -174,3 +174,8 @@
 	logger.Debug(ctx, "EnableHealthinessChannel - unimplemented")
 	return nil
 }
+
+func (kc *KafkaClient) ListTopics(ctx context.Context) ([]string, error) {
+	topics := []string{"voltha.events", "myTopic"}
+	return topics, nil
+}