[VOL-2736]host and port should be specified as a single argument not as two separate arguments
Change-Id: I312fe753ac0fe62c942f335371e6449809ecfb85
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 0919a0c..0d9e3a5 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -39,6 +39,7 @@
const (
DefaultKafkaHost = "127.0.0.1"
DefaultKafkaPort = 9092
+ DefaultKafkaAddress = DefaultKafkaHost + ":" + string(DefaultKafkaPort)
DefaultGroupName = "voltha"
DefaultSleepOnError = 1
DefaultProducerFlushFrequency = 10
diff --git a/pkg/kafka/endpoint_manager_test.go b/pkg/kafka/endpoint_manager_test.go
index de7d028..3790221 100644
--- a/pkg/kafka/endpoint_manager_test.go
+++ b/pkg/kafka/endpoint_manager_test.go
@@ -29,6 +29,7 @@
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
"math"
+ "strconv"
"testing"
"time"
)
@@ -77,7 +78,7 @@
return status.Error(codes.Internal, "Embedded server failed to start")
}
- ep.backend = db.NewBackend("etcd", "127.0.0.1", kvClientPort, timeout, "service/voltha")
+ ep.backend = db.NewBackend("etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
return nil
}
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index beda537..9f9fbfc 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -79,8 +79,7 @@
// interContainerProxy represents the messaging proxy
type interContainerProxy struct {
- kafkaHost string
- kafkaPort int
+ kafkaAddress string
defaultTopic *Topic
defaultRequestHandlerInterface interface{}
deviceDiscoveryTopic *Topic
@@ -107,15 +106,9 @@
type InterContainerProxyOption func(*interContainerProxy)
-func InterContainerHost(host string) InterContainerProxyOption {
+func InterContainerAddress(address string) InterContainerProxyOption {
return func(args *interContainerProxy) {
- args.kafkaHost = host
- }
-}
-
-func InterContainerPort(port int) InterContainerProxyOption {
- return func(args *interContainerProxy) {
- args.kafkaPort = port
+ args.kafkaAddress = address
}
}
@@ -145,9 +138,8 @@
func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
proxy := &interContainerProxy{
- kafkaHost: DefaultKafkaHost,
- kafkaPort: DefaultKafkaPort,
- doneCh: make(chan struct{}),
+ kafkaAddress: DefaultKafkaAddress,
+ doneCh: make(chan struct{}),
}
for _, option := range opts {
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index f32c16c..09286ad 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -22,29 +22,19 @@
func TestDefaultKafkaProxy(t *testing.T) {
actualResult := newInterContainerProxy()
- assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
- assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+ assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
-func TestKafkaProxyOptionHost(t *testing.T) {
- actualResult := newInterContainerProxy(InterContainerHost("10.20.30.40"))
- assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
- assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
- assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
-}
-
-func TestKafkaProxyOptionPort(t *testing.T) {
- actualResult := newInterContainerProxy(InterContainerPort(1020))
- assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
- assert.Equal(t, actualResult.kafkaPort, 1020)
+func TestKafkaProxyOptionAddress(t *testing.T) {
+ actualResult := newInterContainerProxy(InterContainerAddress("10.20.30.40:1020"))
+ assert.Equal(t, actualResult.kafkaAddress, "10.20.30.40:1020")
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
}
func TestKafkaProxyOptionTopic(t *testing.T) {
actualResult := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
- assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
- assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+ assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
}
@@ -55,20 +45,17 @@
func TestKafkaProxyOptionTargetInterface(t *testing.T) {
var m *myInterface
actualResult := newInterContainerProxy(RequestHandlerInterface(m))
- assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
- assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+ assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
}
func TestKafkaProxyChangeAllOptions(t *testing.T) {
var m *myInterface
actualResult := newInterContainerProxy(
- InterContainerHost("10.20.30.40"),
- InterContainerPort(1020),
+ InterContainerAddress("10.20.30.40:1020"),
DefaultTopic(&Topic{Name: "Adapter"}),
RequestHandlerInterface(m))
- assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
- assert.Equal(t, actualResult.kafkaPort, 1020)
+ assert.Equal(t, actualResult.kafkaAddress, "10.20.30.40:1020")
assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
}
@@ -80,8 +67,7 @@
client := NewSaramaClient()
probe := newInterContainerProxy(
- InterContainerHost("10.20.30.40"),
- InterContainerPort(1020),
+ InterContainerAddress("10.20.30.40:1020"),
DefaultTopic(&Topic{Name: "Adapter"}),
RequestHandlerInterface(m),
MsgClient(client),
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 468e546..581cf49 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -47,8 +47,7 @@
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
- KafkaHost string
- KafkaPort int
+ KafkaAddress string
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumers map[string]*scc.Consumer
@@ -86,15 +85,9 @@
type SaramaClientOption func(*SaramaClient)
-func Host(host string) SaramaClientOption {
+func Address(address string) SaramaClientOption {
return func(args *SaramaClient) {
- args.KafkaHost = host
- }
-}
-
-func Port(port int) SaramaClientOption {
- return func(args *SaramaClient) {
- args.KafkaPort = port
+ args.KafkaAddress = address
}
}
@@ -202,8 +195,7 @@
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
- KafkaHost: DefaultKafkaHost,
- KafkaPort: DefaultKafkaPort,
+ KafkaAddress: DefaultKafkaAddress,
}
client.consumerType = DefaultConsumerType
client.producerFlushFrequency = DefaultProducerFlushFrequency
@@ -695,15 +687,14 @@
}
func (sc *SaramaClient) createClusterAdmin() error {
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
// Create a cluster Admin
var cAdmin sarama.ClusterAdmin
var err error
- if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
- logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+ if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
+ logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
return err
}
sc.cAdmin = cAdmin
@@ -841,8 +832,7 @@
//config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.RequiredAcks = sarama.WaitForLocal
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- brokers := []string{kafkaFullAddr}
+ brokers := []string{sc.KafkaAddress}
if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
logger.Errorw("error-starting-publisher", log.Fields{"error": err})
@@ -862,8 +852,7 @@
config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Metadata.Retry.Max = sc.metadataMaxRetry
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- brokers := []string{kafkaFullAddr}
+ brokers := []string{sc.KafkaAddress}
if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
logger.Errorw("error-starting-consumers", log.Fields{"error": err})
@@ -887,8 +876,7 @@
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = initialOffset
//config.Consumer.Offsets.Initial = sarama.OffsetOldest
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- brokers := []string{kafkaFullAddr}
+ brokers := []string{sc.KafkaAddress}
topics := []string{topic.Name}
var consumer *scc.Consumer