VOL-3504 Code changes to support force delete

Change-Id: I041ab2101a607b99e0372e432819a3f10f3a774c
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 4fc5197..a9767d4 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -25,18 +25,18 @@
 	"sync"
 	"time"
 
+	"github.com/gogo/protobuf/proto"
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/empty"
+	"github.com/opencord/voltha-go/db/model"
 	"github.com/opencord/voltha-go/rw_core/core/adapter"
 	"github.com/opencord/voltha-go/rw_core/core/device/flow"
 	"github.com/opencord/voltha-go/rw_core/core/device/group"
 	"github.com/opencord/voltha-go/rw_core/core/device/port"
 	"github.com/opencord/voltha-go/rw_core/core/device/remote"
-	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
-
-	"github.com/gogo/protobuf/proto"
-	"github.com/opencord/voltha-go/db/model"
+	"github.com/opencord/voltha-go/rw_core/core/device/transientstate"
 	coreutils "github.com/opencord/voltha-go/rw_core/utils"
+	"github.com/opencord/voltha-lib-go/v4/pkg/kafka"
 	"github.com/opencord/voltha-lib-go/v4/pkg/log"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
 	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
@@ -63,9 +63,10 @@
 	stopOnce       sync.Once
 	stopped        bool
 
-	flowLoader  *flow.Loader
-	groupLoader *group.Loader
-	portLoader  *port.Loader
+	flowLoader           *flow.Loader
+	groupLoader          *group.Loader
+	portLoader           *port.Loader
+	transientStateLoader *transientstate.Loader
 }
 
 //newAgent creates a new device agent. The device will be initialized when start() is called.
@@ -76,21 +77,22 @@
 	}
 
 	return &Agent{
-		deviceID:       deviceID,
-		adapterProxy:   ap,
-		isRootDevice:   device.Root,
-		parentID:       device.ParentId,
-		deviceType:     device.Type,
-		deviceMgr:      deviceMgr,
-		adapterMgr:     deviceMgr.adapterMgr,
-		exitChannel:    make(chan int, 1),
-		dbProxy:        deviceProxy,
-		defaultTimeout: timeout,
-		device:         proto.Clone(device).(*voltha.Device),
-		requestQueue:   coreutils.NewRequestQueue(),
-		flowLoader:     flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
-		groupLoader:    group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
-		portLoader:     port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
+		deviceID:             deviceID,
+		adapterProxy:         ap,
+		isRootDevice:         device.Root,
+		parentID:             device.ParentId,
+		deviceType:           device.Type,
+		deviceMgr:            deviceMgr,
+		adapterMgr:           deviceMgr.adapterMgr,
+		exitChannel:          make(chan int, 1),
+		dbProxy:              deviceProxy,
+		defaultTimeout:       timeout,
+		device:               proto.Clone(device).(*voltha.Device),
+		requestQueue:         coreutils.NewRequestQueue(),
+		flowLoader:           flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
+		groupLoader:          group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
+		portLoader:           port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
+		transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID),
 	}
 }
 
@@ -128,6 +130,7 @@
 		agent.flowLoader.Load(ctx)
 		agent.groupLoader.Load(ctx)
 		agent.portLoader.Load(ctx)
+		agent.transientStateLoader.Load(ctx)
 
 		logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
 	} else {
@@ -169,7 +172,10 @@
 	defer agent.requestQueue.RequestComplete()
 
 	logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
-
+	// Remove the device transient loader
+	if err := agent.deleteTransientState(ctx); err != nil {
+		return err
+	}
 	//	Remove the device from the KV store
 	if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
 		return err
@@ -206,13 +212,15 @@
 	agent.flowLoader.Load(ctx)
 	agent.groupLoader.Load(ctx)
 	agent.portLoader.Load(ctx)
+	agent.transientStateLoader.Load(ctx)
+
 	logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
 }
 
 // onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
 // and the only action required is to publish a successful result on kafka
 func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
-	logger.Debugw(ctx, "response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+	logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
 	// TODO: Post success message onto kafka
 }
 
@@ -244,6 +252,53 @@
 	}
 }
 
+// onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
+// to an adapter.
+func (agent *Agent) onDeleteSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+	logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+	}
+	previousDeviceTransientState := agent.getTransientState()
+	newDevice := agent.cloneDeviceWithoutLock()
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
+		voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
+		logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+	}
+}
+
+// onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
+//  to an adapter and the only action required is to return the error response.
+func (agent *Agent) onDeleteFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+	if res, ok := response.(error); ok {
+		logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+	} else {
+		logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+	}
+	//Only updating of transient state is required, no transition.
+	if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
+		logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
+	}
+
+}
+
+func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+	onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
+	defer cancel()
+	select {
+	case rpcResponse, ok := <-ch:
+		if !ok {
+			onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+		} else if rpcResponse.Err != nil {
+			onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+		} else {
+			onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
+		}
+	case <-ctx.Done():
+		onFailure(ctx, rpc, ctx.Err(), reqArgs)
+	}
+}
+
 // getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
 func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -278,12 +333,10 @@
 		agent.requestQueue.RequestComplete()
 		return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
 	}
-	if oldDevice.AdminState == voltha.AdminState_DELETED {
-		// This is a temporary state when a device is deleted before it gets removed from the model.
+	if agent.isDeletionInProgress() {
 		agent.requestQueue.RequestComplete()
-		return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s", oldDevice.Id))
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
 	}
-
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
@@ -406,11 +459,14 @@
 		agent.requestQueue.RequestComplete()
 		return nil
 	}
-	if cloned.AdminState == voltha.AdminState_PREPROVISIONED || cloned.AdminState == voltha.AdminState_DELETED {
+	if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
 		agent.requestQueue.RequestComplete()
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
-
+	if agent.isDeletionInProgress() {
+		agent.requestQueue.RequestComplete()
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+	}
 	// Update the Admin State and operational state before sending the request out
 	cloned.AdminState = voltha.AdminState_DISABLED
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
@@ -437,6 +493,9 @@
 	logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
 
 	device := agent.getDeviceReadOnlyWithoutLock()
+	if agent.isDeletionInProgress() {
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+	}
 	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
@@ -447,32 +506,79 @@
 	return nil
 }
 
+func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
+	logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	// Get the device Transient state, return err if it is DELETING
+	previousDeviceTransientState := agent.getTransientState()
+
+	if agent.isStateDeleting(previousDeviceTransientState) {
+		agent.requestQueue.RequestComplete()
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress",
+			agent.deviceID)
+	}
+	device := agent.cloneDeviceWithoutLock()
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
+		previousDeviceTransientState); err != nil {
+		return err
+	}
+	previousAdminState := device.AdminState
+	if previousAdminState != ic.AdminState_PREPROVISIONED {
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
+		if err != nil {
+			cancel()
+			return err
+		}
+		// Since it is a case of force delete, nothing needs to be done on adapter responses.
+		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+			agent.onFailure)
+	}
+	return nil
+}
+
 func (agent *Agent) deleteDevice(ctx context.Context) error {
 	logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
+	// Get the device Transient state, return err if it is DELETING
+	previousDeviceTransientState := agent.getTransientState()
 
-	cloned := agent.cloneDeviceWithoutLock()
-	previousState := cloned.AdminState
+	if agent.isStateDeleting(previousDeviceTransientState) {
+		agent.requestQueue.RequestComplete()
+		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress", agent.deviceID)
+	}
+	device := agent.cloneDeviceWithoutLock()
+	previousAdminState := device.AdminState
+	// Change the device transient state to DELETING_FROM_ADAPTER  state till the device is removed from adapters.
+	currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
 
-	// No check is required when deleting a device.  Changing the state to DELETE will trigger the removal of this
-	// device by the state machine
-	cloned.AdminState = voltha.AdminState_DELETED
-	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+	if previousAdminState == ic.AdminState_PREPROVISIONED {
+		// Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
+		currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
+	}
+	if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
+		previousDeviceTransientState); err != nil {
 		return err
 	}
-
 	// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
 	// adapter
-	if previousState != ic.AdminState_PREPROVISIONED {
+	if previousAdminState != ic.AdminState_PREPROVISIONED {
 		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
-		ch, err := agent.adapterProxy.DeleteDevice(subCtx, cloned)
+		ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
 		if err != nil {
 			cancel()
+			//updating of transient state is required in error
+			if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
+				logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
+			}
 			return err
 		}
-		go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onSuccess, agent.onFailure)
+		go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+			agent.onDeleteFailure)
 	}
 	return nil
 }
@@ -654,7 +760,7 @@
 	// fail early if this agent is no longer valid
 	if agent.stopped {
 		agent.requestQueue.RequestComplete()
-		return errors.New("device agent stopped")
+		return errors.New("device-agent-stopped")
 	}
 
 	// update in db
@@ -671,12 +777,52 @@
 	// release lock before processing transition
 	agent.requestQueue.RequestComplete()
 
-	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx), device, prevDevice); err != nil {
+	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+		device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
 		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
 	}
 	return nil
 }
 
+// This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
+// The calling function MUST hold the device lock.  The caller MUST NOT modify the device after this is called.
+func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
+	transientState, prevTransientState voltha.DeviceTransientState_Types) error {
+	// fail early if this agent is no longer valid
+	if agent.stopped {
+		agent.requestQueue.RequestComplete()
+		return errors.New("device-agent-stopped")
+	}
+	//update device TransientState
+	if err := agent.updateTransientState(ctx, transientState); err != nil {
+		agent.requestQueue.RequestComplete()
+		return err
+	}
+	// update in db
+	if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+		//Reverting TransientState update
+		err := agent.updateTransientState(ctx, prevTransientState)
+		logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
+			"previousTransientState": prevTransientState, "currentTransientState": transientState})
+		agent.requestQueue.RequestComplete()
+		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
+	}
+
+	logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})
+
+	prevDevice := agent.device
+	// update the device
+	agent.device = device
+
+	// release lock before processing transition
+	agent.requestQueue.RequestComplete()
+
+	if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+		device, prevDevice, transientState, prevTransientState); err != nil {
+		logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+	}
+	return nil
+}
 func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err