VOL-1774 Etcd Crash Handling
Change-Id: I1eeb726654c3972fd0a4fafae134607e5a810415
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index ca3ffbb..8cbc63a 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -88,7 +88,12 @@
log.Debugw("starting-device-agent", log.Fields{"deviceId": agent.deviceID})
if deviceToCreate == nil {
// Load the existing device
- if loadedDevice := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, ""); loadedDevice != nil {
+ loadedDevice, err := agent.clusterDataProxy.Get(ctx, "/devices/"+agent.deviceID, 1, true, "")
+ if err != nil {
+ log.Errorw("failed-to-get-from-cluster-data-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if loadedDevice != nil {
var ok bool
if device, ok = loadedDevice.(*voltha.Device); ok {
agent.deviceType = device.Adapter
@@ -119,14 +124,22 @@
}
// Add the initial device to the local model
- if added := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, ""); added == nil {
+ added, err := agent.clusterDataProxy.AddWithID(ctx, "/devices", agent.deviceID, device, "")
+ if err != nil {
+ log.Errorw("failed-to-save-devices-to-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
+ if added == nil {
log.Errorw("failed-to-add-device", log.Fields{"deviceId": agent.deviceID})
return nil, status.Errorf(codes.Aborted, "failed-adding-device-%s", agent.deviceID)
}
agent.device = proto.Clone(device).(*voltha.Device)
}
-
- agent.deviceProxy = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false)
+ var err error
+ if agent.deviceProxy, err = agent.clusterDataProxy.CreateProxy(ctx, "/devices/"+agent.deviceID, false); err != nil {
+ log.Errorw("failed-to-add-devices-to-cluster-proxy", log.Fields{"error": err})
+ return nil, err
+ }
agent.deviceProxy.RegisterCallback(model.POST_UPDATE, agent.processUpdate)
log.Debugw("device-agent-started", log.Fields{"deviceId": agent.deviceID})
@@ -143,7 +156,12 @@
agent.deviceProxy.UnregisterCallback(model.POST_UPDATE, agent.processUpdate)
// Remove the device from the KV store
- if removed := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, ""); removed == nil {
+ removed, err := agent.clusterDataProxy.Remove(ctx, "/devices/"+agent.deviceID, "")
+ if err != nil {
+ log.Errorw("Failed-to-remove-device-from-cluster-data-proxy", log.Fields{"error": err})
+ return
+ }
+ if removed == nil {
log.Debugw("device-already-removed", log.Fields{"id": agent.deviceID})
}
agent.exitChannel <- 1
@@ -157,7 +175,12 @@
defer agent.lockDevice.Unlock()
log.Debug("reconciling-device-agent-devicetype")
// TODO: context timeout
- if device := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceID, 1, true, ""); device != nil {
+ device, err := agent.clusterDataProxy.Get(context.Background(), "/devices/"+agent.deviceID, 1, true, "")
+ if err != nil {
+ log.Errorw("Failed to get device info from cluster data proxy", log.Fields{"error": err})
+ return
+ }
+ if device != nil {
if d, ok := device.(*voltha.Device); ok {
agent.deviceType = d.Adapter
agent.device = proto.Clone(d).(*voltha.Device)
@@ -743,7 +766,10 @@
cloned.PmConfigs = proto.Clone(pmConfigs).(*voltha.PmConfigs)
// Store the device
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
+ afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, cloned, false, "")
+ if err != nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceID)
+ }
if afterUpdate == nil {
return status.Errorf(codes.Internal, "%s", agent.deviceID)
}
@@ -1331,7 +1357,11 @@
// It is an internal helper function.
func (agent *DeviceAgent) updateDeviceInStoreWithoutLock(device *voltha.Device, strict bool, txid string) error {
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- if afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid); afterUpdate == nil {
+ afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/devices/"+agent.deviceID, device, strict, txid)
+ if err != nil {
+ return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
+ }
+ if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceID)
}
log.Debugw("updated-device-in-store", log.Fields{"deviceId: ": agent.deviceID})