[VOL-4291] Rw-core updates for gRPC migration

Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/adapter/endpoint_manager.go b/rw_core/core/adapter/endpoint_manager.go
new file mode 100644
index 0000000..dac137c
--- /dev/null
+++ b/rw_core/core/adapter/endpoint_manager.go
@@ -0,0 +1,470 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapter
+
+import (
+	"context"
+	"fmt"
+	"sync"
+
+	"github.com/buraksezer/consistent"
+	"github.com/cespare/xxhash"
+	"github.com/golang/protobuf/proto"
+	"github.com/opencord/voltha-lib-go/v7/pkg/db"
+	"github.com/opencord/voltha-lib-go/v7/pkg/log"
+	"github.com/opencord/voltha-protos/v5/go/voltha"
+	"google.golang.org/grpc/codes"
+	"google.golang.org/grpc/status"
+)
+
+const (
+	// All the values below can be tuned to get optimal data distribution.  The numbers below seems to work well when
+	// supporting 1000-10000 devices and 1 - 20 replicas of an adapter
+
+	// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+	DefaultPartitionCount = 1117
+
+	// Represents how many times a node is replicated on the consistent ring.
+	DefaultReplicationFactor = 117
+
+	// Load is used to calculate average load.
+	DefaultLoad = 1.1
+)
+
+type Endpoint string // The gRPC endpoint of an adapter instance
+type ReplicaID int32 // The replication ID of an adapter instance
+
+type EndpointManager interface {
+
+	// Registers an adapter
+	RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
+
+	// GetEndpoint is called to get the endpoint to communicate with for a specific device and device type.
+	GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error)
+
+	// IsDeviceOwnedByAdapter is invoked when a specific adapter (adapter type + replicaNumber) is restarted and
+	// devices owned by that adapter need to be reconciled
+	IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error)
+
+	// GetReplicaAssignment returns the replica number of the adapter that owns the deviceID.  This is used by the
+	// test only
+	GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error)
+}
+
+type adapterService struct {
+	adapterType    string // Type of the adapter.  The same type applies for all replicas of that adapter
+	totalReplicas  int32
+	replicas       map[ReplicaID]Endpoint
+	consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+	partitionCount                    int
+	replicationFactor                 int
+	load                              float64
+	backend                           *db.Backend
+	adapterServices                   map[string]*adapterService
+	adapterServicesLock               sync.RWMutex
+	deviceTypeToAdapterServiceMap     map[string]string
+	deviceTypeToAdapterServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.partitionCount = count
+	}
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.replicationFactor = replicas
+	}
+}
+
+func Load(load float64) EndpointManagerOption {
+	return func(args *endpointManager) {
+		args.load = load
+	}
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	tm := &endpointManager{
+		partitionCount:                DefaultPartitionCount,
+		replicationFactor:             DefaultReplicationFactor,
+		load:                          DefaultLoad,
+		backend:                       backend,
+		adapterServices:               make(map[string]*adapterService),
+		deviceTypeToAdapterServiceMap: make(map[string]string),
+	}
+
+	for _, option := range opts {
+		option(tm)
+	}
+	return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+	return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error) {
+	logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType})
+	owner, err := ep.getOwnerByDeviceType(ctx, deviceID, deviceType)
+	if err != nil {
+		return "", err
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	endpoint := m.getEndPoint()
+	if endpoint == "" {
+		return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", deviceType)
+	}
+	logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType, "endpoint": endpoint})
+	return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error) {
+	logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "adapter-type": adapterType, "replica-number": replicaNumber})
+
+	serv, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
+	if err != nil {
+		return false, err
+	}
+	m, ok := serv.(Member)
+	if !ok {
+		return false, status.Errorf(codes.Aborted, "invalid-member-%v", serv)
+	}
+	return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error) {
+	owner, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
+	if err != nil {
+		return 0, nil
+	}
+	m, ok := owner.(Member)
+	if !ok {
+		return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwnerByDeviceType(ctx context.Context, deviceID string, deviceType string) (consistent.Member, error) {
+	serv, err := ep.getAdapterService(ctx, deviceType)
+	if err != nil {
+		return nil, err
+	}
+	key := ep.makeKey(deviceID, deviceType, serv.adapterType)
+	return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getOwnerByAdapterType(ctx context.Context, deviceID string, adapterType string) (consistent.Member, error) {
+	// Check whether the adapter exist
+	ep.adapterServicesLock.RLock()
+	serv, adapterExist := ep.adapterServices[adapterType]
+	ep.adapterServicesLock.RUnlock()
+
+	if !adapterExist {
+		// Sync from the dB
+		if err := ep.loadAdapterServices(ctx); err != nil {
+			return nil, err
+		}
+		// Check again
+		ep.adapterServicesLock.RLock()
+		serv, adapterExist = ep.adapterServices[adapterType]
+		ep.adapterServicesLock.RUnlock()
+		if !adapterExist {
+			return nil, fmt.Errorf("adapter-type-not-exist-%s", adapterType)
+		}
+	}
+
+	// Get the device type
+	deviceType := ""
+	ep.deviceTypeToAdapterServiceMapLock.RLock()
+	for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+		if aType == adapterType {
+			deviceType = dType
+			break
+		}
+	}
+	ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+
+	if deviceType == "" {
+		return nil, fmt.Errorf("device-type-not-exist-for-adapter-type-%s", adapterType)
+	}
+
+	owner := serv.consistentRing.LocateKey(ep.makeKey(deviceID, deviceType, serv.adapterType))
+	m, ok := owner.(Member)
+	if !ok {
+		return nil, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+	}
+	return m, nil
+}
+
+func (ep *endpointManager) getAdapterService(ctx context.Context, deviceType string) (*adapterService, error) {
+	// First get the adapter type for that device type
+	adapterType := ""
+	ep.deviceTypeToAdapterServiceMapLock.RLock()
+	for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+		if dType == deviceType {
+			adapterType = aType
+			break
+		}
+	}
+	ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+
+	// Check whether the adapter exist
+	adapterExist := false
+	var aServ *adapterService
+	if adapterType != "" {
+		ep.adapterServicesLock.RLock()
+		aServ, adapterExist = ep.adapterServices[adapterType]
+		ep.adapterServicesLock.RUnlock()
+	}
+
+	// Load the service and device types if not found, i.e. sync up with the dB
+	if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
+		if err := ep.loadAdapterServices(ctx); err != nil {
+			return nil, err
+		}
+
+		// Get the adapter type if it was empty before
+		if adapterType == "" {
+			ep.deviceTypeToAdapterServiceMapLock.RLock()
+			for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+				if dType == deviceType {
+					adapterType = aType
+					break
+				}
+			}
+			ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+		}
+		// Error put if the adapter type is not set
+		if adapterType == "" {
+			return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
+		}
+
+		// Get the service
+		ep.adapterServicesLock.RLock()
+		aServ, adapterExist = ep.adapterServices[adapterType]
+		ep.adapterServicesLock.RUnlock()
+	}
+
+	// Sanity check
+	if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
+		return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
+	}
+
+	return aServ, nil
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+	return consistent.Config{
+		PartitionCount:    ep.partitionCount,
+		ReplicationFactor: ep.replicationFactor,
+		Load:              ep.load,
+		Hasher:            hasher{},
+	}
+}
+
+// loadAdapterServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadAdapterServices(ctx context.Context) error {
+	ep.adapterServicesLock.Lock()
+	defer ep.adapterServicesLock.Unlock()
+	ep.deviceTypeToAdapterServiceMapLock.Lock()
+	defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
+
+	if ep.backend == nil {
+		return status.Error(codes.Aborted, "backend-not-set")
+	}
+
+	ep.adapterServices = make(map[string]*adapterService)
+	ep.deviceTypeToAdapterServiceMap = make(map[string]string)
+
+	// Load the adapters
+	blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
+	if err != nil {
+		return err
+	}
+
+	// Data is marshalled as proto bytes in the data store
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		adapter := &voltha.Adapter{}
+		if err := proto.Unmarshal(data, adapter); err != nil {
+			return err
+		}
+		// A valid adapter should have the vendorID set
+		if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+			logger.Errorw(ctx, "missing vendor id", log.Fields{"adapter": adapter})
+		}
+	}
+	// Load the device types
+	blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
+	if err != nil {
+		return err
+	}
+	for _, blob := range blobs {
+		data := blob.Value.([]byte)
+		deviceType := &voltha.DeviceType{}
+		if err := proto.Unmarshal(data, deviceType); err != nil {
+			return err
+		}
+		ep.addDeviceTypeWithLock(deviceType)
+	}
+
+	ep.printServices(ctx)
+	return nil
+}
+
+func (ep *endpointManager) printServices(ctx context.Context) {
+	if logger.V(log.DebugLevel) {
+		for key, val := range ep.adapterServices {
+			members := val.consistentRing.GetMembers()
+			logger.Debugw(ctx, "adapter-service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+			for _, m := range members {
+				n := m.(Member)
+				logger.Debugw(ctx, "adapter-instance-registered", log.Fields{"service-id": n.getID(), "adapter-type": n.getAdapterType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+			}
+		}
+		logger.Debugw(ctx, "device-types", log.Fields{"device-types": ep.deviceTypeToAdapterServiceMap})
+	}
+}
+
+func (ep *endpointManager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+	ep.adapterServicesLock.Lock()
+	defer ep.adapterServicesLock.Unlock()
+	ep.deviceTypeToAdapterServiceMapLock.Lock()
+	defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
+
+	if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+		return err
+	}
+	ep.addDeviceTypesWithLock(deviceTypes)
+	ep.printServices(ctx)
+	return nil
+}
+
+func (ep *endpointManager) setupAdapterWithLock(ctx context.Context, adapter *voltha.Adapter) error {
+	// Build the consistent ring for that adapter
+	if adapter.Vendor != "" {
+		if _, ok := ep.adapterServices[adapter.Type]; !ok {
+			ep.adapterServices[adapter.Type] = &adapterService{
+				adapterType:    adapter.Type,
+				totalReplicas:  adapter.TotalReplicas,
+				replicas:       make(map[ReplicaID]Endpoint),
+				consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+			}
+
+		}
+		currentReplica := ReplicaID(adapter.CurrentReplica)
+		endpoint := Endpoint(adapter.Endpoint)
+		ep.adapterServices[adapter.Type].replicas[currentReplica] = endpoint
+		ep.adapterServices[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+	} else {
+		logger.Errorw(ctx, "missing-vendor-id", log.Fields{"adapter": adapter})
+		return fmt.Errorf("missing vendor id for %s adapter", adapter.Id)
+	}
+	return nil
+}
+
+func (ep *endpointManager) addDeviceTypesWithLock(deviceTypes *voltha.DeviceTypes) {
+	// Update the device types
+	for _, deviceType := range deviceTypes.Items {
+		if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
+			ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
+		}
+	}
+}
+
+func (ep *endpointManager) addDeviceTypeWithLock(deviceType *voltha.DeviceType) {
+	if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
+		ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
+	}
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+// In most cases, a deviceType is the same as a serviceType.  It is being differentiated here to allow a
+// serviceType to support multiple device types
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+	return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function.  Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+	return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+	String() string
+	getReplica() ReplicaID
+	getEndPoint() Endpoint
+	getID() string
+	getAdapterType() string
+}
+
+// member implements the Member interface
+type member struct {
+	id          string
+	adapterType string
+	vendor      string
+	version     string
+	replica     ReplicaID
+	endpoint    Endpoint
+}
+
+func newMember(id string, adapterType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+	return &member{
+		id:          id,
+		adapterType: adapterType,
+		vendor:      vendor,
+		version:     version,
+		replica:     replica,
+		endpoint:    endPoint,
+	}
+}
+
+func (m *member) String() string {
+	return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+	return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+	return m.endpoint
+}
+
+func (m *member) getID() string {
+	return m.id
+}
+
+func (m *member) getAdapterType() string {
+	return m.adapterType
+}