VOL-3503 Add a device operational state of RECONCILING
Change-Id: I55dad67a24acdfac0af9448e6f19ec9d35edc39e
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 003f77a..6e32bfe 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -25,9 +25,11 @@
"sync"
"time"
+ "github.com/cenkalti/backoff/v3"
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/rw_core/config"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -50,21 +52,24 @@
// Agent represents device agent attributes
type Agent struct {
- deviceID string
- parentID string
- deviceType string
- isRootDevice bool
- adapterProxy *remote.AdapterProxy
- adapterMgr *adapter.Manager
- deviceMgr *Manager
- dbProxy *model.Proxy
- exitChannel chan int
- device *voltha.Device
- requestQueue *coreutils.RequestQueue
- defaultTimeout time.Duration
- startOnce sync.Once
- stopOnce sync.Once
- stopped bool
+ deviceID string
+ parentID string
+ deviceType string
+ isRootDevice bool
+ adapterProxy *remote.AdapterProxy
+ adapterMgr *adapter.Manager
+ deviceMgr *Manager
+ dbProxy *model.Proxy
+ exitChannel chan int
+ device *voltha.Device
+ requestQueue *coreutils.RequestQueue
+ defaultTimeout time.Duration
+ startOnce sync.Once
+ stopOnce sync.Once
+ stopped bool
+ stopReconciling chan int
+ stopReconcilingMutex sync.RWMutex
+ config *config.RWCoreFlags
flowLoader *flow.Loader
groupLoader *group.Loader
@@ -92,6 +97,7 @@
defaultTimeout: timeout,
device: proto.Clone(device).(*voltha.Device),
requestQueue: coreutils.NewRequestQueue(),
+ config: deviceMgr.config,
flowLoader: flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
groupLoader: group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
@@ -411,14 +417,11 @@
desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
return status.Error(codes.FailedPrecondition, desc)
}
- if agent.isDeletionInProgress() {
+ if !agent.proceedWithRequestNoLock() {
agent.requestQueue.RequestComplete()
- operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-
- desc = fmt.Sprintf("deviceId:%s, Device deletion is in progress.", agent.deviceID)
+ desc = fmt.Sprintf("deviceId:%s, Device deletion or reconciling is in progress.", agent.deviceID)
return status.Error(codes.FailedPrecondition, desc)
-
}
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
@@ -586,10 +589,13 @@
desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
- if agent.isDeletionInProgress() {
+
+ if !agent.proceedWithRequestNoLock() {
agent.requestQueue.RequestComplete()
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+ desc = fmt.Sprintf("deviceId:%s, Device deletion or reconciling is in progress.", agent.deviceID)
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device reconciling is in progress.", agent.deviceID)
}
+
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
@@ -631,8 +637,9 @@
logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
- if agent.isDeletionInProgress() {
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+ if !agent.proceedWithRequestNoLock() {
+ desc = fmt.Sprintf("deviceId:%s, Device delection or reconciling is in progress.", agent.deviceID)
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device reconciling is in progress.", agent.deviceID)
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
@@ -671,6 +678,10 @@
agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
return status.Error(codes.FailedPrecondition, desc)
}
+
+ //Send stop Reconcile if in progress
+ agent.stopReconcile()
+
device := agent.cloneDeviceWithoutLock()
if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
@@ -711,6 +722,13 @@
desc = err.Error()
return err
}
+
+ if agent.isReconcileInProgress() {
+ agent.requestQueue.RequestComplete()
+ desc = fmt.Sprintf("deviceId:%s, Device Reconciling is in progress", agent.deviceID)
+ return status.Error(codes.FailedPrecondition, desc)
+ }
+
// Get the device Transient state, return err if it is DELETING
previousDeviceTransientState := agent.getTransientState()
@@ -956,12 +974,14 @@
if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
}
+ deviceTransientState := agent.getTransientState()
+
// release lock before processing transition
agent.requestQueue.RequestComplete()
subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
- device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
+ device, prevDevice, deviceTransientState, deviceTransientState); err != nil {
logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
// Sending RPC EVENT here
rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
@@ -1245,3 +1265,167 @@
return resp, nil
}
+
+// The device lock MUST be held by the caller.
+func (agent *Agent) proceedWithRequestNoLock() bool {
+ return !agent.isDeletionInProgress() && !agent.isReconcileInProgress()
+}
+
+func (agent *Agent) stopReconcile() {
+ agent.stopReconcilingMutex.Lock()
+ if agent.stopReconciling != nil {
+ agent.stopReconciling <- 0
+ }
+ agent.stopReconcilingMutex.Unlock()
+}
+
+func (agent *Agent) ReconcileDevice(ctx context.Context, device *voltha.Device) {
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return
+ }
+
+ if !agent.proceedWithRequestNoLock() {
+ agent.requestQueue.RequestComplete()
+ desc = fmt.Sprintf("Either device is in deletion or reconcile is already in progress for device : %s", device.Id)
+ logger.Errorf(ctx, desc)
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return
+ }
+
+ //set transient state to RECONCILE IN PROGRESS
+ err := agent.updateTransientState(ctx, voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
+ if err != nil {
+ agent.requestQueue.RequestComplete()
+ desc = fmt.Sprintf("Not able to set device transient state to Reconcile in progress."+
+ "Err: %s", err.Error())
+ logger.Errorf(ctx, desc)
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return
+ }
+
+ logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id})
+ reconcilingBackoff := backoff.NewExponentialBackOff()
+ reconcilingBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
+ reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
+ reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
+
+ //making here to keep lifecycle of this channel within the scope of retryReconcile
+ agent.stopReconcilingMutex.Lock()
+ agent.stopReconciling = make(chan int)
+ agent.stopReconcilingMutex.Unlock()
+
+Loop:
+ for {
+ // Use an exponential back off to prevent getting into a tight loop
+ duration := reconcilingBackoff.NextBackOff()
+ //This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
+ if duration == backoff.Stop {
+ // If we reach a maximum then warn and reset the backoff
+ // timer and keep attempting.
+ logger.Warnw(ctx, "maximum-reconciling-backoff-reached--resetting-backoff-timer",
+ log.Fields{"max-reconciling-backoff": reconcilingBackoff.MaxElapsedTime,
+ "device-id": device.Id})
+ reconcilingBackoff.Reset()
+ duration = reconcilingBackoff.NextBackOff()
+ }
+
+ backoffTimer := time.NewTimer(duration)
+
+ // Send a reconcile request to the adapter.
+ ch, err := agent.adapterProxy.ReconcileDevice(ctx, agent.device)
+ //release lock before moving further
+ agent.requestQueue.RequestComplete()
+ if err != nil {
+ desc := fmt.Sprintf("Failed reconciling from adapter side. Err: %s", err.Error())
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ <-backoffTimer.C
+ // backoffTimer expired continue
+ // Take lock back before retrying
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return
+ }
+ continue
+ }
+
+ // if return err retry if not then break loop and quit retrying reconcile
+ if err = agent.waitForReconcileResponse(backoffTimer, ch); err != nil {
+ desc = err.Error()
+ logger.Errorf(ctx, desc)
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ } else {
+ operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_IN_PROGRESS}
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ break Loop
+ }
+
+ // Take lock back before retrying
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return
+ }
+ }
+}
+
+func (agent *Agent) waitForReconcileResponse(backoffTimer *time.Timer, ch chan *kafka.RpcResponse) error {
+ select {
+ // wait for response
+ case resp, ok := <-ch:
+ if !ok {
+ //channel-closed
+ return errors.New("channel on which reconcile response is awaited is closed")
+ } else if resp.Err != nil {
+ //error encountered
+ return errors.New("error encountered while retrying reconcile")
+ }
+
+ //In case of success quit retrying and wait for adapter to reset operation state of device
+ agent.stopReconcilingMutex.Lock()
+ agent.stopReconciling = nil
+ agent.stopReconcilingMutex.Unlock()
+ return nil
+
+ //if reconciling need to be stopped
+ case _, ok := <-agent.stopReconciling:
+ agent.stopReconcilingMutex.Lock()
+ agent.stopReconciling = nil
+ agent.stopReconcilingMutex.Unlock()
+ if !ok {
+ //channel-closed
+ return errors.New("channel used to notify to stop reconcile is closed")
+ }
+ return nil
+ //continue if timer expired
+ case <-backoffTimer.C:
+ }
+ return nil
+}
+
+func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
+ var desc string
+ operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ desc = err.Error()
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+ err := agent.updateTransientState(ctx, voltha.DeviceTransientState_NONE)
+ if err != nil {
+ desc = fmt.Sprintf("Not able to clear device transient state from Reconcile in progress."+
+ "Err: %s", err.Error())
+ logger.Errorf(ctx, desc)
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return err
+ }
+ operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+ agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+ return nil
+}