[VOL-4421] Execute delete force following an adapater restart
Change-Id: I1f27568ac5587740682ce39eaac86a4e813973e7
diff --git a/VERSION b/VERSION
index 7364556..8e24c99 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.0.2-dev
+3.0.2-dev
\ No newline at end of file
diff --git a/go.mod b/go.mod
index dd4b289..903e49a 100644
--- a/go.mod
+++ b/go.mod
@@ -17,7 +17,7 @@
github.com/golang/mock v1.6.0
github.com/golang/protobuf v1.5.2
github.com/google/uuid v1.3.0
- github.com/opencord/voltha-lib-go/v7 v7.1.0
+ github.com/opencord/voltha-lib-go/v7 v7.1.1
github.com/opencord/voltha-protos/v5 v5.1.0
github.com/opentracing/opentracing-go v1.2.0
github.com/phayes/freeport v0.0.0-20180830031419-95f893ade6f2
diff --git a/go.sum b/go.sum
index 4bafae0..4114466 100644
--- a/go.sum
+++ b/go.sum
@@ -191,8 +191,8 @@
github.com/onsi/gomega v1.10.1/go.mod h1:iN09h71vgCQne3DLsj+A5owkum+a2tYe+TOCB1ybHNo=
github.com/onsi/gomega v1.14.0 h1:ep6kpPVwmr/nTbklSx2nrLNSIO62DoYAhnPNIMhK8gI=
github.com/onsi/gomega v1.14.0/go.mod h1:cIuvLEne0aoVhAgh/O6ac0Op8WWw9H6eYCriF+tEHG0=
-github.com/opencord/voltha-lib-go/v7 v7.1.0 h1:XTJGvaJ2oditHWguQe8iC17PlY7yr1w7acdM8urwNu0=
-github.com/opencord/voltha-lib-go/v7 v7.1.0/go.mod h1:LjoFfwqdf/OHKUmzMqzBDGmoxfjmq9l/Y47yeBsK1xI=
+github.com/opencord/voltha-lib-go/v7 v7.1.1 h1:GWh0Ix7A1gr3b9L7GNy5pSDkLaPyZzJk/4ZNsj/B3eg=
+github.com/opencord/voltha-lib-go/v7 v7.1.1/go.mod h1:LjoFfwqdf/OHKUmzMqzBDGmoxfjmq9l/Y47yeBsK1xI=
github.com/opencord/voltha-protos/v5 v5.1.0 h1:pGClPJIVhuLiM0d01e3MdhGeSQwA6o6+XLAxv3f/Frk=
github.com/opencord/voltha-protos/v5 v5.1.0/go.mod h1:Ff7eoWsL0K7oNtOBB4UcuWDZdv1zBQ9lLHvpJr02erE=
github.com/opentracing/opentracing-go v1.1.0/go.mod h1:UkNAQd3GIcIGf0SeVgPpRdFStlNbqXla1AfSYxPUl2o=
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 818b976..0713f20 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -33,6 +33,7 @@
"github.com/gogo/protobuf/proto"
"github.com/golang/protobuf/ptypes/empty"
"github.com/opencord/voltha-go/rw_core/config"
+ "github.com/opencord/voltha-go/rw_core/utils"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -53,6 +54,7 @@
var errReconcileAborted = errors.New("reconcile aborted")
var errContextExpired = errors.New("context expired")
+var errNoConnection = errors.New("no connection")
// Agent represents device agent attributes
type Agent struct {
@@ -658,7 +660,7 @@
"adapter-endpoint": device.AdapterEndpoint,
})
agent.requestQueue.RequestComplete()
- return err
+ return fmt.Errorf("remote-not-reachable %w", errNoConnection)
}
subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
requestStatus.Code = common.OperationResp_OPERATION_IN_PROGRESS
@@ -1352,6 +1354,9 @@
updatedState = core.DeviceTransientState_DELETE_FAILED
case core.DeviceTransientState_DELETING_FROM_ADAPTER:
updatedState = core.DeviceTransientState_DELETE_FAILED
+ case core.DeviceTransientState_DELETE_FAILED:
+ // do not change state
+ return nil
default:
updatedState = core.DeviceTransientState_NONE
}
@@ -1362,7 +1367,91 @@
return nil
}
+func (agent *Agent) DeleteDevicePostAdapterRestart(ctx context.Context) error {
+ logger.Debugw(ctx, "delete-post-restart", log.Fields{"device-id": agent.deviceID})
+ ctx = utils.WithNewSpanAndRPCMetadataContext(ctx, "DelteDevicePostAdapterRestart")
+
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+
+ device := agent.getDeviceReadOnlyWithoutLock()
+ if device.AdminState == voltha.AdminState_PREPROVISIONED {
+ logger.Debugw(ctx, "device-in-preprovisioning-state-reconcile-not-needed", log.Fields{"device-id": device.Id})
+ agent.requestQueue.RequestComplete()
+ return nil
+ }
+ // Change device transient state to FORCE_DELETING
+ if err := agent.updateTransientState(ctx, core.DeviceTransientState_FORCE_DELETING); err != nil {
+ logger.Errorw(ctx, "failure-updating-transient-state", log.Fields{"error": err, "device-id": agent.deviceID})
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+
+ // Ensure we have a valid grpc client available as we have just restarted
+ deleteBackoff := backoff.NewExponentialBackOff()
+ deleteBackoff.InitialInterval = agent.config.BackoffRetryInitialInterval
+ deleteBackoff.MaxElapsedTime = agent.config.BackoffRetryMaxElapsedTime
+ deleteBackoff.MaxInterval = agent.config.BackoffRetryMaxInterval
+ var backoffTimer *time.Timer
+ var err error
+ var client adapter_service.AdapterServiceClient
+retry:
+ for {
+ client, err = agent.adapterMgr.GetAdapterClient(ctx, agent.adapterEndpoint)
+ if err == nil {
+ break retry
+ }
+ duration := deleteBackoff.NextBackOff()
+ if duration == backoff.Stop {
+ deleteBackoff.Reset()
+ duration = deleteBackoff.NextBackOff()
+ }
+ backoffTimer = time.NewTimer(duration)
+ select {
+ case <-backoffTimer.C:
+ logger.Debugw(ctx, "backoff-timer-expires", log.Fields{"device-id": agent.deviceID})
+ case <-ctx.Done():
+ err = ctx.Err()
+ break retry
+ }
+ }
+ if backoffTimer != nil && !backoffTimer.Stop() {
+ select {
+ case <-backoffTimer.C:
+ default:
+ }
+ }
+ if err != nil || client == nil {
+ agent.requestQueue.RequestComplete()
+ return err
+ }
+
+ // Release the device lock to allow for device state update, if any
+ agent.requestQueue.RequestComplete()
+
+ // Send the delete request to the adapter
+ subCtx, cancel := context.WithTimeout(coreutils.WithAllMetadataFromContext(ctx), agent.rpcTimeout)
+ defer cancel()
+ if _, err = client.DeleteDevice(subCtx, device); err != nil {
+ agent.onDeleteFailure(subCtx, err, nil, nil)
+ } else {
+ agent.onDeleteSuccess(subCtx, nil, nil)
+ }
+ return nil
+}
+
func (agent *Agent) ReconcileDevice(ctx context.Context) {
+ // Do not reconcile if the device was in DELETE_FAILED transient state. Just invoke the force delete on that device.
+ state := agent.getTransientState()
+ logger.Debugw(ctx, "starting-reconcile", log.Fields{"device-id": agent.deviceID, "state": state})
+ if agent.getTransientState() == core.DeviceTransientState_DELETE_FAILED {
+ if err := agent.DeleteDevicePostAdapterRestart(ctx); err != nil {
+ logger.Errorw(ctx, "delete-post-restart-failed", log.Fields{"error": err, "device-id": agent.deviceID})
+ }
+ return
+ }
+
requestStatus := &common.OperationResp{Code: common.OperationResp_OPERATION_FAILURE}
var desc string
diff --git a/rw_core/core/device/manager_state_callback.go b/rw_core/core/device/manager_state_callback.go
index 9990ef6..53ce6af 100644
--- a/rw_core/core/device/manager_state_callback.go
+++ b/rw_core/core/device/manager_state_callback.go
@@ -17,8 +17,10 @@
import (
"context"
+ "errors"
"github.com/opencord/voltha-lib-go/v7/pkg/log"
+ "github.com/opencord/voltha-protos/v5/go/core"
"github.com/opencord/voltha-protos/v5/go/voltha"
"google.golang.org/grpc/codes"
"google.golang.org/grpc/status"
@@ -129,7 +131,17 @@
logger.Debugw(ctx, "invoking-delete-device", log.Fields{"device-id": childDeviceID, "parent-device-id": parentCurrDevice.Id})
if err := agent.deleteDeviceForce(ctx); err != nil {
logger.Warnw(ctx, "delete-device-force-failed", log.Fields{"device-id": childDeviceID, "parent-device-id": parentCurrDevice.Id,
- "error": err.Error()})
+ "error": err})
+ // We got an error - if its a connection error we should just mark the device as delete failed and
+ // when connection is established then proceed with the deletion instead of reconciling the device.
+ // A DeviceTransientState_DELETE_FAILED does not perform any state transition
+ if errors.Is(err, errNoConnection) {
+ if err = agent.updateTransientState(ctx, core.DeviceTransientState_DELETE_FAILED); err != nil {
+ logger.Warnw(ctx, "failed-updating-transient-state", log.Fields{"device-id": childDeviceID, "parent-device-id": parentCurrDevice.Id,
+ "error": err})
+ }
+ logger.Debugw(ctx, "device-set-to-delete-failed", log.Fields{"device-id": childDeviceID, "parent-device-id": parentCurrDevice.Id})
+ }
}
// No further action is required here. The deleteDevice will change the device state where the resulting
// callback will take care of cleaning the child device agent.
diff --git a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
index bbec5a3..add2b28 100644
--- a/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
+++ b/vendor/github.com/opencord/voltha-lib-go/v7/pkg/grpc/client.go
@@ -378,9 +378,17 @@
select {
case <-ctx.Done():
logger.Debugw(ctx, "context-closing", log.Fields{"endpoint": c.apiEndPoint})
- return
+ break loop
case event := <-c.events:
logger.Debugw(ctx, "received-event", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+ c.connectionLock.RLock()
+ // On a client stopped, just allow the stop event to go through
+ if c.done && event != eventStopped {
+ c.connectionLock.RUnlock()
+ logger.Debugw(ctx, "ignoring-event-on-client-stop", log.Fields{"event": event, "endpoint": c.apiEndPoint})
+ continue
+ }
+ c.connectionLock.RUnlock()
switch event {
case eventConnecting:
c.stateLock.Lock()
@@ -404,7 +412,11 @@
return
}
attempt += 1
- c.events <- eventConnecting
+ c.connectionLock.RLock()
+ if !c.done {
+ c.events <- eventConnecting
+ }
+ c.connectionLock.RUnlock()
} else {
backoff.Reset()
}
@@ -544,11 +556,14 @@
}
func (c *Client) Stop(ctx context.Context) {
+ c.connectionLock.Lock()
+ defer c.connectionLock.Unlock()
if !c.done {
+ c.done = true
c.events <- eventStopped
close(c.events)
- c.done = true
}
+ logger.Infow(ctx, "client-stopped", log.Fields{"endpoint": c.apiEndPoint})
}
// SetService is used for testing only
diff --git a/vendor/modules.txt b/vendor/modules.txt
index 5aec008..eaefcd4 100644
--- a/vendor/modules.txt
+++ b/vendor/modules.txt
@@ -211,7 +211,7 @@
github.com/modern-go/concurrent
# github.com/modern-go/reflect2 v1.0.1
github.com/modern-go/reflect2
-# github.com/opencord/voltha-lib-go/v7 v7.1.0
+# github.com/opencord/voltha-lib-go/v7 v7.1.1
## explicit
github.com/opencord/voltha-lib-go/v7/pkg/adapters/common
github.com/opencord/voltha-lib-go/v7/pkg/config