VOL-2868 Remove all uses of Proxy.RegisterCallback(...)
Change-Id: I05d47a9915071adb80ebc3c5f9b129ed6c36b54b
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 37a79c8..4c2b9f6 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -48,7 +48,6 @@
adapterMgr *AdapterManager
deviceMgr *DeviceManager
clusterDataProxy *model.Proxy
- deviceProxy *model.Proxy
exitChannel chan int
device *voltha.Device
requestQueue *coreutils.RequestQueue
@@ -143,11 +142,6 @@
}
agent.device = device
}
- var err error
- if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
- return nil, err
- }
- agent.deviceProxy.RegisterCallback(model.PostUpdate, agent.processUpdate)
startSucceeded = true
logger.Debugw("device-agent-started", log.Fields{"device-id": agent.deviceID})
@@ -168,11 +162,6 @@
logger.Infow("stopping-device-agent", log.Fields{"deviceId": agent.deviceID, "parentId": agent.parentID})
- // First unregister any callbacks
- if agent.deviceProxy != nil {
- agent.deviceProxy.UnregisterCallback(model.PostUpdate, agent.processUpdate)
- }
-
// Remove the device from the KV store
removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
if err != nil {
@@ -298,10 +287,7 @@
// Update the Admin State and set the operational state to activating before sending the request to the
// Adapters
- cloned.AdminState = voltha.AdminState_ENABLED
- cloned.OperStatus = voltha.OperStatus_ACTIVATING
-
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, voltha.OperStatus_ACTIVATING); err != nil {
return err
}
@@ -768,9 +754,7 @@
}
// Update the Admin State and operational state before sending the request out
- cloned.AdminState = voltha.AdminState_DISABLED
- cloned.OperStatus = voltha.OperStatus_UNKNOWN
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DISABLED, cloned.ConnectStatus, voltha.OperStatus_UNKNOWN); err != nil {
return err
}
@@ -816,8 +800,7 @@
// No check is required when deleting a device. Changing the state to DELETE will trigger the removal of this
// device by the state machine
- cloned.AdminState = voltha.AdminState_DELETED
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DELETED, cloned.ConnectStatus, cloned.OperStatus); err != nil {
return err
}
@@ -887,14 +870,7 @@
cloned := agent.getDeviceWithoutLock()
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
- if err != nil {
- return err
- }
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "pm-kv-update-failed-for-device-id-%s", agent.deviceID)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(updateCtx, cloned, false, "")
}
func (agent *DeviceAgent) listPmConfigs(ctx context.Context) (*voltha.PmConfigs, error) {
@@ -934,16 +910,15 @@
// Save the image
clonedImg := proto.Clone(img).(*voltha.ImageDownload)
clonedImg.DownloadState = voltha.ImageDownload_DOWNLOAD_REQUESTED
- cloned := proto.Clone(device).(*voltha.Device)
- if cloned.ImageDownloads == nil {
- cloned.ImageDownloads = []*voltha.ImageDownload{clonedImg}
+ if device.ImageDownloads == nil {
+ device.ImageDownloads = []*voltha.ImageDownload{clonedImg}
} else {
- cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
+ device.ImageDownloads = append(device.ImageDownloads, clonedImg)
}
- cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, device.ConnectStatus, device.OperStatus); err != nil {
return nil, err
}
+
// Send the request to the adapter
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
ch, err := agent.adapterProxy.downloadImage(ctx, cloned, clonedImg)
@@ -982,8 +957,7 @@
}
// Update image download state
- cloned := proto.Clone(device).(*voltha.Device)
- for _, image := range cloned.ImageDownloads {
+ for _, image := range device.ImageDownloads {
if image.Id == img.Id && image.Name == img.Name {
image.DownloadState = voltha.ImageDownload_DOWNLOAD_CANCELLED
}
@@ -991,8 +965,7 @@
if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
// Set the device to Enabled
- cloned.AdminState = voltha.AdminState_ENABLED
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, device, voltha.AdminState_ENABLED, device.ConnectStatus, device.OperStatus); err != nil {
return nil, err
}
subCtx, cancel := context.WithTimeout(context.Background(), agent.defaultTimeout)
@@ -1029,8 +1002,7 @@
}
}
// Set the device to downloading_image
- cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
+ if err := agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_DOWNLOADING_IMAGE, cloned.ConnectStatus, cloned.OperStatus); err != nil {
return nil, err
}
@@ -1137,13 +1109,9 @@
if (img.DownloadState != voltha.ImageDownload_DOWNLOAD_REQUESTED &&
img.DownloadState != voltha.ImageDownload_DOWNLOAD_STARTED) ||
(img.ImageState != voltha.ImageDownload_IMAGE_ACTIVATING) {
- cloned.AdminState = voltha.AdminState_ENABLED
+ return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, voltha.AdminState_ENABLED, cloned.ConnectStatus, cloned.OperStatus)
}
-
- if err := agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, ""); err != nil {
- return err
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
}
func (agent *DeviceAgent) getImageDownload(ctx context.Context, img *voltha.ImageDownload) (*voltha.ImageDownload, error) {
@@ -1278,37 +1246,6 @@
return nil
}
-// processUpdate is a respCallback invoked whenever there is a change on the device manages by this device agent
-func (agent *DeviceAgent) processUpdate(ctx context.Context, args ...interface{}) interface{} {
- //// Run this respCallback in its own go routine
- go func(args ...interface{}) interface{} {
- var previous *voltha.Device
- var current *voltha.Device
- var ok bool
- if len(args) == 2 {
- if previous, ok = args[0].(*voltha.Device); !ok {
- logger.Errorw("invalid-callback-type", log.Fields{"data": args[0]})
- return nil
- }
- if current, ok = args[1].(*voltha.Device); !ok {
- logger.Errorw("invalid-callback-type", log.Fields{"data": args[1]})
- return nil
- }
- } else {
- logger.Errorw("too-many-args-in-callback", log.Fields{"len": len(args)})
- return nil
- }
- // Perform the state transition in it's own go routine
- if err := agent.deviceMgr.processTransition(context.Background(), previous, current); err != nil {
- logger.Errorw("failed-process-transition", log.Fields{"device-id": previous.Id,
- "previous-admin-state": previous.AdminState, "current-admin-state": current.AdminState})
- }
- return nil
- }(args...)
-
- return nil
-}
-
// updatePartialDeviceData updates a subset of a device that an Adapter can update.
// TODO: May need a specific proto to handle only a subset of a device that can be changed by an adapter
func (agent *DeviceAgent) mergeDeviceInfoFromAdapter(device *voltha.Device) (*voltha.Device, error) {
@@ -1353,18 +1290,19 @@
cloned := agent.getDeviceWithoutLock()
+ newConnStatus, newOperStatus := cloned.ConnectStatus, cloned.OperStatus
// Ensure the enums passed in are valid - they will be invalid if they are not set when this function is invoked
if s, ok := voltha.ConnectStatus_Types_value[connStatus.String()]; ok {
logger.Debugw("updateDeviceStatus-conn", log.Fields{"ok": ok, "val": s})
- cloned.ConnectStatus = connStatus
+ newConnStatus = connStatus
}
if s, ok := voltha.OperStatus_Types_value[operStatus.String()]; ok {
logger.Debugw("updateDeviceStatus-oper", log.Fields{"ok": ok, "val": s})
- cloned.OperStatus = operStatus
+ newOperStatus = operStatus
}
logger.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- return agent.updateDeviceInStoreWithoutLock(ctx, cloned, false, "")
+ return agent.updateDeviceStateInStoreWithoutLock(ctx, cloned, cloned.AdminState, newConnStatus, newOperStatus)
}
func (agent *DeviceAgent) updatePortsOperState(ctx context.Context, operStatus voltha.OperStatus_Types) error {
@@ -1563,13 +1501,36 @@
return nil
}
+func (agent *DeviceAgent) updateDeviceStateInStoreWithoutLock(
+ ctx context.Context,
+ device *voltha.Device,
+ adminState voltha.AdminState_Types,
+ connectStatus voltha.ConnectStatus_Types,
+ operStatus voltha.OperStatus_Types,
+) error {
+ previousState := getDeviceStates(device)
+ device.AdminState, device.ConnectStatus, device.OperStatus = adminState, connectStatus, operStatus
+
+ if err := agent.updateDeviceInStoreWithoutLock(ctx, device, false, ""); err != nil {
+ return err
+ }
+
+ // process state transition in its own thread
+ go func() {
+ if err := agent.deviceMgr.processTransition(context.Background(), device, previousState); err != nil {
+ log.Errorw("failed-process-transition", log.Fields{"deviceId": device.Id, "previousAdminState": previousState.Admin, "currentAdminState": device.AdminState})
+ }
+ }()
+ return nil
+}
+
//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
// It is an internal helper function.
func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(ctx context.Context, device *voltha.Device, strict bool, txid string) error {
updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
if err != nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
+ return status.Errorf(codes.Internal, "failed-update-device:%s: %s", agent.deviceID, err)
}
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
@@ -1577,7 +1538,6 @@
logger.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})
agent.device = proto.Clone(device).(*voltha.Device)
-
return nil
}