[VOL-3783] Unifying service address and port for kafka and etcd
Change-Id: Id85168f46f93e445ee2bc9cf5cab493322f10efe
diff --git a/VERSION b/VERSION
index f4f2e28..26aaba0 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.1.2-dev159
+1.2.0
diff --git a/cmd/openonu-adapter/main.go b/cmd/openonu-adapter/main.go
index 506321b..1b1807e 100644
--- a/cmd/openonu-adapter/main.go
+++ b/cmd/openonu-adapter/main.go
@@ -24,7 +24,6 @@
"io/ioutil"
"os"
"os/signal"
- "strconv"
"strings"
"syscall"
"time"
@@ -103,13 +102,11 @@
}
// Setup Log Config
- /* address config update acc. to [VOL-2736] */
- addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
- cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+ cm := conf.NewConfigManager(ctx, a.kvClient, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
// Setup Kafka Client
- if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterHost, a.config.KafkaAdapterPort); err != nil {
+ if a.kafkaClient, err = newKafkaClient(ctx, "sarama", a.config.KafkaAdapterAddress); err != nil {
logger.Fatalw(ctx, "Unsupported-common-client", log.Fields{"error": err})
}
@@ -190,11 +187,9 @@
return nil, errors.New("unsupported-kv-store")
}
-func newKafkaClient(ctx context.Context, clientType, host string, port int) (kafka.Client, error) {
+func newKafkaClient(ctx context.Context, clientType, addr string) (kafka.Client, error) {
logger.Infow(ctx, "common-client-type", log.Fields{"client": clientType})
- /* address config update acc. to [VOL-2736] */
- addr := host + ":" + strconv.Itoa(port)
switch clientType {
case "sarama":
@@ -211,8 +206,7 @@
}
func (a *adapter) setKVClient(ctx context.Context) error {
- addr := a.config.KVStoreHost + ":" + strconv.Itoa(a.config.KVStorePort)
- client, err := newKVClient(ctx, a.config.KVStoreType, addr, a.config.KVStoreTimeout)
+ client, err := newKVClient(ctx, a.config.KVStoreType, a.config.KVStoreAddress, a.config.KVStoreTimeout)
if err != nil {
a.kvClient = nil
logger.Errorw(ctx, "error-starting-KVClient", log.Fields{"error": err})
@@ -223,13 +217,11 @@
}
func (a *adapter) startInterContainerProxy(ctx context.Context, retries int) (kafka.InterContainerProxy, error) {
- logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"host": a.config.KafkaAdapterHost,
- "port": a.config.KafkaAdapterPort, "topic": a.config.Topic})
+ logger.Infow(ctx, "starting-intercontainer-messaging-proxy", log.Fields{"addr": a.config.KafkaAdapterAddress, "topic": a.config.Topic})
var err error
/* address config update acc. to [VOL-2736] */
- addr := a.config.KafkaAdapterHost + ":" + strconv.Itoa(a.config.KafkaAdapterPort)
kip := kafka.NewInterContainerProxy(
- kafka.InterContainerAddress(addr),
+ kafka.InterContainerAddress(a.config.KafkaAdapterAddress),
kafka.MsgClient(a.kafkaClient),
kafka.DefaultTopic(&kafka.Topic{Name: a.config.Topic}))
count := 0
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 9ceca40..5e76275 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -28,14 +28,11 @@
const (
etcdStoreName = "etcd"
defaultInstanceid = "openonu"
- defaultKafkaadapterhost = "192.168.0.20"
- defaultKafkaadapterport = 9092
- defaultKafkaclusterhost = "10.100.198.220"
- defaultKafkaclusterport = 9092
+ defaultKafkaadapteraddress = "127.0.0.1:9092"
+ defaultKafkaclusteraddress = "127.0.0.1:9092"
defaultKvstoretype = etcdStoreName
defaultKvstoretimeout = 5 * time.Second
- defaultKvstorehost = "localhost"
- defaultKvstoreport = 2379 // Consul = 8500; Etcd = 2379
+ defaultKvstoreaddress = "127.0.0.1:2379"
defaultLoglevel = "WARN"
defaultBanner = false
defaultDisplayVersionOnly = false
@@ -71,14 +68,11 @@
type AdapterFlags struct {
// Command line parameters
InstanceID string
- KafkaAdapterHost string
- KafkaAdapterPort int
- KafkaClusterHost string
- KafkaClusterPort int
+ KafkaAdapterAddress string
+ KafkaClusterAddress string // NOTE this is unused across the adapter
KVStoreType string
KVStoreTimeout time.Duration
- KVStoreHost string
- KVStorePort int
+ KVStoreAddress string
Topic string
CoreTopic string
EventTopic string
@@ -108,14 +102,11 @@
func NewAdapterFlags() *AdapterFlags {
var adapterFlags = AdapterFlags{ // Default values
InstanceID: defaultInstanceid,
- KafkaAdapterHost: defaultKafkaadapterhost,
- KafkaAdapterPort: defaultKafkaadapterport,
- KafkaClusterHost: defaultKafkaclusterhost,
- KafkaClusterPort: defaultKafkaclusterport,
+ KafkaAdapterAddress: defaultKafkaadapteraddress,
+ KafkaClusterAddress: defaultKafkaclusteraddress,
KVStoreType: defaultKvstoretype,
KVStoreTimeout: defaultKvstoretimeout,
- KVStoreHost: defaultKvstorehost,
- KVStorePort: defaultKvstoreport,
+ KVStoreAddress: defaultKvstoreaddress,
Topic: defaultTopic,
CoreTopic: defaultCoreTopic,
EventTopic: defaultEventTopic,
@@ -146,17 +137,11 @@
// ParseCommandArguments parses the arguments when running read-write adaptercore service
func (so *AdapterFlags) ParseCommandArguments() {
- help := fmt.Sprintf("Kafka - Adapter messaging host")
- flag.StringVar(&(so.KafkaAdapterHost), "kafka_adapter_host", defaultKafkaadapterhost, help)
+ help := fmt.Sprintf("Kafka - Adapter messaging address")
+ flag.StringVar(&(so.KafkaAdapterAddress), "kafka_adapter_address", defaultKafkaadapteraddress, help)
- help = fmt.Sprintf("Kafka - Adapter messaging port")
- flag.IntVar(&(so.KafkaAdapterPort), "kafka_adapter_port", defaultKafkaadapterport, help)
-
- help = fmt.Sprintf("Kafka - Cluster messaging host")
- flag.StringVar(&(so.KafkaClusterHost), "kafka_cluster_host", defaultKafkaclusterhost, help)
-
- help = fmt.Sprintf("Kafka - Cluster messaging port")
- flag.IntVar(&(so.KafkaClusterPort), "kafka_cluster_port", defaultKafkaclusterport, help)
+ help = fmt.Sprintf("Kafka - Cluster messaging address")
+ flag.StringVar(&(so.KafkaClusterAddress), "kafka_cluster_address", defaultKafkaclusteraddress, help)
help = fmt.Sprintf("Open ONU topic")
baseAdapterTopic := flag.String("adapter_topic", defaultTopic, help)
@@ -173,11 +158,8 @@
help = fmt.Sprintf("The default timeout when making a kv store request")
flag.DurationVar(&(so.KVStoreTimeout), "kv_store_request_timeout", defaultKvstoretimeout, help)
- help = fmt.Sprintf("KV store host")
- flag.StringVar(&(so.KVStoreHost), "kv_store_host", defaultKvstorehost, help)
-
- help = fmt.Sprintf("KV store port")
- flag.IntVar(&(so.KVStorePort), "kv_store_port", defaultKvstoreport, help)
+ help = fmt.Sprintf("KV store address")
+ flag.StringVar(&(so.KVStoreAddress), "kv_store_address", defaultKvstoreaddress, help)
help = fmt.Sprintf("Log level")
flag.StringVar(&(so.LogLevel), "log_level", defaultLoglevel, help)
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index a9683dd..5e6e499 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -2039,14 +2039,14 @@
//setBackend provides a DB backend for the specified path on the existing KV client
func (dh *deviceHandler) setBackend(ctx context.Context, aBasePathKvStore string) *db.Backend {
- addr := dh.pOpenOnuAc.KVStoreHost + ":" + strconv.Itoa(dh.pOpenOnuAc.KVStorePort)
- logger.Debugw(ctx, "SetKVStoreBackend", log.Fields{"IpTarget": addr,
+
+ logger.Debugw(ctx, "SetKVStoreBackend", log.Fields{"IpTarget": dh.pOpenOnuAc.KVStoreAddress,
"BasePathKvStore": aBasePathKvStore, "device-id": dh.deviceID})
kvbackend := &db.Backend{
Client: dh.pOpenOnuAc.kvClient,
StoreType: dh.pOpenOnuAc.KVStoreType,
/* address config update acc. to [VOL-2736] */
- Address: addr,
+ Address: dh.pOpenOnuAc.KVStoreAddress,
Timeout: dh.pOpenOnuAc.KVStoreTimeout,
PathPrefix: aBasePathKvStore}
diff --git a/internal/pkg/onuadaptercore/openonu.go b/internal/pkg/onuadaptercore/openonu.go
index e933173..598be4a 100644
--- a/internal/pkg/onuadaptercore/openonu.go
+++ b/internal/pkg/onuadaptercore/openonu.go
@@ -54,8 +54,7 @@
cm *conf.ConfigManager
config *config.AdapterFlags
numOnus int
- KVStoreHost string
- KVStorePort int
+ KVStoreAddress string
KVStoreType string
KVStoreTimeout time.Duration
mibTemplatesGenerated map[string]bool
@@ -88,8 +87,7 @@
openOnuAc.adapterProxy = adapterProxy
openOnuAc.eventProxy = eventProxy
openOnuAc.kvClient = kvClient
- openOnuAc.KVStoreHost = cfg.KVStoreHost
- openOnuAc.KVStorePort = cfg.KVStorePort
+ openOnuAc.KVStoreAddress = cfg.KVStoreAddress
openOnuAc.KVStoreType = cfg.KVStoreType
openOnuAc.KVStoreTimeout = cfg.KVStoreTimeout
openOnuAc.mibTemplatesGenerated = make(map[string]bool)