VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability

- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
  decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism

Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0ce1cce..0198254 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -29,6 +29,7 @@
 	"google.golang.org/grpc/status"
 	"reflect"
 	"sync"
+	"time"
 )
 
 type DeviceAgent struct {
@@ -85,7 +86,7 @@
 	defer agent.lockDevice.Unlock()
 	log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
 	if loadFromdB {
-		if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+		if device := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, false, ""); device != nil {
 			if d, ok := device.(*voltha.Device); ok {
 				agent.lastData = proto.Clone(d).(*voltha.Device)
 				agent.deviceType = agent.lastData.Adapter
@@ -97,12 +98,12 @@
 		log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
 	} else {
 		// Add the initial device to the local model
-		if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+		if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
 			log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
 		}
 	}
 
-	agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
+	agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceId, false)
 	agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
 
 	log.Debug("device-agent-started")
@@ -115,7 +116,7 @@
 	defer agent.lockDevice.Unlock()
 	log.Debug("stopping-device-agent")
 	//	Remove the device from the KV store
-	if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
+	if removed := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceId, ""); removed == nil {
 		log.Debugw("device-already-removed", log.Fields{"id": agent.deviceId})
 	}
 	agent.exitChannel <- 1
@@ -127,7 +128,7 @@
 func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
 	agent.lockDevice.RLock()
 	defer agent.lockDevice.RUnlock()
-	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+	if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, true, ""); device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			cloned := proto.Clone(d).(*voltha.Device)
 			return cloned, nil
@@ -139,7 +140,7 @@
 // getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
 // This function is meant so that we do not have duplicate code all over the device agent functions
 func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
-	if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+	if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, false, ""); device != nil {
 		if d, ok := device.(*voltha.Device); ok {
 			cloned := proto.Clone(d).(*voltha.Device)
 			return cloned, nil
@@ -186,7 +187,9 @@
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = voltha.AdminState_ENABLED
 		cloned.OperStatus = voltha.OperStatus_ACTIVATING
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 
@@ -547,7 +550,8 @@
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = voltha.AdminState_DISABLED
 		cloned.OperStatus = voltha.OperStatus_UNKNOWN
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 
@@ -574,7 +578,8 @@
 		// Received an Ack (no error found above).  Now update the device in the model to the expected state
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = adminState
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 	}
@@ -626,7 +631,8 @@
 		//	the device as well as its association with the logical device
 		cloned := proto.Clone(device).(*voltha.Device)
 		cloned.AdminState = voltha.AdminState_DELETED
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 
@@ -660,7 +666,8 @@
 			cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
 		}
 		cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 		// Send the request to the adapter
@@ -706,7 +713,8 @@
 		if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
 			// Set the device to Enabled
 			cloned.AdminState = voltha.AdminState_ENABLED
-			if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+			updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+			if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 				return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 			}
 			// Send the request to teh adapter
@@ -744,7 +752,8 @@
 		}
 		// Set the device to downloading_image
 		cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 
@@ -781,7 +790,8 @@
 				image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
 			}
 		}
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 
@@ -836,7 +846,8 @@
 			cloned.AdminState = voltha.AdminState_ENABLED
 		}
 
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
 		}
 	}
@@ -965,7 +976,8 @@
 	defer agent.lockDevice.Unlock()
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	cloned := proto.Clone(device).(*voltha.Device)
-	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "%s", device.Id)
 	}
@@ -975,7 +987,8 @@
 func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
 	log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
 	cloned := proto.Clone(device).(*voltha.Device)
-	afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
 	if afterUpdate == nil {
 		return status.Errorf(codes.Internal, "%s", device.Id)
 	}
@@ -1002,7 +1015,8 @@
 		}
 		log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		return nil
@@ -1022,7 +1036,8 @@
 			port.OperStatus = voltha.OperStatus_ACTIVE
 		}
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		return nil
@@ -1043,7 +1058,8 @@
 			port.OperStatus = voltha.OperStatus_UNKNOWN
 		}
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		return nil
@@ -1077,7 +1093,8 @@
 		}
 		log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		return nil
@@ -1106,7 +1123,8 @@
 		cloned.Ports = []*voltha.Port{}
 		log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
 		// Store the device
-		if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
 		return nil
@@ -1125,7 +1143,8 @@
 		cloned := proto.Clone(storeDevice).(*voltha.Device)
 		cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -1163,7 +1182,8 @@
 		}
 		cloned.Ports = append(cloned.Ports, cp)
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -1191,7 +1211,8 @@
 			}
 		}
 		// Store the device
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -1221,7 +1242,8 @@
 		}
 
 		// Store the device with updated peer ports
-		afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+		updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+		afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
 		if afterUpdate == nil {
 			return status.Errorf(codes.Internal, "%s", agent.deviceId)
 		}
@@ -1263,7 +1285,8 @@
 	log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
 	//	Save the data
 	cloned := proto.Clone(storeDevice).(*voltha.Device)
-	if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+	updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+	if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
 		log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
 	}
 	return