[VOL-2836] Using different topic per ONU device

Change-Id: Ic0328f789c19dfedd86616df527c5ba510de0de9
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
index 02fa3de..bbae0ed 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/adapter_proxy.go
@@ -17,6 +17,7 @@
 
 import (
 	"context"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"time"
 
 	"github.com/golang/protobuf/proto"
@@ -32,14 +33,17 @@
 	kafkaICProxy kafka.InterContainerProxy
 	adapterTopic string
 	coreTopic    string
+	endpointMgr  kafka.EndpointManager
 }
 
-func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string) *AdapterProxy {
-	var proxy AdapterProxy
-	proxy.kafkaICProxy = kafkaProxy
-	proxy.adapterTopic = adapterTopic
-	proxy.coreTopic = coreTopic
-	logger.Debugw("TOPICS", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
+func NewAdapterProxy(kafkaProxy kafka.InterContainerProxy, adapterTopic string, coreTopic string, backend *db.Backend) *AdapterProxy {
+	proxy := AdapterProxy{
+		kafkaICProxy: kafkaProxy,
+		adapterTopic: adapterTopic,
+		coreTopic:    coreTopic,
+		endpointMgr:  kafka.NewEndpointManager(backend),
+	}
+	logger.Debugw("topics", log.Fields{"core": proxy.coreTopic, "adapter": proxy.adapterTopic})
 	return &proxy
 }
 
@@ -87,7 +91,11 @@
 	}
 
 	// Set up the required rpc arguments
-	topic := kafka.Topic{Name: toAdapter}
+	endpoint, err := ap.endpointMgr.GetEndpoint(toDeviceId, toAdapter)
+	if err != nil {
+		return err
+	}
+	topic := kafka.Topic{Name: string(endpoint)}
 	replyToTopic := kafka.Topic{Name: fromAdapter}
 	rpc := "process_inter_adapter_message"
 
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
index 9582f33..20e1a52 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/adapters/common/core_proxy.go
@@ -99,6 +99,28 @@
 	topic := kafka.Topic{Name: ap.coreTopic}
 	replyToTopic := ap.getAdapterTopic()
 	args := make([]*kafka.KVArg, 2)
+
+	if adapter.TotalReplicas == 0 && adapter.CurrentReplica != 0 {
+		log.Fatal("totalReplicas can't be 0, since you're here you have at least one")
+	}
+
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas != 0 {
+		log.Fatal("currentReplica can't be 0, it has to start from 1")
+	}
+
+	if adapter.CurrentReplica == 0 && adapter.TotalReplicas == 0 {
+		// if the adapter is not setting these fields they default to 0,
+		// in that case it means the adapter is not ready to be scaled and thus it defaults
+		// to a single instance
+		adapter.CurrentReplica = 1
+		adapter.TotalReplicas = 1
+	}
+
+	if adapter.CurrentReplica > adapter.TotalReplicas {
+		log.Fatalf("CurrentReplica (%d) can't be greater than TotalReplicas (%d)",
+			adapter.CurrentReplica, adapter.TotalReplicas)
+	}
+
 	args[0] = &kafka.KVArg{
 		Key:   "adapter",
 		Value: adapter,
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
index 9f08b0d..724ad32 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/configmanager.go
@@ -63,10 +63,10 @@
 	ConfigAttribute string
 }
 
-// ConfigManager is a wrapper over backend to maintain Configuration of voltha components
+// ConfigManager is a wrapper over Backend to maintain Configuration of voltha components
 // in kvstore based persistent storage
 type ConfigManager struct {
-	backend             *db.Backend
+	Backend             *db.Backend
 	KvStoreConfigPrefix string
 }
 
@@ -95,7 +95,7 @@
 
 	return &ConfigManager{
 		KvStoreConfigPrefix: defaultkvStoreConfigPath,
-		backend: &db.Backend{
+		Backend: &db.Backend{
 			Client:     kvClient,
 			StoreType:  kvStoreType,
 			Host:       kvStoreHost,
@@ -108,12 +108,12 @@
 
 // RetrieveComponentList list the component Names for which loglevel is stored in kvstore
 func (c *ConfigManager) RetrieveComponentList(ctx context.Context, configType ConfigType) ([]string, error) {
-	data, err := c.backend.List(ctx, c.KvStoreConfigPrefix)
+	data, err := c.Backend.List(ctx, c.KvStoreConfigPrefix)
 	if err != nil {
 		return nil, err
 	}
 
-	// Looping through the data recieved from the backend for config
+	// Looping through the data recieved from the Backend for config
 	// Trimming and Splitting the required key and value from data and  storing as componentName,PackageName and Level
 	// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
 	// Then in default will be stored as PackageName,componentName as <Component Name> and DEBUG will be stored as value in List struct
@@ -168,14 +168,14 @@
 
 	c.changeEventChan = make(chan *ConfigChangeEvent, 1)
 
-	c.kvStoreEventChan = c.cManager.backend.CreateWatch(ctx, key, true)
+	c.kvStoreEventChan = c.cManager.Backend.CreateWatch(ctx, key, true)
 
 	go c.processKVStoreWatchEvents()
 
 	return c.changeEventChan
 }
 
-// processKVStoreWatchEvents process event channel recieved from the backend for any ChangeType
+// processKVStoreWatchEvents process event channel recieved from the Backend for any ChangeType
 // It checks for the EventType is valid or not.For the valid EventTypes creates ConfigChangeEvent and send it on channel
 func (c *ComponentConfig) processKVStoreWatchEvents() {
 
@@ -183,7 +183,7 @@
 
 	logger.Debugw("processing-kvstore-event-change", log.Fields{"key-prefix": ccKeyPrefix})
 
-	ccPathPrefix := c.cManager.backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
+	ccPathPrefix := c.cManager.Backend.PathPrefix + ccKeyPrefix + kvStorePathSeparator
 
 	for watchResp := range c.kvStoreEventChan {
 
@@ -210,7 +210,7 @@
 
 	logger.Debugw("retrieving-config", log.Fields{"key": key})
 
-	if kvpair, err := c.cManager.backend.Get(ctx, key); err != nil {
+	if kvpair, err := c.cManager.Backend.Get(ctx, key); err != nil {
 		return "", err
 	} else {
 		if kvpair == nil {
@@ -228,17 +228,17 @@
 
 	logger.Debugw("retreiving-list", log.Fields{"key": key})
 
-	data, err := c.cManager.backend.List(ctx, key)
+	data, err := c.cManager.Backend.List(ctx, key)
 	if err != nil {
 		return nil, err
 	}
 
-	// Looping through the data recieved from the backend for the given key
+	// Looping through the data recieved from the Backend for the given key
 	// Trimming the required key and value from data and  storing as key/value pair
 	// For Example, recieved key would be <Backend Prefix Path>/<Config Prefix>/<Component Name>/<Config Type>/default and value \"DEBUG\"
 	// Then in default will be stored as key and DEBUG will be stored as value in map[string]string
 	res := make(map[string]string)
-	ccPathPrefix := c.cManager.backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
+	ccPathPrefix := c.cManager.Backend.PathPrefix + kvStorePathSeparator + key + kvStorePathSeparator
 	for attr, val := range data {
 		res[strings.TrimPrefix(attr, ccPathPrefix)] = strings.Trim(fmt.Sprintf("%s", val.Value), "\"")
 	}
@@ -252,7 +252,7 @@
 	logger.Debugw("saving-config", log.Fields{"key": key, "value": configValue})
 
 	//save the data for update config
-	if err := c.cManager.backend.Put(ctx, key, configValue); err != nil {
+	if err := c.cManager.Backend.Put(ctx, key, configValue); err != nil {
 		return err
 	}
 	return nil
@@ -264,7 +264,7 @@
 
 	logger.Debugw("deleting-config", log.Fields{"key": key})
 	//delete the config
-	if err := c.cManager.backend.Delete(ctx, key); err != nil {
+	if err := c.cManager.Backend.Delete(ctx, key); err != nil {
 		return err
 	}
 	return nil
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
index b929c9d..9c36241 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/config/logcontroller.go
@@ -15,7 +15,7 @@
  */
 
 // Package Config provides dynamic logging configuration for specific Voltha component with loglevel lookup
-// from etcd kvstore implemented using backend.
+// from etcd kvstore implemented using Backend.
 // Any Voltha component can start utilizing dynamic logging by starting goroutine of StartLogLevelConfigProcessing after
 // starting kvClient for the component.
 
@@ -121,8 +121,8 @@
 }
 
 // ProcessLogConfig will first load and apply log config and then start waiting on component config and global config
-// channels for any changes. Event channel will be recieved from backend for valid change type
-// Then data for componentn log config and global log config will be retrieved from backend and stored in updatedLogConfig in precedence order
+// channels for any changes. Event channel will be recieved from Backend for valid change type
+// Then data for componentn log config and global log config will be retrieved from Backend and stored in updatedLogConfig in precedence order
 // If any changes in updatedLogConfig will be applied on component
 func (c *ComponentLogController) processLogConfig(ctx context.Context) {
 
@@ -247,7 +247,7 @@
 	return componentLogConfig, nil
 }
 
-// buildUpdatedLogConfig retrieve the global logConfig and component logConfig  from backend
+// buildUpdatedLogConfig retrieve the global logConfig and component logConfig  from Backend
 // component logConfig stores the log config with precedence order
 // For example, If the global logConfig is set and component logConfig is set only for specific package then
 // component logConfig is stored with global logConfig  and component logConfig of specific package
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..4c13c76
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
+	// now this will return the topic name
+	GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+	// devices owned by that service need to be reconciled
+	IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+	// getReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
+	// test only
+	getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+	id             string // Id of the service.  The same id is used for all replicas
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount           int
+	replicationFactor        int
+	load                     float64
+	backend                  *db.Backend
+	services                 map[string]*service
+	servicesLock             sync.RWMutex
+	deviceTypeServiceMap     map[string]string
+	deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:       DefaultPartitionCount,
+		replicationFactor:    DefaultReplicationFactor,
+		load:                 DefaultLoad,
+		backend:              backend,
+		services:             make(map[string]*service),
+		deviceTypeServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+	}
+	logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return false, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, dType, serviceType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+	// Check whether service exist
+	ep.servicesLock.RLock()
+	serv, serviceExist := ep.services[serviceType]
+	ep.servicesLock.RUnlock()
+
+	// Load the service and device types if needed
+	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+		if err := ep.loadServices(); err != nil {
+			return nil, "", err
+		}
+
+		// Check whether the service exists now
+		ep.servicesLock.RLock()
+		serv, serviceExist = ep.services[serviceType]
+		ep.servicesLock.RUnlock()
+		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+		}
+	}
+
+	ep.deviceTypeServiceMapLock.RLock()
+	defer ep.deviceTypeServiceMapLock.RUnlock()
+	for dType, sType := range ep.deviceTypeServiceMap {
+		if sType == serviceType {
+			return serv, dType, nil
+		}
+	}
+	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+	ep.servicesLock.Lock()
+	defer ep.servicesLock.Unlock()
+	ep.deviceTypeServiceMapLock.Lock()
+	defer ep.deviceTypeServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+	ep.services = make(map[string]*service)
+	ep.deviceTypeServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(context.Background(), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if adapter.Vendor != "" {
+			if _, ok := ep.services[adapter.Type]; !ok {
+				ep.services[adapter.Type] = &service{
+					id:             adapter.Type,
+					totalReplicas:  adapter.TotalReplicas,
+					replicas:       make(map[ReplicaID]Endpoint),
+					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+				}
+
+			}
+			currentReplica := ReplicaID(adapter.CurrentReplica)
+			endpoint := Endpoint(adapter.Endpoint)
+			ep.services[adapter.Type].replicas[currentReplica] = endpoint
+			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(context.Background(), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+		}
+	}
+
+	// Log the loaded data in debug mode to facilitate trouble shooting
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.services {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+	}
+	return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	serviceType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          ID,
+		serviceType: serviceType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getServiceType() string {
+	return m.serviceType
+}