blob: dac137ca81de3390890869482a50205eb9619654 [file] [log] [blame]
/*
* Copyright 2021-present Open Networking Foundation
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package adapter
import (
"context"
"fmt"
"sync"
"github.com/buraksezer/consistent"
"github.com/cespare/xxhash"
"github.com/golang/protobuf/proto"
"github.com/opencord/voltha-lib-go/v7/pkg/db"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
const (
// All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
// supporting 1000-10000 devices and 1 - 20 replicas of an adapter
// Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
DefaultPartitionCount = 1117
// Represents how many times a node is replicated on the consistent ring.
DefaultReplicationFactor = 117
// Load is used to calculate average load.
DefaultLoad = 1.1
)
type Endpoint string // The gRPC endpoint of an adapter instance
type ReplicaID int32 // The replication ID of an adapter instance
type EndpointManager interface {
// Registers an adapter
RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
// GetEndpoint is called to get the endpoint to communicate with for a specific device and device type.
GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error)
// IsDeviceOwnedByAdapter is invoked when a specific adapter (adapter type + replicaNumber) is restarted and
// devices owned by that adapter need to be reconciled
IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error)
// GetReplicaAssignment returns the replica number of the adapter that owns the deviceID. This is used by the
// test only
GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error)
}
type adapterService struct {
adapterType string // Type of the adapter. The same type applies for all replicas of that adapter
totalReplicas int32
replicas map[ReplicaID]Endpoint
consistentRing *consistent.Consistent
}
type endpointManager struct {
partitionCount int
replicationFactor int
load float64
backend *db.Backend
adapterServices map[string]*adapterService
adapterServicesLock sync.RWMutex
deviceTypeToAdapterServiceMap map[string]string
deviceTypeToAdapterServiceMapLock sync.RWMutex
}
type EndpointManagerOption func(*endpointManager)
func PartitionCount(count int) EndpointManagerOption {
return func(args *endpointManager) {
args.partitionCount = count
}
}
func ReplicationFactor(replicas int) EndpointManagerOption {
return func(args *endpointManager) {
args.replicationFactor = replicas
}
}
func Load(load float64) EndpointManagerOption {
return func(args *endpointManager) {
args.load = load
}
}
func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
tm := &endpointManager{
partitionCount: DefaultPartitionCount,
replicationFactor: DefaultReplicationFactor,
load: DefaultLoad,
backend: backend,
adapterServices: make(map[string]*adapterService),
deviceTypeToAdapterServiceMap: make(map[string]string),
}
for _, option := range opts {
option(tm)
}
return tm
}
func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
return newEndpointManager(backend, opts...)
}
func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error) {
logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType})
owner, err := ep.getOwnerByDeviceType(ctx, deviceID, deviceType)
if err != nil {
return "", err
}
m, ok := owner.(Member)
if !ok {
return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
}
endpoint := m.getEndPoint()
if endpoint == "" {
return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", deviceType)
}
logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType, "endpoint": endpoint})
return endpoint, nil
}
func (ep *endpointManager) IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error) {
logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "adapter-type": adapterType, "replica-number": replicaNumber})
serv, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
if err != nil {
return false, err
}
m, ok := serv.(Member)
if !ok {
return false, status.Errorf(codes.Aborted, "invalid-member-%v", serv)
}
return m.getReplica() == ReplicaID(replicaNumber), nil
}
func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error) {
owner, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
if err != nil {
return 0, nil
}
m, ok := owner.(Member)
if !ok {
return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
}
return m.getReplica(), nil
}
func (ep *endpointManager) getOwnerByDeviceType(ctx context.Context, deviceID string, deviceType string) (consistent.Member, error) {
serv, err := ep.getAdapterService(ctx, deviceType)
if err != nil {
return nil, err
}
key := ep.makeKey(deviceID, deviceType, serv.adapterType)
return serv.consistentRing.LocateKey(key), nil
}
func (ep *endpointManager) getOwnerByAdapterType(ctx context.Context, deviceID string, adapterType string) (consistent.Member, error) {
// Check whether the adapter exist
ep.adapterServicesLock.RLock()
serv, adapterExist := ep.adapterServices[adapterType]
ep.adapterServicesLock.RUnlock()
if !adapterExist {
// Sync from the dB
if err := ep.loadAdapterServices(ctx); err != nil {
return nil, err
}
// Check again
ep.adapterServicesLock.RLock()
serv, adapterExist = ep.adapterServices[adapterType]
ep.adapterServicesLock.RUnlock()
if !adapterExist {
return nil, fmt.Errorf("adapter-type-not-exist-%s", adapterType)
}
}
// Get the device type
deviceType := ""
ep.deviceTypeToAdapterServiceMapLock.RLock()
for dType, aType := range ep.deviceTypeToAdapterServiceMap {
if aType == adapterType {
deviceType = dType
break
}
}
ep.deviceTypeToAdapterServiceMapLock.RUnlock()
if deviceType == "" {
return nil, fmt.Errorf("device-type-not-exist-for-adapter-type-%s", adapterType)
}
owner := serv.consistentRing.LocateKey(ep.makeKey(deviceID, deviceType, serv.adapterType))
m, ok := owner.(Member)
if !ok {
return nil, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
}
return m, nil
}
func (ep *endpointManager) getAdapterService(ctx context.Context, deviceType string) (*adapterService, error) {
// First get the adapter type for that device type
adapterType := ""
ep.deviceTypeToAdapterServiceMapLock.RLock()
for dType, aType := range ep.deviceTypeToAdapterServiceMap {
if dType == deviceType {
adapterType = aType
break
}
}
ep.deviceTypeToAdapterServiceMapLock.RUnlock()
// Check whether the adapter exist
adapterExist := false
var aServ *adapterService
if adapterType != "" {
ep.adapterServicesLock.RLock()
aServ, adapterExist = ep.adapterServices[adapterType]
ep.adapterServicesLock.RUnlock()
}
// Load the service and device types if not found, i.e. sync up with the dB
if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
if err := ep.loadAdapterServices(ctx); err != nil {
return nil, err
}
// Get the adapter type if it was empty before
if adapterType == "" {
ep.deviceTypeToAdapterServiceMapLock.RLock()
for dType, aType := range ep.deviceTypeToAdapterServiceMap {
if dType == deviceType {
adapterType = aType
break
}
}
ep.deviceTypeToAdapterServiceMapLock.RUnlock()
}
// Error put if the adapter type is not set
if adapterType == "" {
return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
}
// Get the service
ep.adapterServicesLock.RLock()
aServ, adapterExist = ep.adapterServices[adapterType]
ep.adapterServicesLock.RUnlock()
}
// Sanity check
if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
}
return aServ, nil
}
func (ep *endpointManager) getConsistentConfig() consistent.Config {
return consistent.Config{
PartitionCount: ep.partitionCount,
ReplicationFactor: ep.replicationFactor,
Load: ep.load,
Hasher: hasher{},
}
}
// loadAdapterServices loads the services (adapters) and device types in memory. Because of the small size of the data and
// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
// instead of watching for updates in the dB and acting on it.
func (ep *endpointManager) loadAdapterServices(ctx context.Context) error {
ep.adapterServicesLock.Lock()
defer ep.adapterServicesLock.Unlock()
ep.deviceTypeToAdapterServiceMapLock.Lock()
defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
if ep.backend == nil {
return status.Error(codes.Aborted, "backend-not-set")
}
ep.adapterServices = make(map[string]*adapterService)
ep.deviceTypeToAdapterServiceMap = make(map[string]string)
// Load the adapters
blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
if err != nil {
return err
}
// Data is marshalled as proto bytes in the data store
for _, blob := range blobs {
data := blob.Value.([]byte)
adapter := &voltha.Adapter{}
if err := proto.Unmarshal(data, adapter); err != nil {
return err
}
// A valid adapter should have the vendorID set
if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
logger.Errorw(ctx, "missing vendor id", log.Fields{"adapter": adapter})
}
}
// Load the device types
blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
if err != nil {
return err
}
for _, blob := range blobs {
data := blob.Value.([]byte)
deviceType := &voltha.DeviceType{}
if err := proto.Unmarshal(data, deviceType); err != nil {
return err
}
ep.addDeviceTypeWithLock(deviceType)
}
ep.printServices(ctx)
return nil
}
func (ep *endpointManager) printServices(ctx context.Context) {
if logger.V(log.DebugLevel) {
for key, val := range ep.adapterServices {
members := val.consistentRing.GetMembers()
logger.Debugw(ctx, "adapter-service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
for _, m := range members {
n := m.(Member)
logger.Debugw(ctx, "adapter-instance-registered", log.Fields{"service-id": n.getID(), "adapter-type": n.getAdapterType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
}
}
logger.Debugw(ctx, "device-types", log.Fields{"device-types": ep.deviceTypeToAdapterServiceMap})
}
}
func (ep *endpointManager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
ep.adapterServicesLock.Lock()
defer ep.adapterServicesLock.Unlock()
ep.deviceTypeToAdapterServiceMapLock.Lock()
defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
return err
}
ep.addDeviceTypesWithLock(deviceTypes)
ep.printServices(ctx)
return nil
}
func (ep *endpointManager) setupAdapterWithLock(ctx context.Context, adapter *voltha.Adapter) error {
// Build the consistent ring for that adapter
if adapter.Vendor != "" {
if _, ok := ep.adapterServices[adapter.Type]; !ok {
ep.adapterServices[adapter.Type] = &adapterService{
adapterType: adapter.Type,
totalReplicas: adapter.TotalReplicas,
replicas: make(map[ReplicaID]Endpoint),
consistentRing: consistent.New(nil, ep.getConsistentConfig()),
}
}
currentReplica := ReplicaID(adapter.CurrentReplica)
endpoint := Endpoint(adapter.Endpoint)
ep.adapterServices[adapter.Type].replicas[currentReplica] = endpoint
ep.adapterServices[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
} else {
logger.Errorw(ctx, "missing-vendor-id", log.Fields{"adapter": adapter})
return fmt.Errorf("missing vendor id for %s adapter", adapter.Id)
}
return nil
}
func (ep *endpointManager) addDeviceTypesWithLock(deviceTypes *voltha.DeviceTypes) {
// Update the device types
for _, deviceType := range deviceTypes.Items {
if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
}
}
}
func (ep *endpointManager) addDeviceTypeWithLock(deviceType *voltha.DeviceType) {
if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
}
}
// makeKey creates the string that the hash function uses to create the hash
// In most cases, a deviceType is the same as a serviceType. It is being differentiated here to allow a
// serviceType to support multiple device types
func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
}
// The consistent package requires a hasher function
type hasher struct{}
// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
// best distribution compare to other hash packages
func (h hasher) Sum64(data []byte) uint64 {
return xxhash.Sum64(data)
}
// Member represents a member on the consistent ring
type Member interface {
String() string
getReplica() ReplicaID
getEndPoint() Endpoint
getID() string
getAdapterType() string
}
// member implements the Member interface
type member struct {
id string
adapterType string
vendor string
version string
replica ReplicaID
endpoint Endpoint
}
func newMember(id string, adapterType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
return &member{
id: id,
adapterType: adapterType,
vendor: vendor,
version: version,
replica: replica,
endpoint: endPoint,
}
}
func (m *member) String() string {
return string(m.endpoint)
}
func (m *member) getReplica() ReplicaID {
return m.replica
}
func (m *member) getEndPoint() Endpoint {
return m.endpoint
}
func (m *member) getID() string {
return m.id
}
func (m *member) getAdapterType() string {
return m.adapterType
}