VOL-1798:Restructuring KV store update calls in DeviceAgent

Device updates to KV store moved into a separate function.
Change-Id: I7be29ad8f1e21b44be8389e3ef412f31da5baf18
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index a202c82..25b12b5 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -188,9 +188,8 @@
 		cloned.AdminState = voltha.AdminState_ENABLED
 		cloned.OperStatus = voltha.OperStatus_ACTIVATING
 
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return err
 		}
 
 		// Adopt the device if it was in preprovision state.  In all other cases, try to reenable it.
@@ -550,9 +549,8 @@
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = voltha.AdminState_DISABLED
 		cloned.OperStatus = voltha.OperStatus_UNKNOWN
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return err
 		}
 
 		if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
@@ -578,9 +576,8 @@
 		// Received an Ack (no error found above).  Now update the device in the model to the expected state
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = adminState
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return err
 		}
 	}
 	return nil
@@ -631,11 +628,9 @@
 		//	the device as well as its association with the logical device
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = voltha.AdminState_DELETED
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return err
 		}
-
 		//	If this is a child device then remove the associated peer ports on the parent device
 		if !device.Root {
 			go agent.deviceMgr.deletePeerPorts(device.ParentId, device.Id)
@@ -656,10 +651,8 @@
 		cloned := proto.Clone(storeDevice).(*voltha.Device)
 		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 		// Store the device
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return err
 		}
 		// Send the request to the adapter
 		if err := agent.adapterProxy.UpdatePmConfigs(ctx, cloned, pmConfigs); err != nil {
@@ -726,9 +719,8 @@
 			cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
 		}
 		cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return nil, err
 		}
 		// Send the request to the adapter
 		if err := agent.adapterProxy.DownloadImage(ctx, cloned, clonedImg); err != nil {
@@ -773,9 +765,8 @@
 		if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
 			// Set the device to Enabled
 			cloned.AdminState = voltha.AdminState_ENABLED
-			updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-			if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-				return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+			if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+				return nil, err
 			}
 			// Send the request to teh adapter
 			if err := agent.adapterProxy.CancelImageDownload(ctx, device, img); err != nil {
@@ -812,9 +803,8 @@
 		}
 		// Set the device to downloading_image
 		cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return nil, err
 		}
 
 		if err := agent.adapterProxy.ActivateImageUpdate(ctx, device, img); err != nil {
@@ -850,9 +840,9 @@
 				image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
 			}
 		}
-		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return nil, err
 		}
 
 		if err := agent.adapterProxy.RevertImageUpdate(ctx, device, img); err != nil {
@@ -906,9 +896,8 @@
 			cloned.AdminState = voltha.AdminState_ENABLED
 		}
 
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+		if err := agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
+			return err
 		}
 	}
 	return nil
@@ -1036,23 +1025,13 @@
 	defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	cloned := proto.Clone(device).(*voltha.Device)
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "%s", device.Id)
-	}
-	return nil
+	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 }
 
 func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	cloned := proto.Clone(device).(*voltha.Device)
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
-	if afterUpdate == nil {
-		return status.Errorf(codes.Internal, "%s", device.Id)
-	}
-	return nil
+	return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 }
 
 func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
@@ -1075,11 +1054,7 @@
 		}
 		log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1096,11 +1071,7 @@
 			port.OperStatus = voltha.OperStatus_ACTIVE
 		}
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1118,11 +1089,7 @@
 			port.OperStatus = voltha.OperStatus_UNKNOWN
 		}
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1153,11 +1120,7 @@
 		}
 		log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1183,11 +1146,7 @@
 		cloned.Ports = []*voltha.Port{}
 		log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1221,12 +1180,7 @@
 		}
 		cloned.Ports = append(cloned.Ports, cp)
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1250,12 +1204,7 @@
 			}
 		}
 		// Store the device
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1281,12 +1230,7 @@
 		}
 
 		// Store the device with updated peer ports
-		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
-		if afterUpdate == nil {
-			return status.Errorf(codes.Internal, "%s", agent.deviceId)
-		}
-		return nil
+		return agent.updateDeviceInStoreWithoutLock(cloned, false, "")
 	}
 }
 
@@ -1324,8 +1268,7 @@
 	log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
 	//	Save the data
 	cloned := proto.Clone(storeDevice).(*voltha.Device)
-	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
-	if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+	if err = agent.updateDeviceInStoreWithoutLock(cloned, false, ""); err != nil {
 		log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
 	}
 	return
@@ -1347,3 +1290,15 @@
 	}
 	return nil
 }
+
+//This is an update operation to model without Lock.This function must never be invoked by another function unless the latter holds a lock on the device.
+// It is an internal helper function.
+func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(device *voltha.Device, strict bool, txid string) error {
+	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, device, strict, txid); afterUpdate == nil {
+		return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
+	}
+	log.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceId})
+
+	return nil
+}