[VOL-2961] Added configuration for kafka topic

Change-Id: I4dd609774938e775e00d7a0ed36177fa5f9ff8a2
diff --git a/internal/common/kafka_topic_config_test.go b/internal/common/kafka_topic_config_test.go
new file mode 100644
index 0000000..bd3dd2e
--- /dev/null
+++ b/internal/common/kafka_topic_config_test.go
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+	"testing"
+
+	"github.com/Shopify/sarama"
+	"gotest.tools/assert"
+)
+
+type mockAsyncProducer struct {
+	input     chan *sarama.ProducerMessage
+	successes chan *sarama.ProducerMessage
+	errors    chan *sarama.ProducerError
+}
+
+func (p mockAsyncProducer) AsyncClose() {
+
+}
+
+func (p mockAsyncProducer) Close() error {
+	return nil
+}
+
+func (p mockAsyncProducer) Input() chan<- *sarama.ProducerMessage {
+	return p.input
+}
+
+func (p mockAsyncProducer) Successes() <-chan *sarama.ProducerMessage {
+	return p.successes
+}
+
+func (p mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
+	return p.errors
+}
+
+type mockSarama struct{}
+
+func (m mockSarama) NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error) {
+
+	producer := &mockAsyncProducer{
+		errors:    make(chan *sarama.ProducerError),
+		input:     make(chan *sarama.ProducerMessage),
+		successes: make(chan *sarama.ProducerMessage),
+	}
+
+	return producer, nil
+}
+
+func TestInitializePublisher(t *testing.T) {
+	mockLib := mockSarama{}
+	err := InitializePublisher(mockLib.NewAsyncProducer, 0)
+
+	assert.Equal(t, err, nil)
+	assert.Equal(t, topic, "BBSim-OLT-0-Events")
+
+	Options.BBSim.KafkaEventTopic = "Testing-Topic"
+	err = InitializePublisher(mockLib.NewAsyncProducer, 0)
+	assert.Equal(t, topic, "Testing-Topic")
+	assert.Equal(t, err, nil)
+}
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
index e907aad..780093a 100644
--- a/internal/common/kafka_utils.go
+++ b/internal/common/kafka_utils.go
@@ -39,8 +39,12 @@
 	Timestamp string
 }
 
+type saramaIf interface {
+	NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
+}
+
 // InitializePublisher initalizes kafka publisher
-func InitializePublisher(oltID int) error {
+func InitializePublisher(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int) error {
 
 	var err error
 	sarama.Logger = log.New()
@@ -50,9 +54,13 @@
 	config.Metadata.Retry.Max = 10
 	config.Metadata.Retry.Backoff = 10 * time.Second
 	config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
-	topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+	if len(Options.BBSim.KafkaEventTopic) > 0 {
+		topic = Options.BBSim.KafkaEventTopic
+	} else {
+		topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+	}
 
-	producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+	producer, err = NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
 	return err
 }
 
diff --git a/internal/common/options.go b/internal/common/options.go
index 6088a1d..0ecb0c8 100644
--- a/internal/common/options.go
+++ b/internal/common/options.go
@@ -20,11 +20,12 @@
 	"errors"
 	"flag"
 	"fmt"
-	"github.com/ghodss/yaml"
 	"io/ioutil"
-	log "github.com/sirupsen/logrus"
 	"net"
 	"strings"
+
+	"github.com/ghodss/yaml"
+	log "github.com/sirupsen/logrus"
 )
 
 var tagAllocationValues = []string{
@@ -57,7 +58,6 @@
 	TagAllocationUnique
 )
 
-
 var sadisFormatValues = []string{
 	"unknown",
 	"att",
@@ -143,6 +143,7 @@
 	Events               bool          `yaml:"enable_events"`
 	ControlledActivation string        `yaml:"controlled_activation"`
 	EnablePerf           bool          `yaml:"enable_perf"`
+	KafkaEventTopic      string        `yaml:"kafka_event_topic`
 }
 
 type BBRConfig struct {
@@ -183,6 +184,7 @@
 			Events:               false,
 			ControlledActivation: "default",
 			EnablePerf:           false,
+			KafkaEventTopic:      "",
 		},
 		OltConfig{
 			Vendor:             "BBSim",
@@ -261,6 +263,7 @@
 	enablePerf := flag.Bool("enableperf", conf.BBSim.EnablePerf, "Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc..")
 	enableEvents := flag.Bool("enableEvents", conf.BBSim.Events, "Enable sending BBSim events on configured kafka server")
 	kafkaAddress := flag.String("kafkaAddress", conf.BBSim.KafkaAddress, "IP:Port for kafka")
+	kafkaEventTopic := flag.String("kafkaEventTopic", conf.BBSim.KafkaEventTopic, "Ability to configure the topic on which BBSim publishes events on Kafka")
 	flag.Parse()
 
 	sTagAlloc, err := tagAllocationFromString(*s_tag_allocation)
@@ -304,6 +307,7 @@
 	conf.BBSim.ApiAddress = *api_address
 	conf.BBSim.RestApiAddress = *rest_api_address
 	conf.BBSim.SadisFormat = sf
+	conf.BBSim.KafkaEventTopic = *kafkaEventTopic
 
 	// update device id if not set
 	if conf.Olt.DeviceId == "" {