[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: I55b40d97afaed0d75240fd6557f26da90950f6c5
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c6ce503..58c6968 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -119,6 +119,14 @@
}
defer core.KafkaClient.Stop(ctx)
+ // create the voltha.events topic
+ topic := &kafka.Topic{Name: cf.EventTopic}
+ if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
+ if err != nil {
+ logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err})
+ }
+ }
+
// Create the event proxy to post events to KAFKA
eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
go func() {