VOL-3356 - Changed the way devices are updated.
so that state transitions will execute in the calling thread.
Also changed the locking guarantees when accessing devices.
Change-Id: I0d40215bf35ffafd2ee4fcef6b34515001adcc9c
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 9fbeb9d..6053d65 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -251,59 +251,63 @@
return proto.Clone(agent.device).(*voltha.Device), nil
}
-// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
-func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
+// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever. This is very efficient.
+// The device lock MUST be held by the caller.
+func (agent *Agent) getDeviceReadOnly() *voltha.Device {
return agent.device
}
+// cloneDeviceWithoutLock returns a copy of the device which is safe to modify.
+// The device lock MUST be held by the caller.
+func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device {
+ return proto.Clone(agent.device).(*voltha.Device)
+}
+
// enableDevice activates a preprovisioned or a disable device
func (agent *Agent) enableDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ oldDevice := agent.getDeviceReadOnly()
+ if oldDevice.AdminState == voltha.AdminState_ENABLED {
+ logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
+ agent.requestQueue.RequestComplete()
+ return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+ }
+ if oldDevice.AdminState == voltha.AdminState_DELETED {
+ // This is a temporary state when a device is deleted before it gets removed from the model.
+ agent.requestQueue.RequestComplete()
+ return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s", oldDevice.Id))
+ }
// First figure out which adapter will handle this device type. We do it at this stage as allow devices to be
// pre-provisioned with the required adapter not registered. At this stage, since we need to communicate
// with the adapter then we need to know the adapter that will handle this request
- adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+ adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
if err != nil {
- return err
- }
- cloned.Adapter = adapterName
-
- if cloned.AdminState == voltha.AdminState_ENABLED {
- logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
- err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
+ agent.requestQueue.RequestComplete()
return err
}
- if cloned.AdminState == voltha.AdminState_DELETED {
- // This is a temporary state when a device is deleted before it gets removed from the model.
- err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s ", cloned.Id))
- return err
- }
+ newDevice := agent.cloneDeviceWithoutLock()
+ newDevice.Adapter = adapterName
- previousAdminState := cloned.AdminState
-
- // Update the Admin State and set the operational state to activating before sending the request to the
- // Adapters
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, voltha.OperStatus_ACTIVATING); err != nil {
+ // Update the Admin State and set the operational state to activating before sending the request to the Adapters
+ newDevice.AdminState = voltha.AdminState_ENABLED
+ newDevice.OperStatus = voltha.OperStatus_ACTIVATING
+ if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
return err
}
// Adopt the device if it was in pre-provision state. In all other cases, try to re-enable it.
- device := proto.Clone(cloned).(*voltha.Device)
var ch chan *kafka.RpcResponse
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- if previousAdminState == voltha.AdminState_PREPROVISIONED {
- ch, err = agent.adapterProxy.AdoptDevice(subCtx, device)
+ if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
+ ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
} else {
- ch, err = agent.adapterProxy.ReEnableDevice(subCtx, device)
+ ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
}
if err != nil {
cancel()
@@ -391,27 +395,29 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
if cloned.AdminState == voltha.AdminState_DISABLED {
logger.Debugw(ctx, "device-already-disabled", log.Fields{"id": agent.deviceID})
+ agent.requestQueue.RequestComplete()
return nil
}
- if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
- cloned.AdminState == voltha.AdminState_DELETED {
+ if cloned.AdminState == voltha.AdminState_PREPROVISIONED || cloned.AdminState == voltha.AdminState_DELETED {
+ agent.requestQueue.RequestComplete()
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
}
// Update the Admin State and operational state before sending the request out
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DISABLED, cloned.ConnectStatus, voltha.OperStatus_UNKNOWN); err != nil {
+ cloned.AdminState = voltha.AdminState_DISABLED
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.DisableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+ ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
if err != nil {
cancel()
return err
@@ -428,7 +434,7 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
- device := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
if err != nil {
@@ -444,15 +450,14 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
-
+ cloned := agent.cloneDeviceWithoutLock()
previousState := cloned.AdminState
// No check is required when deleting a device. Changing the state to DELETE will trigger the removal of this
// device by the state machine
- if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DELETED, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ cloned.AdminState = voltha.AdminState_DELETED
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
return err
}
@@ -474,18 +479,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
-
logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
cloned.ParentId = parentID
- // Store the device
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- return err
- }
-
- return nil
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
// getSwitchCapability retrieves the switch capability of a parent device
@@ -553,10 +551,13 @@
return nil
}
-// updatePartialDeviceData updates a subset of a device that an Adapter can update.
-// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
-func (agent *Agent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
- cloned := agent.getDeviceWithoutLock()
+func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+
+ cloned := agent.cloneDeviceWithoutLock()
cloned.Root = device.Root
cloned.Vendor = device.Vendor
cloned.Model = device.Model
@@ -564,59 +565,41 @@
cloned.MacAddress = device.MacAddress
cloned.Vlan = device.Vlan
cloned.Reason = device.Reason
- return cloned, nil
-}
-
-func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- return err
- }
- defer agent.requestQueue.RequestComplete()
- logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
-
- updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
- if err != nil {
- return status.Errorf(codes.Internal, "%s", err.Error())
- }
- cloned := proto.Clone(updatedDevice).(*voltha.Device)
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
- cloned := agent.getDeviceWithoutLock()
-
- newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
+ cloned := agent.cloneDeviceWithoutLock()
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
- if s, ok := voltha.ConnectStatus_Types_value[connStatus.String()]; ok {
+ if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
- newConnStatus = connStatus
+ cloned.ConnectStatus = connStatus
}
- if s, ok := voltha.OperStatus_Types_value[operStatus.String()]; ok {
+ if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
- newOperStatus = operStatus
+ cloned.OperStatus = operStatus
}
logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
// TODO: A generic device update by attribute
func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
- if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
- logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
- return
- }
- defer agent.requestQueue.RequestComplete()
if value == nil {
return
}
- cloned := agent.getDeviceWithoutLock()
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+ return
+ }
+
+ cloned := agent.cloneDeviceWithoutLock()
updated := false
s := reflect.ValueOf(cloned).Elem()
if s.Kind() == reflect.Struct {
@@ -639,7 +622,7 @@
logger.Debugw(ctx, "update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
// Save the data
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
}
@@ -651,10 +634,10 @@
defer agent.requestQueue.RequestComplete()
logger.Debugw(ctx, "simulateAlarm", log.Fields{"id": agent.deviceID})
- cloned := agent.getDeviceWithoutLock()
+ device := agent.getDeviceReadOnly()
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
- ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
+ ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
if err != nil {
cancel()
return err
@@ -663,42 +646,32 @@
return nil
}
-func (agent *Agent) updateDeviceStateInStoreWithoutLock(
- ctx context.Context,
- device *voltha.Device,
- adminState voltha.AdminState_Types,
- connectStatus voltha.ConnectStatus_Types,
- operStatus voltha.OperStatus_Types,
-) error {
- previousState := getDeviceStates(device)
- device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
-
- if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
- return err
- }
-
- // process state transition in its own thread
- go func() {
- if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
- log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
- }
- }()
- return nil
-}
-
-//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
-// It is an internal helper function.
-func (agent *Agent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
+// This function updates the device in the DB, releases the device lock, and runs any state transitions.
+// The calling function MUST hold the device lock. The caller MUST NOT modify the device after this is called.
+func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error {
+ // fail early if this agent is no longer valid
if agent.stopped {
+ agent.requestQueue.RequestComplete()
return errors.New("device agent stopped")
}
+ // update in db
if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+ agent.requestQueue.RequestComplete()
return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
logger.Debugw(ctx, "updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
+ previousState := getDeviceStates(agent.device)
+ // update the device
agent.device = device
+
+ // release lock before processing transition
+ agent.requestQueue.RequestComplete()
+
+ if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
+ log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
+ }
return nil
}
@@ -706,13 +679,11 @@
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
}
- defer agent.requestQueue.RequestComplete()
+ logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": agent.deviceID, "reason": reason})
- cloned := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
cloned.Reason = reason
- logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
- // Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceAndReleaseLock(ctx, cloned)
}
func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
@@ -754,19 +725,19 @@
return nil, err
}
- device := agent.getDeviceWithoutLock()
+ cloned := agent.cloneDeviceWithoutLock()
- if device.Adapter == "" {
- adapterName, err := agent.adapterMgr.GetAdapterType(device.Type)
+ if cloned.Adapter == "" {
+ adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
if err != nil {
agent.requestQueue.RequestComplete()
return nil, err
}
- device.Adapter = adapterName
+ cloned.Adapter = adapterName
}
// Send request to the adapter
- ch, err := agent.adapterProxy.StartOmciTest(ctx, device, omcitestrequest)
+ ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
agent.requestQueue.RequestComplete()
if err != nil {
return nil, err