VOL-1798:Restructuring KV store update calls in DeviceAgent
Device updates to KV store moved into a separate function.
Change-Id: I7be29ad8f1e21b44be8389e3ef412f31da5baf18
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index a202c82..25b12b5 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -188,9 +188,8 @@
cloned.AdminState = voltha.AdminState_ENABLED
cloned.OperStatus = voltha.OperStatus_ACTIVATING
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return err
}
// Adopt the device if it was in preprovision state. In all other cases, try to reenable it.
@@ -550,9 +549,8 @@
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return err
}
if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
@@ -578,9 +576,8 @@
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = adminState
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return err
}
}
return nil
@@ -631,11 +628,9 @@
// the device as well as its association with the logical device
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DELETED
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return err
}
-
// If this is a child device then remove the associated peer ports on the parent device
if !device.Root {
go agent.deviceMgr.deletePeerPorts(device.ParentId, device.Id)
@@ -656,10 +651,8 @@
cloned := proto.Clone(storeDevice).(*voltha.Device)
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return err
}
// Send the request to the adapter
if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
@@ -726,9 +719,8 @@
cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
}
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return nil, err
}
// Send the request to the adapter
if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
@@ -773,9 +765,8 @@
if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
// Set the device to Enabled
cloned.AdminState = voltha.AdminState_ENABLED
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return nil, err
}
// Send the request to teh adapter
if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
@@ -812,9 +803,8 @@
}
// Set the device to downloading_image
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return nil, err
}
if err := agent.adapterProxy.ActivateImageUpdate(ctx, device, img); err != nil {
@@ -850,9 +840,9 @@
image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
}
}
- updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return nil, err
}
if err := agent.adapterProxy.RevertImageUpdate(ctx, device, img); err != nil {
@@ -906,9 +896,8 @@
cloned.AdminState = voltha.AdminState_ENABLED
}
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+ return err
}
}
return nil
@@ -1036,23 +1025,13 @@
defer agent.lockDevice.Unlock()
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", device.Id)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", device.Id)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
@@ -1075,11 +1054,7 @@
}
log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1096,11 +1071,7 @@
port.OperStatus = voltha.OperStatus_ACTIVE
}
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1118,11 +1089,7 @@
port.OperStatus = voltha.OperStatus_UNKNOWN
}
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1153,11 +1120,7 @@
}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1183,11 +1146,7 @@
cloned.Ports = []*voltha.Port{}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1221,12 +1180,7 @@
}
cloned.Ports = append(cloned.Ports, cp)
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1250,12 +1204,7 @@
}
}
// Store the device
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1281,12 +1230,7 @@
}
// Store the device with updated peer ports
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
- if afterUpdate == nil {
- return status.Errorf(codes.Internal, "%s", agent.deviceId)
- }
- return nil
+ return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
}
}
@@ -1324,8 +1268,7 @@
log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
// Save the data
cloned := proto.Clone(storeDevice).(*voltha.Device)
- updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ if err = agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
return
@@ -1347,3 +1290,15 @@
}
return nil
}
+
+//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
+// It is an internal helper function.
+func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(device *voltha.Device, strict bool, txid string) error {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, device, strict, txid); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+ }
+ log.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceId})
+
+ return nil
+}