[VOL-2736]:host and port should be specified as a single argument not as two separate arguments
Change-Id: I14b59b4e42b1cf0821807cdb3dd6eef2094077da
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
index 468e546..581cf49 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/sarama_client.go
@@ -47,8 +47,7 @@
// SaramaClient represents the messaging proxy
type SaramaClient struct {
cAdmin sarama.ClusterAdmin
- KafkaHost string
- KafkaPort int
+ KafkaAddress string
producer sarama.AsyncProducer
consumer sarama.Consumer
groupConsumers map[string]*scc.Consumer
@@ -86,15 +85,9 @@
type SaramaClientOption func(*SaramaClient)
-func Host(host string) SaramaClientOption {
+func Address(address string) SaramaClientOption {
return func(args *SaramaClient) {
- args.KafkaHost = host
- }
-}
-
-func Port(port int) SaramaClientOption {
- return func(args *SaramaClient) {
- args.KafkaPort = port
+ args.KafkaAddress = address
}
}
@@ -202,8 +195,7 @@
func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
client := &SaramaClient{
- KafkaHost: DefaultKafkaHost,
- KafkaPort: DefaultKafkaPort,
+ KafkaAddress: DefaultKafkaAddress,
}
client.consumerType = DefaultConsumerType
client.producerFlushFrequency = DefaultProducerFlushFrequency
@@ -695,15 +687,14 @@
}
func (sc *SaramaClient) createClusterAdmin() error {
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
config := sarama.NewConfig()
config.Version = sarama.V1_0_0_0
// Create a cluster Admin
var cAdmin sarama.ClusterAdmin
var err error
- if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
- logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+ if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
+ logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
return err
}
sc.cAdmin = cAdmin
@@ -841,8 +832,7 @@
//config.Producer.RequiredAcks = sarama.WaitForAll
config.Producer.RequiredAcks = sarama.WaitForLocal
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- brokers := []string{kafkaFullAddr}
+ brokers := []string{sc.KafkaAddress}
if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
logger.Errorw("error-starting-publisher", log.Fields{"error": err})
@@ -862,8 +852,7 @@
config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = sarama.OffsetNewest
config.Metadata.Retry.Max = sc.metadataMaxRetry
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- brokers := []string{kafkaFullAddr}
+ brokers := []string{sc.KafkaAddress}
if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
logger.Errorw("error-starting-consumers", log.Fields{"error": err})
@@ -887,8 +876,7 @@
//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
config.Consumer.Offsets.Initial = initialOffset
//config.Consumer.Offsets.Initial = sarama.OffsetOldest
- kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
- brokers := []string{kafkaFullAddr}
+ brokers := []string{sc.KafkaAddress}
topics := []string{topic.Name}
var consumer *scc.Consumer