[VOL-2961] Added configuration for kafka topic
Change-Id: I4dd609774938e775e00d7a0ed36177fa5f9ff8a2
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
index e907aad..780093a 100644
--- a/internal/common/kafka_utils.go
+++ b/internal/common/kafka_utils.go
@@ -39,8 +39,12 @@
Timestamp string
}
+type saramaIf interface {
+ NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
+}
+
// InitializePublisher initalizes kafka publisher
-func InitializePublisher(oltID int) error {
+func InitializePublisher(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int) error {
var err error
sarama.Logger = log.New()
@@ -50,9 +54,13 @@
config.Metadata.Retry.Max = 10
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
- topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+ if len(Options.BBSim.KafkaEventTopic) > 0 {
+ topic = Options.BBSim.KafkaEventTopic
+ } else {
+ topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+ }
- producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+ producer, err = NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
return err
}