[VOL-2961] Added configuration for kafka topic
Change-Id: I4dd609774938e775e00d7a0ed36177fa5f9ff8a2
diff --git a/internal/common/kafka_topic_config_test.go b/internal/common/kafka_topic_config_test.go
new file mode 100644
index 0000000..bd3dd2e
--- /dev/null
+++ b/internal/common/kafka_topic_config_test.go
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "testing"
+
+ "github.com/Shopify/sarama"
+ "gotest.tools/assert"
+)
+
+type mockAsyncProducer struct {
+ input chan *sarama.ProducerMessage
+ successes chan *sarama.ProducerMessage
+ errors chan *sarama.ProducerError
+}
+
+func (p mockAsyncProducer) AsyncClose() {
+
+}
+
+func (p mockAsyncProducer) Close() error {
+ return nil
+}
+
+func (p mockAsyncProducer) Input() chan<- *sarama.ProducerMessage {
+ return p.input
+}
+
+func (p mockAsyncProducer) Successes() <-chan *sarama.ProducerMessage {
+ return p.successes
+}
+
+func (p mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
+ return p.errors
+}
+
+type mockSarama struct{}
+
+func (m mockSarama) NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error) {
+
+ producer := &mockAsyncProducer{
+ errors: make(chan *sarama.ProducerError),
+ input: make(chan *sarama.ProducerMessage),
+ successes: make(chan *sarama.ProducerMessage),
+ }
+
+ return producer, nil
+}
+
+func TestInitializePublisher(t *testing.T) {
+ mockLib := mockSarama{}
+ err := InitializePublisher(mockLib.NewAsyncProducer, 0)
+
+ assert.Equal(t, err, nil)
+ assert.Equal(t, topic, "BBSim-OLT-0-Events")
+
+ Options.BBSim.KafkaEventTopic = "Testing-Topic"
+ err = InitializePublisher(mockLib.NewAsyncProducer, 0)
+ assert.Equal(t, topic, "Testing-Topic")
+ assert.Equal(t, err, nil)
+}
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
index e907aad..780093a 100644
--- a/internal/common/kafka_utils.go
+++ b/internal/common/kafka_utils.go
@@ -39,8 +39,12 @@
Timestamp string
}
+type saramaIf interface {
+ NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
+}
+
// InitializePublisher initalizes kafka publisher
-func InitializePublisher(oltID int) error {
+func InitializePublisher(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int) error {
var err error
sarama.Logger = log.New()
@@ -50,9 +54,13 @@
config.Metadata.Retry.Max = 10
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
- topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+ if len(Options.BBSim.KafkaEventTopic) > 0 {
+ topic = Options.BBSim.KafkaEventTopic
+ } else {
+ topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+ }
- producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+ producer, err = NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
return err
}
diff --git a/internal/common/options.go b/internal/common/options.go
index 6088a1d..0ecb0c8 100644
--- a/internal/common/options.go
+++ b/internal/common/options.go
@@ -20,11 +20,12 @@
"errors"
"flag"
"fmt"
- "github.com/ghodss/yaml"
"io/ioutil"
- log "github.com/sirupsen/logrus"
"net"
"strings"
+
+ "github.com/ghodss/yaml"
+ log "github.com/sirupsen/logrus"
)
var tagAllocationValues = []string{
@@ -57,7 +58,6 @@
TagAllocationUnique
)
-
var sadisFormatValues = []string{
"unknown",
"att",
@@ -143,6 +143,7 @@
Events bool `yaml:"enable_events"`
ControlledActivation string `yaml:"controlled_activation"`
EnablePerf bool `yaml:"enable_perf"`
+ KafkaEventTopic string `yaml:"kafka_event_topic`
}
type BBRConfig struct {
@@ -183,6 +184,7 @@
Events: false,
ControlledActivation: "default",
EnablePerf: false,
+ KafkaEventTopic: "",
},
OltConfig{
Vendor: "BBSim",
@@ -261,6 +263,7 @@
enablePerf := flag.Bool("enableperf", conf.BBSim.EnablePerf, "Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc..")
enableEvents := flag.Bool("enableEvents", conf.BBSim.Events, "Enable sending BBSim events on configured kafka server")
kafkaAddress := flag.String("kafkaAddress", conf.BBSim.KafkaAddress, "IP:Port for kafka")
+ kafkaEventTopic := flag.String("kafkaEventTopic", conf.BBSim.KafkaEventTopic, "Ability to configure the topic on which BBSim publishes events on Kafka")
flag.Parse()
sTagAlloc, err := tagAllocationFromString(*s_tag_allocation)
@@ -304,6 +307,7 @@
conf.BBSim.ApiAddress = *api_address
conf.BBSim.RestApiAddress = *rest_api_address
conf.BBSim.SadisFormat = sf
+ conf.BBSim.KafkaEventTopic = *kafkaEventTopic
// update device id if not set
if conf.Olt.DeviceId == "" {