[VOL-2736]host and port should be specified as a single argument not as two separate arguments

Change-Id: I312fe753ac0fe62c942f335371e6449809ecfb85
diff --git a/pkg/kafka/client.go b/pkg/kafka/client.go
index 0919a0c..0d9e3a5 100755
--- a/pkg/kafka/client.go
+++ b/pkg/kafka/client.go
@@ -39,6 +39,7 @@
 const (
 	DefaultKafkaHost                = "127.0.0.1"
 	DefaultKafkaPort                = 9092
+	DefaultKafkaAddress             = DefaultKafkaHost + ":" + string(DefaultKafkaPort)
 	DefaultGroupName                = "voltha"
 	DefaultSleepOnError             = 1
 	DefaultProducerFlushFrequency   = 10
diff --git a/pkg/kafka/endpoint_manager_test.go b/pkg/kafka/endpoint_manager_test.go
index de7d028..3790221 100644
--- a/pkg/kafka/endpoint_manager_test.go
+++ b/pkg/kafka/endpoint_manager_test.go
@@ -29,6 +29,7 @@
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 	"math"
+	"strconv"
 	"testing"
 	"time"
 )
@@ -77,7 +78,7 @@
 		return status.Error(codes.Internal, "Embedded server failed to start")
 	}
 
-	ep.backend = db.NewBackend("etcd", "127.0.0.1", kvClientPort, timeout, "service/voltha")
+	ep.backend = db.NewBackend("etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
 	return nil
 }
 
diff --git a/pkg/kafka/kafka_inter_container_library.go b/pkg/kafka/kafka_inter_container_library.go
index beda537..9f9fbfc 100644
--- a/pkg/kafka/kafka_inter_container_library.go
+++ b/pkg/kafka/kafka_inter_container_library.go
@@ -79,8 +79,7 @@
 
 // interContainerProxy represents the messaging proxy
 type interContainerProxy struct {
-	kafkaHost                      string
-	kafkaPort                      int
+	kafkaAddress                   string
 	defaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
 	deviceDiscoveryTopic           *Topic
@@ -107,15 +106,9 @@
 
 type InterContainerProxyOption func(*interContainerProxy)
 
-func InterContainerHost(host string) InterContainerProxyOption {
+func InterContainerAddress(address string) InterContainerProxyOption {
 	return func(args *interContainerProxy) {
-		args.kafkaHost = host
-	}
-}
-
-func InterContainerPort(port int) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.kafkaPort = port
+		args.kafkaAddress = address
 	}
 }
 
@@ -145,9 +138,8 @@
 
 func newInterContainerProxy(opts ...InterContainerProxyOption) *interContainerProxy {
 	proxy := &interContainerProxy{
-		kafkaHost: DefaultKafkaHost,
-		kafkaPort: DefaultKafkaPort,
-		doneCh:    make(chan struct{}),
+		kafkaAddress: DefaultKafkaAddress,
+		doneCh:       make(chan struct{}),
 	}
 
 	for _, option := range opts {
diff --git a/pkg/kafka/kafka_inter_container_library_test.go b/pkg/kafka/kafka_inter_container_library_test.go
index f32c16c..09286ad 100644
--- a/pkg/kafka/kafka_inter_container_library_test.go
+++ b/pkg/kafka/kafka_inter_container_library_test.go
@@ -22,29 +22,19 @@
 
 func TestDefaultKafkaProxy(t *testing.T) {
 	actualResult := newInterContainerProxy()
-	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
-func TestKafkaProxyOptionHost(t *testing.T) {
-	actualResult := newInterContainerProxy(InterContainerHost("10.20.30.40"))
-	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
-	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
-	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
-}
-
-func TestKafkaProxyOptionPort(t *testing.T) {
-	actualResult := newInterContainerProxy(InterContainerPort(1020))
-	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.kafkaPort, 1020)
+func TestKafkaProxyOptionAddress(t *testing.T) {
+	actualResult := newInterContainerProxy(InterContainerAddress("10.20.30.40:1020"))
+	assert.Equal(t, actualResult.kafkaAddress, "10.20.30.40:1020")
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 }
 
 func TestKafkaProxyOptionTopic(t *testing.T) {
 	actualResult := newInterContainerProxy(DefaultTopic(&Topic{Name: "Adapter"}))
-	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, interface{}(nil))
 	assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
 }
@@ -55,20 +45,17 @@
 func TestKafkaProxyOptionTargetInterface(t *testing.T) {
 	var m *myInterface
 	actualResult := newInterContainerProxy(RequestHandlerInterface(m))
-	assert.Equal(t, actualResult.kafkaHost, DefaultKafkaHost)
-	assert.Equal(t, actualResult.kafkaPort, DefaultKafkaPort)
+	assert.Equal(t, actualResult.kafkaAddress, DefaultKafkaAddress)
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
 }
 
 func TestKafkaProxyChangeAllOptions(t *testing.T) {
 	var m *myInterface
 	actualResult := newInterContainerProxy(
-		InterContainerHost("10.20.30.40"),
-		InterContainerPort(1020),
+		InterContainerAddress("10.20.30.40:1020"),
 		DefaultTopic(&Topic{Name: "Adapter"}),
 		RequestHandlerInterface(m))
-	assert.Equal(t, actualResult.kafkaHost, "10.20.30.40")
-	assert.Equal(t, actualResult.kafkaPort, 1020)
+	assert.Equal(t, actualResult.kafkaAddress, "10.20.30.40:1020")
 	assert.Equal(t, actualResult.defaultRequestHandlerInterface, m)
 	assert.Equal(t, actualResult.defaultTopic.Name, "Adapter")
 }
@@ -80,8 +67,7 @@
 	client := NewSaramaClient()
 
 	probe := newInterContainerProxy(
-		InterContainerHost("10.20.30.40"),
-		InterContainerPort(1020),
+		InterContainerAddress("10.20.30.40:1020"),
 		DefaultTopic(&Topic{Name: "Adapter"}),
 		RequestHandlerInterface(m),
 		MsgClient(client),
diff --git a/pkg/kafka/sarama_client.go b/pkg/kafka/sarama_client.go
index 468e546..581cf49 100755
--- a/pkg/kafka/sarama_client.go
+++ b/pkg/kafka/sarama_client.go
@@ -47,8 +47,7 @@
 // SaramaClient represents the messaging proxy
 type SaramaClient struct {
 	cAdmin                        sarama.ClusterAdmin
-	KafkaHost                     string
-	KafkaPort                     int
+	KafkaAddress                  string
 	producer                      sarama.AsyncProducer
 	consumer                      sarama.Consumer
 	groupConsumers                map[string]*scc.Consumer
@@ -86,15 +85,9 @@
 
 type SaramaClientOption func(*SaramaClient)
 
-func Host(host string) SaramaClientOption {
+func Address(address string) SaramaClientOption {
 	return func(args *SaramaClient) {
-		args.KafkaHost = host
-	}
-}
-
-func Port(port int) SaramaClientOption {
-	return func(args *SaramaClient) {
-		args.KafkaPort = port
+		args.KafkaAddress = address
 	}
 }
 
@@ -202,8 +195,7 @@
 
 func NewSaramaClient(opts ...SaramaClientOption) *SaramaClient {
 	client := &SaramaClient{
-		KafkaHost: DefaultKafkaHost,
-		KafkaPort: DefaultKafkaPort,
+		KafkaAddress: DefaultKafkaAddress,
 	}
 	client.consumerType = DefaultConsumerType
 	client.producerFlushFrequency = DefaultProducerFlushFrequency
@@ -695,15 +687,14 @@
 }
 
 func (sc *SaramaClient) createClusterAdmin() error {
-	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
 	config := sarama.NewConfig()
 	config.Version = sarama.V1_0_0_0
 
 	// Create a cluster Admin
 	var cAdmin sarama.ClusterAdmin
 	var err error
-	if cAdmin, err = sarama.NewClusterAdmin([]string{kafkaFullAddr}, config); err != nil {
-		logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": kafkaFullAddr})
+	if cAdmin, err = sarama.NewClusterAdmin([]string{sc.KafkaAddress}, config); err != nil {
+		logger.Errorw("cluster-admin-failure", log.Fields{"error": err, "broker-address": sc.KafkaAddress})
 		return err
 	}
 	sc.cAdmin = cAdmin
@@ -841,8 +832,7 @@
 	//config.Producer.RequiredAcks = sarama.WaitForAll
 	config.Producer.RequiredAcks = sarama.WaitForLocal
 
-	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
-	brokers := []string{kafkaFullAddr}
+	brokers := []string{sc.KafkaAddress}
 
 	if producer, err := sarama.NewAsyncProducer(brokers, config); err != nil {
 		logger.Errorw("error-starting-publisher", log.Fields{"error": err})
@@ -862,8 +852,7 @@
 	config.Consumer.MaxProcessingTime = time.Duration(sc.maxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = sarama.OffsetNewest
 	config.Metadata.Retry.Max = sc.metadataMaxRetry
-	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
-	brokers := []string{kafkaFullAddr}
+	brokers := []string{sc.KafkaAddress}
 
 	if consumer, err := sarama.NewConsumer(brokers, config); err != nil {
 		logger.Errorw("error-starting-consumers", log.Fields{"error": err})
@@ -887,8 +876,7 @@
 	//config.Consumer.MaxProcessingTime = time.Duration(DefaultMaxProcessingTime) * time.Millisecond
 	config.Consumer.Offsets.Initial = initialOffset
 	//config.Consumer.Offsets.Initial = sarama.OffsetOldest
-	kafkaFullAddr := fmt.Sprintf("%s:%d", sc.KafkaHost, sc.KafkaPort)
-	brokers := []string{kafkaFullAddr}
+	brokers := []string{sc.KafkaAddress}
 
 	topics := []string{topic.Name}
 	var consumer *scc.Consumer