VOL-2868 Remove all uses of Proxy.RegisterCallback(...)

Change-Id: I05d47a9915071adb80ebc3c5f9b129ed6c36b54b
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 37a79c8..4c2b9f6 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -48,7 +48,6 @@
 	adapterMgr       *AdapterManager
 	deviceMgr        *DeviceManager
 	clusterDataProxy *model.Proxy
-	deviceProxy      *model.Proxy
 	exitChannel      chan int
 	device           *voltha.Device
 	requestQueue     *coreutils.RequestQueue
@@ -143,11 +142,6 @@
 		}
 		agent.device = device
 	}
-	var err error
-	if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
-		return nil, err
-	}
-	agent.deviceProxy.RegisterCallback(model.PostUpdate, agent.processUpdate)
 
 	startSucceeded = true
 	logger.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
@@ -168,11 +162,6 @@
 
 	logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
 
-	// First unregister any callbacks
-	if agent.deviceProxy != nil {
-		agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
-	}
-
 	//	Remove the device from the KV store
 	removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
 	if err != nil {
@@ -298,10 +287,7 @@
 
 	// Update the Admin State and set the operational state to activating before sending the request to the
 	// Adapters
-	cloned.AdminState = voltha.AdminState_ENABLED
-	cloned.OperStatus = voltha.OperStatus_ACTIVATING
-
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, voltha.OperStatus_ACTIVATING); err != nil {
 		return err
 	}
 
@@ -768,9 +754,7 @@
 	}
 
 	// Update the Admin State and operational state before sending the request out
-	cloned.AdminState = voltha.AdminState_DISABLED
-	cloned.OperStatus = voltha.OperStatus_UNKNOWN
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DISABLED, cloned.ConnectStatus, voltha.OperStatus_UNKNOWN); err != nil {
 		return err
 	}
 
@@ -816,8 +800,7 @@
 
 	// No check is required when deleting a device.  Changing the state to DELETE will trigger the removal of this
 	// device by the state machine
-	cloned.AdminState = voltha.AdminState_DELETED
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DELETED, cloned.ConnectStatus, cloned.OperStatus); err != nil {
 		return err
 	}
 
@@ -887,14 +870,7 @@
 	cloned := agent.getDeviceWithoutLock()
 	cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
-	if err != nil {
-		return err
-	}
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "pm-kv-update-failed-for-device-id-%s", agent.deviceID)
-	}
-	return nil
+	return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
 }
 
 func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
@@ -934,16 +910,15 @@
 		// Save the image
 		clonedImg := proto.Clone(img).(*voltha.ImageDownload)
 		clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
-		cloned := proto.Clone(device).(*voltha.Device)
-		if cloned.ImageDownloads == nil {
-			cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+		if device.ImageDownloads == nil {
+			device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
 		} else {
-			cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
+			device.ImageDownloads = append(device.ImageDownloads, clonedImg)
 		}
-		cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-		if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+		if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
 			return nil, err
 		}
+
 		// Send the request to the adapter
 		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
 		ch, err := agent.adapterProxy.downloadImage(ctx, cloned, clonedImg)
@@ -982,8 +957,7 @@
 	}
 
 	// Update image download state
-	cloned := proto.Clone(device).(*voltha.Device)
-	for _, image := range cloned.ImageDownloads {
+	for _, image := range device.ImageDownloads {
 		if image.Id == img.Id && image.Name == img.Name {
 			image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
 		}
@@ -991,8 +965,7 @@
 
 	if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
 		// Set the device to Enabled
-		cloned.AdminState = voltha.AdminState_ENABLED
-		if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+		if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
 			return nil, err
 		}
 		subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
@@ -1029,8 +1002,7 @@
 		}
 	}
 	// Set the device to downloading_image
-	cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+	if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
 		return nil, err
 	}
 
@@ -1137,13 +1109,9 @@
 	if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
 		img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
 		(img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
-		cloned.AdminState = voltha.AdminState_ENABLED
+		return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
 	}
-
-	if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
-		return err
-	}
-	return nil
+	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
 }
 
 func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
@@ -1278,37 +1246,6 @@
 	return nil
 }
 
-// processUpdate is a respCallback invoked whenever there is a change on the device manages by this device agent
-func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
-	//// Run this respCallback in its own go routine
-	go func(args ...interface{}) interface{} {
-		var previous *voltha.Device
-		var current *voltha.Device
-		var ok bool
-		if len(args) == 2 {
-			if previous, ok = args[0].(*voltha.Device); !ok {
-				logger.Errorw("invalid-callback-type", log.Fields{"data": args[0]})
-				return nil
-			}
-			if current, ok = args[1].(*voltha.Device); !ok {
-				logger.Errorw("invalid-callback-type", log.Fields{"data": args[1]})
-				return nil
-			}
-		} else {
-			logger.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
-			return nil
-		}
-		// Perform the state transition in it's own go routine
-		if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
-			logger.Errorw("failed-process-transition", log.Fields{"device-id": previous.Id,
-				"previous-admin-state": previous.AdminState, "current-admin-state": current.AdminState})
-		}
-		return nil
-	}(args...)
-
-	return nil
-}
-
 // updatePartialDeviceData updates a subset of a device that an Adapter can update.
 // TODO:  May need a specific proto to handle only a subset of a device that can be changed by an adapter
 func (agent *DeviceAgent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
@@ -1353,18 +1290,19 @@
 
 	cloned := agent.getDeviceWithoutLock()
 
+	newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
 	// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
 	if s, ok := voltha.ConnectStatus_Types_value[connStatus.String()]; ok {
 		logger.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
-		cloned.ConnectStatus = connStatus
+		newConnStatus = connStatus
 	}
 	if s, ok := voltha.OperStatus_Types_value[operStatus.String()]; ok {
 		logger.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
-		cloned.OperStatus = operStatus
+		newOperStatus = operStatus
 	}
 	logger.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 	// Store the device
-	return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+	return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
 }
 
 func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
@@ -1563,13 +1501,36 @@
 	return nil
 }
 
+func (agent *DeviceAgent) updateDeviceStateInStoreWithoutLock(
+	ctx context.Context,
+	device *voltha.Device,
+	adminState voltha.AdminState_Types,
+	connectStatus voltha.ConnectStatus_Types,
+	operStatus voltha.OperStatus_Types,
+) error {
+	previousState := getDeviceStates(device)
+	device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
+
+	if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+		return err
+	}
+
+	// process state transition in its own thread
+	go func() {
+		if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
+			log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
+		}
+	}()
+	return nil
+}
+
 //This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
 // It is an internal helper function.
 func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
 	updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
 	afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
 	if err != nil {
-		return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
+		return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
 	}
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
@@ -1577,7 +1538,6 @@
 	logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
 
 	agent.device = proto.Clone(device).(*voltha.Device)
-
 	return nil
 }