[VOL-4291] Rw-core updates for gRPC migration
Change-Id: I8d5a554409115b29318089671ca4e1ab3fa98810
diff --git a/rw_core/core/adapter/manager.go b/rw_core/core/adapter/manager.go
index 772ff75..b592842 100644
--- a/rw_core/core/adapter/manager.go
+++ b/rw_core/core/adapter/manager.go
@@ -18,129 +18,149 @@
import (
"context"
+ "errors"
"fmt"
"sync"
"time"
+ "github.com/opencord/voltha-lib-go/v7/pkg/db"
+ vgrpc "github.com/opencord/voltha-lib-go/v7/pkg/grpc"
+ "github.com/opencord/voltha-protos/v5/go/adapter_services"
+ "github.com/opencord/voltha-protos/v5/go/common"
+ ic "github.com/opencord/voltha-protos/v5/go/inter_container"
+
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/db/model"
- "github.com/opencord/voltha-lib-go/v5/pkg/kafka"
- "github.com/opencord/voltha-lib-go/v5/pkg/log"
- "github.com/opencord/voltha-lib-go/v5/pkg/probe"
- "github.com/opencord/voltha-protos/v4/go/voltha"
+ "github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-lib-go/v7/pkg/probe"
+ "github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
)
// Manager represents adapter manager attributes
type Manager struct {
- adapterAgents map[string]*agent
- deviceTypes map[string]*voltha.DeviceType
- adapterProxy *model.Proxy
- deviceTypeProxy *model.Proxy
- onAdapterRestart adapterRestartedHandler
- coreInstanceID string
- lockAdaptersMap sync.RWMutex
- lockdDeviceTypeToAdapterMap sync.RWMutex
+ adapterAgents map[string]*agent
+ adapterEndpoints map[Endpoint]*agent
+ deviceTypes map[string]*voltha.DeviceType
+ adapterDbProxy *model.Proxy
+ deviceTypeDbProxy *model.Proxy
+ onAdapterRestart vgrpc.RestartedHandler
+ endpointMgr EndpointManager
+ lockAdapterAgentsMap sync.RWMutex
+ lockDeviceTypesMap sync.RWMutex
+ lockAdapterEndPointsMap sync.RWMutex
+ liveProbeInterval time.Duration
}
-func NewAdapterManager(ctx context.Context, dbPath *model.Path, coreInstanceID string, kafkaClient kafka.Client) *Manager {
- aMgr := &Manager{
- coreInstanceID: coreInstanceID,
- adapterProxy: dbPath.Proxy("adapters"),
- deviceTypeProxy: dbPath.Proxy("device_types"),
- deviceTypes: make(map[string]*voltha.DeviceType),
- adapterAgents: make(map[string]*agent),
- }
- kafkaClient.SubscribeForMetadata(ctx, aMgr.updateLastAdapterCommunication)
- return aMgr
-}
-
-// an interface type for callbacks
-// if more than one callback is required, this should be converted to a proper interface
-type adapterRestartedHandler func(ctx context.Context, adapter *voltha.Adapter) error
-
-func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart adapterRestartedHandler) {
+// SetAdapterRestartedCallback is used to set the callback that needs to be invoked on an adapter restart
+func (aMgr *Manager) SetAdapterRestartedCallback(onAdapterRestart vgrpc.RestartedHandler) {
aMgr.onAdapterRestart = onAdapterRestart
}
-func (aMgr *Manager) Start(ctx context.Context) {
- probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusPreparing)
- logger.Info(ctx, "starting-adapter-manager")
+func NewAdapterManager(
+ dbPath *model.Path,
+ coreInstanceID string,
+ backend *db.Backend,
+ liveProbeInterval time.Duration,
+) *Manager {
+ return &Manager{
+ adapterDbProxy: dbPath.Proxy("adapters"),
+ deviceTypeDbProxy: dbPath.Proxy("device_types"),
+ deviceTypes: make(map[string]*voltha.DeviceType),
+ adapterAgents: make(map[string]*agent),
+ adapterEndpoints: make(map[Endpoint]*agent),
+ endpointMgr: NewEndpointManager(backend),
+ liveProbeInterval: liveProbeInterval,
+ }
+}
+
+func (aMgr *Manager) Start(ctx context.Context, serviceName string) {
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusPreparing)
+ logger.Infow(ctx, "starting-service", log.Fields{"service": serviceName})
// Load the existing adapterAgents and device types - this will also ensure the correct paths have been
// created if there are no data in the dB to start
err := aMgr.loadAdaptersAndDevicetypesInMemory(ctx)
if err != nil {
- logger.Fatalf(ctx, "failed-to-load-adapters-and-device-types-in-memory: %s", err)
+ logger.Fatalw(ctx, "failed-to-load-adapters-and-device-types-in-memory", log.Fields{"service": serviceName, "error": err})
}
- probe.UpdateStatusFromContext(ctx, "adapter-manager", probe.ServiceStatusRunning)
- logger.Info(ctx, "adapter-manager-started")
+ probe.UpdateStatusFromContext(ctx, serviceName, probe.ServiceStatusRunning)
+ logger.Infow(ctx, "service-started", log.Fields{"service": serviceName})
}
-//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
-func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
- // Load the adapters
- var adapters []*voltha.Adapter
- if err := aMgr.adapterProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
- logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
- return err
+func (aMgr *Manager) Stop(ctx context.Context) {
+ // Stop all adapters
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ adapterAgent.stop(ctx)
}
- if len(adapters) != 0 {
- for _, adapter := range adapters {
- if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
- logger.Errorw(ctx, "failed to add adapter", log.Fields{"adapterId": adapter.Id})
- } else {
- logger.Debugw(ctx, "adapter added successfully", log.Fields{"adapterId": adapter.Id})
- }
- }
- }
-
- // Load the device types
- var deviceTypes []*voltha.DeviceType
- if err := aMgr.deviceTypeProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
- logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
- return err
- }
- if len(deviceTypes) != 0 {
- dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
- for _, dType := range deviceTypes {
- logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": dTypes})
- dTypes.Items = append(dTypes.Items, dType)
- }
- return aMgr.addDeviceTypes(ctx, dTypes, false)
- }
-
- logger.Debug(ctx, "no-existing-device-type-found")
-
- return nil
}
-func (aMgr *Manager) updateLastAdapterCommunication(adapterID string, timestamp time.Time) {
- aMgr.lockAdaptersMap.RLock()
- adapterAgent, have := aMgr.adapterAgents[adapterID]
- aMgr.lockAdaptersMap.RUnlock()
+func (aMgr *Manager) GetAdapterEndpoint(ctx context.Context, deviceID string, deviceType string) (string, error) {
+ endPoint, err := aMgr.endpointMgr.GetEndpoint(ctx, deviceID, deviceType)
+ if err != nil {
+ return "", err
+ }
+ return string(endPoint), nil
+}
+
+func (aMgr *Manager) GetAdapterWithEndpoint(ctx context.Context, endPoint string) (*voltha.Adapter, error) {
+ aMgr.lockAdapterEndPointsMap.RLock()
+ agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
+ aMgr.lockAdapterEndPointsMap.RUnlock()
if have {
- adapterAgent.updateCommunicationTime(timestamp)
+ return agent.getAdapter(ctx), nil
}
+
+ return nil, errors.New("Not found")
+}
+
+func (aMgr *Manager) GetAdapterNameWithEndpoint(ctx context.Context, endPoint string) (string, error) {
+ aMgr.lockAdapterEndPointsMap.RLock()
+ agent, have := aMgr.adapterEndpoints[Endpoint(endPoint)]
+ aMgr.lockAdapterEndPointsMap.RUnlock()
+
+ if have {
+ return agent.adapter.Id, nil
+ }
+
+ return "", errors.New("Not found")
+}
+
+func (aMgr *Manager) GetAdapterClient(_ context.Context, endpoint string) (adapter_services.AdapterServiceClient, error) {
+ if endpoint == "" {
+ return nil, errors.New("endpoint-cannot-be-empty")
+ }
+ aMgr.lockAdapterEndPointsMap.RLock()
+ defer aMgr.lockAdapterEndPointsMap.RUnlock()
+
+ if agent, have := aMgr.adapterEndpoints[Endpoint(endpoint)]; have {
+ return agent.getClient()
+ }
+
+ return nil, fmt.Errorf("Endpoint-not-found-%s", endpoint)
}
func (aMgr *Manager) addAdapter(ctx context.Context, adapter *voltha.Adapter, saveToDb bool) error {
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
+ aMgr.lockAdapterAgentsMap.Lock()
+ aMgr.lockAdapterEndPointsMap.Lock()
+ defer aMgr.lockAdapterEndPointsMap.Unlock()
+ defer aMgr.lockAdapterAgentsMap.Unlock()
logger.Debugw(ctx, "adding-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
if _, exist := aMgr.adapterAgents[adapter.Id]; !exist {
if saveToDb {
// Save the adapter to the KV store - first check if it already exist
- if have, err := aMgr.adapterProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
+ if have, err := aMgr.adapterDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, &voltha.Adapter{}); err != nil {
logger.Errorw(ctx, "failed-to-get-adapters-from-cluster-proxy", log.Fields{"error": err})
return err
} else if !have {
- if err := aMgr.adapterProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
+ if err := aMgr.adapterDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), adapter.Id, adapter); err != nil {
logger.Errorw(ctx, "failed-to-save-adapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "replica": adapter.CurrentReplica, "total": adapter.TotalReplicas})
return err
@@ -155,7 +175,11 @@
}
}
clonedAdapter := (proto.Clone(adapter)).(*voltha.Adapter)
- aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter)
+ // Use a muted adapter restart handler which is invoked by the corresponding gRPC client on an adapter restart.
+ // This handler just log the restart event. The actual action taken following an adapter restart
+ // will be done when an adapter re-registers itself.
+ aMgr.adapterAgents[adapter.Id] = newAdapterAgent(clonedAdapter, aMgr.mutedAdapterRestartedHandler, aMgr.liveProbeInterval)
+ aMgr.adapterEndpoints[Endpoint(adapter.Endpoint)] = aMgr.adapterAgents[adapter.Id]
}
return nil
}
@@ -165,10 +189,10 @@
return fmt.Errorf("no-device-type")
}
logger.Debugw(ctx, "adding-device-types", log.Fields{"deviceTypes": deviceTypes})
- aMgr.lockAdaptersMap.Lock()
- defer aMgr.lockAdaptersMap.Unlock()
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ aMgr.lockAdapterAgentsMap.Lock()
+ defer aMgr.lockAdapterAgentsMap.Unlock()
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
// create an in memory map to fetch the entire voltha.DeviceType from a device.Type string
for _, deviceType := range deviceTypes.Items {
@@ -178,13 +202,13 @@
if saveToDb {
// Save the device types to the KV store
for _, deviceType := range deviceTypes.Items {
- if have, err := aMgr.deviceTypeProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
+ if have, err := aMgr.deviceTypeDbProxy.Get(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, &voltha.DeviceType{}); err != nil {
logger.Errorw(ctx, "Failed-to--device-types-from-cluster-data-proxy", log.Fields{"error": err})
return err
} else if !have {
// Does not exist - save it
clonedDType := (proto.Clone(deviceType)).(*voltha.DeviceType)
- if err := aMgr.deviceTypeProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
+ if err := aMgr.deviceTypeDbProxy.Set(log.WithSpanFromContext(context.Background(), ctx), deviceType.Id, clonedDType); err != nil {
logger.Errorw(ctx, "Failed-to-add-device-types-to-cluster-data-proxy", log.Fields{"error": err})
return err
}
@@ -192,35 +216,71 @@
}
}
}
-
return nil
}
-// ListAdapters returns the contents of all adapters known to the system
-func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
- result := &voltha.Adapters{Items: []*voltha.Adapter{}}
- aMgr.lockAdaptersMap.RLock()
- defer aMgr.lockAdaptersMap.RUnlock()
- for _, adapterAgent := range aMgr.adapterAgents {
- if a := adapterAgent.getAdapter(ctx); a != nil {
- result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+//loadAdaptersAndDevicetypesInMemory loads the existing set of adapters and device types in memory
+func (aMgr *Manager) loadAdaptersAndDevicetypesInMemory(ctx context.Context) error {
+ // Load the adapters
+ var adapters []*voltha.Adapter
+ if err := aMgr.adapterDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &adapters); err != nil {
+ logger.Errorw(ctx, "Failed-to-list-adapters-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
+ }
+
+ logger.Debugw(ctx, "retrieved-adapters", log.Fields{"count": len(adapters)})
+
+ if len(adapters) != 0 {
+ for _, adapter := range adapters {
+ if err := aMgr.addAdapter(ctx, adapter, false); err != nil {
+ logger.Errorw(ctx, "failed-to-add-adapter", log.Fields{"adapterId": adapter.Id})
+ } else {
+ logger.Debugw(ctx, "adapter-added-successfully", log.Fields{"adapterId": adapter.Id})
+ }
}
}
- return result, nil
-}
-func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) *voltha.Adapter {
- aMgr.lockAdaptersMap.RLock()
- defer aMgr.lockAdaptersMap.RUnlock()
- if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
- return adapterAgent.getAdapter(ctx)
+ // Load the device types
+ var deviceTypes []*voltha.DeviceType
+ if err := aMgr.deviceTypeDbProxy.List(log.WithSpanFromContext(context.Background(), ctx), &deviceTypes); err != nil {
+ logger.Errorw(ctx, "Failed-to-list-device-types-from-cluster-data-proxy", log.Fields{"error": err})
+ return err
}
+
+ logger.Debugw(ctx, "retrieved-devicetypes", log.Fields{"count": len(deviceTypes)})
+
+ if len(deviceTypes) != 0 {
+ dTypes := &voltha.DeviceTypes{Items: []*voltha.DeviceType{}}
+ for _, dType := range deviceTypes {
+ logger.Debugw(ctx, "found-existing-device-types", log.Fields{"deviceTypes": deviceTypes})
+ dTypes.Items = append(dTypes.Items, dType)
+ }
+ if err := aMgr.addDeviceTypes(ctx, dTypes, false); err != nil {
+ logger.Errorw(ctx, "failed-to-add-device-type", log.Fields{"deviceTypes": deviceTypes})
+ } else {
+ logger.Debugw(ctx, "device-type-added-successfully", log.Fields{"deviceTypes": deviceTypes})
+ }
+ }
+
+ // Start the adapter agents - this will trigger the connection to the adapter
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ subCtx := log.WithSpanFromContext(context.Background(), ctx)
+ if err := adapterAgent.start(subCtx); err != nil {
+ logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"adapter-endpoint": adapterAgent.adapterAPIEndPoint})
+ }
+ }
+
+ logger.Debug(ctx, "no-existing-device-type-found")
+
return nil
}
-func (aMgr *Manager) RegisterAdapter(ctx context.Context, adapter *voltha.Adapter, deviceTypes *voltha.DeviceTypes) (*voltha.CoreInstance, error) {
- logger.Debugw(ctx, "RegisterAdapter", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
- "currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint, "deviceTypes": deviceTypes.Items})
+func (aMgr *Manager) RegisterAdapter(ctx context.Context, registration *ic.AdapterRegistration) (*empty.Empty, error) {
+ adapter := registration.Adapter
+ deviceTypes := registration.DTypes
+ logger.Infow(ctx, "RegisterAdapter", log.Fields{"adapter": adapter, "deviceTypes": deviceTypes.Items})
if adapter.Type == "" {
logger.Errorw(ctx, "adapter-not-specifying-type", log.Fields{
@@ -231,15 +291,25 @@
return nil, status.Error(codes.InvalidArgument, "adapter-not-specifying-type")
}
- if aMgr.getAdapter(ctx, adapter.Id) != nil {
+ if adpt, _ := aMgr.getAdapter(ctx, adapter.Id); adpt != nil {
// Already registered - Adapter may have restarted. Trigger the reconcile process for that adapter
+ logger.Warnw(ctx, "adapter-restarted", log.Fields{"adapter": adpt.Id, "endpoint": adpt.Endpoint})
+
+ // First reset the adapter connection
+ agt, err := aMgr.getAgent(ctx, adpt.Id)
+ if err != nil {
+ logger.Errorw(ctx, "no-adapter-agent", log.Fields{"error": err})
+ return nil, err
+ }
+ agt.resetConnection(ctx)
+
go func() {
- err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adapter)
+ err := aMgr.onAdapterRestart(log.WithSpanFromContext(context.Background(), ctx), adpt.Endpoint)
if err != nil {
logger.Errorw(ctx, "unable-to-restart-adapter", log.Fields{"error": err})
}
}()
- return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
+ return &empty.Empty{}, nil
}
// Save the adapter and the device types
if err := aMgr.addAdapter(ctx, adapter, true); err != nil {
@@ -254,16 +324,45 @@
logger.Debugw(ctx, "adapter-registered", log.Fields{"adapterId": adapter.Id, "vendor": adapter.Vendor,
"currentReplica": adapter.CurrentReplica, "totalReplicas": adapter.TotalReplicas, "endpoint": adapter.Endpoint})
- return &voltha.CoreInstance{InstanceId: aMgr.coreInstanceID}, nil
+ // Setup the endpoints for this adapter
+ if err := aMgr.endpointMgr.RegisterAdapter(ctx, adapter, deviceTypes); err != nil {
+ logger.Errorw(ctx, "failed-to-register-adapter", log.Fields{"error": err})
+ }
+
+ // Start adapter instance - this will trigger the connection to the adapter
+ if agent, err := aMgr.getAgent(ctx, adapter.Id); agent != nil {
+ subCtx := log.WithSpanFromContext(context.Background(), ctx)
+ if err := agent.start(subCtx); err != nil {
+ logger.Errorw(ctx, "failed-to-start-adapter", log.Fields{"error": err})
+ return nil, err
+ }
+ } else {
+ logger.Fatalw(ctx, "adapter-absent", log.Fields{"error": err, "adapter": adapter.Id})
+ }
+
+ return &empty.Empty{}, nil
}
-// GetAdapterType returns the name of the device adapter that service this device type
+func (aMgr *Manager) GetAdapterTypeByVendorID(vendorID string) (string, error) {
+ aMgr.lockDeviceTypesMap.RLock()
+ defer aMgr.lockDeviceTypesMap.RUnlock()
+ for _, dType := range aMgr.deviceTypes {
+ for _, v := range dType.VendorIds {
+ if v == vendorID {
+ return dType.AdapterType, nil
+ }
+ }
+ }
+ return "", fmt.Errorf("vendor id %s not found", vendorID)
+}
+
+// GetAdapterType returns the name of the device adapter that services this device type
func (aMgr *Manager) GetAdapterType(deviceType string) (string, error) {
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
- for _, adapterAgent := range aMgr.adapterAgents {
- if deviceType == adapterAgent.adapter.Type {
- return adapterAgent.adapter.Type, nil
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
+ for _, dt := range aMgr.deviceTypes {
+ if deviceType == dt.Id {
+ return dt.AdapterType, nil
}
}
return "", fmt.Errorf("adapter-not-registered-for-device-type %s", deviceType)
@@ -272,8 +371,8 @@
// ListDeviceTypes returns all the device types known to the system
func (aMgr *Manager) ListDeviceTypes(ctx context.Context, _ *empty.Empty) (*voltha.DeviceTypes, error) {
logger.Debug(ctx, "ListDeviceTypes")
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
deviceTypes := make([]*voltha.DeviceType, 0, len(aMgr.deviceTypes))
for _, deviceType := range aMgr.deviceTypes {
@@ -283,10 +382,10 @@
}
// GetDeviceType returns the device type proto definition given the name of the device type
-func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *voltha.ID) (*voltha.DeviceType, error) {
+func (aMgr *Manager) GetDeviceType(ctx context.Context, deviceType *common.ID) (*voltha.DeviceType, error) {
logger.Debugw(ctx, "GetDeviceType", log.Fields{"typeid": deviceType.Id})
- aMgr.lockdDeviceTypeToAdapterMap.Lock()
- defer aMgr.lockdDeviceTypeToAdapterMap.Unlock()
+ aMgr.lockDeviceTypesMap.Lock()
+ defer aMgr.lockDeviceTypesMap.Unlock()
dType, exist := aMgr.deviceTypes[deviceType.Id]
if !exist {
@@ -294,3 +393,81 @@
}
return dType, nil
}
+
+// ListAdapters returns the contents of all adapters known to the system
+func (aMgr *Manager) ListAdapters(ctx context.Context, _ *empty.Empty) (*voltha.Adapters, error) {
+ logger.Debug(ctx, "Listing adapters")
+ result := &voltha.Adapters{Items: []*voltha.Adapter{}}
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ for _, adapterAgent := range aMgr.adapterAgents {
+ if a := adapterAgent.getAdapter(ctx); a != nil {
+ result.Items = append(result.Items, (proto.Clone(a)).(*voltha.Adapter))
+ }
+ }
+ logger.Debugw(ctx, "Listing adapters", log.Fields{"result": result})
+ return result, nil
+}
+
+func (aMgr *Manager) getAgent(ctx context.Context, adapterID string) (*agent, error) {
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
+ return adapterAgent, nil
+ }
+ return nil, errors.New("Not found")
+}
+
+func (aMgr *Manager) getAdapter(ctx context.Context, adapterID string) (*voltha.Adapter, error) {
+ aMgr.lockAdapterAgentsMap.RLock()
+ defer aMgr.lockAdapterAgentsMap.RUnlock()
+ if adapterAgent, ok := aMgr.adapterAgents[adapterID]; ok {
+ return adapterAgent.getAdapter(ctx), nil
+ }
+ return nil, errors.New("Not found")
+}
+
+// mutedAdapterRestartedHandler will be invoked by the grpc client on an adapter restart.
+// Since the Adapter will re-register itself and that will trigger the reconcile process,
+// therefore this handler does nothing, other than logging the event.
+func (aMgr *Manager) mutedAdapterRestartedHandler(ctx context.Context, endpoint string) error {
+ logger.Infow(ctx, "muted-adapter-restarted", log.Fields{"endpoint": endpoint})
+ return nil
+}
+
+func (aMgr *Manager) WaitUntilConnectionsToAdaptersAreUp(ctx context.Context, connectionRetryInterval time.Duration) error {
+ logger.Infow(ctx, "waiting-for-adapters-to-be-up", log.Fields{"retry-interval": connectionRetryInterval})
+ for {
+ aMgr.lockAdapterAgentsMap.Lock()
+ numAdapters := len(aMgr.adapterAgents)
+ if numAdapters == 0 {
+ // No adapter registered yet
+ aMgr.lockAdapterAgentsMap.Unlock()
+ logger.Info(ctx, "no-adapter-registered")
+ return nil
+ }
+ // A case of Core restart
+ agentsUp := true
+ adapterloop:
+ for _, agt := range aMgr.adapterAgents {
+ agentsUp = agentsUp && agt.IsConnectionUp()
+ if !agentsUp {
+ break adapterloop
+ }
+ }
+ aMgr.lockAdapterAgentsMap.Unlock()
+ if agentsUp {
+ logger.Infow(ctx, "adapter-connections-ready", log.Fields{"adapter-count": numAdapters})
+ return nil
+ }
+ logger.Warnw(ctx, "adapter-connections-not-ready", log.Fields{"adapter-count": numAdapters})
+ select {
+ case <-time.After(connectionRetryInterval):
+ logger.Infow(ctx, "retrying-adapter-connections", log.Fields{"adapter-count": numAdapters})
+ continue
+ case <-ctx.Done():
+ logger.Errorw(ctx, "context-timeout", log.Fields{"adapter-count": numAdapters, "err": ctx.Err()})
+ return ctx.Err()
+ }
+ }
+}