[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/adapter/agent.go b/rw_core/core/adapter/agent.go
index a6d8186..c2d45de 100644
--- a/rw_core/core/adapter/agent.go
+++ b/rw_core/core/adapter/agent.go
@@ -18,44 +18,108 @@
import (
"context"
- "github.com/golang/protobuf/ptypes"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "errors"
"sync"
"time"
+
+ "github.com/golang/protobuf/ptypes/empty"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc"
)
// agent represents adapter agent
type agent struct {
- adapter *voltha.Adapter
- lock sync.RWMutex
+ adapter *voltha.Adapter
+ lock sync.RWMutex
+ adapterAPIEndPoint string
+ vClient *vgrpc.Client
+ adapterLock sync.RWMutex
+ onAdapterRestart vgrpc.RestartedHandler
+ liveProbeInterval time.Duration
}
-func newAdapterAgent(adapter *voltha.Adapter) *agent {
+func setAndTestAdapterServiceHandler(ctx context.Context, conn *grpc.ClientConn) interface{} {
+ svc := adapter_services.NewAdapterServiceClient(conn)
+ if h, err := svc.GetHealthStatus(ctx, &empty.Empty{}); err != nil || h.State != voltha.HealthStatus_HEALTHY {
+ logger.Debugw(ctx, "connection-not-ready", log.Fields{"error": err, "health": h})
+ return nil
+ }
+ return svc
+}
+
+func newAdapterAgent(adapter *voltha.Adapter, onAdapterRestart vgrpc.RestartedHandler, liveProbeInterval time.Duration) *agent {
return &agent{
- adapter: adapter,
+ adapter: adapter,
+ onAdapterRestart: onAdapterRestart,
+ adapterAPIEndPoint: adapter.Endpoint,
+ liveProbeInterval: liveProbeInterval,
+ }
+}
+
+func (aa *agent) start(ctx context.Context) error {
+ // Establish grpc connection to Core
+ var err error
+ if aa.vClient, err = vgrpc.NewClient(aa.adapterAPIEndPoint,
+ aa.onAdapterRestart,
+ vgrpc.ActivityCheck(true)); err != nil {
+ return err
+ }
+
+ // Add a liveness communication update
+ aa.vClient.SubscribeForLiveness(aa.updateCommunicationTime)
+
+ go aa.vClient.Start(ctx, setAndTestAdapterServiceHandler)
+ return nil
+}
+
+func (aa *agent) stop(ctx context.Context) {
+ // Close the client
+ if aa.vClient != nil {
+ aa.vClient.Stop(ctx)
}
}
func (aa *agent) getAdapter(ctx context.Context) *voltha.Adapter {
- aa.lock.RLock()
- defer aa.lock.RUnlock()
- logger.Debugw(ctx, "getAdapter", log.Fields{"adapter": aa.adapter})
+ aa.adapterLock.RLock()
+ defer aa.adapterLock.RUnlock()
return aa.adapter
}
+func (aa *agent) getClient() (adapter_services.AdapterServiceClient, error) {
+ client, err := aa.vClient.GetClient()
+ if err != nil {
+ return nil, err
+ }
+ c, ok := client.(adapter_services.AdapterServiceClient)
+ if ok {
+ return c, nil
+ }
+ return nil, errors.New("invalid client returned")
+}
+
+func (aa *agent) resetConnection(ctx context.Context) {
+ if aa.vClient != nil {
+ aa.vClient.Reset(ctx)
+ }
+}
+
// updateCommunicationTime updates the message to the specified time.
// No attempt is made to save the time to the db, so only recent times are guaranteed to be accurate.
func (aa *agent) updateCommunicationTime(new time.Time) {
// only update if new time is not in the future, and either the old time is invalid or new time > old time
aa.lock.Lock()
defer aa.lock.Unlock()
- if last, err := ptypes.Timestamp(aa.adapter.LastCommunication); !new.After(time.Now()) && (err != nil || new.After(last)) {
- timestamp, err := ptypes.TimestampProto(new)
- if err != nil {
- return // if the new time cannot be encoded, just ignore it
- }
-
- aa.adapter.LastCommunication = timestamp
+ timestamp := time.Unix(aa.adapter.LastCommunication, 0)
+ if !new.After(time.Now()) && new.After(timestamp) {
+ timestamp = new
+ aa.adapter.LastCommunication = timestamp.Unix()
}
}
+
+func (aa *agent) IsConnectionUp() bool {
+ _, err := aa.getClient()
+ return err == nil
+}
diff --git a/rw_core/core/adapter/common.go b/rw_core/core/adapter/common.go
index 0c6b2c3..354ccc9 100644
--- a/rw_core/core/adapter/common.go
+++ b/rw_core/core/adapter/common.go
@@ -18,7 +18,7 @@
package adapter
import (
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
)
var logger log.CLogger
diff --git a/rw_core/core/adapter/endpoint_manager.go b/rw_core/core/adapter/endpoint_manager.go
new file mode 100644
index 0000000..dac137c
--- /dev/null
+++ b/rw_core/core/adapter/endpoint_manager.go
@@ -0,0 +1,470 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package adapter
+
+import (
+ "context"
+ "fmt"
+ "sync"
+
+ "github.com/buraksezer/consistent"
+ "github.com/cespare/xxhash"
+ "github.com/golang/protobuf/proto"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+const (
+ // All the values below can be tuned to get optimal data distribution. The numbers below seems to work well when
+ // supporting 1000-10000 devices and 1 - 20 replicas of an adapter
+
+ // Keys are distributed among partitions. Prime numbers are good to distribute keys uniformly.
+ DefaultPartitionCount = 1117
+
+ // Represents how many times a node is replicated on the consistent ring.
+ DefaultReplicationFactor = 117
+
+ // Load is used to calculate average load.
+ DefaultLoad = 1.1
+)
+
+type Endpoint string // The gRPC endpoint of an adapter instance
+type ReplicaID int32 // The replication ID of an adapter instance
+
+type EndpointManager interface {
+
+ // Registers an adapter
+ RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error
+
+ // GetEndpoint is called to get the endpoint to communicate with for a specific device and device type.
+ GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error)
+
+ // IsDeviceOwnedByAdapter is invoked when a specific adapter (adapter type + replicaNumber) is restarted and
+ // devices owned by that adapter need to be reconciled
+ IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error)
+
+ // GetReplicaAssignment returns the replica number of the adapter that owns the deviceID. This is used by the
+ // test only
+ GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error)
+}
+
+type adapterService struct {
+ adapterType string // Type of the adapter. The same type applies for all replicas of that adapter
+ totalReplicas int32
+ replicas map[ReplicaID]Endpoint
+ consistentRing *consistent.Consistent
+}
+
+type endpointManager struct {
+ partitionCount int
+ replicationFactor int
+ load float64
+ backend *db.Backend
+ adapterServices map[string]*adapterService
+ adapterServicesLock sync.RWMutex
+ deviceTypeToAdapterServiceMap map[string]string
+ deviceTypeToAdapterServiceMapLock sync.RWMutex
+}
+
+type EndpointManagerOption func(*endpointManager)
+
+func PartitionCount(count int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.partitionCount = count
+ }
+}
+
+func ReplicationFactor(replicas int) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.replicationFactor = replicas
+ }
+}
+
+func Load(load float64) EndpointManagerOption {
+ return func(args *endpointManager) {
+ args.load = load
+ }
+}
+
+func newEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ tm := &endpointManager{
+ partitionCount: DefaultPartitionCount,
+ replicationFactor: DefaultReplicationFactor,
+ load: DefaultLoad,
+ backend: backend,
+ adapterServices: make(map[string]*adapterService),
+ deviceTypeToAdapterServiceMap: make(map[string]string),
+ }
+
+ for _, option := range opts {
+ option(tm)
+ }
+ return tm
+}
+
+func NewEndpointManager(backend *db.Backend, opts ...EndpointManagerOption) EndpointManager {
+ return newEndpointManager(backend, opts...)
+}
+
+func (ep *endpointManager) GetEndpoint(ctx context.Context, deviceID string, deviceType string) (Endpoint, error) {
+ logger.Debugw(ctx, "getting-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType})
+ owner, err := ep.getOwnerByDeviceType(ctx, deviceID, deviceType)
+ if err != nil {
+ return "", err
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return "", status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ endpoint := m.getEndPoint()
+ if endpoint == "" {
+ return "", status.Errorf(codes.Unavailable, "endpoint-not-set-%s", deviceType)
+ }
+ logger.Debugw(ctx, "returning-endpoint", log.Fields{"device-id": deviceID, "device-type": deviceType, "endpoint": endpoint})
+ return endpoint, nil
+}
+
+func (ep *endpointManager) IsDeviceOwnedByAdapter(ctx context.Context, deviceID string, adapterType string, replicaNumber int32) (bool, error) {
+ logger.Debugw(ctx, "device-ownership", log.Fields{"device-id": deviceID, "adapter-type": adapterType, "replica-number": replicaNumber})
+
+ serv, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
+ if err != nil {
+ return false, err
+ }
+ m, ok := serv.(Member)
+ if !ok {
+ return false, status.Errorf(codes.Aborted, "invalid-member-%v", serv)
+ }
+ return m.getReplica() == ReplicaID(replicaNumber), nil
+}
+
+func (ep *endpointManager) GetReplicaAssignment(ctx context.Context, deviceID string, adapterType string) (ReplicaID, error) {
+ owner, err := ep.getOwnerByAdapterType(ctx, deviceID, adapterType)
+ if err != nil {
+ return 0, nil
+ }
+ m, ok := owner.(Member)
+ if !ok {
+ return 0, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m.getReplica(), nil
+}
+
+func (ep *endpointManager) getOwnerByDeviceType(ctx context.Context, deviceID string, deviceType string) (consistent.Member, error) {
+ serv, err := ep.getAdapterService(ctx, deviceType)
+ if err != nil {
+ return nil, err
+ }
+ key := ep.makeKey(deviceID, deviceType, serv.adapterType)
+ return serv.consistentRing.LocateKey(key), nil
+}
+
+func (ep *endpointManager) getOwnerByAdapterType(ctx context.Context, deviceID string, adapterType string) (consistent.Member, error) {
+ // Check whether the adapter exist
+ ep.adapterServicesLock.RLock()
+ serv, adapterExist := ep.adapterServices[adapterType]
+ ep.adapterServicesLock.RUnlock()
+
+ if !adapterExist {
+ // Sync from the dB
+ if err := ep.loadAdapterServices(ctx); err != nil {
+ return nil, err
+ }
+ // Check again
+ ep.adapterServicesLock.RLock()
+ serv, adapterExist = ep.adapterServices[adapterType]
+ ep.adapterServicesLock.RUnlock()
+ if !adapterExist {
+ return nil, fmt.Errorf("adapter-type-not-exist-%s", adapterType)
+ }
+ }
+
+ // Get the device type
+ deviceType := ""
+ ep.deviceTypeToAdapterServiceMapLock.RLock()
+ for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+ if aType == adapterType {
+ deviceType = dType
+ break
+ }
+ }
+ ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+
+ if deviceType == "" {
+ return nil, fmt.Errorf("device-type-not-exist-for-adapter-type-%s", adapterType)
+ }
+
+ owner := serv.consistentRing.LocateKey(ep.makeKey(deviceID, deviceType, serv.adapterType))
+ m, ok := owner.(Member)
+ if !ok {
+ return nil, status.Errorf(codes.Aborted, "invalid-member-%v", owner)
+ }
+ return m, nil
+}
+
+func (ep *endpointManager) getAdapterService(ctx context.Context, deviceType string) (*adapterService, error) {
+ // First get the adapter type for that device type
+ adapterType := ""
+ ep.deviceTypeToAdapterServiceMapLock.RLock()
+ for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+ if dType == deviceType {
+ adapterType = aType
+ break
+ }
+ }
+ ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+
+ // Check whether the adapter exist
+ adapterExist := false
+ var aServ *adapterService
+ if adapterType != "" {
+ ep.adapterServicesLock.RLock()
+ aServ, adapterExist = ep.adapterServices[adapterType]
+ ep.adapterServicesLock.RUnlock()
+ }
+
+ // Load the service and device types if not found, i.e. sync up with the dB
+ if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
+ if err := ep.loadAdapterServices(ctx); err != nil {
+ return nil, err
+ }
+
+ // Get the adapter type if it was empty before
+ if adapterType == "" {
+ ep.deviceTypeToAdapterServiceMapLock.RLock()
+ for dType, aType := range ep.deviceTypeToAdapterServiceMap {
+ if dType == deviceType {
+ adapterType = aType
+ break
+ }
+ }
+ ep.deviceTypeToAdapterServiceMapLock.RUnlock()
+ }
+ // Error put if the adapter type is not set
+ if adapterType == "" {
+ return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
+ }
+
+ // Get the service
+ ep.adapterServicesLock.RLock()
+ aServ, adapterExist = ep.adapterServices[adapterType]
+ ep.adapterServicesLock.RUnlock()
+ }
+
+ // Sanity check
+ if !adapterExist || aServ == nil || int(aServ.totalReplicas) != len(aServ.consistentRing.GetMembers()) {
+ return nil, fmt.Errorf("adapter-service-not-found-for-device-type-%s", deviceType)
+ }
+
+ return aServ, nil
+}
+
+func (ep *endpointManager) getConsistentConfig() consistent.Config {
+ return consistent.Config{
+ PartitionCount: ep.partitionCount,
+ ReplicationFactor: ep.replicationFactor,
+ Load: ep.load,
+ Hasher: hasher{},
+ }
+}
+
+// loadAdapterServices loads the services (adapters) and device types in memory. Because of the small size of the data and
+// the data format in the dB being binary protobuf then it is better to load all the data if inconsistency is detected,
+// instead of watching for updates in the dB and acting on it.
+func (ep *endpointManager) loadAdapterServices(ctx context.Context) error {
+ ep.adapterServicesLock.Lock()
+ defer ep.adapterServicesLock.Unlock()
+ ep.deviceTypeToAdapterServiceMapLock.Lock()
+ defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
+
+ if ep.backend == nil {
+ return status.Error(codes.Aborted, "backend-not-set")
+ }
+
+ ep.adapterServices = make(map[string]*adapterService)
+ ep.deviceTypeToAdapterServiceMap = make(map[string]string)
+
+ // Load the adapters
+ blobs, err := ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "adapters")
+ if err != nil {
+ return err
+ }
+
+ // Data is marshalled as proto bytes in the data store
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ adapter := &voltha.Adapter{}
+ if err := proto.Unmarshal(data, adapter); err != nil {
+ return err
+ }
+ // A valid adapter should have the vendorID set
+ if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+ logger.Errorw(ctx, "missing vendor id", log.Fields{"adapter": adapter})
+ }
+ }
+ // Load the device types
+ blobs, err = ep.backend.List(log.WithSpanFromContext(context.Background(), ctx), "device_types")
+ if err != nil {
+ return err
+ }
+ for _, blob := range blobs {
+ data := blob.Value.([]byte)
+ deviceType := &voltha.DeviceType{}
+ if err := proto.Unmarshal(data, deviceType); err != nil {
+ return err
+ }
+ ep.addDeviceTypeWithLock(deviceType)
+ }
+
+ ep.printServices(ctx)
+ return nil
+}
+
+func (ep *endpointManager) printServices(ctx context.Context) {
+ if logger.V(log.DebugLevel) {
+ for key, val := range ep.adapterServices {
+ members := val.consistentRing.GetMembers()
+ logger.Debugw(ctx, "adapter-service", log.Fields{"service": key, "expected-replica": val.totalReplicas, "replicas": len(val.consistentRing.GetMembers())})
+ for _, m := range members {
+ n := m.(Member)
+ logger.Debugw(ctx, "adapter-instance-registered", log.Fields{"service-id": n.getID(), "adapter-type": n.getAdapterType(), "replica": n.getReplica(), "endpoint": n.getEndPoint()})
+ }
+ }
+ logger.Debugw(ctx, "device-types", log.Fields{"device-types": ep.deviceTypeToAdapterServiceMap})
+ }
+}
+
+func (ep *endpointManager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) error {
+ ep.adapterServicesLock.Lock()
+ defer ep.adapterServicesLock.Unlock()
+ ep.deviceTypeToAdapterServiceMapLock.Lock()
+ defer ep.deviceTypeToAdapterServiceMapLock.Unlock()
+
+ if err := ep.setupAdapterWithLock(ctx, adapter); err != nil {
+ return err
+ }
+ ep.addDeviceTypesWithLock(deviceTypes)
+ ep.printServices(ctx)
+ return nil
+}
+
+func (ep *endpointManager) setupAdapterWithLock(ctx context.Context, adapter *voltha.Adapter) error {
+ // Build the consistent ring for that adapter
+ if adapter.Vendor != "" {
+ if _, ok := ep.adapterServices[adapter.Type]; !ok {
+ ep.adapterServices[adapter.Type] = &adapterService{
+ adapterType: adapter.Type,
+ totalReplicas: adapter.TotalReplicas,
+ replicas: make(map[ReplicaID]Endpoint),
+ consistentRing: consistent.New(nil, ep.getConsistentConfig()),
+ }
+
+ }
+ currentReplica := ReplicaID(adapter.CurrentReplica)
+ endpoint := Endpoint(adapter.Endpoint)
+ ep.adapterServices[adapter.Type].replicas[currentReplica] = endpoint
+ ep.adapterServices[adapter.Type].consistentRing.Add(newMember(adapter.Id, adapter.Type, adapter.Vendor, endpoint, adapter.Version, currentReplica))
+ } else {
+ logger.Errorw(ctx, "missing-vendor-id", log.Fields{"adapter": adapter})
+ return fmt.Errorf("missing vendor id for %s adapter", adapter.Id)
+ }
+ return nil
+}
+
+func (ep *endpointManager) addDeviceTypesWithLock(deviceTypes *voltha.DeviceTypes) {
+ // Update the device types
+ for _, deviceType := range deviceTypes.Items {
+ if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
+ }
+ }
+}
+
+func (ep *endpointManager) addDeviceTypeWithLock(deviceType *voltha.DeviceType) {
+ if _, ok := ep.deviceTypeToAdapterServiceMap[deviceType.Id]; !ok {
+ ep.deviceTypeToAdapterServiceMap[deviceType.Id] = deviceType.AdapterType
+ }
+}
+
+// makeKey creates the string that the hash function uses to create the hash
+// In most cases, a deviceType is the same as a serviceType. It is being differentiated here to allow a
+// serviceType to support multiple device types
+func (ep *endpointManager) makeKey(deviceID string, deviceType string, serviceType string) []byte {
+ return []byte(fmt.Sprintf("%s_%s_%s", serviceType, deviceType, deviceID))
+}
+
+// The consistent package requires a hasher function
+type hasher struct{}
+
+// Sum64 provides the hasher function. Based upon numerous testing scenarios, the xxhash package seems to provide the
+// best distribution compare to other hash packages
+func (h hasher) Sum64(data []byte) uint64 {
+ return xxhash.Sum64(data)
+}
+
+// Member represents a member on the consistent ring
+type Member interface {
+ String() string
+ getReplica() ReplicaID
+ getEndPoint() Endpoint
+ getID() string
+ getAdapterType() string
+}
+
+// member implements the Member interface
+type member struct {
+ id string
+ adapterType string
+ vendor string
+ version string
+ replica ReplicaID
+ endpoint Endpoint
+}
+
+func newMember(id string, adapterType string, vendor string, endPoint Endpoint, version string, replica ReplicaID) Member {
+ return &member{
+ id: id,
+ adapterType: adapterType,
+ vendor: vendor,
+ version: version,
+ replica: replica,
+ endpoint: endPoint,
+ }
+}
+
+func (m *member) String() string {
+ return string(m.endpoint)
+}
+
+func (m *member) getReplica() ReplicaID {
+ return m.replica
+}
+
+func (m *member) getEndPoint() Endpoint {
+ return m.endpoint
+}
+
+func (m *member) getID() string {
+ return m.id
+}
+
+func (m *member) getAdapterType() string {
+ return m.adapterType
+}
diff --git a/rw_core/core/adapter/endpoint_manager_test.go b/rw_core/core/adapter/endpoint_manager_test.go
new file mode 100644
index 0000000..78635d1
--- /dev/null
+++ b/rw_core/core/adapter/endpoint_manager_test.go
@@ -0,0 +1,258 @@
+/*
+ * Copyright 2021-present Open Networking Foundation
+
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package adapter
+
+import (
+ "context"
+ "fmt"
+ "math"
+ "strconv"
+ "testing"
+ "time"
+
+ "github.com/golang/protobuf/proto"
+ "github.com/google/uuid"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/mocks/etcd"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
+ "github.com/phayes/freeport"
+ "github.com/stretchr/testify/assert"
+ "google.golang.org/grpc/codes"
+ "google.golang.org/grpc/status"
+)
+
+type EPTest struct {
+ etcdServer *etcd.EtcdServer
+ backend *db.Backend
+ maxReplicas int
+ minReplicas int
+}
+
+func newEPTest(minReplicas, maxReplicas int) *EPTest {
+ ctx := context.Background()
+ test := &EPTest{
+ minReplicas: minReplicas,
+ maxReplicas: maxReplicas,
+ }
+
+ // Create backend
+ if err := test.initBackend(); err != nil {
+ logger.Fatalw(ctx, "setting-backend-failed", log.Fields{"error": err})
+ }
+
+ // Populate backend with data
+ if err := test.populateBackend(); err != nil {
+ logger.Fatalw(ctx, "populating-db-failed", log.Fields{"error": err})
+ }
+ return test
+}
+
+func (ep *EPTest) initBackend() error {
+ ctx := context.Background()
+ configName := "voltha-go.adapter.ep.test"
+ storageDir := "voltha-go.adapter.ep.etcd"
+ logLevel := "error"
+ timeout := 5 * time.Second
+
+ kvClientPort, err := freeport.GetFreePort()
+ if err != nil {
+ return err
+ }
+ peerPort, err := freeport.GetFreePort()
+ if err != nil {
+ return err
+ }
+ ep.etcdServer = etcd.StartEtcdServer(ctx, etcd.MKConfig(ctx, configName, kvClientPort, peerPort, storageDir, logLevel))
+ if ep.etcdServer == nil {
+ return status.Error(codes.Internal, "Embedded server failed to start")
+ }
+
+ ep.backend = db.NewBackend(ctx, "etcd", "127.0.0.1"+":"+strconv.Itoa(kvClientPort), timeout, "service/voltha")
+ return nil
+}
+
+func (ep *EPTest) stopAll() {
+ if ep.etcdServer != nil {
+ ep.etcdServer.Stop(context.Background())
+ }
+}
+
+func (ep *EPTest) populateBackend() error {
+ // Add an adapter with multiple replicas
+ adapterPrefix := "adapter_brcm_openomci_onu"
+ numReplicas := ep.maxReplicas
+ for i := 0; i < numReplicas; i++ {
+ adapter := &voltha.Adapter{
+ Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
+ Vendor: "VOLTHA OpenONU",
+ Version: "2.4.0-dev0",
+ Type: adapterPrefix,
+ CurrentReplica: int32(i),
+ TotalReplicas: int32(numReplicas),
+ Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
+ }
+ adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
+ blob, err := proto.Marshal(adapter)
+ if err != nil {
+ return err
+ }
+ if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
+ return err
+ }
+ }
+
+ // Add an adapter with minreplicas
+ adapterPrefix = "adapter_openolt"
+ numReplicas = ep.minReplicas
+ for i := 0; i < numReplicas; i++ {
+ adapter := &voltha.Adapter{
+ Id: fmt.Sprintf("%s_%d", adapterPrefix, i),
+ Vendor: "VOLTHA OpenOLT",
+ Version: "2.3.1-dev",
+ Type: adapterPrefix,
+ CurrentReplica: int32(i),
+ TotalReplicas: int32(numReplicas),
+ Endpoint: fmt.Sprintf("%s_%d", adapterPrefix, i),
+ }
+ adapterKVKey := fmt.Sprintf("%s/%d", adapterPrefix, i)
+ blob, err := proto.Marshal(adapter)
+ if err != nil {
+ return err
+ }
+ if err := ep.backend.Put(context.Background(), "adapters/"+adapterKVKey, blob); err != nil {
+ return err
+ }
+ }
+
+ // Add the brcm_openomci_onu device type
+ dType := "brcm_openomci_onu"
+ adapterName := "adapter_brcm_openomci_onu"
+ deviceType := &voltha.DeviceType{
+ Id: dType,
+ VendorIds: []string{"OPEN", "ALCL", "BRCM", "TWSH", "ALPH", "ISKT", "SFAA", "BBSM", "SCOM", "ARPX", "DACM", "ERSN", "HWTC", "CIGG"},
+ AdapterType: adapterName,
+ AcceptsAddRemoveFlowUpdates: true,
+ }
+ blob, err := proto.Marshal(deviceType)
+ if err != nil {
+ return err
+ }
+ if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
+ return err
+ }
+
+ // Add the openolt device type
+ dType = "openolt"
+ adapterName = "adapter_openolt"
+ deviceType = &voltha.DeviceType{
+ Id: dType,
+ AdapterType: adapterName,
+ AcceptsAddRemoveFlowUpdates: true,
+ }
+ blob, err = proto.Marshal(deviceType)
+ if err != nil {
+ return err
+ }
+ if err := ep.backend.Put(context.Background(), "device_types/"+deviceType.Id, blob); err != nil {
+ return err
+ }
+ return nil
+}
+
+func getMeanAndStdDeviation(val []int, replicas int) (float64, float64) {
+ var sum, mean, sd float64
+ for i := 0; i < replicas; i++ {
+ sum += float64(val[i])
+ }
+ mean = sum / float64(replicas)
+
+ for j := 0; j < replicas; j++ {
+ sd += math.Pow(float64(val[j])-mean, 2)
+ }
+ sd = math.Sqrt(sd / float64(replicas))
+ return mean, sd
+}
+
+func (ep *EPTest) testEndpointManagerAPIs(t *testing.T, tm EndpointManager, adapterType string, deviceType string, replicas int) {
+ ctx := context.Background()
+ // Map of device ids to topic
+ deviceIDs := make(map[string]Endpoint)
+ numDevices := 1000
+ total := make([]int, replicas)
+ for i := 0; i < numDevices; i++ {
+ deviceID := uuid.New().String()
+ endpoint, err := tm.GetEndpoint(ctx, deviceID, deviceType)
+ if err != nil {
+ logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
+ }
+ deviceIDs[deviceID] = endpoint
+ replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, adapterType)
+ if err != nil {
+ logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
+ }
+ total[replicaID]++
+ }
+
+ mean, sdtDev := getMeanAndStdDeviation(total, replicas)
+ fmt.Printf("Device distributions => devices:%d adapter_replicas:%d mean:%d standard_deviation:%d, distributions:%v\n", numDevices, replicas, int(mean), int(sdtDev), total)
+
+ // Verify that we get the same topic for a given device ID, irrespective of the number of iterations
+ numIterations := 10
+ for i := 0; i < numIterations; i++ {
+ for deviceID, expectedEndpoint := range deviceIDs {
+ endpointByAdapterType, err := tm.GetEndpoint(ctx, deviceID, deviceType)
+ if err != nil {
+ logger.Fatalw(ctx, "error-getting-endpoint", log.Fields{"error": err})
+ }
+ assert.Equal(t, expectedEndpoint, endpointByAdapterType)
+ }
+ }
+
+ // Verify that a device belong to the correct node
+ for deviceID := range deviceIDs {
+ replicaID, err := tm.GetReplicaAssignment(ctx, deviceID, adapterType)
+ if err != nil {
+ logger.Fatalw(ctx, "error-getting-topic", log.Fields{"error": err})
+ }
+ for k := 0; k < replicas; k++ {
+ owned, err := tm.IsDeviceOwnedByAdapter(ctx, deviceID, adapterType, int32(k))
+ if err != nil {
+ logger.Fatalw(ctx, "error-verifying-device-ownership", log.Fields{"error": err})
+ }
+ assert.Equal(t, ReplicaID(k) == replicaID, owned)
+ }
+ }
+}
+
+func TestEndpointManagerSuite(t *testing.T) {
+ tmt := newEPTest(1, 10)
+ assert.NotNil(t, tmt)
+
+ tm := NewEndpointManager(
+ tmt.backend,
+ PartitionCount(1117),
+ ReplicationFactor(200),
+ Load(1.1))
+
+ defer tmt.stopAll()
+
+ //1. Test APIs with multiple replicas
+ tmt.testEndpointManagerAPIs(t, tm, "adapter_brcm_openomci_onu", "brcm_openomci_onu", tmt.maxReplicas)
+
+ //2. Test APIs with single replica
+ tmt.testEndpointManagerAPIs(t, tm, "adapter_openolt", "openolt", tmt.minReplicas)
+}
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 772ff75..b592842 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -18,129 +18,149 @@
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-lib-go/v5/pkg/probe"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Manager represents adapter manager attributes
type Manager struct {
- adapterAgents map[string]*agent
- deviceTypes map[string]*voltha.DeviceType
- adapterProxy *model.Proxy
- deviceTypeProxy *model.Proxy
- onAdapterRestart adapterRestartedHandler
- coreInstanceID string
- lockAdaptersMap sync.RWMutex
- lockdDeviceTypeToAdapterMap sync.RWMutex
+ adapterAgents map[string]*agent
+ adapterEndpoints map[Endpoint]*agent
+ deviceTypes map[string]*voltha.DeviceType
+ adapterDbProxy *model.Proxy
+ deviceTypeDbProxy *model.Proxy
+ onAdapterRestart vgrpc.RestartedHandler
+ endpointMgr EndpointManager
+ lockAdapterAgentsMap sync.RWMutex
+ lockDeviceTypesMap sync.RWMutex
+ lockAdapterEndPointsMap sync.RWMutex
+ liveProbeInterval time.Duration
}
-func NewAdapterManager(ctx context.Context, dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
- aMgr := &Manager{
- coreInstanceID: coreInstanceID,
- adapterProxy: dbPath.Proxy("adapters"),
- deviceTypeProxy: dbPath.Proxy("device_types"),
- deviceTypes: make(map[string]*voltha.DeviceType),
- adapterAgents: make(map[string]*agent),
- }
- kafkaClient.SubscribeForMetadata(ctx, aMgr.updateLastAdapterCommunication)
- return aMgr
-}
-
-// an interface type for callbacks
-// if more than one callback is required, this should be converted to a proper interface
-type adapterRestartedHandler func(ctx context.Context, adapter *voltha.Adapter) error
-
-func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart adapterRestartedHandler) {
+// SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
+func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart vgrpc.RestartedHandler) {
aMgr.onAdapterRestart = onAdapterRestart
}
-func (aMgr *Manager) Start(ctx context.Context) {
- probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusPreparing)
- logger.Info(ctx, "starting-adapter-manager")
+func NewAdapterManager(
+ dbPath *model.Path,
+ coreInstanceID string,
+ backend *db.Backend,
+ liveProbeInterval time.Duration,
+) *Manager {
+ return &Manager{
+ adapterDbProxy: dbPath.Proxy("adapters"),
+ deviceTypeDbProxy: dbPath.Proxy("device_types"),
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*agent),
+ adapterEndpoints: make(map[Endpoint]*agent),
+ endpointMgr: NewEndpointManager(backend),
+ liveProbeInterval: liveProbeInterval,
+ }
+}
+
+func (aMgr *Manager) Start(ctx context.Context, serviceName string) {
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusPreparing)
+ logger.Infow(ctx, "starting-service", log.Fields{"service": serviceName})
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
// created if there are no data in the dB to start
err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
if err != nil {
- logger.Fatalf(ctx, "failed-to-load-adapters-and-device-types-in-memory: %s", err)
+ logger.Fatalw(ctx, "failed-to-load-adapters-and-device-types-in-memory", log.Fields{"service": serviceName, "error": err})
}
- probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
- logger.Info(ctx, "adapter-manager-started")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
}
-//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
- // Load the adapters
- var adapters []*voltha.Adapter
- if err := aMgr.adapterProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
- logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
- return err
+func (aMgr *Manager) Stop(ctx context.Context) {
+ // Stop all adapters
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ adapterAgent.stop(ctx)
}
- if len(adapters) != 0 {
- for _, adapter := range adapters {
- if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
- logger.Errorw(ctx, "failed to add adapter", log.Fields{"adapterId": adapter.Id})
- } else {
- logger.Debugw(ctx, "adapter added successfully", log.Fields{"adapterId": adapter.Id})
- }
- }
- }
-
- // Load the device types
- var deviceTypes []*voltha.DeviceType
- if err := aMgr.deviceTypeProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
- logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
- return err
- }
- if len(deviceTypes) != 0 {
- dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
- for _, dType := range deviceTypes {
- logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": dTypes})
- dTypes.Items = append(dTypes.Items, dType)
- }
- return aMgr.addDeviceTypes(ctx, dTypes, false)
- }
-
- logger.Debug(ctx, "no-existing-device-type-found")
-
- return nil
}
-func (aMgr *Manager) updateLastAdapterCommunication(adapterID string, timestamp time.Time) {
- aMgr.lockAdaptersMap.RLock()
- adapterAgent, have := aMgr.adapterAgents[adapterID]
- aMgr.lockAdaptersMap.RUnlock()
+func (aMgr *Manager) GetAdapterEndpoint(ctx context.Context, deviceID string, deviceType string) (string, error) {
+ endPoint, err := aMgr.endpointMgr.GetEndpoint(ctx, deviceID, deviceType)
+ if err != nil {
+ return "", err
+ }
+ return string(endPoint), nil
+}
+
+func (aMgr *Manager) GetAdapterWithEndpoint(ctx context.Context, endPoint string) (*voltha.Adapter, error) {
+ aMgr.lockAdapterEndPointsMap.RLock()
+ agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
+ aMgr.lockAdapterEndPointsMap.RUnlock()
if have {
- adapterAgent.updateCommunicationTime(timestamp)
+ return agent.getAdapter(ctx), nil
}
+
+ return nil, errors.New("Not found")
+}
+
+func (aMgr *Manager) GetAdapterNameWithEndpoint(ctx context.Context, endPoint string) (string, error) {
+ aMgr.lockAdapterEndPointsMap.RLock()
+ agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
+ aMgr.lockAdapterEndPointsMap.RUnlock()
+
+ if have {
+ return agent.adapter.Id, nil
+ }
+
+ return "", errors.New("Not found")
+}
+
+func (aMgr *Manager) GetAdapterClient(_ context.Context, endpoint string) (adapter_services.AdapterServiceClient, error) {
+ if endpoint == "" {
+ return nil, errors.New("endpoint-cannot-be-empty")
+ }
+ aMgr.lockAdapterEndPointsMap.RLock()
+ defer aMgr.lockAdapterEndPointsMap.RUnlock()
+
+ if agent, have := aMgr.adapterEndpoints[Endpoint(endpoint)]; have {
+ return agent.getClient()
+ }
+
+ return nil, fmt.Errorf("Endpoint-not-found-%s", endpoint)
}
func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
+ aMgr.lockAdapterAgentsMap.Lock()
+ aMgr.lockAdapterEndPointsMap.Lock()
+ defer aMgr.lockAdapterEndPointsMap.Unlock()
+ defer aMgr.lockAdapterAgentsMap.Unlock()
logger.Debugw(ctx, "adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- if have, err := aMgr.adapterProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
+ if have, err := aMgr.adapterDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
logger.Errorw(ctx, "failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
return err
} else if !have {
- if err := aMgr.adapterProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
+ if err := aMgr.adapterDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
logger.Errorw(ctx, "failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
@@ -155,7 +175,11 @@
}
}
clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
- aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter)
+ // Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
+ // This handler just log the restart event. The actual action taken following an adapter restart
+ // will be done when an adapter re-registers itself.
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+ aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
}
return nil
}
@@ -165,10 +189,10 @@
return fmt.Errorf("no-device-type")
}
logger.Debugw(ctx, "adding-device-types", log.Fields{"deviceTypes": deviceTypes})
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ aMgr.lockAdapterAgentsMap.Lock()
+ defer aMgr.lockAdapterAgentsMap.Unlock()
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
// create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
for _, deviceType := range deviceTypes.Items {
@@ -178,13 +202,13 @@
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
- if have, err := aMgr.deviceTypeProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
+ if have, err := aMgr.deviceTypeDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
logger.Errorw(ctx, "Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
} else if !have {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if err := aMgr.deviceTypeProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
+ if err := aMgr.deviceTypeDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
logger.Errorw(ctx, "Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
return err
}
@@ -192,35 +216,71 @@
}
}
}
-
return nil
}
-// ListAdapters returns the contents of all adapters known to the system
-func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
- result := &voltha.Adapters{Items: []*voltha.Adapter{}}
- aMgr.lockAdaptersMap.RLock()
- defer aMgr.lockAdaptersMap.RUnlock()
- for _, adapterAgent := range aMgr.adapterAgents {
- if a := adapterAgent.getAdapter(ctx); a != nil {
- result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
+ // Load the adapters
+ var adapters []*voltha.Adapter
+ if err := aMgr.adapterDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
+ logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
+ }
+
+ logger.Debugw(ctx, "retrieved-adapters", log.Fields{"count": len(adapters)})
+
+ if len(adapters) != 0 {
+ for _, adapter := range adapters {
+ if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
+ logger.Errorw(ctx, "failed-to-add-adapter", log.Fields{"adapterId": adapter.Id})
+ } else {
+ logger.Debugw(ctx, "adapter-added-successfully", log.Fields{"adapterId": adapter.Id})
+ }
}
}
- return result, nil
-}
-func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) *voltha.Adapter {
- aMgr.lockAdaptersMap.RLock()
- defer aMgr.lockAdaptersMap.RUnlock()
- if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
- return adapterAgent.getAdapter(ctx)
+ // Load the device types
+ var deviceTypes []*voltha.DeviceType
+ if err := aMgr.deviceTypeDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
+ logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
}
+
+ logger.Debugw(ctx, "retrieved-devicetypes", log.Fields{"count": len(deviceTypes)})
+
+ if len(deviceTypes) != 0 {
+ dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
+ for _, dType := range deviceTypes {
+ logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": deviceTypes})
+ dTypes.Items = append(dTypes.Items, dType)
+ }
+ if err := aMgr.addDeviceTypes(ctx, dTypes, false); err != nil {
+ logger.Errorw(ctx, "failed-to-add-device-type", log.Fields{"deviceTypes": deviceTypes})
+ } else {
+ logger.Debugw(ctx, "device-type-added-successfully", log.Fields{"deviceTypes": deviceTypes})
+ }
+ }
+
+ // Start the adapter agents - this will trigger the connection to the adapter
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ subCtx := log.WithSpanFromContext(context.Background(), ctx)
+ if err := adapterAgent.start(subCtx); err != nil {
+ logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"adapter-endpoint": adapterAgent.adapterAPIEndPoint})
+ }
+ }
+
+ logger.Debug(ctx, "no-existing-device-type-found")
+
return nil
}
-func (aMgr *Manager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
- logger.Debugw(ctx, "RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
- "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
+func (aMgr *Manager) RegisterAdapter(ctx context.Context, registration *ic.AdapterRegistration) (*empty.Empty, error) {
+ adapter := registration.Adapter
+ deviceTypes := registration.DTypes
+ logger.Infow(ctx, "RegisterAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
if adapter.Type == "" {
logger.Errorw(ctx, "adapter-not-specifying-type", log.Fields{
@@ -231,15 +291,25 @@
return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
}
- if aMgr.getAdapter(ctx, adapter.Id) != nil {
+ if adpt, _ := aMgr.getAdapter(ctx, adapter.Id); adpt != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
+ logger.Warnw(ctx, "adapter-restarted", log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint})
+
+ // First reset the adapter connection
+ agt, err := aMgr.getAgent(ctx, adpt.Id)
+ if err != nil {
+ logger.Errorw(ctx, "no-adapter-agent", log.Fields{"error": err})
+ return nil, err
+ }
+ agt.resetConnection(ctx)
+
go func() {
- err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adapter)
+ err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adpt.Endpoint)
if err != nil {
logger.Errorw(ctx, "unable-to-restart-adapter", log.Fields{"error": err})
}
}()
- return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
+ return &empty.Empty{}, nil
}
// Save the adapter and the device types
if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
@@ -254,16 +324,45 @@
logger.Debugw(ctx, "adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
- return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
+ // Setup the endpoints for this adapter
+ if err := aMgr.endpointMgr.RegisterAdapter(ctx, adapter, deviceTypes); err != nil {
+ logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err})
+ }
+
+ // Start adapter instance - this will trigger the connection to the adapter
+ if agent, err := aMgr.getAgent(ctx, adapter.Id); agent != nil {
+ subCtx := log.WithSpanFromContext(context.Background(), ctx)
+ if err := agent.start(subCtx); err != nil {
+ logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"error": err})
+ return nil, err
+ }
+ } else {
+ logger.Fatalw(ctx, "adapter-absent", log.Fields{"error": err, "adapter": adapter.Id})
+ }
+
+ return &empty.Empty{}, nil
}
-// GetAdapterType returns the name of the device adapter that service this device type
+func (aMgr *Manager) GetAdapterTypeByVendorID(vendorID string) (string, error) {
+ aMgr.lockDeviceTypesMap.RLock()
+ defer aMgr.lockDeviceTypesMap.RUnlock()
+ for _, dType := range aMgr.deviceTypes {
+ for _, v := range dType.VendorIds {
+ if v == vendorID {
+ return dType.AdapterType, nil
+ }
+ }
+ }
+ return "", fmt.Errorf("vendor id %s not found", vendorID)
+}
+
+// GetAdapterType returns the name of the device adapter that services this device type
func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- for _, adapterAgent := range aMgr.adapterAgents {
- if deviceType == adapterAgent.adapter.Type {
- return adapterAgent.adapter.Type, nil
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
+ for _, dt := range aMgr.deviceTypes {
+ if deviceType == dt.Id {
+ return dt.AdapterType, nil
}
}
return "", fmt.Errorf("adapter-not-registered-for-device-type %s", deviceType)
@@ -272,8 +371,8 @@
// ListDeviceTypes returns all the device types known to the system
func (aMgr *Manager) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
logger.Debug(ctx, "ListDeviceTypes")
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
for _, deviceType := range aMgr.deviceTypes {
@@ -283,10 +382,10 @@
}
// GetDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *voltha.ID) (*voltha.DeviceType, error) {
+func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *common.ID) (*voltha.DeviceType, error) {
logger.Debugw(ctx, "GetDeviceType", log.Fields{"typeid": deviceType.Id})
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
dType, exist := aMgr.deviceTypes[deviceType.Id]
if !exist {
@@ -294,3 +393,81 @@
}
return dType, nil
}
+
+// ListAdapters returns the contents of all adapters known to the system
+func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
+ logger.Debug(ctx, "Listing adapters")
+ result := &voltha.Adapters{Items: []*voltha.Adapter{}}
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ if a := adapterAgent.getAdapter(ctx); a != nil {
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ }
+ }
+ logger.Debugw(ctx, "Listing adapters", log.Fields{"result": result})
+ return result, nil
+}
+
+func (aMgr *Manager) getAgent(ctx context.Context, adapterID string) (*agent, error) {
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
+ return adapterAgent, nil
+ }
+ return nil, errors.New("Not found")
+}
+
+func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) (*voltha.Adapter, error) {
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
+ return adapterAgent.getAdapter(ctx), nil
+ }
+ return nil, errors.New("Not found")
+}
+
+// mutedAdapterRestartedHandler will be invoked by the grpc client on an adapter restart.
+// Since the Adapter will re-register itself and that will trigger the reconcile process,
+// therefore this handler does nothing, other than logging the event.
+func (aMgr *Manager) mutedAdapterRestartedHandler(ctx context.Context, endpoint string) error {
+ logger.Infow(ctx, "muted-adapter-restarted", log.Fields{"endpoint": endpoint})
+ return nil
+}
+
+func (aMgr *Manager) WaitUntilConnectionsToAdaptersAreUp(ctx context.Context, connectionRetryInterval time.Duration) error {
+ logger.Infow(ctx, "waiting-for-adapters-to-be-up", log.Fields{"retry-interval": connectionRetryInterval})
+ for {
+ aMgr.lockAdapterAgentsMap.Lock()
+ numAdapters := len(aMgr.adapterAgents)
+ if numAdapters == 0 {
+ // No adapter registered yet
+ aMgr.lockAdapterAgentsMap.Unlock()
+ logger.Info(ctx, "no-adapter-registered")
+ return nil
+ }
+ // A case of Core restart
+ agentsUp := true
+ adapterloop:
+ for _, agt := range aMgr.adapterAgents {
+ agentsUp = agentsUp && agt.IsConnectionUp()
+ if !agentsUp {
+ break adapterloop
+ }
+ }
+ aMgr.lockAdapterAgentsMap.Unlock()
+ if agentsUp {
+ logger.Infow(ctx, "adapter-connections-ready", log.Fields{"adapter-count": numAdapters})
+ return nil
+ }
+ logger.Warnw(ctx, "adapter-connections-not-ready", log.Fields{"adapter-count": numAdapters})
+ select {
+ case <-time.After(connectionRetryInterval):
+ logger.Infow(ctx, "retrying-adapter-connections", log.Fields{"adapter-count": numAdapters})
+ continue
+ case <-ctx.Done():
+ logger.Errorw(ctx, "context-timeout", log.Fields{"adapter-count": numAdapters, "err": ctx.Err()})
+ return ctx.Err()
+ }
+ }
+}