VOL-3503 Add a device operational state of RECONCILING

Change-Id: I55dad67a24acdfac0af9448e6f19ec9d35edc39e
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 003f77a..6e32bfe 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -25,9 +25,11 @@
 	"sync"
 	"time"
 
+	"github.com/cenkalti/backoff/v3"
 	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/rw_core/config"
 	"google.golang.org/grpc/codes"
 	"google.golang.org/grpc/status"
 
@@ -50,21 +52,24 @@
 
 // Agent represents device agent attributes
 type Agent struct {
-	deviceID       string
-	parentID       string
-	deviceType     string
-	isRootDevice   bool
-	adapterProxy   *remote.AdapterProxy
-	adapterMgr     *adapter.Manager
-	deviceMgr      *Manager
-	dbProxy        *model.Proxy
-	exitChannel    chan int
-	device         *voltha.Device
-	requestQueue   *coreutils.RequestQueue
-	defaultTimeout time.Duration
-	startOnce      sync.Once
-	stopOnce       sync.Once
-	stopped        bool
+	deviceID             string
+	parentID             string
+	deviceType           string
+	isRootDevice         bool
+	adapterProxy         *remote.AdapterProxy
+	adapterMgr           *adapter.Manager
+	deviceMgr            *Manager
+	dbProxy              *model.Proxy
+	exitChannel          chan int
+	device               *voltha.Device
+	requestQueue         *coreutils.RequestQueue
+	defaultTimeout       time.Duration
+	startOnce            sync.Once
+	stopOnce             sync.Once
+	stopped              bool
+	stopReconciling      chan int
+	stopReconcilingMutex sync.RWMutex
+	config               *config.RWCoreFlags
 
 	flowLoader           *flow.Loader
 	groupLoader          *group.Loader
@@ -92,6 +97,7 @@
 		defaultTimeout:       timeout,
 		device:               proto.Clone(device).(*voltha.Device),
 		requestQueue:         coreutils.NewRequestQueue(),
+		config:               deviceMgr.config,
 		flowLoader:           flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
 		groupLoader:          group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
 		portLoader:           port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
@@ -411,14 +417,11 @@
 		desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
 		return status.Error(codes.FailedPrecondition, desc)
 	}
-	if agent.isDeletionInProgress() {
+	if !agent.proceedWithRequestNoLock() {
 		agent.requestQueue.RequestComplete()
 
-		operStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
-
-		desc = fmt.Sprintf("deviceId:%s, Device deletion is in progress.", agent.deviceID)
+		desc = fmt.Sprintf("deviceId:%s, Device deletion or reconciling is in progress.", agent.deviceID)
 		return status.Error(codes.FailedPrecondition, desc)
-
 	}
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
@@ -586,10 +589,13 @@
 		desc = fmt.Sprintf("deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
-	if agent.isDeletionInProgress() {
+
+	if !agent.proceedWithRequestNoLock() {
 		agent.requestQueue.RequestComplete()
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+		desc = fmt.Sprintf("deviceId:%s, Device deletion or reconciling is in progress.", agent.deviceID)
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device reconciling is in progress.", agent.deviceID)
 	}
+
 	// Update the Admin State and operational state before sending the request out
 	cloned.AdminState = voltha.AdminState_DISABLED
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
@@ -631,8 +637,9 @@
 	logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceReadOnlyWithoutLock()
-	if agent.isDeletionInProgress() {
-		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+	if !agent.proceedWithRequestNoLock() {
+		desc = fmt.Sprintf("deviceId:%s, Device delection or reconciling is in progress.", agent.deviceID)
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device reconciling is in progress.", agent.deviceID)
 	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
 	subCtx = coreutils.WithRPCMetadataFromContext(subCtx, ctx)
@@ -671,6 +678,10 @@
 		agent.logDeviceUpdate(ctx, "deleteDeviceForce", nil, nil, operStatus, &desc)
 		return status.Error(codes.FailedPrecondition, desc)
 	}
+
+	//Send stop Reconcile if in progress
+	agent.stopReconcile()
+
 	device := agent.cloneDeviceWithoutLock()
 	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device,
 		voltha.DeviceTransientState_FORCE_DELETING, previousDeviceTransientState); err != nil {
@@ -711,6 +722,13 @@
 		desc = err.Error()
 		return err
 	}
+
+	if agent.isReconcileInProgress() {
+		agent.requestQueue.RequestComplete()
+		desc = fmt.Sprintf("deviceId:%s, Device Reconciling is in progress", agent.deviceID)
+		return status.Error(codes.FailedPrecondition, desc)
+	}
+
 	// Get the device Transient state, return err if it is DELETING
 	previousDeviceTransientState := agent.getTransientState()
 
@@ -956,12 +974,14 @@
 	if prevDevice.OperStatus != device.OperStatus || prevDevice.ConnectStatus != device.ConnectStatus || prevDevice.AdminState != device.AdminState {
 		_ = agent.deviceMgr.Agent.SendDeviceStateChangeEvent(ctx, prevDevice.OperStatus, prevDevice.ConnectStatus, prevDevice.AdminState, device, time.Now().Unix())
 	}
+	deviceTransientState := agent.getTransientState()
+
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
 	subCtx := coreutils.WithSpanAndRPCMetadataFromContext(ctx)
 
 	if err := agent.deviceMgr.stateTransitions.ProcessTransition(subCtx,
-		device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
+		device, prevDevice, deviceTransientState, deviceTransientState); err != nil {
 		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previous-admin-state": prevDevice.AdminState, "current-admin-state": device.AdminState})
 		// Sending RPC EVENT here
 		rpce := agent.deviceMgr.NewRPCEvent(ctx, agent.deviceID, err.Error(), nil)
@@ -1245,3 +1265,167 @@
 
 	return resp, nil
 }
+
+// The device lock MUST be held by the caller.
+func (agent *Agent) proceedWithRequestNoLock() bool {
+	return !agent.isDeletionInProgress() && !agent.isReconcileInProgress()
+}
+
+func (agent *Agent) stopReconcile() {
+	agent.stopReconcilingMutex.Lock()
+	if agent.stopReconciling != nil {
+		agent.stopReconciling <- 0
+	}
+	agent.stopReconcilingMutex.Unlock()
+}
+
+func (agent *Agent) ReconcileDevice(ctx context.Context, device *voltha.Device) {
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		return
+	}
+
+	if !agent.proceedWithRequestNoLock() {
+		agent.requestQueue.RequestComplete()
+		desc = fmt.Sprintf("Either device is in deletion or reconcile is already in progress for device : %s", device.Id)
+		logger.Errorf(ctx, desc)
+		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		return
+	}
+
+	//set transient state to RECONCILE IN PROGRESS
+	err := agent.updateTransientState(ctx, voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
+	if err != nil {
+		agent.requestQueue.RequestComplete()
+		desc = fmt.Sprintf("Not able to set device transient state to Reconcile in progress."+
+			"Err: %s", err.Error())
+		logger.Errorf(ctx, desc)
+		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		return
+	}
+
+	logger.Debugw(ctx, "retrying-reconciling", log.Fields{"deviceID": device.Id})
+	reconcilingBackoff := backoff.NewExponentialBackOff()
+	reconcilingBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
+	reconcilingBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
+	reconcilingBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
+
+	//making here to keep lifecycle of this channel within the scope of retryReconcile
+	agent.stopReconcilingMutex.Lock()
+	agent.stopReconciling = make(chan int)
+	agent.stopReconcilingMutex.Unlock()
+
+Loop:
+	for {
+		// Use an exponential back off to prevent getting into a tight loop
+		duration := reconcilingBackoff.NextBackOff()
+		//This case should never occur in default case as max elapsed time for backoff is 0(by default) , so it will never return stop
+		if duration == backoff.Stop {
+			// If we reach a maximum then warn and reset the backoff
+			// timer and keep attempting.
+			logger.Warnw(ctx, "maximum-reconciling-backoff-reached--resetting-backoff-timer",
+				log.Fields{"max-reconciling-backoff": reconcilingBackoff.MaxElapsedTime,
+					"device-id": device.Id})
+			reconcilingBackoff.Reset()
+			duration = reconcilingBackoff.NextBackOff()
+		}
+
+		backoffTimer := time.NewTimer(duration)
+
+		// Send a reconcile request to the adapter.
+		ch, err := agent.adapterProxy.ReconcileDevice(ctx, agent.device)
+		//release lock before moving further
+		agent.requestQueue.RequestComplete()
+		if err != nil {
+			desc := fmt.Sprintf("Failed reconciling from adapter side. Err: %s", err.Error())
+			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+			<-backoffTimer.C
+			// backoffTimer expired continue
+			// Take lock back before retrying
+			if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+				desc = err.Error()
+				agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+				return
+			}
+			continue
+		}
+
+		// if return err retry if not then break loop and quit retrying reconcile
+		if err = agent.waitForReconcileResponse(backoffTimer, ch); err != nil {
+			desc = err.Error()
+			logger.Errorf(ctx, desc)
+			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		} else {
+			operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_IN_PROGRESS}
+			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+			break Loop
+		}
+
+		// Take lock back before retrying
+		if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+			desc = err.Error()
+			agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+			return
+		}
+	}
+}
+
+func (agent *Agent) waitForReconcileResponse(backoffTimer *time.Timer, ch chan *kafka.RpcResponse) error {
+	select {
+	// wait for response
+	case resp, ok := <-ch:
+		if !ok {
+			//channel-closed
+			return errors.New("channel on which reconcile response is awaited is closed")
+		} else if resp.Err != nil {
+			//error encountered
+			return errors.New("error encountered while retrying reconcile")
+		}
+
+		//In case of success quit retrying and wait for adapter to reset operation state of device
+		agent.stopReconcilingMutex.Lock()
+		agent.stopReconciling = nil
+		agent.stopReconcilingMutex.Unlock()
+		return nil
+
+	//if reconciling need to be stopped
+	case _, ok := <-agent.stopReconciling:
+		agent.stopReconcilingMutex.Lock()
+		agent.stopReconciling = nil
+		agent.stopReconcilingMutex.Unlock()
+		if !ok {
+			//channel-closed
+			return errors.New("channel used to notify to stop reconcile is closed")
+		}
+		return nil
+	//continue if timer expired
+	case <-backoffTimer.C:
+	}
+	return nil
+}
+
+func (agent *Agent) reconcilingCleanup(ctx context.Context) error {
+	var desc string
+	operStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		desc = err.Error()
+		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+	err := agent.updateTransientState(ctx, voltha.DeviceTransientState_NONE)
+	if err != nil {
+		desc = fmt.Sprintf("Not able to clear device transient state from Reconcile in progress."+
+			"Err: %s", err.Error())
+		logger.Errorf(ctx, desc)
+		agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+		return err
+	}
+	operStatus = &common.OperationResp{Code: common.OperationResp_OPERATION_SUCCESS}
+	agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
+	return nil
+}