[VOL-2961] Added configuration for kafka topic

Change-Id: I4dd609774938e775e00d7a0ed36177fa5f9ff8a2
diff --git a/VERSION b/VERSION
index 0c62199..a0d4970 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.1
+0.2.2-dev
\ No newline at end of file
diff --git a/cmd/bbsim/bbsim.go b/cmd/bbsim/bbsim.go
index 6a4c6e9..4dade03 100644
--- a/cmd/bbsim/bbsim.go
+++ b/cmd/bbsim/bbsim.go
@@ -26,6 +26,7 @@
 	"sync"
 	"syscall"
 
+	"github.com/Shopify/sarama"
 	"github.com/grpc-ecosystem/grpc-gateway/runtime"
 	"github.com/opencord/bbsim/api/bbsim"
 	"github.com/opencord/bbsim/api/legacy"
@@ -155,6 +156,7 @@
 		"Dhcp":                 options.BBSim.EnableDhcp,
 		"Delay":                options.BBSim.Delay,
 		"Events":               options.BBSim.Events,
+		"KafkaEventTopic":      options.BBSim.KafkaEventTopic,
 		"ControlledActivation": options.BBSim.ControlledActivation,
 		"EnablePerf":           options.BBSim.EnablePerf,
 		"CTag":                 options.BBSim.CTag,
@@ -196,7 +198,7 @@
 
 	if options.BBSim.Events {
 		// initialize a publisher
-		if err := common.InitializePublisher(olt.ID); err == nil {
+		if err := common.InitializePublisher(sarama.NewAsyncProducer, olt.ID); err == nil {
 			// start a go routine which will read from channel and publish on kafka
 			go common.KafkaPublisher(olt.EventChannel)
 		} else {
diff --git a/configs/bbsim.yaml b/configs/bbsim.yaml
index 3e3587f..8cb5e91 100644
--- a/configs/bbsim.yaml
+++ b/configs/bbsim.yaml
@@ -1,6 +1,6 @@
 ---
 # CLI arguments override values specified here.
-# Commented values show defaults. 
+# Commented values show defaults.
 
 # BBSim specific settings
 bbsim:
@@ -17,13 +17,14 @@
   # sadis_format: att|dt|tt
   # enable_events: false
   # kafka_address: ":9092"
-  # log_level: "debug" 
+  # log_level: "debug"
   # log_caller: false
   # delay: 200
   # c_tag_allocation: unique
   # c_tag: 900
   # s_tag_allocation: shared
   # s_tag: 900
+  # kafka_event_topic: ""
 
 # OLT device settings
 olt:
@@ -33,7 +34,7 @@
   device_serial_number: BBSM00000001
   pon_ports: 1
   nni_ports: 1
-  onus_per_port: 1 
+  onus_per_port: 1
   onus_per_port: 1
   technology: "XGS-PON"
   id: 0                 # OLT-ID of the device
diff --git a/docs/source/index.rst b/docs/source/index.rst
index b3ea17b..4cd9d46 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -137,6 +137,16 @@
            Use 'unique' for incremental values, 'shared' to use the same value in all the ONUs (default "shared")
      -sadisFormat string
            Which format should sadis expose? [att|dt|tt] (default "att")
+     -enableEvents
+           Set this flag for publishing BBSim events on configured kafkaAddress
+     -kafkaAddress string
+           IP:Port for kafka, used only when bbsimEvents flag is set (default ":9092")
+     -ca string
+           Set the mode for controlled activation of PON ports and ONUs
+     -enableperf bool
+           Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc
+     -kafkaEventTopic string
+           Set the topic on which BBSim publishes events on kafka
 
 ``BBSim`` also looks for a configuration file in ``configs/bbsim.yaml`` from
 which it reads a number of default settings. The command line options listed
diff --git a/internal/common/kafka_topic_config_test.go b/internal/common/kafka_topic_config_test.go
new file mode 100644
index 0000000..bd3dd2e
--- /dev/null
+++ b/internal/common/kafka_topic_config_test.go
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+	"testing"
+
+	"github.com/Shopify/sarama"
+	"gotest.tools/assert"
+)
+
+type mockAsyncProducer struct {
+	input     chan *sarama.ProducerMessage
+	successes chan *sarama.ProducerMessage
+	errors    chan *sarama.ProducerError
+}
+
+func (p mockAsyncProducer) AsyncClose() {
+
+}
+
+func (p mockAsyncProducer) Close() error {
+	return nil
+}
+
+func (p mockAsyncProducer) Input() chan<- *sarama.ProducerMessage {
+	return p.input
+}
+
+func (p mockAsyncProducer) Successes() <-chan *sarama.ProducerMessage {
+	return p.successes
+}
+
+func (p mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
+	return p.errors
+}
+
+type mockSarama struct{}
+
+func (m mockSarama) NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error) {
+
+	producer := &mockAsyncProducer{
+		errors:    make(chan *sarama.ProducerError),
+		input:     make(chan *sarama.ProducerMessage),
+		successes: make(chan *sarama.ProducerMessage),
+	}
+
+	return producer, nil
+}
+
+func TestInitializePublisher(t *testing.T) {
+	mockLib := mockSarama{}
+	err := InitializePublisher(mockLib.NewAsyncProducer, 0)
+
+	assert.Equal(t, err, nil)
+	assert.Equal(t, topic, "BBSim-OLT-0-Events")
+
+	Options.BBSim.KafkaEventTopic = "Testing-Topic"
+	err = InitializePublisher(mockLib.NewAsyncProducer, 0)
+	assert.Equal(t, topic, "Testing-Topic")
+	assert.Equal(t, err, nil)
+}
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
index e907aad..780093a 100644
--- a/internal/common/kafka_utils.go
+++ b/internal/common/kafka_utils.go
@@ -39,8 +39,12 @@
 	Timestamp string
 }
 
+type saramaIf interface {
+	NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
+}
+
 // InitializePublisher initalizes kafka publisher
-func InitializePublisher(oltID int) error {
+func InitializePublisher(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int) error {
 
 	var err error
 	sarama.Logger = log.New()
@@ -50,9 +54,13 @@
 	config.Metadata.Retry.Max = 10
 	config.Metadata.Retry.Backoff = 10 * time.Second
 	config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
-	topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+	if len(Options.BBSim.KafkaEventTopic) > 0 {
+		topic = Options.BBSim.KafkaEventTopic
+	} else {
+		topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+	}
 
-	producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+	producer, err = NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
 	return err
 }
 
diff --git a/internal/common/options.go b/internal/common/options.go
index 6088a1d..0ecb0c8 100644
--- a/internal/common/options.go
+++ b/internal/common/options.go
@@ -20,11 +20,12 @@
 	"errors"
 	"flag"
 	"fmt"
-	"github.com/ghodss/yaml"
 	"io/ioutil"
-	log "github.com/sirupsen/logrus"
 	"net"
 	"strings"
+
+	"github.com/ghodss/yaml"
+	log "github.com/sirupsen/logrus"
 )
 
 var tagAllocationValues = []string{
@@ -57,7 +58,6 @@
 	TagAllocationUnique
 )
 
-
 var sadisFormatValues = []string{
 	"unknown",
 	"att",
@@ -143,6 +143,7 @@
 	Events               bool          `yaml:"enable_events"`
 	ControlledActivation string        `yaml:"controlled_activation"`
 	EnablePerf           bool          `yaml:"enable_perf"`
+	KafkaEventTopic      string        `yaml:"kafka_event_topic`
 }
 
 type BBRConfig struct {
@@ -183,6 +184,7 @@
 			Events:               false,
 			ControlledActivation: "default",
 			EnablePerf:           false,
+			KafkaEventTopic:      "",
 		},
 		OltConfig{
 			Vendor:             "BBSim",
@@ -261,6 +263,7 @@
 	enablePerf := flag.Bool("enableperf", conf.BBSim.EnablePerf, "Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc..")
 	enableEvents := flag.Bool("enableEvents", conf.BBSim.Events, "Enable sending BBSim events on configured kafka server")
 	kafkaAddress := flag.String("kafkaAddress", conf.BBSim.KafkaAddress, "IP:Port for kafka")
+	kafkaEventTopic := flag.String("kafkaEventTopic", conf.BBSim.KafkaEventTopic, "Ability to configure the topic on which BBSim publishes events on Kafka")
 	flag.Parse()
 
 	sTagAlloc, err := tagAllocationFromString(*s_tag_allocation)
@@ -304,6 +307,7 @@
 	conf.BBSim.ApiAddress = *api_address
 	conf.BBSim.RestApiAddress = *rest_api_address
 	conf.BBSim.SadisFormat = sf
+	conf.BBSim.KafkaEventTopic = *kafkaEventTopic
 
 	// update device id if not set
 	if conf.Olt.DeviceId == "" {