[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor

Change-Id: I55b40d97afaed0d75240fd6557f26da90950f6c5
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 4375b9b..0f154f6 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -37,6 +37,8 @@
 	KVStoreTimeout              time.Duration
 	KVStoreAddress              string
 	EventTopic                  string
+	EventTopicPartitions        int
+	EventTopicReplicas          int
 	LogLevel                    string
 	Banner                      bool
 	DisplayVersionOnly          bool
@@ -85,6 +87,16 @@
 		"voltha.events",
 		"RW Core Event topic")
 
+	fs.IntVar(&cf.EventTopicPartitions,
+		"EventTopicPartitions",
+		3,
+		"RW Core Event topic partitions")
+
+	fs.IntVar(&cf.EventTopicReplicas,
+		"EventTopicReplicas",
+		1,
+		"RW Core Event topic replicas")
+
 	fs.StringVar(&cf.KVStoreType,
 		"kv_store_type",
 		EtcdStoreName,
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c6ce503..58c6968 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -119,6 +119,14 @@
 	}
 	defer core.KafkaClient.Stop(ctx)
 
+	// create the voltha.events topic
+	topic := &kafka.Topic{Name: cf.EventTopic}
+	if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
+		if err != nil {
+			logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err})
+		}
+	}
+
 	// Create the event proxy to post events to KAFKA
 	eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
 	go func() {