[VOL-3117] Read KV store data path prefix from an environment variable
- voltha-lib-go reads the KVStoreDataPrefix from env variable, so removed the command-line flag in voltha-go
- also removed unused corepair topic flag
Change-Id: Ibe8403bf187126b587a92cd9c58aa6d923f84cd0
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index 5eeae01..e7e5c4f 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -37,7 +37,6 @@
defaultKVStoreHost = "127.0.0.1"
defaultKVStorePort = 2379 // Consul = 8500; Etcd = 2379
defaultKVTxnKeyDelTime = 60
- defaultKVStoreDataPrefix = "service/voltha"
defaultLogLevel = "WARN"
defaultBanner = false
defaultDisplayVersionOnly = false
@@ -52,7 +51,6 @@
defaultDefaultRequestTimeout = 1000 * time.Millisecond
defaultCoreTimeout = 1000 * time.Millisecond
defaultCoreBindingKey = "voltha_backend_name"
- defaultCorePairTopic = "rwcore_1"
defaultMaxConnectionRetries = -1 // retries forever
defaultConnectionRetryInterval = 2 * time.Second
defaultLiveProbeInterval = 60 * time.Second
@@ -76,7 +74,6 @@
KVStoreHost string
KVStorePort int
KVTxnKeyDelTime int
- KVStoreDataPrefix string
CoreTopic string
LogLevel string
Banner bool
@@ -90,7 +87,6 @@
DefaultRequestTimeout time.Duration
DefaultCoreTimeout time.Duration
CoreBindingKey string
- CorePairTopic string
MaxConnectionRetries int
ConnectionRetryInterval time.Duration
LiveProbeInterval time.Duration
@@ -113,7 +109,6 @@
KVStoreTimeout: defaultKVStoreTimeout,
KVStoreHost: defaultKVStoreHost,
KVStorePort: defaultKVStorePort,
- KVStoreDataPrefix: defaultKVStoreDataPrefix,
KVTxnKeyDelTime: defaultKVTxnKeyDelTime,
CoreTopic: defaultCoreTopic,
LogLevel: defaultLogLevel,
@@ -128,7 +123,6 @@
LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
DefaultCoreTimeout: defaultCoreTimeout,
CoreBindingKey: defaultCoreBindingKey,
- CorePairTopic: defaultCorePairTopic,
MaxConnectionRetries: defaultMaxConnectionRetries,
ConnectionRetryInterval: defaultConnectionRetryInterval,
LiveProbeInterval: defaultLiveProbeInterval,
@@ -187,9 +181,6 @@
help = fmt.Sprintf("The time to wait before deleting a completed transaction key")
flag.IntVar(&(cf.KVTxnKeyDelTime), "kv_txn_delete_time", defaultKVTxnKeyDelTime, help)
- help = fmt.Sprintf("KV store data prefix")
- flag.StringVar(&(cf.KVStoreDataPrefix), "kv_store_data_prefix", defaultKVStoreDataPrefix, help)
-
help = fmt.Sprintf("Log level")
flag.StringVar(&(cf.LogLevel), "log_level", defaultLogLevel, help)
@@ -211,9 +202,6 @@
help = fmt.Sprintf("The name of the meta-key whose value is the rw-core group to which the ofagent is bound")
flag.StringVar(&(cf.CoreBindingKey), "core_binding_key", defaultCoreBindingKey, help)
- help = fmt.Sprintf("Core pairing group topic")
- flag.StringVar(&cf.CorePairTopic, "core_pair_topic", defaultCorePairTopic, help)
-
help = fmt.Sprintf("The number of retries to connect to a dependent component")
flag.IntVar(&(cf.MaxConnectionRetries), "max_connection_retries", defaultMaxConnectionRetries, help)
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 543a19d..0fe877e 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -90,7 +90,7 @@
ctx, cancel := context.WithTimeout(context.Background(), 1*time.Second)
defer cancel()
cfg := config.NewRWCoreFlags()
- cfg.CorePairTopic = "rw_core"
+ cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = nb.defaultTimeout
cfg.DefaultCoreTimeout = nb.defaultTimeout
cfg.KVStorePort = nb.kvClientPort
@@ -109,8 +109,7 @@
Host: cfg.KVStoreHost,
Port: cfg.KVStorePort,
Timeout: cfg.KVStoreTimeout,
- LivenessChannelInterval: cfg.LiveProbeInterval / 2,
- PathPrefix: cfg.KVStoreDataPrefix}
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2}
nb.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerHost(cfg.KafkaAdapterHost),
kafka.InterContainerPort(cfg.KafkaAdapterPort),
@@ -121,7 +120,7 @@
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
nb.adapterMgr = adapter.NewAdapterManager(proxy, nb.coreInstanceID, nb.kClient)
- nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CorePairTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
+ nb.deviceMgr, nb.logicalDeviceMgr = device.NewManagers(proxy, nb.adapterMgr, nb.kmp, endpointMgr, cfg.CoreTopic, nb.coreInstanceID, cfg.DefaultCoreTimeout)
nb.adapterMgr.Start(ctx)
if err := nb.kmp.Start(); err != nil {
@@ -131,9 +130,6 @@
if err := nb.kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: cfg.CoreTopic}, requestProxy); err != nil {
logger.Fatalf("Cannot add request handler: %s", err)
}
- if err := nb.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
- logger.Fatalf("Cannot add default request handler: %s", err)
- }
}
func (nb *NBTest) stopAll() {
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 524b52c..0a76ca2 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -27,7 +27,6 @@
"github.com/opencord/voltha-go/rw_core/core/api"
"github.com/opencord/voltha-go/rw_core/core/device"
conf "github.com/opencord/voltha-lib-go/v3/pkg/config"
- "github.com/opencord/voltha-lib-go/v3/pkg/db"
grpcserver "github.com/opencord/voltha-lib-go/v3/pkg/grpc"
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
@@ -84,17 +83,8 @@
cm := conf.NewConfigManager(kvClient, cf.KVStoreType, cf.KVStoreHost, cf.KVStorePort, cf.KVStoreTimeout)
go conf.StartLogLevelConfigProcessing(cm, ctx)
- backend := &db.Backend{
- Client: kvClient,
- StoreType: cf.KVStoreType,
- Host: cf.KVStoreHost,
- Port: cf.KVStorePort,
- Timeout: cf.KVStoreTimeout,
- // Configure backend to push Liveness Status at least every (cf.LiveProbeInterval / 2) seconds
- // so as to avoid trigger of Liveness check (due to Liveness timeout) when backend is alive
- LivenessChannelInterval: cf.LiveProbeInterval / 2,
- PathPrefix: cf.KVStoreDataPrefix,
- }
+ backend := cm.Backend
+ backend.LivenessChannelInterval = cf.LiveProbeInterval / 2
// wait until connection to KV Store is up
if err := waitUntilKVStoreReachableOrMaxTries(ctx, kvClient, cf.MaxConnectionRetries, cf.ConnectionRetryInterval); err != nil {
@@ -139,10 +129,10 @@
// create the core of the system, the device managers
endpointMgr := kafka.NewEndpointManager(backend)
- deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CorePairTopic, id, cf.DefaultCoreTimeout)
+ deviceMgr, logicalDeviceMgr := device.NewManagers(dbPath, adapterMgr, kmp, endpointMgr, cf.CoreTopic, id, cf.DefaultCoreTimeout)
// register kafka RPC handler
- registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic, cf.CorePairTopic)
+ registerAdapterRequestHandlers(kmp, deviceMgr, adapterMgr, cf.CoreTopic)
// start gRPC handler
grpcServer := grpcserver.NewGrpcServer(cf.GrpcHost, cf.GrpcPort, nil, false, probe.GetProbeFromContext(ctx))
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 29d8062..cd49681 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -112,7 +112,7 @@
func (dat *DATest) startCore(inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
- cfg.CorePairTopic = "rw_core"
+ cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStorePort = dat.kvClientPort
cfg.InCompetingMode = inCompeteMode
@@ -129,8 +129,7 @@
Host: cfg.KVStoreHost,
Port: cfg.KVStorePort,
Timeout: cfg.KVStoreTimeout,
- LivenessChannelInterval: cfg.LiveProbeInterval / 2,
- PathPrefix: cfg.KVStoreDataPrefix}
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2}
dat.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerHost(cfg.KafkaAdapterHost),
kafka.InterContainerPort(cfg.KafkaAdapterPort),
@@ -142,13 +141,13 @@
proxy := model.NewDBPath(backend)
dat.adapterMgr = adapter.NewAdapterManager(proxy, dat.coreInstanceID, dat.kClient)
- dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CorePairTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
+ dat.deviceMgr, dat.logicalDeviceMgr = NewManagers(proxy, dat.adapterMgr, dat.kmp, endpointMgr, cfg.CoreTopic, dat.coreInstanceID, cfg.DefaultCoreTimeout)
dat.adapterMgr.Start(context.Background())
if err = dat.kmp.Start(); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
- if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CorePairTopic}, kafka.OffsetNewest); err != nil {
+ if err := dat.kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: cfg.CoreTopic}, kafka.OffsetNewest); err != nil {
logger.Fatalf("Cannot add default request handler: %s", err)
}
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 77c4b7c..2649ee4 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -454,7 +454,7 @@
func (lda *LDATest) startCore(inCompeteMode bool) {
cfg := config.NewRWCoreFlags()
- cfg.CorePairTopic = "rw_core"
+ cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = lda.defaultTimeout
cfg.KVStorePort = lda.kvClientPort
cfg.InCompetingMode = inCompeteMode
@@ -471,8 +471,7 @@
Host: cfg.KVStoreHost,
Port: cfg.KVStorePort,
Timeout: cfg.KVStoreTimeout,
- LivenessChannelInterval: cfg.LiveProbeInterval / 2,
- PathPrefix: cfg.KVStoreDataPrefix}
+ LivenessChannelInterval: cfg.LiveProbeInterval / 2}
lda.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerHost(cfg.KafkaAdapterHost),
kafka.InterContainerPort(cfg.KafkaAdapterPort),
@@ -484,7 +483,7 @@
proxy := model.NewDBPath(backend)
adapterMgr := adapter.NewAdapterManager(proxy, lda.coreInstanceID, lda.kClient)
- lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CorePairTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
+ lda.deviceMgr, lda.logicalDeviceMgr = NewManagers(proxy, adapterMgr, lda.kmp, endpointMgr, cfg.CoreTopic, lda.coreInstanceID, cfg.DefaultCoreTimeout)
if err = lda.kmp.Start(); err != nil {
logger.Fatal("Cannot start InterContainerProxy")
}
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 17ac266..c7af54a 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -60,11 +60,11 @@
}
//NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, corePairTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
- adapterProxy: remote.NewAdapterProxy(kmp, corePairTopic, endpointMgr),
+ adapterProxy: remote.NewAdapterProxy(kmp, coreTopic, endpointMgr),
coreInstanceID: coreInstanceID,
dbPath: dbPath,
dProxy: dbPath.Proxy("devices"),
diff --git a/rw_core/core/device/remote/adapter_proxy.go b/rw_core/core/device/remote/adapter_proxy.go
index f4579ef..4cbb363 100755
--- a/rw_core/core/device/remote/adapter_proxy.go
+++ b/rw_core/core/device/remote/adapter_proxy.go
@@ -18,6 +18,7 @@
import (
"context"
+
"github.com/opencord/voltha-lib-go/v3/pkg/kafka"
"github.com/opencord/voltha-lib-go/v3/pkg/log"
ic "github.com/opencord/voltha-protos/v3/go/inter_container"
@@ -29,22 +30,22 @@
type AdapterProxy struct {
kafka.EndpointManager
deviceTopicRegistered bool
- corePairTopic string
+ coreTopic string
kafkaICProxy kafka.InterContainerProxy
}
// NewAdapterProxy will return adapter proxy instance
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, corePairTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, coreTopic string, endpointManager kafka.EndpointManager) *AdapterProxy {
return &AdapterProxy{
EndpointManager: endpointManager,
kafkaICProxy: kafkaProxy,
- corePairTopic: corePairTopic,
+ coreTopic: coreTopic,
deviceTopicRegistered: false,
}
}
func (ap *AdapterProxy) getCoreTopic() kafka.Topic {
- return kafka.Topic{Name: ap.corePairTopic}
+ return kafka.Topic{Name: ap.coreTopic}
}
func (ap *AdapterProxy) getAdapterTopic(deviceID string, adapterType string) (*kafka.Topic, error) {
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index fcdf340..18b9ec8 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -147,18 +147,12 @@
}
}
-func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic, corePairTopic string) {
+func registerAdapterRequestHandlers(kmp kafka.InterContainerProxy, dMgr *device.Manager, aMgr *adapter.Manager, coreTopic string) {
requestProxy := api.NewAdapterRequestHandlerProxy(dMgr, aMgr)
// Register the broadcast topic to handle any core-bound broadcast requests
if err := kmp.SubscribeWithRequestHandlerInterface(kafka.Topic{Name: coreTopic}, requestProxy); err != nil {
logger.Fatalw("Failed-registering-broadcast-handler", log.Fields{"topic": coreTopic})
}
-
- // Register the core-pair topic to handle core-bound requests destined to the core pair
- if err := kmp.SubscribeWithDefaultRequestHandler(kafka.Topic{Name: corePairTopic}, kafka.OffsetNewest); err != nil {
- logger.Fatalw("Failed-registering-pair-handler", log.Fields{"topic": corePairTopic})
- }
-
logger.Info("request-handler-registered")
}