VOL-3503 Add a device operational state of RECONCILING
Change-Id: I55dad67a24acdfac0af9448e6f19ec9d35edc39e
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index dd64ab9..2c77186 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -19,6 +19,7 @@
import (
"context"
"errors"
+ "github.com/opencord/voltha-go/rw_core/config"
"sync"
"time"
@@ -58,31 +59,33 @@
defaultTimeout time.Duration
devicesLoadingLock sync.RWMutex
deviceLoadingInProgress map[string][]chan int
+ config *config.RWCoreFlags
}
//NewManagers creates the Manager and the Logical Manager.
-func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, coreTopic, coreInstanceID string, defaultCoreTimeout time.Duration, eventProxy *events.EventProxy, stackID string) (*Manager, *LogicalManager) {
+func NewManagers(dbPath *model.Path, adapterMgr *adapter.Manager, kmp kafka.InterContainerProxy, endpointMgr kafka.EndpointManager, cf *config.RWCoreFlags, coreInstanceID string, eventProxy *events.EventProxy) (*Manager, *LogicalManager) {
deviceMgr := &Manager{
rootDevices: make(map[string]bool),
kafkaICProxy: kmp,
- adapterProxy: remote.NewAdapterProxy(kmp, coreTopic, endpointMgr),
+ adapterProxy: remote.NewAdapterProxy(kmp, cf.CoreTopic, endpointMgr),
coreInstanceID: coreInstanceID,
dbPath: dbPath,
dProxy: dbPath.Proxy("devices"),
adapterMgr: adapterMgr,
- defaultTimeout: defaultCoreTimeout,
- Agent: event.NewAgent(eventProxy, coreInstanceID, stackID),
+ defaultTimeout: cf.DefaultCoreTimeout,
+ Agent: event.NewAgent(eventProxy, coreInstanceID, cf.VolthaStackID),
deviceLoadingInProgress: make(map[string][]chan int),
+ config: cf,
}
deviceMgr.stateTransitions = state.NewTransitionMap(deviceMgr)
logicalDeviceMgr := &LogicalManager{
- Manager: event.NewManager(eventProxy, coreInstanceID, stackID),
+ Manager: event.NewManager(eventProxy, coreInstanceID, cf.VolthaStackID),
deviceMgr: deviceMgr,
kafkaICProxy: kmp,
dbPath: dbPath,
ldProxy: dbPath.Proxy("logical_devices"),
- defaultTimeout: defaultCoreTimeout,
+ defaultTimeout: cf.DefaultCoreTimeout,
logicalDeviceLoadingInProgress: make(map[string][]chan int),
}
deviceMgr.logicalDeviceMgr = logicalDeviceMgr
@@ -651,17 +654,6 @@
return &empty.Empty{}, nil
}
-// isOkToReconcile validates whether a device is in the correct status to be reconciled
-func (dMgr *Manager) isOkToReconcile(ctx context.Context, device *voltha.Device) bool {
- if device == nil {
- return false
- }
- if agent := dMgr.getDeviceAgent(ctx, device.Id); agent != nil {
- return device.AdminState != voltha.AdminState_PREPROVISIONED && (!agent.isDeletionInProgress())
- }
- return false
-}
-
// adapterRestarted is invoked whenever an adapter is restarted
func (dMgr *Manager) adapterRestarted(ctx context.Context, adapter *voltha.Adapter) error {
logger.Debugw(ctx, "adapter-restarted", log.Fields{"adapter-id": adapter.Id, "vendor": adapter.Vendor,
@@ -673,7 +665,11 @@
return nil
}
- responses := make([]utils.Response, 0)
+ if len(dMgr.rootDevices) == 0 {
+ logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
+ return nil
+ }
+
for rootDeviceID := range dMgr.rootDevices {
dAgent := dMgr.getDeviceAgent(ctx, rootDeviceID)
if dAgent == nil {
@@ -691,9 +687,9 @@
continue
}
if isDeviceOwnedByService {
- if dMgr.isOkToReconcile(ctx, rootDevice) {
+ if rootDevice.AdminState != voltha.AdminState_PREPROVISIONED {
logger.Debugw(ctx, "reconciling-root-device", log.Fields{"rootId": rootDevice.Id})
- responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, rootDevice))
+ go dAgent.ReconcileDevice(ctx, rootDevice)
} else {
logger.Debugw(ctx, "not-reconciling-root-device", log.Fields{"rootId": rootDevice.Id, "state": rootDevice.AdminState})
}
@@ -708,9 +704,9 @@
logger.Warnw(ctx, "is-device-owned-by-service", log.Fields{"error": err, "child-device-id": childDevice.Id, "adapter-type": adapter.Type, "replica-number": adapter.CurrentReplica})
}
if isDeviceOwnedByService {
- if dMgr.isOkToReconcile(ctx, childDevice) {
+ if childDevice.AdminState != voltha.AdminState_PREPROVISIONED {
logger.Debugw(ctx, "reconciling-child-device", log.Fields{"child-device-id": childDevice.Id})
- responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
+ go dAgent.ReconcileDevice(ctx, childDevice)
} else {
logger.Debugw(ctx, "not-reconciling-child-device", log.Fields{"child-device-id": childDevice.Id, "state": childDevice.AdminState})
}
@@ -725,54 +721,25 @@
}
}
}
- if len(responses) > 0 {
- // Wait for completion
- if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
- } else {
- logger.Debugw(ctx, "no-managed-device-to-reconcile", log.Fields{"adapter-id": adapter.Id})
- }
+ logger.Debugw(ctx, "Reconciling for device on adapter restart is initiated", log.Fields{"adapter-id": adapter.Id})
+
return nil
}
-func (dMgr *Manager) sendReconcileDeviceRequest(ctx context.Context, device *voltha.Device) utils.Response {
- // Send a reconcile request to the adapter. Since this Core may not be managing this device then there is no
- // point of creating a device agent (if the device is not being managed by this Core) before sending the request
- // to the adapter. We will therefore bypass the adapter adapter and send the request directly to the adapter via
- // the adapter proxy.
- response := utils.NewResponse()
- ch, err := dMgr.adapterProxy.ReconcileDevice(ctx, device)
- if err != nil {
- response.Error(err)
- }
- // Wait for adapter response in its own routine
- go func() {
- resp, ok := <-ch
- if !ok {
- response.Error(status.Errorf(codes.Aborted, "channel-closed-device: %s", device.Id))
- } else if resp.Err != nil {
- response.Error(resp.Err)
- }
- response.Done()
- }()
- return response
-}
-
func (dMgr *Manager) ReconcileChildDevices(ctx context.Context, parentDeviceID string) error {
+ dAgent := dMgr.getDeviceAgent(ctx, parentDeviceID)
+ if dAgent == nil {
+ return status.Errorf(codes.NotFound, "error-unable to get agent from device")
+ }
if parentDevicePorts, err := dMgr.listDevicePorts(ctx, parentDeviceID); err == nil {
- responses := make([]utils.Response, 0)
for _, port := range parentDevicePorts {
for _, peer := range port.Peers {
if childDevice, err := dMgr.getDeviceFromModel(ctx, peer.DeviceId); err == nil {
- responses = append(responses, dMgr.sendReconcileDeviceRequest(ctx, childDevice))
+ go dAgent.ReconcileDevice(ctx, childDevice)
}
}
}
- // Wait for completion
- if res := utils.WaitForNilOrErrorResponses(dMgr.defaultTimeout, responses...); res != nil {
- return status.Errorf(codes.Aborted, "errors-%s", res)
- }
+ logger.Debugw(ctx, "Reconcile initiated for child devices", log.Fields{"parent-device-id": parentDeviceID})
}
return nil
}
@@ -1961,3 +1928,17 @@
}
}
}
+
+func (dMgr *Manager) ReconcilingCleanup(ctx context.Context, device *voltha.Device) error {
+ agent := dMgr.getDeviceAgent(ctx, device.Id)
+ if agent == nil {
+ logger.Errorf(ctx, "Not able to get device agent.")
+ return status.Errorf(codes.NotFound, "Not able to get device agent for device : %s", device.Id)
+ }
+ err := agent.reconcilingCleanup(ctx)
+ if err != nil {
+ logger.Errorf(ctx, err.Error())
+ return status.Errorf(codes.Internal, err.Error())
+ }
+ return nil
+}