VOL-3244 - remove competing mode flag

- removed competing core command line argument
- changed references from affinity router to device discovery

Change-Id: I40aa553762ef7a4f1c87932c5a5b2ed3038ced8d
diff --git a/go.mod b/go.mod
index e3c5839..26a75dc 100644
--- a/go.mod
+++ b/go.mod
@@ -6,7 +6,7 @@
 	github.com/gogo/protobuf v1.3.0
 	github.com/golang/protobuf v1.3.2
 	github.com/google/uuid v1.1.1
-	github.com/opencord/voltha-lib-go/v3 v3.1.16
+	github.com/opencord/voltha-lib-go/v3 v3.1.21
 	github.com/opencord/voltha-protos/v3 v3.3.9
 	github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
 	github.com/stretchr/testify v1.4.0
diff --git a/go.sum b/go.sum
index 4040d4c..50886b7 100644
--- a/go.sum
+++ b/go.sum
@@ -198,8 +198,8 @@
 github.com/onsi/ginkgo v1.6.0/go.mod h1:lLunBs/Ym6LB5Z9jYTR76FiuTmxDTDusOGeTQH+WWjE=
 github.com/onsi/gomega v1.4.2 h1:3mYCb7aPxS/RU7TI1y4rkEn1oKmPRjNJLNEXgw7MH2I=
 github.com/onsi/gomega v1.4.2/go.mod h1:ex+gbHU/CVuBBDIJjb2X0qEXbFg53c61hWP/1CpauHY=
-github.com/opencord/voltha-lib-go/v3 v3.1.16 h1:l2C93dSlI4xKkkeJbnJLdVHa73DlWYPv74CeI+LNxto=
-github.com/opencord/voltha-lib-go/v3 v3.1.16/go.mod h1:sa508HZ5vlOauh0i+WC0XFX1JZnfHtJqNIms5XBT/Z0=
+github.com/opencord/voltha-lib-go/v3 v3.1.21 h1:a2j+BZoHH300DszfKm2/Cm6TkSrYbOJCRAXI7uXuw9c=
+github.com/opencord/voltha-lib-go/v3 v3.1.21/go.mod h1:sa508HZ5vlOauh0i+WC0XFX1JZnfHtJqNIms5XBT/Z0=
 github.com/opencord/voltha-protos/v3 v3.3.9 h1:BnfDN9oaRBgyAiH9ZN7LpBpEJYxjX/ZS7R4OT2hDrtY=
 github.com/opencord/voltha-protos/v3 v3.3.9/go.mod h1:nl1ETp5Iw3avxOaKD8BJlYY5wYI4KeV95aT1pL63nto=
 github.com/opentracing/opentracing-go v1.1.0 h1:pWlfV3Bxv7k65HYwkikxat0+s3pV4bsqf19k25Ur8rU=
diff --git a/rw_core/config/config.go b/rw_core/config/config.go
index dca40d0..3d95f33 100644
--- a/rw_core/config/config.go
+++ b/rw_core/config/config.go
@@ -41,8 +41,6 @@
 	defaultRWCoreKey                 = "pki/voltha.key"
 	defaultRWCoreCert                = "pki/voltha.crt"
 	defaultRWCoreCA                  = "pki/voltha-CA.pem"
-	defaultAffinityRouterTopic       = "affinityRouter"
-	defaultInCompetingMode           = true
 	defaultLongRunningRequestTimeout = 2000 * time.Millisecond
 	defaultDefaultRequestTimeout     = 1000 * time.Millisecond
 	defaultCoreTimeout               = 1000 * time.Millisecond
@@ -72,8 +70,6 @@
 	RWCoreKey                 string
 	RWCoreCert                string
 	RWCoreCA                  string
-	AffinityRouterTopic       string
-	InCompetingMode           bool
 	LongRunningRequestTimeout time.Duration
 	DefaultRequestTimeout     time.Duration
 	DefaultCoreTimeout        time.Duration
@@ -103,8 +99,6 @@
 		RWCoreKey:                 defaultRWCoreKey,
 		RWCoreCert:                defaultRWCoreCert,
 		RWCoreCA:                  defaultRWCoreCA,
-		AffinityRouterTopic:       defaultAffinityRouterTopic,
-		InCompetingMode:           defaultInCompetingMode,
 		DefaultRequestTimeout:     defaultDefaultRequestTimeout,
 		LongRunningRequestTimeout: defaultLongRunningRequestTimeout,
 		DefaultCoreTimeout:        defaultCoreTimeout,
@@ -136,11 +130,7 @@
 	help = fmt.Sprintf("RW Core topic")
 	flag.StringVar(&(cf.CoreTopic), "rw_core_topic", defaultCoreTopic, help)
 
-	help = fmt.Sprintf("Affinity Router topic")
-	flag.StringVar(&(cf.AffinityRouterTopic), "affinity_router_topic", defaultAffinityRouterTopic, help)
-
-	help = fmt.Sprintf("In competing Mode - two cores competing to handle a transaction ")
-	flag.BoolVar(&cf.InCompetingMode, "in_competing_mode", defaultInCompetingMode, help)
+	flag.Bool("in_competing_mode", false, "deprecated")
 
 	help = fmt.Sprintf("KV store type")
 	flag.StringVar(&(cf.KVStoreType), "kv_store_type", defaultKVStoreType, help)
diff --git a/rw_core/core/api/grpc_nbi_handler_test.go b/rw_core/core/api/grpc_nbi_handler_test.go
index 2eff4a4..77c0b06 100755
--- a/rw_core/core/api/grpc_nbi_handler_test.go
+++ b/rw_core/core/api/grpc_nbi_handler_test.go
@@ -94,7 +94,6 @@
 	cfg.CoreTopic = "rw_core"
 	cfg.DefaultRequestTimeout = nb.defaultTimeout
 	cfg.DefaultCoreTimeout = nb.defaultTimeout
-	cfg.InCompetingMode = inCompeteMode
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(nb.kvClientPort)
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
@@ -112,8 +111,7 @@
 	nb.kmp = kafka.NewInterContainerProxy(
 		kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
 		kafka.MsgClient(nb.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
 
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
diff --git a/rw_core/core/core.go b/rw_core/core/core.go
index 0cfa915..801b72c 100644
--- a/rw_core/core/core.go
+++ b/rw_core/core/core.go
@@ -117,7 +117,7 @@
 
 	// connect to kafka, then wait until reachable and publisher/consumer created
 	// core.kmp must be created before deviceMgr and adapterMgr
-	kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.AffinityRouterTopic, cf.ConnectionRetryInterval)
+	kmp, err := startKafkInterContainerProxy(ctx, kafkaClient, cf.KafkaAdapterAddress, cf.CoreTopic, cf.ConnectionRetryInterval)
 	if err != nil {
 		logger.Warn("Failed to setup kafka connection")
 		return
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 93277ff..5af75ec 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -111,12 +111,11 @@
 	return test
 }
 
-func (dat *DATest) startCore(inCompeteMode bool) {
+func (dat *DATest) startCore() {
 	cfg := config.NewRWCoreFlags()
 	cfg.CoreTopic = "rw_core"
 	cfg.DefaultRequestTimeout = dat.defaultTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(dat.kvClientPort)
-	cfg.InCompetingMode = inCompeteMode
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
 		logger.Fatal("Cannot get a freeport for grpc")
@@ -132,8 +131,7 @@
 	dat.kmp = kafka.NewInterContainerProxy(
 		kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
 		kafka.MsgClient(dat.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
 
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
@@ -252,7 +250,7 @@
 		defer da.stopAll()
 		log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
 		// Start the Core
-		da.startCore(false)
+		da.startCore()
 
 		var wg sync.WaitGroup
 		numConCurrentDeviceAgents := 20
@@ -272,7 +270,7 @@
 
 	log.SetPackageLogLevel("github.com/opencord/voltha-go/rw_core/core", log.DebugLevel)
 	// Start the Core
-	da.startCore(false)
+	da.startCore()
 	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
 
 	a := da.createDeviceAgent(t)
@@ -288,7 +286,7 @@
 	defer da.stopAll()
 
 	// Start the Core
-	da.startCore(false)
+	da.startCore()
 	da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
 	a := da.createDeviceAgent(t)
 	cloned := a.getDeviceWithoutLock()
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 40c6b9c..1b1dc59 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -136,7 +136,6 @@
 	cfg.CoreTopic = "rw_core"
 	cfg.DefaultRequestTimeout = lda.defaultTimeout
 	cfg.KVStoreAddress = "127.0.0.1" + ":" + strconv.Itoa(lda.kvClientPort)
-	cfg.InCompetingMode = inCompeteMode
 	grpcPort, err := freeport.GetFreePort()
 	if err != nil {
 		logger.Fatal("Cannot get a freeport for grpc")
@@ -152,8 +151,7 @@
 	lda.kmp = kafka.NewInterContainerProxy(
 		kafka.InterContainerAddress(cfg.KafkaAdapterAddress),
 		kafka.MsgClient(lda.kClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: cfg.AffinityRouterTopic}))
+		kafka.DefaultTopic(&kafka.Topic{Name: cfg.CoreTopic}))
 
 	endpointMgr := kafka.NewEndpointManager(backend)
 	proxy := model.NewDBPath(backend)
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 51da0e1..92b7f20 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1055,14 +1055,6 @@
 		}()
 	}
 
-	// Publish on the messaging bus that we have discovered new devices
-	go func() {
-		err := dMgr.kafkaICProxy.DeviceDiscovered(agent.deviceID, deviceType, parentDeviceID, dMgr.coreInstanceID)
-		if err != nil {
-			logger.Errorw("unable-to-discover-the-device", log.Fields{"error": err})
-		}
-	}()
-
 	return childDevice, nil
 }
 
diff --git a/rw_core/core/kafka.go b/rw_core/core/kafka.go
index 3cb0292..0f28d66 100644
--- a/rw_core/core/kafka.go
+++ b/rw_core/core/kafka.go
@@ -29,7 +29,7 @@
 )
 
 // startKafkInterContainerProxy is responsible for starting the Kafka Interadapter Proxy
-func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic, affinityRouterTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
+func startKafkInterContainerProxy(ctx context.Context, kafkaClient kafka.Client, address string, coreTopic string, connectionRetryInterval time.Duration) (kafka.InterContainerProxy, error) {
 	logger.Infow("initialize-kafka-manager", log.Fields{"address": address, "topic": coreTopic})
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPreparing)
@@ -38,8 +38,7 @@
 	kmp := kafka.NewInterContainerProxy(
 		kafka.InterContainerAddress(address),
 		kafka.MsgClient(kafkaClient),
-		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}),
-		kafka.DeviceDiscoveryTopic(&kafka.Topic{Name: affinityRouterTopic}))
+		kafka.DefaultTopic(&kafka.Topic{Name: coreTopic}))
 
 	probe.UpdateStatusFromContext(ctx, "message-bus", probe.ServiceStatusPrepared)
 
diff --git a/tests/kafka/kafka_inter_container_messaging_test.go b/tests/kafka/kafka_inter_container_messaging_test.go
index 52927eb..b44bc96 100644
--- a/tests/kafka/kafka_inter_container_messaging_test.go
+++ b/tests/kafka/kafka_inter_container_messaging_test.go
@@ -19,6 +19,10 @@
 
 import (
 	"context"
+	"os"
+	"testing"
+	"time"
+
 	"github.com/golang/protobuf/ptypes"
 	"github.com/google/uuid"
 	"github.com/opencord/voltha-go/rw_core/core/api"
@@ -27,9 +31,6 @@
 	ic "github.com/opencord/voltha-protos/v3/go/inter_container"
 	"github.com/opencord/voltha-protos/v3/go/voltha"
 	"github.com/stretchr/testify/assert"
-	"os"
-	"testing"
-	"time"
 )
 
 /*
@@ -43,12 +44,12 @@
 var coreKafkaProxy *kk.InterContainerProxy
 var adapterKafkaProxy *kk.InterContainerProxy
 var kafkaPartitionClient kk.Client
-var affinityRouterTopic string
+var deviceDiscoveryTopic string
 var hostIP string
 var kafkaClient kk.Client
 
 func init() {
-	affinityRouterTopic = "AffinityRouter"
+	deviceDiscoveryTopic = "deviceDiscovery"
 	hostIP = os.Getenv("DOCKER_HOST_IP")
 	kafkaClient = kk.NewSaramaClient(
 		kk.Host(hostIP),
@@ -59,7 +60,7 @@
 		kk.InterContainerPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Core"}),
 		kk.MsgClient(kafkaClient),
-		kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
+		kk.DeviceDiscoveryTopic(&kk.Topic{Name: deviceDiscoveryTopic}))
 
 	adapterKafkaProxy = kk.NewInterContainerProxy(
 		kk.InterContainerHost(hostIP),
@@ -517,11 +518,11 @@
 		kk.InterContainerPort(9092),
 		kk.DefaultTopic(&kk.Topic{Name: "Test"}),
 		kk.MsgClient(kafkaClient),
-		kk.DeviceDiscoveryTopic(&kk.Topic{Name: affinityRouterTopic}))
+		kk.DeviceDiscoveryTopic(&kk.Topic{Name: deviceDiscoveryTopic}))
 
 	//	First start to wait for the message
 	waitingChannel := make(chan *ic.InterContainerMessage)
-	go subscribeToTopic(&kk.Topic{Name: affinityRouterTopic}, waitingChannel)
+	go subscribeToTopic(&kk.Topic{Name: deviceDiscoveryTopic}, waitingChannel)
 
 	// Sleep to make sure the consumer is ready
 	time.Sleep(time.Millisecond * 100)
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
index 9f9fbfc..cbde834 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/kafka_inter_container_library.go
@@ -19,13 +19,14 @@
 	"context"
 	"errors"
 	"fmt"
-	"google.golang.org/grpc/codes"
-	"google.golang.org/grpc/status"
 	"reflect"
 	"strings"
 	"sync"
 	"time"
 
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+
 	"github.com/golang/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -66,7 +67,6 @@
 	Start() error
 	Stop()
 	GetDefaultTopic() *Topic
-	DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error
 	InvokeRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) (bool, *any.Any)
 	InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic, waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse
 	SubscribeWithRequestHandlerInterface(topic Topic, handler interface{}) error
@@ -82,7 +82,6 @@
 	kafkaAddress                   string
 	defaultTopic                   *Topic
 	defaultRequestHandlerInterface interface{}
-	deviceDiscoveryTopic           *Topic
 	kafkaClient                    Client
 	doneCh                         chan struct{}
 	doneOnce                       sync.Once
@@ -118,12 +117,6 @@
 	}
 }
 
-func DeviceDiscoveryTopic(topic *Topic) InterContainerProxyOption {
-	return func(args *interContainerProxy) {
-		args.deviceDiscoveryTopic = topic
-	}
-}
-
 func RequestHandlerInterface(handler interface{}) InterContainerProxyOption {
 	return func(args *interContainerProxy) {
 		args.defaultRequestHandlerInterface = handler
@@ -199,48 +192,6 @@
 	return kp.defaultTopic
 }
 
-// DeviceDiscovered publish the discovered device onto the kafka messaging bus
-func (kp *interContainerProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
-	logger.Debugw("sending-device-discovery-msg", log.Fields{"deviceId": deviceId})
-	//	Simple validation
-	if deviceId == "" || deviceType == "" {
-		logger.Errorw("invalid-parameters", log.Fields{"id": deviceId, "type": deviceType})
-		return errors.New("invalid-parameters")
-	}
-	//	Create the device discovery message
-	header := &ic.Header{
-		Id:        uuid.New().String(),
-		Type:      ic.MessageType_DEVICE_DISCOVERED,
-		FromTopic: kp.defaultTopic.Name,
-		ToTopic:   kp.deviceDiscoveryTopic.Name,
-		Timestamp: ptypes.TimestampNow(),
-	}
-	body := &ic.DeviceDiscovered{
-		Id:         deviceId,
-		DeviceType: deviceType,
-		ParentId:   parentId,
-		Publisher:  publisher,
-	}
-
-	var marshalledData *any.Any
-	var err error
-	if marshalledData, err = ptypes.MarshalAny(body); err != nil {
-		logger.Errorw("cannot-marshal-request", log.Fields{"error": err})
-		return err
-	}
-	msg := &ic.InterContainerMessage{
-		Header: header,
-		Body:   marshalledData,
-	}
-
-	// Send the message
-	if err := kp.kafkaClient.Send(msg, kp.deviceDiscoveryTopic); err != nil {
-		logger.Errorw("cannot-send-device-discovery-message", log.Fields{"error": err})
-		return err
-	}
-	return nil
-}
-
 // InvokeAsyncRPC is used to make an RPC request asynchronously
 func (kp *interContainerProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *Topic, replyToTopic *Topic,
 	waitForResponse bool, key string, kvArgs ...*KVArg) chan *RpcResponse {
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
index 34aec95..bf8582d 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/mocks/kafka/kafka_inter_container_proxy.go
@@ -18,6 +18,7 @@
 
 import (
 	"context"
+
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
@@ -61,9 +62,7 @@
 	return &t
 }
 func (s *MockKafkaICProxy) DeleteTopic(topic kafka.Topic) error { return nil }
-func (s *MockKafkaICProxy) DeviceDiscovered(deviceId string, deviceType string, parentId string, publisher string) error {
-	return nil
-}
+
 func (s *MockKafkaICProxy) Stop() {}
 
 func (s *MockKafkaICProxy) InvokeAsyncRPC(ctx context.Context, rpc string, toTopic *kafka.Topic, replyToTopic *kafka.Topic,
diff --git a/vendor/modules.txt b/vendor/modules.txt
index c23335f..b197853 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -99,7 +99,7 @@
 github.com/modern-go/concurrent
 # github.com/modern-go/reflect2 v1.0.1
 github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v3 v3.1.16
+# github.com/opencord/voltha-lib-go/v3 v3.1.21
 github.com/opencord/voltha-lib-go/v3/pkg/adapters
 github.com/opencord/voltha-lib-go/v3/pkg/adapters/adapterif
 github.com/opencord/voltha-lib-go/v3/pkg/adapters/common