[VOL-2836] Using different topic per ONU device

Change-Id: Ic0328f789c19dfedd86616df527c5ba510de0de9
diff --git a/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
new file mode 100644
index 0000000..4c13c76
--- /dev/null
+++ b/vendor/github.com/opencord/voltha-lib-go/v3/pkg/kafka/endpoint_manager.go
@@ -0,0 +1,352 @@
+/*
+ * Copyright 2020-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package kafka
+
+import (
+	"context"
+	"fmt"
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v3/pkg/db"
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
+	"github.com/opencord/voltha-protos/v3/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+	"sync"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of a service
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // Endpoint of a service instance.  When using kafka, this is the topic name of a service
+type ReplicaID int32 // The replication ID of a service instance
+
+type EndpointManager interface {
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and service type.  For
+	// now this will return the topic name
+	GetEndpoint(deviceID string, serviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByService is invoked when a specific service (service type + replicaNumber) is restarted and
+	// devices owned by that service need to be reconciled
+	IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error)
+
+	// getReplicaAssignment returns the replica number of the service that owns the deviceID.  This is used by the
+	// test only
+	getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error)
+}
+
+type service struct {
+	id             string // Id of the service.  The same id is used for all replicas
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount           int
+	replicationFactor        int
+	load                     float64
+	backend                  *db.Backend
+	services                 map[string]*service
+	servicesLock             sync.RWMutex
+	deviceTypeServiceMap     map[string]string
+	deviceTypeServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:       DefaultPartitionCount,
+		replicationFactor:    DefaultReplicationFactor,
+		load:                 DefaultLoad,
+		backend:              backend,
+		services:             make(map[string]*service),
+		deviceTypeServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(deviceID string, serviceType string) (Endpoint, error) {
+	logger.Debugw("getting-endpoint", log.Fields{"device-id": deviceID, "service": serviceType})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", serviceType)
+	}
+	logger.Debugw("returning-endpoint", log.Fields{"device-id": deviceID, "service": serviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByService(deviceID string, serviceType string, replicaNumber int32) (bool, error) {
+	logger.Debugw("device-ownership", log.Fields{"device-id": deviceID, "service": serviceType, "replica-number": replicaNumber})
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return false, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) getReplicaAssignment(deviceID string, serviceType string) (ReplicaID, error) {
+	owner, err := ep.getOwner(deviceID, serviceType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwner(deviceID string, serviceType string) (consistent.Member, error) {
+	serv, dType, err := ep.getServiceAndDeviceType(serviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, dType, serviceType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getServiceAndDeviceType(serviceType string) (*service, string, error) {
+	// Check whether service exist
+	ep.servicesLock.RLock()
+	serv, serviceExist := ep.services[serviceType]
+	ep.servicesLock.RUnlock()
+
+	// Load the service and device types if needed
+	if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+		if err := ep.loadServices(); err != nil {
+			return nil, "", err
+		}
+
+		// Check whether the service exists now
+		ep.servicesLock.RLock()
+		serv, serviceExist = ep.services[serviceType]
+		ep.servicesLock.RUnlock()
+		if !serviceExist || serv == nil || int(serv.totalReplicas) != len(serv.consistentRing.GetMembers()) {
+			return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+		}
+	}
+
+	ep.deviceTypeServiceMapLock.RLock()
+	defer ep.deviceTypeServiceMapLock.RUnlock()
+	for dType, sType := range ep.deviceTypeServiceMap {
+		if sType == serviceType {
+			return serv, dType, nil
+		}
+	}
+	return nil, "", status.Errorf(codes.NotFound, "service-%s", serviceType)
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadServices() error {
+	ep.servicesLock.Lock()
+	defer ep.servicesLock.Unlock()
+	ep.deviceTypeServiceMapLock.Lock()
+	defer ep.deviceTypeServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+	ep.services = make(map[string]*service)
+	ep.deviceTypeServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(context.Background(), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if adapter.Vendor != "" {
+			if _, ok := ep.services[adapter.Type]; !ok {
+				ep.services[adapter.Type] = &service{
+					id:             adapter.Type,
+					totalReplicas:  adapter.TotalReplicas,
+					replicas:       make(map[ReplicaID]Endpoint),
+					consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+				}
+
+			}
+			currentReplica := ReplicaID(adapter.CurrentReplica)
+			endpoint := Endpoint(adapter.Endpoint)
+			ep.services[adapter.Type].replicas[currentReplica] = endpoint
+			ep.services[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(context.Background(), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		if _, ok := ep.deviceTypeServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeServiceMap[deviceType.Id] = deviceType.Adapter
+		}
+	}
+
+	// Log the loaded data in debug mode to facilitate trouble shooting
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.services {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw("service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw("service-loaded", log.Fields{"serviceId": n.getID(), "serviceType": n.getServiceType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw("device-types-loaded", log.Fields{"device-types": ep.deviceTypeServiceMap})
+	}
+	return nil
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getServiceType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	serviceType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(ID string, serviceType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          ID,
+		serviceType: serviceType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getServiceType() string {
+	return m.serviceType
+}