VOL-4189
rwcore - RPC update-flows-incrementally() triggered during reconciling
Change-Id: I91b260bdaf7401927c03eed08847c2d7b4112095
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 9cd49f0..cd879c6 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -407,18 +407,20 @@
oldDevice := agent.getDeviceReadOnlyWithoutLock()
+ if !agent.proceedWithRequest(oldDevice) {
+ agent.requestQueue.RequestComplete()
+
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.", agent.deviceID)
+ return status.Error(codes.FailedPrecondition, desc)
+ }
+
if oldDevice.AdminState == voltha.AdminState_ENABLED {
logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
agent.requestQueue.RequestComplete()
desc = fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id)
return status.Error(codes.FailedPrecondition, desc)
}
- if !agent.proceedWithRequestNoLock() {
- agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.", agent.deviceID)
- return status.Error(codes.FailedPrecondition, desc)
- }
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
@@ -574,6 +576,12 @@
cloned := agent.cloneDeviceWithoutLock()
+ if !agent.proceedWithRequest(cloned) {
+ agent.requestQueue.RequestComplete()
+ desc = fmt.Sprintf("deviceId:%s,Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
+ return status.Errorf(codes.FailedPrecondition, desc)
+ }
+
if cloned.AdminState == voltha.AdminState_DISABLED {
desc = "device-already-disabled"
logger.Debugw(ctx, "device-already-disabled", log.Fields{"device-id": agent.deviceID})
@@ -586,12 +594,6 @@
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
- if !agent.proceedWithRequestNoLock() {
- agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s,Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.", agent.deviceID)
- return status.Errorf(codes.FailedPrecondition, desc)
- }
-
// Update the Admin State and operational state before sending the request out
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
@@ -633,8 +635,9 @@
logger.Debugw(ctx, "reboot-device", log.Fields{"device-id": agent.deviceID})
device := agent.getDeviceReadOnlyWithoutLock()
- if !agent.proceedWithRequestNoLock() {
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.", agent.deviceID)
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.", agent.deviceID)
return status.Errorf(codes.FailedPrecondition, desc)
}
subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
@@ -719,21 +722,17 @@
return err
}
- if agent.isInReconcileState() {
+ device := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as Reconciling is in progress or failed", agent.deviceID)
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
return status.Error(codes.FailedPrecondition, desc)
}
// Get the device Transient state, return err if it is DELETING
previousDeviceTransientState := agent.getTransientState()
- if agent.isStateDeleting(previousDeviceTransientState) {
- agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("deviceId:%s, Device Deletion is in progress", agent.deviceID)
- return status.Error(codes.FailedPrecondition, desc)
- }
- device := agent.cloneDeviceWithoutLock()
previousAdminState := device.AdminState
// Change the device transient state to DELETING_FROM_ADAPTER state till the device is removed from adapters.
currentDeviceTransientState := voltha.DeviceTransientState_DELETING_FROM_ADAPTER
@@ -1262,9 +1261,8 @@
return resp, nil
}
-// The device lock MUST be held by the caller.
-func (agent *Agent) proceedWithRequestNoLock() bool {
- return !agent.isDeletionInProgress() && !agent.isInReconcileState()
+func (agent *Agent) proceedWithRequest(device *voltha.Device) bool {
+ return !agent.isDeletionInProgress() && !agent.isInReconcileState(device)
}
func (agent *Agent) stopReconcile() {
@@ -1285,9 +1283,9 @@
return
}
- if !agent.proceedWithRequestNoLock() {
+ if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- desc = fmt.Sprintf("Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed for device : %s", device.Id)
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
logger.Errorf(ctx, desc)
agent.logDeviceUpdate(ctx, "Reconciling", nil, nil, operStatus, &desc)
return
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 90b57a7..f13003b 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -60,12 +60,20 @@
agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", agent.deviceID)
+ agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ }
+
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
agent.logDeviceUpdate(ctx, "addFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
+
flowsToAdd := make([]*ofp.OfpFlowStats, 0)
flowsToDelete := make([]*ofp.OfpFlowStats, 0)
for _, flow := range newFlows {
@@ -161,11 +169,19 @@
desc = err.Error()
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
+ agent.logDeviceUpdate(ctx, "deleteFlowsFromAdapter", nil, nil, operStatus, &desc)
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ }
+
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
+
for _, flow := range flowsToDel {
if flowHandle, have := agent.flowCache.Lock(flow.Id); have {
// Update the store and cache
@@ -230,11 +246,19 @@
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
+ agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ }
+
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
desc = "invalid device states"
agent.logDeviceUpdate(ctx, "updateFlowsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
+
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
@@ -242,6 +266,7 @@
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
}
+
flowsToAdd := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
flowsToDelete := make([]*ofp.OfpFlowStats, 0, len(updatedFlows))
for _, flow := range updatedFlows {
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 035bc7c..ba58d2f 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -61,6 +61,13 @@
agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
+ agent.logDeviceUpdate(ctx, "addGroupsToAdapter", nil, nil, operStatus, &desc)
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ }
+
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
@@ -163,6 +170,13 @@
desc = err.Error()
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", device.Id)
+ agent.logDeviceUpdate(ctx, "deleteGroupsFromAdapter", nil, nil, operStatus, &desc)
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ }
+
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
@@ -234,11 +248,19 @@
agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.Aborted, "%s", err)
}
+
+ if !agent.proceedWithRequest(device) {
+ desc = fmt.Sprintf("deviceId:%s, Cannot complete operation as device deletion is in progress or Reconciling is in progress/failed", device.Id)
+ agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
+ return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "%s", desc)
+ }
+
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
desc = fmt.Sprintf("invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
agent.logDeviceUpdate(ctx, "updateGroupsToAdapter", nil, nil, operStatus, &desc)
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
}
+
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
desc = fmt.Sprintf("non-existent-device-type-%s", device.Type)
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 4dedc9a..e71af8a 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -42,13 +42,15 @@
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, is an OLT. Image update "+
"not supported by VOLTHA. Use Device Manager or other means", agent.deviceID)
}
- if !agent.proceedWithRequestNoLock() {
+
+ device := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.",
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
agent.deviceID)
}
- device := agent.cloneDeviceWithoutLock()
if device.ImageDownloads != nil {
for _, image := range device.ImageDownloads {
if image.DownloadState == voltha.ImageDownload_DOWNLOAD_REQUESTED {
@@ -105,14 +107,15 @@
}
logger.Debugw(ctx, "cancel-image-download", log.Fields{"device-id": agent.deviceID})
- if !agent.proceedWithRequestNoLock() {
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.",
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
agent.deviceID)
}
// Update image download state
- cloned := agent.cloneDeviceWithoutLock()
_, index, err := getImage(img, cloned)
if err != nil {
agent.requestQueue.RequestComplete()
@@ -149,14 +152,15 @@
}
logger.Debugw(ctx, "activate-image", log.Fields{"device-id": agent.deviceID})
- if !agent.proceedWithRequestNoLock() {
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.",
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconcile is in progress/failed.",
agent.deviceID)
}
// Update image download state
- cloned := agent.cloneDeviceWithoutLock()
image, index, err := getImage(img, cloned)
if err != nil {
agent.requestQueue.RequestComplete()
@@ -268,14 +272,15 @@
}
logger.Debugw(ctx, "updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
- if !agent.proceedWithRequestNoLock() {
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.",
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
agent.deviceID)
}
// Update the image as well as remove it if the download was cancelled
- cloned := agent.cloneDeviceWithoutLock()
clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
for _, image := range cloned.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
@@ -331,9 +336,12 @@
logger.Errorw(subCtx, "can't obtain lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
return
}
- if !agent.proceedWithRequestNoLock() {
+
+ device := agent.getDeviceReadOnlyWithoutLock()
+
+ if !agent.proceedWithRequest(device) {
agent.requestQueue.RequestComplete()
- logger.Errorw(subCtx, "Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.",
+ logger.Errorw(subCtx, "Cannot complete operation as device deletion is in progress or reconciling is in progress/failed.",
log.Fields{"rpc": rpc, "device-id": agent.deviceID})
return
}
@@ -383,14 +391,17 @@
logger.Errorw(ctx, "cannot-obtain-lock", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "error": err, "args": reqArgs})
return
}
- if !agent.proceedWithRequestNoLock() {
+
+ cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
agent.requestQueue.RequestComplete()
- logger.Errorw(ctx, "Cannot complete operation as Device deletion/reconciling is in progress or reconcile failed.",
+ logger.Errorw(ctx, "Cannot complete operation as Device deletion is in progress or reconciling is in progress/failed.",
log.Fields{"rpc": rpc, "device-id": agent.deviceID})
return
}
logger.Infow(ctx, "rpc-successful", log.Fields{"rpc": rpc, "device-id": agent.deviceID, "response": response, "args": reqArgs})
- cloned := agent.cloneDeviceWithoutLock()
+
//TODO base this on IMAGE ID when created
var imageSucceeded *voltha.ImageDownload
var index int
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index 2e355aa..176a126 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -32,6 +32,11 @@
logger.Debugw(ctx, "update-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", cloned.Id)
+ }
+
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Send the request to the adapter
@@ -95,6 +100,10 @@
logger.Debugw(ctx, "init-pm-configs", log.Fields{"device-id": pmConfigs.Id})
cloned := agent.cloneDeviceWithoutLock()
+
+ if !agent.proceedWithRequest(cloned) {
+ return status.Errorf(codes.FailedPrecondition, "deviceId:%s, Cannot complete operation as device deletion is in progress or reconciling is in progress/failed", cloned.Id)
+ }
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
diff --git a/rw_core/core/device/agent_transient_state.go b/rw_core/core/device/agent_transient_state.go
index 6f75ecc..0adf57d 100644
--- a/rw_core/core/device/agent_transient_state.go
+++ b/rw_core/core/device/agent_transient_state.go
@@ -74,8 +74,7 @@
return nil
}
-func (agent *Agent) isInReconcileState() bool {
- device := agent.getDeviceReadOnlyWithoutLock()
+func (agent *Agent) isInReconcileState(device *voltha.Device) bool {
return device.OperStatus == common.OperStatus_RECONCILING || device.OperStatus == common.OperStatus_RECONCILING_FAILED ||
agent.matchTransientState(voltha.DeviceTransientState_RECONCILE_IN_PROGRESS)
}