[VOL-4663] create voltha event topic (voltha.events) with conifgurable no of partitions and replication factor
Change-Id: I55b40d97afaed0d75240fd6557f26da90950f6c5
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 4375b9b..0f154f6 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -37,6 +37,8 @@
KVStoreTimeout time.Duration
KVStoreAddress string
EventTopic string
+ EventTopicPartitions int
+ EventTopicReplicas int
LogLevel string
Banner bool
DisplayVersionOnly bool
@@ -85,6 +87,16 @@
"voltha.events",
"RW Core Event topic")
+ fs.IntVar(&cf.EventTopicPartitions,
+ "EventTopicPartitions",
+ 3,
+ "RW Core Event topic partitions")
+
+ fs.IntVar(&cf.EventTopicReplicas,
+ "EventTopicReplicas",
+ 1,
+ "RW Core Event topic replicas")
+
fs.StringVar(&cf.KVStoreType,
"kv_store_type",
EtcdStoreName,
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index c6ce503..58c6968 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -119,6 +119,14 @@
}
defer core.KafkaClient.Stop(ctx)
+ // create the voltha.events topic
+ topic := &kafka.Topic{Name: cf.EventTopic}
+ if err := core.KafkaClient.CreateTopic(ctx, topic, cf.EventTopicPartitions, cf.EventTopicReplicas); err != nil {
+ if err != nil {
+ logger.Fatal(ctx, "unable-to create topic", log.Fields{"topic": cf.EventTopic, "error": err})
+ }
+ }
+
// Create the event proxy to post events to KAFKA
eventProxy := events.NewEventProxy(events.MsgClient(core.KafkaClient), events.MsgTopic(kafka.Topic{Name: cf.EventTopic}))
go func() {