[VOL-2961] Added configuration for kafka topic

Change-Id: I4dd609774938e775e00d7a0ed36177fa5f9ff8a2
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
index e907aad..780093a 100644
--- a/internal/common/kafka_utils.go
+++ b/internal/common/kafka_utils.go
@@ -39,8 +39,12 @@
 	Timestamp string
 }
 
+type saramaIf interface {
+	NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
+}
+
 // InitializePublisher initalizes kafka publisher
-func InitializePublisher(oltID int) error {
+func InitializePublisher(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int) error {
 
 	var err error
 	sarama.Logger = log.New()
@@ -50,9 +54,13 @@
 	config.Metadata.Retry.Max = 10
 	config.Metadata.Retry.Backoff = 10 * time.Second
 	config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
-	topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+	if len(Options.BBSim.KafkaEventTopic) > 0 {
+		topic = Options.BBSim.KafkaEventTopic
+	} else {
+		topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+	}
 
-	producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+	producer, err = NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
 	return err
 }