VOL-3356 - Changed the way devices are updated.
so that state transitions will execute in the calling thread.
Also changed the locking guarantees when accessing devices.
Change-Id: I0d40215bf35ffafd2ee4fcef6b34515001adcc9c
diff --git a/VERSION b/VERSION
index 73462a5..499bd1e 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.5.1
+2.5.2-dev
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 9fbeb9d..6053d65 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -251,59 +251,63 @@
return proto.Clone(agent.device).(*voltha.Device), nil
}
-// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
-func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
+// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever. This is very efficient.
+// The device lock MUST be held by the caller.
+func (agent *Agent) getDeviceReadOnly() *voltha.Device {
return agent.device
}
+// cloneDeviceWithoutLock returns a copy of the device which is safe to modify.
+// The device lock MUST be held by the caller.
+func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device {
+ return proto.Clone(agent.device).(*voltha.Device)
+}
+
// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ oldDevice := agent.getDeviceReadOnly()
+ if oldDevice.AdminState == voltha.AdminState_ENABLED {
+ logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
+ agent.requestQueue.RequestComplete()
+ return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+ }
+ if oldDevice.AdminState == voltha.AdminState_DELETED {
+ // This is a temporary state when a device is deleted before it gets removed from the model.
+ agent.requestQueue.RequestComplete()
+ return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s", oldDevice.Id))
+ }
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
- adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+ adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
if err != nil {
- return err
- }
- cloned.Adapter = adapterName
-
- if cloned.AdminState == voltha.AdminState_ENABLED {
- logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
- err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
+ agent.requestQueue.RequestComplete()
return err
}
- if cloned.AdminState == voltha.AdminState_DELETED {
- // This is a temporary state when a device is deleted before it gets removed from the model.
- err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s ", cloned.Id))
- return err
- }
+ newDevice := agent.cloneDeviceWithoutLock()
+ newDevice.Adapter = adapterName
- previousAdminState := cloned.AdminState
-
- // Update the Admin State and set the operational state to activating before sending the request to the
- // Adapters
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, voltha.OperStatus_ACTIVATING); err != nil {
+ // Update the Admin State and set the operational state to activating before sending the request to the Adapters
+ newDevice.AdminState = voltha.AdminState_ENABLED
+ newDevice.OperStatus = voltha.OperStatus_ACTIVATING
+ if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
return err
}
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
- device := proto.Clone(cloned).(*voltha.Device)
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- if previousAdminState == voltha.AdminState_PREPROVISIONED {
- ch, err = agent.adapterProxy.AdoptDevice(subCtx, device)
+ if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
+ ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
- ch, err = agent.adapterProxy.ReEnableDevice(subCtx, device)
+ ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
}
if err != nil {
cancel()
@@ -391,27 +395,29 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
if cloned.AdminState == voltha.AdminState_DISABLED {
logger.Debugw(ctx, "device-already-disabled", log.Fields{"id": agent.deviceID})
+ agent.requestQueue.RequestComplete()
return nil
}
- if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
- cloned.AdminState == voltha.AdminState_DELETED {
+ if cloned.AdminState == voltha.AdminState_PREPROVISIONED || cloned.AdminState == voltha.AdminState_DELETED {
+ agent.requestQueue.RequestComplete()
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
// Update the Admin State and operational state before sending the request out
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DISABLED, cloned.ConnectStatus, voltha.OperStatus_UNKNOWN); err != nil {
+ cloned.AdminState = voltha.AdminState_DISABLED
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DisableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+ ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
return err
@@ -428,7 +434,7 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
@@ -444,15 +450,14 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
-
+ cloned := agent.cloneDeviceWithoutLock()
previousState := cloned.AdminState
// No check is required when deleting a device. Changing the state to DELETE will trigger the removal of this
// device by the state machine
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DELETED, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ cloned.AdminState = voltha.AdminState_DELETED
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
@@ -474,18 +479,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
cloned.ParentId = parentID
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- return err
- }
-
- return nil
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
// getSwitchCapability retrieves the switch capability of a parent device
@@ -553,10 +551,13 @@
return nil
}
-// updatePartialDeviceData updates a subset of a device that an Adapter can update.
-// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
-func (agent *Agent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
- cloned := agent.getDeviceWithoutLock()
+func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+
+ cloned := agent.cloneDeviceWithoutLock()
cloned.Root = device.Root
cloned.Vendor = device.Vendor
cloned.Model = device.Model
@@ -564,59 +565,41 @@
cloned.MacAddress = device.MacAddress
cloned.Vlan = device.Vlan
cloned.Reason = device.Reason
- return cloned, nil
-}
-
-func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
-
- updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
- if err != nil {
- return status.Errorf(codes.Internal, "%s", err.Error())
- }
- cloned := proto.Clone(updatedDevice).(*voltha.Device)
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
-
- newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
+ cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
- if s, ok := voltha.ConnectStatus_Types_value[connStatus.String()]; ok {
+ if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
- newConnStatus = connStatus
+ cloned.ConnectStatus = connStatus
}
- if s, ok := voltha.OperStatus_Types_value[operStatus.String()]; ok {
+ if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
- newOperStatus = operStatus
+ cloned.OperStatus = operStatus
}
logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
// TODO: A generic device update by attribute
func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
- return
- }
- defer agent.requestQueue.RequestComplete()
if value == nil {
return
}
- cloned := agent.getDeviceWithoutLock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+ return
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
updated := false
s := reflect.ValueOf(cloned).Elem()
if s.Kind() == reflect.Struct {
@@ -639,7 +622,7 @@
logger.Debugw(ctx, "update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
// Save the data
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
}
@@ -651,10 +634,10 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "simulateAlarm", log.Fields{"id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
+ ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
return err
@@ -663,42 +646,32 @@
return nil
}
-func (agent *Agent) updateDeviceStateInStoreWithoutLock(
- ctx context.Context,
- device *voltha.Device,
- adminState voltha.AdminState_Types,
- connectStatus voltha.ConnectStatus_Types,
- operStatus voltha.OperStatus_Types,
-) error {
- previousState := getDeviceStates(device)
- device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
-
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- return err
- }
-
- // process state transition in its own thread
- go func() {
- if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
- log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
- }
- }()
- return nil
-}
-
-//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
-// It is an internal helper function.
-func (agent *Agent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
+// This function updates the device in the DB, releases the device lock, and runs any state transitions.
+// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
+func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error {
+ // fail early if this agent is no longer valid
if agent.stopped {
+ agent.requestQueue.RequestComplete()
return errors.New("device agent stopped")
}
+ // update in db
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+ agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw(ctx, "updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
+ previousState := getDeviceStates(agent.device)
+ // update the device
agent.device = device
+
+ // release lock before processing transition
+ agent.requestQueue.RequestComplete()
+
+ if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
+ log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
+ }
return nil
}
@@ -706,13 +679,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": agent.deviceID, "reason": reason})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
- logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
@@ -754,19 +725,19 @@
return nil, err
}
- device := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
- if device.Adapter == "" {
- adapterName, err := agent.adapterMgr.GetAdapterType(device.Type)
+ if cloned.Adapter == "" {
+ adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
if err != nil {
agent.requestQueue.RequestComplete()
return nil, err
}
- device.Adapter = adapterName
+ cloned.Adapter = adapterName
}
// Send request to the adapter
- ch, err := agent.adapterProxy.StartOmciTest(ctx, device, omcitestrequest)
+ ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err
diff --git a/rw_core/core/device/agent_flow.go b/rw_core/core/device/agent_flow.go
index 52c7706..09ca7ed 100644
--- a/rw_core/core/device/agent_flow.go
+++ b/rw_core/core/device/agent_flow.go
@@ -49,7 +49,7 @@
logger.Debugw(ctx, "nothing-to-update", log.Fields{"device-id": agent.deviceID, "flows": newFlows})
return coreutils.DoneResponse(), nil
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
@@ -127,7 +127,7 @@
return coreutils.DoneResponse(), nil
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
@@ -183,7 +183,7 @@
return coreutils.DoneResponse(), nil
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states")
}
diff --git a/rw_core/core/device/agent_group.go b/rw_core/core/device/agent_group.go
index 86cc108..da2b7c5 100644
--- a/rw_core/core/device/agent_group.go
+++ b/rw_core/core/device/agent_group.go
@@ -50,7 +50,7 @@
return coreutils.DoneResponse(), nil
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
@@ -128,7 +128,7 @@
logger.Debugw(ctx, "nothing-to-delete", log.Fields{"device-id": agent.deviceID})
return coreutils.DoneResponse(), nil
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
dType, err := agent.adapterMgr.GetDeviceType(ctx, &voltha.ID{Id: device.Type})
if err != nil {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "non-existent-device-type-%s", device.Type)
@@ -184,7 +184,7 @@
return coreutils.DoneResponse(), nil
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
if device.OperStatus != voltha.OperStatus_ACTIVE || device.ConnectStatus != voltha.ConnectStatus_REACHABLE || device.AdminState != voltha.AdminState_ENABLED {
return coreutils.DoneResponse(), status.Errorf(codes.FailedPrecondition, "invalid device states-oper-%s-connect-%s-admin-%s", device.OperStatus, device.ConnectStatus, device.AdminState)
}
diff --git a/rw_core/core/device/agent_image.go b/rw_core/core/device/agent_image.go
index 8d60531..3bc91ca 100644
--- a/rw_core/core/device/agent_image.go
+++ b/rw_core/core/device/agent_image.go
@@ -31,47 +31,39 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "downloadImage", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
-
+ device := agent.cloneDeviceWithoutLock()
if device.AdminState != voltha.AdminState_ENABLED {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
}
+ if device.AdminState != voltha.AdminState_ENABLED {
+ logger.Debugw(ctx, "device-not-enabled", log.Fields{"id": agent.deviceID})
+ agent.requestQueue.RequestComplete()
+ return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
+ }
+
// Save the image
clonedImg := proto.Clone(img).(*voltha.ImageDownload)
clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
- cloned := proto.Clone(device).(*voltha.Device)
- if cloned.ImageDownloads == nil {
- cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
- } else {
- if device.AdminState != voltha.AdminState_ENABLED {
- logger.Debugw(ctx, "device-not-enabled", log.Fields{"id": agent.deviceID})
- return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceID, voltha.AdminState_ENABLED)
- }
- // Save the image
- clonedImg := proto.Clone(img).(*voltha.ImageDownload)
- clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
- if device.ImageDownloads == nil {
- device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
- } else {
- device.ImageDownloads = append(device.ImageDownloads, clonedImg)
- }
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
- return nil, err
- }
+ cloned := agent.cloneDeviceWithoutLock()
+ cloned.ImageDownloads = append(device.ImageDownloads, clonedImg)
- // Send the request to the adapter
- subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
- if err != nil {
- cancel()
- return nil, err
- }
- go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
+ cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
+ return nil, err
}
+
+ // Send the request to the adapter
+ subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
+ ch, err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg)
+ if err != nil {
+ cancel()
+ return nil, err
+ }
+ go agent.waitForAdapterResponse(subCtx, cancel, "downloadImage", ch, agent.onSuccess, agent.onFailure)
+
return &voltha.OperationResp{Code: voltha.OperationResp_OPERATION_SUCCESS}, nil
}
@@ -89,31 +81,33 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "cancelImageDownload", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
-
// Verify whether the Image is in the list of image being downloaded
+ device := agent.getDeviceReadOnly()
if !isImageRegistered(img, device) {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
// Update image download state
- for _, image := range device.ImageDownloads {
+ cloned := agent.cloneDeviceWithoutLock()
+ for _, image := range cloned.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
}
}
- if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ if cloned.AdminState != voltha.AdminState_DOWNLOADING_IMAGE {
+ agent.requestQueue.RequestComplete()
+ } else {
// Set the device to Enabled
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
+ cloned.AdminState = voltha.AdminState_ENABLED
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return nil, err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.CancelImageDownload(subCtx, device, img)
+ ch, err := agent.adapterProxy.CancelImageDownload(subCtx, cloned, img)
if err != nil {
cancel()
return nil, err
@@ -127,31 +121,34 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "activateImage", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
// Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, cloned) {
+ device := agent.getDeviceReadOnly()
+ if !isImageRegistered(img, device) {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
-
- if cloned.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "device-id:%s, device-in-downloading-state:%s", agent.deviceID, img.Name)
}
+
// Update image download state
+ cloned := agent.cloneDeviceWithoutLock()
for _, image := range cloned.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
image.ImageState = voltha.ImageDownload_IMAGE_ACTIVATING
}
}
// Set the device to downloading_image
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return nil, err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ ch, err := agent.adapterProxy.ActivateImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
return nil, err
@@ -167,32 +164,32 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "revertImage", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
-
// Verify whether the Image is in the list of image being downloaded
- if !isImageRegistered(img, cloned) {
+ device := agent.getDeviceReadOnly()
+ if !isImageRegistered(img, device) {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, image-not-registered:%s", agent.deviceID, img.Name)
}
-
- if cloned.AdminState != voltha.AdminState_ENABLED {
+ if device.AdminState != voltha.AdminState_ENABLED {
+ agent.requestQueue.RequestComplete()
return nil, status.Errorf(codes.FailedPrecondition, "deviceId:%s, device-not-enabled-state:%s", agent.deviceID, img.Name)
}
// Update image download state
+ cloned := agent.cloneDeviceWithoutLock()
for _, image := range cloned.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
}
}
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return nil, err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, proto.Clone(cloned).(*voltha.Device), img)
+ ch, err := agent.adapterProxy.RevertImageUpdate(subCtx, cloned, img)
if err != nil {
cancel()
return nil, err
@@ -208,7 +205,7 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return nil, err
}
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
ch, err := agent.adapterProxy.GetImageDownloadStatus(ctx, device, img)
agent.requestQueue.RequestComplete()
if err != nil {
@@ -234,12 +231,10 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "updating-image-download", log.Fields{"device-id": agent.deviceID, "img": img})
- cloned := agent.getDeviceWithoutLock()
-
// Update the image as well as remove it if the download was cancelled
+ cloned := agent.cloneDeviceWithoutLock()
clonedImages := make([]*voltha.ImageDownload, len(cloned.ImageDownloads))
for _, image := range cloned.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
@@ -249,13 +244,14 @@
}
}
cloned.ImageDownloads = clonedImages
+
// Set the Admin state to enabled if required
if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
- (img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
- return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
+ img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING {
+ cloned.AdminState = voltha.AdminState_ENABLED
}
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
@@ -265,8 +261,8 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "getImageDownload", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
- for _, image := range cloned.ImageDownloads {
+ device := agent.getDeviceReadOnly()
+ for _, image := range device.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
return image, nil
}
@@ -281,5 +277,5 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "listImageDownloads", log.Fields{"device-id": agent.deviceID})
- return &voltha.ImageDownloads{Items: agent.getDeviceWithoutLock().ImageDownloads}, nil
+ return &voltha.ImageDownloads{Items: agent.getDeviceReadOnly().ImageDownloads}, nil
}
diff --git a/rw_core/core/device/agent_pm_config.go b/rw_core/core/device/agent_pm_config.go
index d4e48fd..033e6f4 100644
--- a/rw_core/core/device/agent_pm_config.go
+++ b/rw_core/core/device/agent_pm_config.go
@@ -28,13 +28,12 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "updatePmConfigs", log.Fields{"device-id": pmConfigs.Id})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
// Send the request to the adapter
@@ -52,12 +51,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "initPmConfigs", log.Fields{"device-id": pmConfigs.Id})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
@@ -67,5 +65,5 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "listPmConfigs", log.Fields{"device-id": agent.deviceID})
- return agent.getDeviceWithoutLock().PmConfigs, nil
+ return agent.getDeviceReadOnly().PmConfigs, nil
}
diff --git a/rw_core/core/device/agent_test.go b/rw_core/core/device/agent_test.go
index 92977ac..9249198 100755
--- a/rw_core/core/device/agent_test.go
+++ b/rw_core/core/device/agent_test.go
@@ -18,6 +18,7 @@
import (
"context"
+ "fmt"
"math/rand"
"sort"
"strconv"
@@ -244,6 +245,8 @@
updatedDevice, _ := da.getDevice(context.Background())
updatedDevicePorts := da.listDevicePorts()
assert.NotNil(t, updatedDevice)
+ fmt.Printf("1 %+v\n", expectedChange)
+ fmt.Printf("2 %+v\n", updatedDevice)
assert.True(t, proto.Equal(expectedChange, updatedDevice))
assert.Equal(t, len(originalDevicePorts)+1, len(updatedDevicePorts))
assert.True(t, proto.Equal(updatedDevicePorts[portToAdd.PortNo], portToAdd))
@@ -284,9 +287,12 @@
da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
- cloned := a.getDeviceWithoutLock()
- err := a.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- assert.Nil(t, err)
+ err1 := a.requestQueue.WaitForGreenLight(ctx)
+ assert.Nil(t, err1)
+ cloned := a.cloneDeviceWithoutLock()
+ cloned.AdminState, cloned.ConnectStatus, cloned.OperStatus = voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE
+ err2 := a.updateDeviceAndReleaseLock(ctx, cloned)
+ assert.Nil(t, err2)
da.testFlowAddDeletes(t, a)
}
@@ -300,9 +306,12 @@
da.startCore(ctx)
da.oltAdapter, da.onuAdapter = tst.CreateAndregisterAdapters(ctx, t, da.kClient, da.coreInstanceID, da.oltAdapterName, da.onuAdapterName, da.adapterMgr)
a := da.createDeviceAgent(t)
- cloned := a.getDeviceWithoutLock()
- err := a.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
- assert.Nil(t, err)
+ err1 := a.requestQueue.WaitForGreenLight(ctx)
+ assert.Nil(t, err1)
+ cloned := a.cloneDeviceWithoutLock()
+ cloned.AdminState, cloned.ConnectStatus, cloned.OperStatus = voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE
+ err2 := a.updateDeviceAndReleaseLock(ctx, cloned)
+ assert.Nil(t, err2)
da.testGroupAddDeletes(t, a)
}
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 40f058e..3ae6534 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -179,7 +179,7 @@
// setupUNILogicalPorts creates a UNI port on the logical device that represents a child UNI interface
func (agent *LogicalAgent) setupUNILogicalPorts(ctx context.Context, childDevice *voltha.Device, childDevicePorts map[uint32]*voltha.Port) error {
- logger.Infow(ctx, "setupUNILogicalPort", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ logger.Infow(ctx, "setupUNILogicalPorts", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
// Build the logical device based on information retrieved from the device adapter
var err error
//Get UNI port number
diff --git a/rw_core/core/device/manager.go b/rw_core/core/device/manager.go
index 77da2f0..ad20aac 100755
--- a/rw_core/core/device/manager.go
+++ b/rw_core/core/device/manager.go
@@ -1306,13 +1306,13 @@
// SetupUNILogicalPorts creates UNI ports on the logical device that represents a child UNI interface
func (dMgr *Manager) SetupUNILogicalPorts(ctx context.Context, cDevice *voltha.Device) error {
- logger.Info(ctx, "addUNILogicalPort")
+ logger.Info(ctx, "SetupUNILogicalPorts")
cDevicePorts, err := dMgr.listDevicePorts(ctx, cDevice.Id)
if err != nil {
return err
}
if err := dMgr.logicalDeviceMgr.setupUNILogicalPorts(ctx, cDevice, cDevicePorts); err != nil {
- logger.Warnw(ctx, "addUNILogicalPort-error", log.Fields{"device": cDevice, "err": err})
+ logger.Warnw(ctx, "setupUNILogicalPorts-error", log.Fields{"device": cDevice, "err": err})
return err
}
return nil