VOL-1775 VOL-1779 VOL-1780 : Fix several issues with overall stability
- Apply changes as reported by golang race utility
- Added version attribute in KV object
- Added context object to db/model api
- Carrying timestamp info through context to help in the
decision making when applying a revision change
- Replaced proxy access control mechanism with etcd reservation mechanism
Change-Id: If3d142a73b1da0d64fa6a819530f297dbfada2d3
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 0ce1cce..0198254 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -29,6 +29,7 @@
"google.golang.org/grpc/status"
"reflect"
"sync"
+ "time"
)
type DeviceAgent struct {
@@ -85,7 +86,7 @@
defer agent.lockDevice.Unlock()
log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceId})
if loadFromdB {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 1, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceId, 1, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.lastData = proto.Clone(d).(*voltha.Device)
agent.deviceType = agent.lastData.Adapter
@@ -97,12 +98,12 @@
log.Debugw("device-loaded-from-dB", log.Fields{"device": agent.lastData})
} else {
// Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID("/devices", agent.deviceId, agent.lastData, ""); added == nil {
+ if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceId, agent.lastData, ""); added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceId})
}
}
- agent.deviceProxy = agent.clusterDataProxy.CreateProxy("/devices/"+agent.deviceId, false)
+ agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceId, false)
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
log.Debug("device-agent-started")
@@ -115,7 +116,7 @@
defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
// Remove the device from the KV store
- if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
+ if removed := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceId, ""); removed == nil {
log.Debugw("device-already-removed", log.Fields{"id": agent.deviceId})
}
agent.exitChannel <- 1
@@ -127,7 +128,7 @@
func (agent *DeviceAgent) getDevice() (*voltha.Device, error) {
agent.lockDevice.RLock()
defer agent.lockDevice.RUnlock()
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, true, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -139,7 +140,7 @@
// getDeviceWithoutLock is a helper function to be used ONLY by any device agent function AFTER it has acquired the device lock.
// This function is meant so that we do not have duplicate code all over the device agent functions
func (agent *DeviceAgent) getDeviceWithoutLock() (*voltha.Device, error) {
- if device := agent.clusterDataProxy.Get("/devices/"+agent.deviceId, 0, false, ""); device != nil {
+ if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceId, 0, false, ""); device != nil {
if d, ok := device.(*voltha.Device); ok {
cloned := proto.Clone(d).(*voltha.Device)
return cloned, nil
@@ -186,7 +187,9 @@
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_ENABLED
cloned.OperStatus = voltha.OperStatus_ACTIVATING
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -547,7 +550,8 @@
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DISABLED
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -574,7 +578,8 @@
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = adminState
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
}
@@ -626,7 +631,8 @@
// the device as well as its association with the logical device
cloned := proto.Clone(device).(*voltha.Device)
cloned.AdminState = voltha.AdminState_DELETED
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -660,7 +666,8 @@
cloned.ImageDownloads = append(cloned.ImageDownloads, clonedImg)
}
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
// Send the request to the adapter
@@ -706,7 +713,8 @@
if device.AdminState == voltha.AdminState_DOWNLOADING_IMAGE {
// Set the device to Enabled
cloned.AdminState = voltha.AdminState_ENABLED
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
// Send the request to teh adapter
@@ -744,7 +752,8 @@
}
// Set the device to downloading_image
cloned.AdminState = voltha.AdminState_DOWNLOADING_IMAGE
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -781,7 +790,8 @@
image.ImageState = voltha.ImageDownload_IMAGE_REVERTING
}
}
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(ctx, model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return nil, status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
@@ -836,7 +846,8 @@
cloned.AdminState = voltha.AdminState_ENABLED
}
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
}
@@ -965,7 +976,8 @@
defer agent.lockDevice.Unlock()
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -975,7 +987,8 @@
func (agent *DeviceAgent) updateDeviceWithoutLock(device *voltha.Device) error {
log.Debugw("updateDevice", log.Fields{"deviceId": device.Id})
cloned := proto.Clone(device).(*voltha.Device)
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+device.Id, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+device.Id, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", device.Id)
}
@@ -1002,7 +1015,8 @@
}
log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1022,7 +1036,8 @@
port.OperStatus = voltha.OperStatus_ACTIVE
}
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1043,7 +1058,8 @@
port.OperStatus = voltha.OperStatus_UNKNOWN
}
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1077,7 +1093,8 @@
}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1106,7 +1123,8 @@
cloned.Ports = []*voltha.Port{}
log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
// Store the device
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
return nil
@@ -1125,7 +1143,8 @@
cloned := proto.Clone(storeDevice).(*voltha.Device)
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1163,7 +1182,8 @@
}
cloned.Ports = append(cloned.Ports, cp)
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1191,7 +1211,8 @@
}
}
// Store the device
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1221,7 +1242,8 @@
}
// Store the device with updated peer ports
- afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, "")
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
@@ -1263,7 +1285,8 @@
log.Debugw("update-field-status", log.Fields{"deviceId": storeDevice.Id, "name": name, "updated": updated})
// Save the data
cloned := proto.Clone(storeDevice).(*voltha.Device)
- if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
+ if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
log.Warnw("attribute-update-failed", log.Fields{"attribute": name, "value": value})
}
return