VOL-3504 Code changes to support force delete
Change-Id: I041ab2101a607b99e0372e432819a3f10f3a774c
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 4fc5197..a9767d4 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -25,18 +25,18 @@
"sync"
"time"
+ "github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/empty"
+ "github.com/opencord/voltha-go/db/model"
"github.com/opencord/voltha-go/rw_core/core/adapter"
"github.com/opencord/voltha-go/rw_core/core/device/flow"
"github.com/opencord/voltha-go/rw_core/core/device/group"
"github.com/opencord/voltha-go/rw_core/core/device/port"
"github.com/opencord/voltha-go/rw_core/core/device/remote"
- "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
-
- "github.com/gogo/protobuf/proto"
- "github.com/opencord/voltha-go/db/model"
+ "github.com/opencord/voltha-go/rw_core/core/device/transientstate"
coreutils "github.com/opencord/voltha-go/rw_core/utils"
+ "github.com/opencord/voltha-lib-go/v4/pkg/kafka"
"github.com/opencord/voltha-lib-go/v4/pkg/log"
ic "github.com/opencord/voltha-protos/v4/go/inter_container"
ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
@@ -63,9 +63,10 @@
stopOnce sync.Once
stopped bool
- flowLoader *flow.Loader
- groupLoader *group.Loader
- portLoader *port.Loader
+ flowLoader *flow.Loader
+ groupLoader *group.Loader
+ portLoader *port.Loader
+ transientStateLoader *transientstate.Loader
}
//newAgent creates a new device agent. The device will be initialized when start() is called.
@@ -76,21 +77,22 @@
}
return &Agent{
- deviceID: deviceID,
- adapterProxy: ap,
- isRootDevice: device.Root,
- parentID: device.ParentId,
- deviceType: device.Type,
- deviceMgr: deviceMgr,
- adapterMgr: deviceMgr.adapterMgr,
- exitChannel: make(chan int, 1),
- dbProxy: deviceProxy,
- defaultTimeout: timeout,
- device: proto.Clone(device).(*voltha.Device),
- requestQueue: coreutils.NewRequestQueue(),
- flowLoader: flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
- groupLoader: group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
- portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
+ deviceID: deviceID,
+ adapterProxy: ap,
+ isRootDevice: device.Root,
+ parentID: device.ParentId,
+ deviceType: device.Type,
+ deviceMgr: deviceMgr,
+ adapterMgr: deviceMgr.adapterMgr,
+ exitChannel: make(chan int, 1),
+ dbProxy: deviceProxy,
+ defaultTimeout: timeout,
+ device: proto.Clone(device).(*voltha.Device),
+ requestQueue: coreutils.NewRequestQueue(),
+ flowLoader: flow.NewLoader(dbPath.SubPath("flows").Proxy(deviceID)),
+ groupLoader: group.NewLoader(dbPath.SubPath("groups").Proxy(deviceID)),
+ portLoader: port.NewLoader(dbPath.SubPath("ports").Proxy(deviceID)),
+ transientStateLoader: transientstate.NewLoader(dbPath.SubPath("core").Proxy("transientstate"), deviceID),
}
}
@@ -128,6 +130,7 @@
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
agent.portLoader.Load(ctx)
+ agent.transientStateLoader.Load(ctx)
logger.Infow(ctx, "device-loaded-from-dB", log.Fields{"device-id": agent.deviceID})
} else {
@@ -169,7 +172,10 @@
defer agent.requestQueue.RequestComplete()
logger.Infow(ctx, "stopping-device-agent", log.Fields{"device-id": agent.deviceID, "parentId": agent.parentID})
-
+ // Remove the device transient loader
+ if err := agent.deleteTransientState(ctx); err != nil {
+ return err
+ }
// Remove the device from the KV store
if err := agent.dbProxy.Remove(ctx, agent.deviceID); err != nil {
return err
@@ -206,13 +212,15 @@
agent.flowLoader.Load(ctx)
agent.groupLoader.Load(ctx)
agent.portLoader.Load(ctx)
+ agent.transientStateLoader.Load(ctx)
+
logger.Debugw(ctx, "reconciled-device-agent-devicetype", log.Fields{"device-id": agent.deviceID, "type": agent.deviceType})
}
// onSuccess is a common callback for scenarios where we receive a nil response following a request to an adapter
// and the only action required is to publish a successful result on kafka
func (agent *Agent) onSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
- logger.Debugw(ctx, "response successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+ logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
// TODO: Post success message onto kafka
}
@@ -244,6 +252,53 @@
}
}
+// onDeleteSuccess is a common callback for scenarios where we receive a nil response following a delete request
+// to an adapter.
+func (agent *Agent) onDeleteSuccess(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+ logger.Debugw(ctx, "response-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+ }
+ previousDeviceTransientState := agent.getTransientState()
+ newDevice := agent.cloneDeviceWithoutLock()
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, newDevice,
+ voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE, previousDeviceTransientState); err != nil {
+ logger.Errorw(ctx, "delete-device-failure", log.Fields{"device-id": agent.deviceID, "error": err, "args": reqArgs})
+ }
+}
+
+// onDeleteFailure is a common callback for scenarios where we receive an error response following a delete request
+// to an adapter and the only action required is to return the error response.
+func (agent *Agent) onDeleteFailure(ctx context.Context, rpc string, response interface{}, reqArgs ...interface{}) {
+ if res, ok := response.(error); ok {
+ logger.Errorw(ctx, "rpc-failed", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": res, "args": reqArgs})
+ } else {
+ logger.Errorw(ctx, "rpc-failed-invalid-error", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "args": reqArgs})
+ }
+ //Only updating of transient state is required, no transition.
+ if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
+ logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
+ }
+
+}
+
+func (agent *Agent) waitForAdapterDeleteResponse(ctx context.Context, cancel context.CancelFunc, rpc string, ch chan *kafka.RpcResponse,
+ onSuccess coreutils.ResponseCallback, onFailure coreutils.ResponseCallback, reqArgs ...interface{}) {
+ defer cancel()
+ select {
+ case rpcResponse, ok := <-ch:
+ if !ok {
+ onFailure(ctx, rpc, status.Errorf(codes.Aborted, "channel-closed"), reqArgs)
+ } else if rpcResponse.Err != nil {
+ onFailure(ctx, rpc, rpcResponse.Err, reqArgs)
+ } else {
+ onSuccess(ctx, rpc, rpcResponse.Reply, reqArgs)
+ }
+ case <-ctx.Done():
+ onFailure(ctx, rpc, ctx.Err(), reqArgs)
+ }
+}
+
// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.
func (agent *Agent) getDeviceReadOnly(ctx context.Context) (*voltha.Device, error) {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
@@ -278,12 +333,10 @@
agent.requestQueue.RequestComplete()
return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
}
- if oldDevice.AdminState == voltha.AdminState_DELETED {
- // This is a temporary state when a device is deleted before it gets removed from the model.
+ if agent.isDeletionInProgress() {
agent.requestQueue.RequestComplete()
- return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s", oldDevice.Id))
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
}
-
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
@@ -406,11 +459,14 @@
agent.requestQueue.RequestComplete()
return nil
}
- if cloned.AdminState == voltha.AdminState_PREPROVISIONED || cloned.AdminState == voltha.AdminState_DELETED {
+ if cloned.AdminState == voltha.AdminState_PREPROVISIONED {
agent.requestQueue.RequestComplete()
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
-
+ if agent.isDeletionInProgress() {
+ agent.requestQueue.RequestComplete()
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+ }
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
@@ -437,6 +493,9 @@
logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
+ if agent.isDeletionInProgress() {
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device deletion is in progress.", agent.deviceID)
+ }
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
@@ -447,32 +506,79 @@
return nil
}
+func (agent *Agent) deleteDeviceForce(ctx context.Context) error {
+ logger.Debugw(ctx, "deleteDeviceForce", log.Fields{"device-id": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ // Get the device Transient state, return err if it is DELETING
+ previousDeviceTransientState := agent.getTransientState()
+
+ if agent.isStateDeleting(previousDeviceTransientState) {
+ agent.requestQueue.RequestComplete()
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress",
+ agent.deviceID)
+ }
+ device := agent.cloneDeviceWithoutLock()
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, voltha.DeviceTransientState_FORCE_DELETING,
+ previousDeviceTransientState); err != nil {
+ return err
+ }
+ previousAdminState := device.AdminState
+ if previousAdminState != ic.AdminState_PREPROVISIONED {
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
+ if err != nil {
+ cancel()
+ return err
+ }
+ // Since it is a case of force delete, nothing needs to be done on adapter responses.
+ go agent.waitForAdapterResponse(subCtx, cancel, "deleteDeviceForce", ch, agent.onSuccess,
+ agent.onFailure)
+ }
+ return nil
+}
+
func (agent *Agent) deleteDevice(ctx context.Context) error {
logger.Debugw(ctx, "deleteDevice", log.Fields{"device-id": agent.deviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
+ // Get the device Transient state, return err if it is DELETING
+ previousDeviceTransientState := agent.getTransientState()
- cloned := agent.cloneDeviceWithoutLock()
- previousState := cloned.AdminState
+ if agent.isStateDeleting(previousDeviceTransientState) {
+ agent.requestQueue.RequestComplete()
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Device Deletion is in progress", agent.deviceID)
+ }
+ device := agent.cloneDeviceWithoutLock()
+ previousAdminState := device.AdminState
+ // Change the device transient state to DELETING_FROM_ADAPTER state till the device is removed from adapters.
+ currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
- // No check is required when deleting a device. Changing the state to DELETE will trigger the removal of this
- // device by the state machine
- cloned.AdminState = voltha.AdminState_DELETED
- if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ if previousAdminState == ic.AdminState_PREPROVISIONED {
+ // Change the state to DELETING POST ADAPTER RESPONSE directly as adapters have no info of the device.
+ currentDeviceTransientState = voltha.DeviceTransientState_DELETING_POST_ADAPTER_RESPONSE
+ }
+ if err := agent.updateDeviceWithTransientStateAndReleaseLock(ctx, device, currentDeviceTransientState,
+ previousDeviceTransientState); err != nil {
return err
}
-
// If the device was in pre-prov state (only parent device are in that state) then do not send the request to the
// adapter
- if previousState != ic.AdminState_PREPROVISIONED {
+ if previousAdminState != ic.AdminState_PREPROVISIONED {
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DeleteDevice(subCtx, cloned)
+ ch, err := agent.adapterProxy.DeleteDevice(subCtx, device)
if err != nil {
cancel()
+ //updating of transient state is required in error
+ if err := agent.updateTransientState(ctx, voltha.DeviceTransientState_DELETE_FAILED); err != nil {
+ logger.Errorw(ctx, "failed-to-update-transient-state-as-delete-failed", log.Fields{"device-id": agent.deviceID})
+ }
return err
}
- go agent.waitForAdapterResponse(subCtx, cancel, "deleteDevice", ch, agent.onSuccess, agent.onFailure)
+ go agent.waitForAdapterDeleteResponse(subCtx, cancel, "deleteDevice", ch, agent.onDeleteSuccess,
+ agent.onDeleteFailure)
}
return nil
}
@@ -654,7 +760,7 @@
// fail early if this agent is no longer valid
if agent.stopped {
agent.requestQueue.RequestComplete()
- return errors.New("device agent stopped")
+ return errors.New("device-agent-stopped")
}
// update in db
@@ -671,12 +777,52 @@
// release lock before processing transition
agent.requestQueue.RequestComplete()
- if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx), device, prevDevice); err != nil {
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ device, prevDevice, voltha.DeviceTransientState_NONE, voltha.DeviceTransientState_NONE); err != nil {
logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
}
return nil
}
+// This function updates the device transient in the DB through loader, releases the device lock, and runs any state transitions.
+// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
+func (agent *Agent) updateDeviceWithTransientStateAndReleaseLock(ctx context.Context, device *voltha.Device,
+ transientState, prevTransientState voltha.DeviceTransientState_Types) error {
+ // fail early if this agent is no longer valid
+ if agent.stopped {
+ agent.requestQueue.RequestComplete()
+ return errors.New("device-agent-stopped")
+ }
+ //update device TransientState
+ if err := agent.updateTransientState(ctx, transientState); err != nil {
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+ // update in db
+ if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+ //Reverting TransientState update
+ err := agent.updateTransientState(ctx, prevTransientState)
+ logger.Errorw(ctx, "failed-to-revert-transient-state-update-on-error", log.Fields{"device-id": device.Id,
+ "previousTransientState": prevTransientState, "currentTransientState": transientState})
+ agent.requestQueue.RequestComplete()
+ return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
+ }
+
+ logger.Debugw(ctx, "updated-device-in-store", log.Fields{"device-id: ": agent.deviceID})
+
+ prevDevice := agent.device
+ // update the device
+ agent.device = device
+
+ // release lock before processing transition
+ agent.requestQueue.RequestComplete()
+
+ if err := agent.deviceMgr.stateTransitions.ProcessTransition(log.WithSpanFromContext(context.Background(), ctx),
+ device, prevDevice, transientState, prevTransientState); err != nil {
+ logger.Errorw(ctx, "failed-process-transition", log.Fields{"device-id": device.Id, "previousAdminState": prevDevice.AdminState, "currentAdminState": device.AdminState})
+ }
+ return nil
+}
func (agent *Agent) updateDeviceReason(ctx context.Context, reason string) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err