[VOL-2736]host and port should be specified as a single argument not as two separate arguments
Change-Id: I5a3a494c38dafa1e7e18e1f1cd55c0035359c7a9
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index e7e5c4f..dca40d0 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -26,16 +26,12 @@
const (
ConsulStoreName = "consul"
EtcdStoreName = "etcd"
- defaultGrpcPort = 50057
- defaultGrpcHost = ""
- defaultKafkaAdapterHost = "127.0.0.1"
- defaultKafkaAdapterPort = 9092
- defaultKafkaClusterHost = "127.0.0.1"
- defaultKafkaClusterPort = 9094
+ defaultGrpcAddress = ":50057"
+ defaultKafkaAdapterAddress = "127.0.0.1:9092"
+ defaultKafkaClusterAddress = "127.0.0.1:9094"
defaultKVStoreType = EtcdStoreName
defaultKVStoreTimeout = 5 * time.Second
- defaultKVStoreHost = "127.0.0.1"
- defaultKVStorePort = 2379 // Consul = 8500; Etcd = 2379
+ defaultKVStoreAddress = "127.0.0.1:2379" // Consul = 8500; Etcd = 2379
defaultKVTxnKeyDelTime = 60
defaultLogLevel = "WARN"
defaultBanner = false
@@ -55,24 +51,19 @@
defaultConnectionRetryInterval = 2 * time.Second
defaultLiveProbeInterval = 60 * time.Second
defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
- defaultProbeHost = ""
- defaultProbePort = 8080
+ defaultProbeAddress = ":8080"
)
// RWCoreFlags represents the set of configurations used by the read-write core service
type RWCoreFlags struct {
// Command line parameters
RWCoreEndpoint string
- GrpcHost string
- GrpcPort int
- KafkaAdapterHost string
- KafkaAdapterPort int
- KafkaClusterHost string
- KafkaClusterPort int
+ GrpcAddress string
+ KafkaAdapterAddress string
+ KafkaClusterAddress string
KVStoreType string
KVStoreTimeout time.Duration
- KVStoreHost string
- KVStorePort int
+ KVStoreAddress string
KVTxnKeyDelTime int
CoreTopic string
LogLevel string
@@ -91,24 +82,19 @@
ConnectionRetryInterval time.Duration
LiveProbeInterval time.Duration
NotLiveProbeInterval time.Duration
- ProbeHost string
- ProbePort int
+ ProbeAddress string
}
// NewRWCoreFlags returns a new RWCore config
func NewRWCoreFlags() *RWCoreFlags {
var rwCoreFlag = RWCoreFlags{ // Default values
RWCoreEndpoint: defaultRWCoreEndpoint,
- GrpcHost: defaultGrpcHost,
- GrpcPort: defaultGrpcPort,
- KafkaAdapterHost: defaultKafkaAdapterHost,
- KafkaAdapterPort: defaultKafkaAdapterPort,
- KafkaClusterHost: defaultKafkaClusterHost,
- KafkaClusterPort: defaultKafkaClusterPort,
+ GrpcAddress: defaultGrpcAddress,
+ KafkaAdapterAddress: defaultKafkaAdapterAddress,
+ KafkaClusterAddress: defaultKafkaClusterAddress,
KVStoreType: defaultKVStoreType,
KVStoreTimeout: defaultKVStoreTimeout,
- KVStoreHost: defaultKVStoreHost,
- KVStorePort: defaultKVStorePort,
+ KVStoreAddress: defaultKVStoreAddress,
KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
CoreTopic: defaultCoreTopic,
LogLevel: defaultLogLevel,
@@ -127,8 +113,7 @@
ConnectionRetryInterval: defaultConnectionRetryInterval,
LiveProbeInterval: defaultLiveProbeInterval,
NotLiveProbeInterval: defaultNotLiveProbeInterval,
- ProbeHost: defaultProbeHost,
- ProbePort: defaultProbePort,
+ ProbeAddress: defaultProbeAddress,
}
return &rwCoreFlag
}
@@ -139,23 +124,14 @@
help := fmt.Sprintf("RW core endpoint address")
flag.StringVar(&(cf.RWCoreEndpoint), "vcore-endpoint", defaultRWCoreEndpoint, help)
- help = fmt.Sprintf("GRPC server - host")
- flag.StringVar(&(cf.GrpcHost), "grpc_host", defaultGrpcHost, help)
+ help = fmt.Sprintf("GRPC server - address")
+ flag.StringVar(&(cf.GrpcAddress), "grpc_address", defaultGrpcAddress, help)
- help = fmt.Sprintf("GRPC server - port")
- flag.IntVar(&(cf.GrpcPort), "grpc_port", defaultGrpcPort, help)
+ help = fmt.Sprintf("Kafka - Adapter messaging address")
+ flag.StringVar(&(cf.KafkaAdapterAddress), "kafka_adapter_address", defaultKafkaAdapterAddress, help)
- help = fmt.Sprintf("Kafka - Adapter messaging host")
- flag.StringVar(&(cf.KafkaAdapterHost), "kafka_adapter_host", defaultKafkaAdapterHost, help)
-
- help = fmt.Sprintf("Kafka - Adapter messaging port")
- flag.IntVar(&(cf.KafkaAdapterPort), "kafka_adapter_port", defaultKafkaAdapterPort, help)
-
- help = fmt.Sprintf("Kafka - Cluster messaging host")
- flag.StringVar(&(cf.KafkaClusterHost), "kafka_cluster_host", defaultKafkaClusterHost, help)
-
- help = fmt.Sprintf("Kafka - Cluster messaging port")
- flag.IntVar(&(cf.KafkaClusterPort), "kafka_cluster_port", defaultKafkaClusterPort, help)
+ help = fmt.Sprintf("Kafka - Cluster messaging address")
+ flag.StringVar(&(cf.KafkaClusterAddress), "kafka_cluster_address", defaultKafkaClusterAddress, help)
help = fmt.Sprintf("RW Core topic")
flag.StringVar(&(cf.CoreTopic), "rw_core_topic", defaultCoreTopic, help)
@@ -172,11 +148,8 @@
help = fmt.Sprintf("The default timeout when making a kv store request")
flag.DurationVar(&(cf.KVStoreTimeout), "kv_store_request_timeout", defaultKVStoreTimeout, help)
- help = fmt.Sprintf("KV store host")
- flag.StringVar(&(cf.KVStoreHost), "kv_store_host", defaultKVStoreHost, help)
-
- help = fmt.Sprintf("KV store port")
- flag.IntVar(&(cf.KVStorePort), "kv_store_port", defaultKVStorePort, help)
+ help = fmt.Sprintf("KV store address")
+ flag.StringVar(&(cf.KVStoreAddress), "kv_store_address", defaultKVStoreAddress, help)
help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", defaultKVTxnKeyDelTime, help)
@@ -214,11 +187,8 @@
help = fmt.Sprintf("The number of seconds between liveness probes while in a not live state")
flag.DurationVar(&(cf.NotLiveProbeInterval), "not_live_probe_interval", defaultNotLiveProbeInterval, help)
- help = fmt.Sprintf("The host on which to listen to answer liveness and readiness probe queries over HTTP.")
- flag.StringVar(&(cf.ProbeHost), "probe_host", defaultProbeHost, help)
-
- help = fmt.Sprintf("The port on which to listen to answer liveness and readiness probe queries over HTTP.")
- flag.IntVar(&(cf.ProbePort), "probe_port", defaultProbePort, help)
+ help = fmt.Sprintf("The address on which to listen to answer liveness and readiness probe queries over HTTP.")
+ flag.StringVar(&(cf.ProbeAddress), "probe_address", defaultProbeAddress, help)
flag.Parse()
}
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 0fe877e..5801f27 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -24,6 +24,7 @@
"os"
"runtime"
"runtime/pprof"
+ "strconv"
"strings"
"sync"
"testing"
@@ -93,26 +94,23 @@
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = nb.defaultTimeout
cfg.DefaultCoreTimeout = nb.defaultTimeout
- cfg.KVStorePort = nb.kvClientPort
cfg.InCompetingMode = inCompeteMode
+ cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
}
- cfg.GrpcPort = grpcPort
- cfg.GrpcHost = "127.0.0.1"
+ cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
setCoreCompeteMode(inCompeteMode)
client := tst.SetupKVClient(cfg, nb.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
- Host: cfg.KVStoreHost,
- Port: cfg.KVStorePort,
+ Address: cfg.KVStoreAddress,
Timeout: cfg.KVStoreTimeout,
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
nb.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerHost(cfg.KafkaAdapterHost),
- kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
kafka.MsgClient(nb.kClient),
kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0a76ca2..0cfa915 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -18,7 +18,6 @@
import (
"context"
- "strconv"
"time"
"github.com/opencord/voltha-go/db/model"
@@ -73,14 +72,14 @@
// setup kv client
logger.Debugw("create-kv-client", log.Fields{"kvstore": cf.KVStoreType})
- kvClient, err := newKVClient(cf.KVStoreType, cf.KVStoreHost+":"+strconv.Itoa(cf.KVStorePort), cf.KVStoreTimeout)
+ kvClient, err := newKVClient(cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
if err != nil {
logger.Fatal(err)
}
defer stopKVClient(context.Background(), kvClient)
// sync logging config with kv store
- cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
+ cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreAddress, cf.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
backend := cm.Backend
@@ -94,8 +93,7 @@
// create kafka client
kafkaClient := kafka.NewSaramaClient(
- kafka.Host(cf.KafkaAdapterHost),
- kafka.Port(cf.KafkaAdapterPort),
+ kafka.Address(cf.KafkaAdapterAddress),
kafka.ConsumerType(kafka.GroupCustomer),
kafka.ProducerReturnOnErrors(true),
kafka.ProducerReturnOnSuccess(true),
@@ -119,7 +117,7 @@
// connect to kafka, then wait until reachable and publisher/consumer created
// core.kmp must be created before deviceMgr and adapterMgr
- kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterHost, cf.KafkaAdapterPort, cf.CoreTopic, cf.AffinityRouterTopic, cf.ConnectionRetryInterval)
+ kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.AffinityRouterTopic, cf.ConnectionRetryInterval)
if err != nil {
logger.Warn("Failed to setup kafka connection")
return
@@ -135,7 +133,7 @@
registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic)
// start gRPC handler
- grpcServer := grpcserver.NewGrpcServer(cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
+ grpcServer := grpcserver.NewGrpcServer(cf.GrpcAddress, nil, false, probe.GetProbeFromContext(ctx))
go startGRPCService(ctx, grpcServer, api.NewNBIHandler(deviceMgr, logicalDeviceMgr, adapterMgr))
defer grpcServer.Stop()
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index cd49681..93277ff 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -20,6 +20,7 @@
"context"
"math/rand"
"sort"
+ "strconv"
"strings"
"sync"
"testing"
@@ -114,25 +115,22 @@
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = dat.defaultTimeout
- cfg.KVStorePort = dat.kvClientPort
+ cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
cfg.InCompetingMode = inCompeteMode
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
}
- cfg.GrpcPort = grpcPort
- cfg.GrpcHost = "127.0.0.1"
+ cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
client := tst.SetupKVClient(cfg, dat.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
- Host: cfg.KVStoreHost,
- Port: cfg.KVStorePort,
+ Address: cfg.KVStoreAddress,
Timeout: cfg.KVStoreTimeout,
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
dat.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerHost(cfg.KafkaAdapterHost),
- kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
kafka.MsgClient(dat.kClient),
kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 2649ee4..2e1b1d3 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -18,6 +18,7 @@
import (
"context"
"math/rand"
+ "strconv"
"sync"
"testing"
"time"
@@ -456,25 +457,22 @@
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = lda.defaultTimeout
- cfg.KVStorePort = lda.kvClientPort
+ cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
cfg.InCompetingMode = inCompeteMode
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
}
- cfg.GrpcPort = grpcPort
- cfg.GrpcHost = "127.0.0.1"
+ cfg.GrpcAddress = "127.0.0.1" + ":" + strconv.Itoa(grpcPort)
client := tst.SetupKVClient(cfg, lda.coreInstanceID)
backend := &db.Backend{
Client: client,
StoreType: cfg.KVStoreType,
- Host: cfg.KVStoreHost,
- Port: cfg.KVStorePort,
+ Address: cfg.KVStoreAddress,
Timeout: cfg.KVStoreTimeout,
LivenessChannelInterval: cfg.LiveProbeInterval / 2}
lda.kmp = kafka.NewInterContainerProxy(
- kafka.InterContainerHost(cfg.KafkaAdapterHost),
- kafka.InterContainerPort(cfg.KafkaAdapterPort),
+ kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
kafka.MsgClient(lda.kClient),
kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index 18b9ec8..3cb0292 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -29,15 +29,14 @@
)
// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
-func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, host string, port int, coreTopic, affinityRouterTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
- logger.Infow("initialize-kafka-manager", log.Fields{"host": host, "port": port, "topic": coreTopic})
+func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic, affinityRouterTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
+ logger.Infow("initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
// create the kafka RPC proxy
kmp := kafka.NewInterContainerProxy(
- kafka.InterContainerHost(host),
- kafka.InterContainerPort(port),
+ kafka.InterContainerAddress(address),
kafka.MsgClient(kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
@@ -45,8 +44,8 @@
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
// wait for connectivity
- logger.Infow("starting-kafka-manager", log.Fields{"host": host,
- "port": port, "topic": coreTopic})
+ logger.Infow("starting-kafka-manager", log.Fields{"address": address,
+ "topic": coreTopic})
for {
// If we haven't started yet, then try to start
diff --git a/rw_core/main.go b/rw_core/main.go
index 6884993..ddfb2b1 100644
--- a/rw_core/main.go
+++ b/rw_core/main.go
@@ -136,7 +136,7 @@
* objects there can be a single probe end point for the process.
*/
p := &probe.Probe{}
- go p.ListenAndServe(fmt.Sprintf("%s:%d", cf.ProbeHost, cf.ProbePort))
+ go p.ListenAndServe(cf.ProbeAddress)
// Add the probe to the context to pass to all the services started
probeCtx := context.WithValue(ctx, probe.ProbeContextKey, p)
diff --git a/rw_core/test/utils.go b/rw_core/test/utils.go
index 7d2dda0..b831e4c 100644
--- a/rw_core/test/utils.go
+++ b/rw_core/test/utils.go
@@ -18,7 +18,6 @@
package test
import (
- "strconv"
"testing"
"github.com/opencord/voltha-go/rw_core/config"
@@ -147,8 +146,7 @@
//SetupKVClient creates a new etcd client
func SetupKVClient(cf *config.RWCoreFlags, coreInstanceID string) kvstore.Client {
- addr := cf.KVStoreHost + ":" + strconv.Itoa(cf.KVStorePort)
- client, err := kvstore.NewEtcdClient(addr, cf.KVStoreTimeout, log.FatalLevel)
+ client, err := kvstore.NewEtcdClient(cf.KVStoreAddress, cf.KVStoreTimeout, log.FatalLevel)
if err != nil {
panic("no kv client")
}