VOL-3244 - remove competing mode flag
- removed competing core command line argument
- changed references from affinity router to device discovery
Change-Id: I40aa553762ef7a4f1c87932c5a5b2ed3038ced8d
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index dca40d0..3d95f33 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -41,8 +41,6 @@
defaultRWCoreKey = "pki/voltha.key"
defaultRWCoreCert = "pki/voltha.crt"
defaultRWCoreCA = "pki/voltha-CA.pem"
- defaultAffinityRouterTopic = "affinityRouter"
- defaultInCompetingMode = true
defaultLongRunningRequestTimeout = 2000 * time.Millisecond
defaultDefaultRequestTimeout = 1000 * time.Millisecond
defaultCoreTimeout = 1000 * time.Millisecond
@@ -72,8 +70,6 @@
RWCoreKey string
RWCoreCert string
RWCoreCA string
- AffinityRouterTopic string
- InCompetingMode bool
LongRunningRequestTimeout time.Duration
DefaultRequestTimeout time.Duration
DefaultCoreTimeout time.Duration
@@ -103,8 +99,6 @@
RWCoreKey: defaultRWCoreKey,
RWCoreCert: defaultRWCoreCert,
RWCoreCA: defaultRWCoreCA,
- AffinityRouterTopic: defaultAffinityRouterTopic,
- InCompetingMode: defaultInCompetingMode,
DefaultRequestTimeout: defaultDefaultRequestTimeout,
LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
DefaultCoreTimeout: defaultCoreTimeout,
@@ -136,11 +130,7 @@
help = fmt.Sprintf("RW Core topic")
flag.StringVar(&(cf.CoreTopic), "rw_core_topic", defaultCoreTopic, help)
- help = fmt.Sprintf("Affinity Router topic")
- flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", defaultAffinityRouterTopic, help)
-
- help = fmt.Sprintf("In competing Mode - two cores competing to handle a transaction ")
- flag.BoolVar(&cf.InCompetingMode, "in_competing_mode", defaultInCompetingMode, help)
+ flag.Bool("in_competing_mode", false, "deprecated")
help = fmt.Sprintf("KV store type")
flag.StringVar(&(cf.KVStoreType), "kv_store_type", defaultKVStoreType, help)
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 2eff4a4..77c0b06 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -94,7 +94,6 @@
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = nb.defaultTimeout
cfg.DefaultCoreTimeout = nb.defaultTimeout
- cfg.InCompetingMode = inCompeteMode
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
grpcPort, err := freeport.GetFreePort()
if err != nil {
@@ -112,8 +111,7 @@
nb.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
kafka.MsgClient(nb.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0cfa915..801b72c 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -117,7 +117,7 @@
// connect to kafka, then wait until reachable and publisher/consumer created
// core.kmp must be created before deviceMgr and adapterMgr
- kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.AffinityRouterTopic, cf.ConnectionRetryInterval)
+ kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
if err != nil {
logger.Warn("Failed to setup kafka connection")
return
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 93277ff..5af75ec 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -111,12 +111,11 @@
return test
}
-func (dat *DATest) startCore(inCompeteMode bool) {
+func (dat *DATest) startCore() {
cfg := config.NewRWCoreFlags()
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = dat.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
- cfg.InCompetingMode = inCompeteMode
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
@@ -132,8 +131,7 @@
dat.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
kafka.MsgClient(dat.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
@@ -252,7 +250,7 @@
defer da.stopAll()
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
- da.startCore(false)
+ da.startCore()
var wg sync.WaitGroup
numConCurrentDeviceAgents := 20
@@ -272,7 +270,7 @@
log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
// Start the Core
- da.startCore(false)
+ da.startCore()
da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
@@ -288,7 +286,7 @@
defer da.stopAll()
// Start the Core
- da.startCore(false)
+ da.startCore()
da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
cloned := a.getDeviceWithoutLock()
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 40c6b9c..1b1dc59 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -136,7 +136,6 @@
cfg.CoreTopic = "rw_core"
cfg.DefaultRequestTimeout = lda.defaultTimeout
cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
- cfg.InCompetingMode = inCompeteMode
grpcPort, err := freeport.GetFreePort()
if err != nil {
logger.Fatal("Cannot get a freeport for grpc")
@@ -152,8 +151,7 @@
lda.kmp = kafka.NewInterContainerProxy(
kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
kafka.MsgClient(lda.kClient),
- kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+ kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
endpointMgr := kafka.NewEndpointManager(backend)
proxy := model.NewDBPath(backend)
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 51da0e1..92b7f20 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1055,14 +1055,6 @@
}()
}
- // Publish on the messaging bus that we have discovered new devices
- go func() {
- err := dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceID, deviceType, parentDeviceID, dMgr.coreInstanceID)
- if err != nil {
- logger.Errorw("unable-to-discover-the-device", log.Fields{"error": err})
- }
- }()
-
return childDevice, nil
}
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index 3cb0292..0f28d66 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -29,7 +29,7 @@
)
// startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
-func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic, affinityRouterTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
+func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
logger.Infow("initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
@@ -38,8 +38,7 @@
kmp := kafka.NewInterContainerProxy(
kafka.InterContainerAddress(address),
kafka.MsgClient(kafkaClient),
- kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
- kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+ kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)