VOL-3356 - Changed the way devices are updated.

so that state transitions will execute in the calling thread.
Also changed the locking guarantees when accessing devices.

Change-Id: I0d40215bf35ffafd2ee4fcef6b34515001adcc9c
diff --git a/rw_core/core/device/agent.go b/rw_core/core/device/agent.go
index 9fbeb9d..6053d65 100755
--- a/rw_core/core/device/agent.go
+++ b/rw_core/core/device/agent.go
@@ -251,59 +251,63 @@
 	return proto.Clone(agent.device).(*voltha.Device), nil
 }
 
-// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
-func (agent *Agent) getDeviceWithoutLock() *voltha.Device {
+// getDeviceReadOnly returns a device which MUST NOT be modified, but is safe to keep forever.  This is very efficient.
+// The device lock MUST be held by the caller.
+func (agent *Agent) getDeviceReadOnly() *voltha.Device {
 	return agent.device
 }
 
+// cloneDeviceWithoutLock returns a copy of the device which is safe to modify.
+// The device lock MUST be held by the caller.
+func (agent *Agent) cloneDeviceWithoutLock() *voltha.Device {
+	return proto.Clone(agent.device).(*voltha.Device)
+}
+
 // enableDevice activates a preprovisioned or a disable device
 func (agent *Agent) enableDevice(ctx context.Context) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
-
 	logger.Debugw(ctx, "enableDevice", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	oldDevice := agent.getDeviceReadOnly()
+	if oldDevice.AdminState == voltha.AdminState_ENABLED {
+		logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
+		agent.requestQueue.RequestComplete()
+		return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s", oldDevice.Id))
+	}
+	if oldDevice.AdminState == voltha.AdminState_DELETED {
+		// This is a temporary state when a device is deleted before it gets removed from the model.
+		agent.requestQueue.RequestComplete()
+		return status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s", oldDevice.Id))
+	}
 
 	// First figure out which adapter will handle this device type.  We do it at this stage as allow devices to be
 	// pre-provisioned with the required adapter not registered.   At this stage, since we need to communicate
 	// with the adapter then we need to know the adapter that will handle this request
-	adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
+	adapterName, err := agent.adapterMgr.GetAdapterType(oldDevice.Type)
 	if err != nil {
-		return err
-	}
-	cloned.Adapter = adapterName
-
-	if cloned.AdminState == voltha.AdminState_ENABLED {
-		logger.Warnw(ctx, "device-already-enabled", log.Fields{"device-id": agent.deviceID})
-		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-an-already-enabled-device: %s ", cloned.Id))
+		agent.requestQueue.RequestComplete()
 		return err
 	}
 
-	if cloned.AdminState == voltha.AdminState_DELETED {
-		// This is a temporary state when a device is deleted before it gets removed from the model.
-		err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-enable-a-deleted-device: %s ", cloned.Id))
-		return err
-	}
+	newDevice := agent.cloneDeviceWithoutLock()
+	newDevice.Adapter = adapterName
 
-	previousAdminState := cloned.AdminState
-
-	// Update the Admin State and set the operational state to activating before sending the request to the
-	// Adapters
-	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, voltha.OperStatus_ACTIVATING); err != nil {
+	// Update the Admin State and set the operational state to activating before sending the request to the Adapters
+	newDevice.AdminState = voltha.AdminState_ENABLED
+	newDevice.OperStatus = voltha.OperStatus_ACTIVATING
+	if err := agent.updateDeviceAndReleaseLock(ctx, newDevice); err != nil {
 		return err
 	}
 
 	// Adopt the device if it was in pre-provision state.  In all other cases, try to re-enable it.
-	device := proto.Clone(cloned).(*voltha.Device)
 	var ch chan *kafka.RpcResponse
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	if previousAdminState == voltha.AdminState_PREPROVISIONED {
-		ch, err = agent.adapterProxy.AdoptDevice(subCtx, device)
+	if oldDevice.AdminState == voltha.AdminState_PREPROVISIONED {
+		ch, err = agent.adapterProxy.AdoptDevice(subCtx, newDevice)
 	} else {
-		ch, err = agent.adapterProxy.ReEnableDevice(subCtx, device)
+		ch, err = agent.adapterProxy.ReEnableDevice(subCtx, newDevice)
 	}
 	if err != nil {
 		cancel()
@@ -391,27 +395,29 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
 	logger.Debugw(ctx, "disableDevice", log.Fields{"device-id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.cloneDeviceWithoutLock()
 
 	if cloned.AdminState == voltha.AdminState_DISABLED {
 		logger.Debugw(ctx, "device-already-disabled", log.Fields{"id": agent.deviceID})
+		agent.requestQueue.RequestComplete()
 		return nil
 	}
-	if cloned.AdminState == voltha.AdminState_PREPROVISIONED ||
-		cloned.AdminState == voltha.AdminState_DELETED {
+	if cloned.AdminState == voltha.AdminState_PREPROVISIONED || cloned.AdminState == voltha.AdminState_DELETED {
+		agent.requestQueue.RequestComplete()
 		return status.Errorf(codes.FailedPrecondition, "deviceId:%s, invalid-admin-state:%s", agent.deviceID, cloned.AdminState)
 	}
 
 	// Update the Admin State and operational state before sending the request out
-	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DISABLED, cloned.ConnectStatus, voltha.OperStatus_UNKNOWN); err != nil {
+	cloned.AdminState = voltha.AdminState_DISABLED
+	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		return err
 	}
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.DisableDevice(subCtx, proto.Clone(cloned).(*voltha.Device))
+	ch, err := agent.adapterProxy.DisableDevice(subCtx, cloned)
 	if err != nil {
 		cancel()
 		return err
@@ -428,7 +434,7 @@
 	defer agent.requestQueue.RequestComplete()
 	logger.Debugw(ctx, "rebootDevice", log.Fields{"device-id": agent.deviceID})
 
-	device := agent.getDeviceWithoutLock()
+	device := agent.getDeviceReadOnly()
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 	ch, err := agent.adapterProxy.RebootDevice(subCtx, device)
 	if err != nil {
@@ -444,15 +450,14 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
 
-	cloned := agent.getDeviceWithoutLock()
-
+	cloned := agent.cloneDeviceWithoutLock()
 	previousState := cloned.AdminState
 
 	// No check is required when deleting a device.  Changing the state to DELETE will trigger the removal of this
 	// device by the state machine
-	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DELETED, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+	cloned.AdminState = voltha.AdminState_DELETED
+	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		return err
 	}
 
@@ -474,18 +479,11 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
-
 	logger.Debugw(ctx, "setParentId", log.Fields{"device-id": device.Id, "parent-id": parentID})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.cloneDeviceWithoutLock()
 	cloned.ParentId = parentID
-	// Store the device
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
-		return err
-	}
-
-	return nil
+	return agent.updateDeviceAndReleaseLock(ctx, cloned)
 }
 
 // getSwitchCapability retrieves the switch capability of a parent device
@@ -553,10 +551,13 @@
 	return nil
 }
 
-// updatePartialDeviceData updates a subset of a device that an Adapter can update.
-// TODO:  May need a specific proto to handle only a subset of a device that can be changed by an adapter
-func (agent *Agent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
-	cloned := agent.getDeviceWithoutLock()
+func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
+
+	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Root = device.Root
 	cloned.Vendor = device.Vendor
 	cloned.Model = device.Model
@@ -564,59 +565,41 @@
 	cloned.MacAddress = device.MacAddress
 	cloned.Vlan = device.Vlan
 	cloned.Reason = device.Reason
-	return cloned, nil
-}
-
-func (agent *Agent) updateDeviceUsingAdapterData(ctx context.Context, device *voltha.Device) error {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		return err
-	}
-	defer agent.requestQueue.RequestComplete()
-	logger.Debugw(ctx, "updateDeviceUsingAdapterData", log.Fields{"device-id": device.Id})
-
-	updatedDevice, err := agent.mergeDeviceInfoFromAdapter(device)
-	if err != nil {
-		return status.Errorf(codes.Internal, "%s", err.Error())
-	}
-	cloned := proto.Clone(updatedDevice).(*voltha.Device)
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	return agent.updateDeviceAndReleaseLock(ctx, cloned)
 }
 
 func (agent *Agent) updateDeviceStatus(ctx context.Context, operStatus voltha.OperStatus_Types, connStatus voltha.ConnectStatus_Types) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
 
-	cloned := agent.getDeviceWithoutLock()
-
-	newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
+	cloned := agent.cloneDeviceWithoutLock()
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
-	if s, ok := voltha.ConnectStatus_Types_value[connStatus.String()]; ok {
+	if s, ok := voltha.ConnectStatus_Types_name[int32(connStatus)]; ok {
 		logger.Debugw(ctx, "updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
-		newConnStatus = connStatus
+		cloned.ConnectStatus = connStatus
 	}
-	if s, ok := voltha.OperStatus_Types_value[operStatus.String()]; ok {
+	if s, ok := voltha.OperStatus_Types_name[int32(operStatus)]; ok {
 		logger.Debugw(ctx, "updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
-		newOperStatus = operStatus
+		cloned.OperStatus = operStatus
 	}
 	logger.Debugw(ctx, "updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 	// Store the device
-	return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
+	return agent.updateDeviceAndReleaseLock(ctx, cloned)
 }
 
 // TODO: A generic device update by attribute
 func (agent *Agent) updateDeviceAttribute(ctx context.Context, name string, value interface{}) {
-	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
-		logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
-		return
-	}
-	defer agent.requestQueue.RequestComplete()
 	if value == nil {
 		return
 	}
 
-	cloned := agent.getDeviceWithoutLock()
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		logger.Warnw(ctx, "request-aborted", log.Fields{"device-id": agent.deviceID, "name": name, "error": err})
+		return
+	}
+
+	cloned := agent.cloneDeviceWithoutLock()
 	updated := false
 	s := reflect.ValueOf(cloned).Elem()
 	if s.Kind() == reflect.Struct {
@@ -639,7 +622,7 @@
 	logger.Debugw(ctx, "update-field-status", log.Fields{"deviceId": cloned.Id, "name": name, "updated": updated})
 	//	Save the data
 
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+	if err := agent.updateDeviceAndReleaseLock(ctx, cloned); err != nil {
 		logger.Warnw(ctx, "attribute-update-failed", log.Fields{"attribute": name, "value": value})
 	}
 }
@@ -651,10 +634,10 @@
 	defer agent.requestQueue.RequestComplete()
 	logger.Debugw(ctx, "simulateAlarm", log.Fields{"id": agent.deviceID})
 
-	cloned := agent.getDeviceWithoutLock()
+	device := agent.getDeviceReadOnly()
 
 	subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
-	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, cloned, simulateReq)
+	ch, err := agent.adapterProxy.SimulateAlarm(subCtx, device, simulateReq)
 	if err != nil {
 		cancel()
 		return err
@@ -663,42 +646,32 @@
 	return nil
 }
 
-func (agent *Agent) updateDeviceStateInStoreWithoutLock(
-	ctx context.Context,
-	device *voltha.Device,
-	adminState voltha.AdminState_Types,
-	connectStatus voltha.ConnectStatus_Types,
-	operStatus voltha.OperStatus_Types,
-) error {
-	previousState := getDeviceStates(device)
-	device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
-
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
-		return err
-	}
-
-	// process state transition in its own thread
-	go func() {
-		if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
-			log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
-		}
-	}()
-	return nil
-}
-
-//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
-// It is an internal helper function.
-func (agent *Agent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
+// This function updates the device in the DB, releases the device lock, and runs any state transitions.
+// The calling function MUST hold the device lock.  The caller MUST NOT modify the device after this is called.
+func (agent *Agent) updateDeviceAndReleaseLock(ctx context.Context, device *voltha.Device) error {
+	// fail early if this agent is no longer valid
 	if agent.stopped {
+		agent.requestQueue.RequestComplete()
 		return errors.New("device agent stopped")
 	}
 
+	// update in db
 	if err := agent.dbProxy.Set(ctx, agent.deviceID, device); err != nil {
+		agent.requestQueue.RequestComplete()
 		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
 	}
 	logger.Debugw(ctx, "updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
 
+	previousState := getDeviceStates(agent.device)
+	// update the device
 	agent.device = device
+
+	// release lock before processing transition
+	agent.requestQueue.RequestComplete()
+
+	if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
+		log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
+	}
 	return nil
 }
 
@@ -706,13 +679,11 @@
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
 	}
-	defer agent.requestQueue.RequestComplete()
+	logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": agent.deviceID, "reason": reason})
 
-	cloned := agent.getDeviceWithoutLock()
+	cloned := agent.cloneDeviceWithoutLock()
 	cloned.Reason = reason
-	logger.Debugw(ctx, "updateDeviceReason", log.Fields{"deviceId": cloned.Id, "reason": cloned.Reason})
-	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	return agent.updateDeviceAndReleaseLock(ctx, cloned)
 }
 
 func (agent *Agent) ChildDeviceLost(ctx context.Context, device *voltha.Device) error {
@@ -754,19 +725,19 @@
 		return nil, err
 	}
 
-	device := agent.getDeviceWithoutLock()
+	cloned := agent.cloneDeviceWithoutLock()
 
-	if device.Adapter == "" {
-		adapterName, err := agent.adapterMgr.GetAdapterType(device.Type)
+	if cloned.Adapter == "" {
+		adapterName, err := agent.adapterMgr.GetAdapterType(cloned.Type)
 		if err != nil {
 			agent.requestQueue.RequestComplete()
 			return nil, err
 		}
-		device.Adapter = adapterName
+		cloned.Adapter = adapterName
 	}
 
 	// Send request to the adapter
-	ch, err := agent.adapterProxy.StartOmciTest(ctx, device, omcitestrequest)
+	ch, err := agent.adapterProxy.StartOmciTest(ctx, cloned, omcitestrequest)
 	agent.requestQueue.RequestComplete()
 	if err != nil {
 		return nil, err