[VOL-2961] Added configuration for kafka topic
Change-Id: I4dd609774938e775e00d7a0ed36177fa5f9ff8a2
diff --git a/VERSION b/VERSION
index 0c62199..a0d4970 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-0.2.1
+0.2.2-dev
\ No newline at end of file
diff --git a/cmd/bbsim/bbsim.go b/cmd/bbsim/bbsim.go
index 6a4c6e9..4dade03 100644
--- a/cmd/bbsim/bbsim.go
+++ b/cmd/bbsim/bbsim.go
@@ -26,6 +26,7 @@
"sync"
"syscall"
+ "github.com/Shopify/sarama"
"github.com/grpc-ecosystem/grpc-gateway/runtime"
"github.com/opencord/bbsim/api/bbsim"
"github.com/opencord/bbsim/api/legacy"
@@ -155,6 +156,7 @@
"Dhcp": options.BBSim.EnableDhcp,
"Delay": options.BBSim.Delay,
"Events": options.BBSim.Events,
+ "KafkaEventTopic": options.BBSim.KafkaEventTopic,
"ControlledActivation": options.BBSim.ControlledActivation,
"EnablePerf": options.BBSim.EnablePerf,
"CTag": options.BBSim.CTag,
@@ -196,7 +198,7 @@
if options.BBSim.Events {
// initialize a publisher
- if err := common.InitializePublisher(olt.ID); err == nil {
+ if err := common.InitializePublisher(sarama.NewAsyncProducer, olt.ID); err == nil {
// start a go routine which will read from channel and publish on kafka
go common.KafkaPublisher(olt.EventChannel)
} else {
diff --git a/configs/bbsim.yaml b/configs/bbsim.yaml
index 3e3587f..8cb5e91 100644
--- a/configs/bbsim.yaml
+++ b/configs/bbsim.yaml
@@ -1,6 +1,6 @@
---
# CLI arguments override values specified here.
-# Commented values show defaults.
+# Commented values show defaults.
# BBSim specific settings
bbsim:
@@ -17,13 +17,14 @@
# sadis_format: att|dt|tt
# enable_events: false
# kafka_address: ":9092"
- # log_level: "debug"
+ # log_level: "debug"
# log_caller: false
# delay: 200
# c_tag_allocation: unique
# c_tag: 900
# s_tag_allocation: shared
# s_tag: 900
+ # kafka_event_topic: ""
# OLT device settings
olt:
@@ -33,7 +34,7 @@
device_serial_number: BBSM00000001
pon_ports: 1
nni_ports: 1
- onus_per_port: 1
+ onus_per_port: 1
onus_per_port: 1
technology: "XGS-PON"
id: 0 # OLT-ID of the device
diff --git a/docs/source/index.rst b/docs/source/index.rst
index b3ea17b..4cd9d46 100644
--- a/docs/source/index.rst
+++ b/docs/source/index.rst
@@ -137,6 +137,16 @@
Use 'unique' for incremental values, 'shared' to use the same value in all the ONUs (default "shared")
-sadisFormat string
Which format should sadis expose? [att|dt|tt] (default "att")
+ -enableEvents
+ Set this flag for publishing BBSim events on configured kafkaAddress
+ -kafkaAddress string
+ IP:Port for kafka, used only when bbsimEvents flag is set (default ":9092")
+ -ca string
+ Set the mode for controlled activation of PON ports and ONUs
+ -enableperf bool
+ Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc
+ -kafkaEventTopic string
+ Set the topic on which BBSim publishes events on kafka
``BBSim`` also looks for a configuration file in ``configs/bbsim.yaml`` from
which it reads a number of default settings. The command line options listed
diff --git a/internal/common/kafka_topic_config_test.go b/internal/common/kafka_topic_config_test.go
new file mode 100644
index 0000000..bd3dd2e
--- /dev/null
+++ b/internal/common/kafka_topic_config_test.go
@@ -0,0 +1,76 @@
+/*
+ * Copyright 2018-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package common
+
+import (
+ "testing"
+
+ "github.com/Shopify/sarama"
+ "gotest.tools/assert"
+)
+
+type mockAsyncProducer struct {
+ input chan *sarama.ProducerMessage
+ successes chan *sarama.ProducerMessage
+ errors chan *sarama.ProducerError
+}
+
+func (p mockAsyncProducer) AsyncClose() {
+
+}
+
+func (p mockAsyncProducer) Close() error {
+ return nil
+}
+
+func (p mockAsyncProducer) Input() chan<- *sarama.ProducerMessage {
+ return p.input
+}
+
+func (p mockAsyncProducer) Successes() <-chan *sarama.ProducerMessage {
+ return p.successes
+}
+
+func (p mockAsyncProducer) Errors() <-chan *sarama.ProducerError {
+ return p.errors
+}
+
+type mockSarama struct{}
+
+func (m mockSarama) NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error) {
+
+ producer := &mockAsyncProducer{
+ errors: make(chan *sarama.ProducerError),
+ input: make(chan *sarama.ProducerMessage),
+ successes: make(chan *sarama.ProducerMessage),
+ }
+
+ return producer, nil
+}
+
+func TestInitializePublisher(t *testing.T) {
+ mockLib := mockSarama{}
+ err := InitializePublisher(mockLib.NewAsyncProducer, 0)
+
+ assert.Equal(t, err, nil)
+ assert.Equal(t, topic, "BBSim-OLT-0-Events")
+
+ Options.BBSim.KafkaEventTopic = "Testing-Topic"
+ err = InitializePublisher(mockLib.NewAsyncProducer, 0)
+ assert.Equal(t, topic, "Testing-Topic")
+ assert.Equal(t, err, nil)
+}
diff --git a/internal/common/kafka_utils.go b/internal/common/kafka_utils.go
index e907aad..780093a 100644
--- a/internal/common/kafka_utils.go
+++ b/internal/common/kafka_utils.go
@@ -39,8 +39,12 @@
Timestamp string
}
+type saramaIf interface {
+ NewAsyncProducer(addrs []string, conf *sarama.Config) (sarama.AsyncProducer, error)
+}
+
// InitializePublisher initalizes kafka publisher
-func InitializePublisher(oltID int) error {
+func InitializePublisher(NewAsyncProducer func([]string, *sarama.Config) (sarama.AsyncProducer, error), oltID int) error {
var err error
sarama.Logger = log.New()
@@ -50,9 +54,13 @@
config.Metadata.Retry.Max = 10
config.Metadata.Retry.Backoff = 10 * time.Second
config.ClientID = "BBSim-OLT-" + strconv.Itoa(oltID)
- topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+ if len(Options.BBSim.KafkaEventTopic) > 0 {
+ topic = Options.BBSim.KafkaEventTopic
+ } else {
+ topic = "BBSim-OLT-" + strconv.Itoa(oltID) + "-Events"
+ }
- producer, err = sarama.NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
+ producer, err = NewAsyncProducer([]string{Options.BBSim.KafkaAddress}, config)
return err
}
diff --git a/internal/common/options.go b/internal/common/options.go
index 6088a1d..0ecb0c8 100644
--- a/internal/common/options.go
+++ b/internal/common/options.go
@@ -20,11 +20,12 @@
"errors"
"flag"
"fmt"
- "github.com/ghodss/yaml"
"io/ioutil"
- log "github.com/sirupsen/logrus"
"net"
"strings"
+
+ "github.com/ghodss/yaml"
+ log "github.com/sirupsen/logrus"
)
var tagAllocationValues = []string{
@@ -57,7 +58,6 @@
TagAllocationUnique
)
-
var sadisFormatValues = []string{
"unknown",
"att",
@@ -143,6 +143,7 @@
Events bool `yaml:"enable_events"`
ControlledActivation string `yaml:"controlled_activation"`
EnablePerf bool `yaml:"enable_perf"`
+ KafkaEventTopic string `yaml:"kafka_event_topic`
}
type BBRConfig struct {
@@ -183,6 +184,7 @@
Events: false,
ControlledActivation: "default",
EnablePerf: false,
+ KafkaEventTopic: "",
},
OltConfig{
Vendor: "BBSim",
@@ -261,6 +263,7 @@
enablePerf := flag.Bool("enableperf", conf.BBSim.EnablePerf, "Setting this flag will cause BBSim to not store data like traffic schedulers, flows of ONUs etc..")
enableEvents := flag.Bool("enableEvents", conf.BBSim.Events, "Enable sending BBSim events on configured kafka server")
kafkaAddress := flag.String("kafkaAddress", conf.BBSim.KafkaAddress, "IP:Port for kafka")
+ kafkaEventTopic := flag.String("kafkaEventTopic", conf.BBSim.KafkaEventTopic, "Ability to configure the topic on which BBSim publishes events on Kafka")
flag.Parse()
sTagAlloc, err := tagAllocationFromString(*s_tag_allocation)
@@ -304,6 +307,7 @@
conf.BBSim.ApiAddress = *api_address
conf.BBSim.RestApiAddress = *rest_api_address
conf.BBSim.SadisFormat = sf
+ conf.BBSim.KafkaEventTopic = *kafkaEventTopic
// update device id if not set
if conf.Olt.DeviceId == "" {