VOL-1774 Etcd Crash Handling
Change-Id: I1eeb726654c3972fd0a4fafae134607e5a810415
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index e0aac9d..b85d572 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -88,6 +88,7 @@
func (agent *LogicalDeviceAgent) start(ctx context.Context, loadFromdB bool) error {
log.Infow("starting-logical_device-agent", log.Fields{"logicaldeviceId": agent.logicalDeviceID, "loadFromdB": loadFromdB})
var ld *voltha.LogicalDevice
+ var err error
if !loadFromdB {
//Build the logical device based on information retrieved from the device adapter
var switchCap *ic.SwitchCapability
@@ -113,7 +114,13 @@
agent.lockLogicalDevice.Lock()
// Save the logical device
- if added := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, ""); added == nil {
+ added, err := agent.clusterDataProxy.AddWithID(ctx, "/logical_devices", ld.Id, ld, "")
+ if err != nil {
+ log.Errorw("failed-to-save-logical-devices-to-cluster-proxy", log.Fields{"error": err})
+ agent.lockLogicalDevice.Unlock()
+ return err
+ }
+ if added == nil {
log.Errorw("failed-to-add-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
} else {
log.Debugw("logicaldevice-created", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
@@ -134,7 +141,10 @@
// load from dB - the logical may not exist at this time. On error, just return and the calling function
// will destroy this agent.
agent.lockLogicalDevice.Lock()
- logicalDevice := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+ logicalDevice, err := agent.clusterDataProxy.Get(context.Background(), "/logical_devices/"+agent.logicalDeviceID, 0, true, "")
+ if err != nil {
+ return status.Errorf(codes.NotFound, "logical_device-%s", agent.logicalDeviceID)
+ }
ld, ok := logicalDevice.(*voltha.LogicalDevice)
if !ok {
agent.lockLogicalDevice.Unlock()
@@ -154,23 +164,38 @@
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
- agent.flowProxy = agent.clusterDataProxy.CreateProxy(
+ agent.flowProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/flows", agent.logicalDeviceID),
false)
- agent.meterProxy = agent.clusterDataProxy.CreateProxy(
+ if err != nil {
+ log.Errorw("failed-to-create-flow-proxy", log.Fields{"error": err})
+ return err
+ }
+ agent.meterProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/meters", agent.logicalDeviceID),
false)
- agent.groupProxy = agent.clusterDataProxy.CreateProxy(
+ if err != nil {
+ log.Errorw("failed-to-create-meter-proxy", log.Fields{"error": err})
+ return err
+ }
+ agent.groupProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s/flow_groups", agent.logicalDeviceID),
false)
- agent.ldProxy = agent.clusterDataProxy.CreateProxy(
+ if err != nil {
+ log.Errorw("failed-to-create-group-proxy", log.Fields{"error": err})
+ return err
+ }
+ agent.ldProxy, err = agent.clusterDataProxy.CreateProxy(
ctx,
fmt.Sprintf("/logical_devices/%s", agent.logicalDeviceID),
false)
-
+ if err != nil {
+ log.Errorw("failed-to-create-logical-device-proxy", log.Fields{"error": err})
+ return err
+ }
// TODO: Use a port proxy once the POST_ADD is fixed
if agent.ldProxy != nil {
agent.ldProxy.RegisterCallback(model.POST_UPDATE, agent.portUpdated)
@@ -187,19 +212,23 @@
}
// stop stops the logical devuce agent. This removes the logical device from the data model.
-func (agent *LogicalDeviceAgent) stop(ctx context.Context) {
+func (agent *LogicalDeviceAgent) stop(ctx context.Context) error {
log.Info("stopping-logical_device-agent")
agent.lockLogicalDevice.Lock()
defer agent.lockLogicalDevice.Unlock()
//Remove the logical device from the model
- if removed := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); removed == nil {
+ if removed, err := agent.clusterDataProxy.Remove(ctx, "/logical_devices/"+agent.logicalDeviceID, ""); err != nil {
+ log.Errorw("failed-to-remove-device", log.Fields{"error": err})
+ return err
+ } else if removed == nil {
log.Errorw("failed-to-remove-logical-device", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
} else {
log.Debugw("logicaldevice-removed", log.Fields{"logicaldeviceId": agent.logicalDeviceID})
}
agent.exitChannel <- 1
log.Info("logical_device-agent-stopped")
+ return nil
}
// GetLogicalDevice returns the latest logical device data
@@ -246,7 +275,6 @@
// ListLogicalDevicePorts returns logical device ports
func (agent *LogicalDeviceAgent) ListLogicalDevicePorts() *voltha.LogicalPorts {
log.Debug("ListLogicalDevicePorts")
-
logicalDevice := agent.GetLogicalDevice()
lPorts := make([]*voltha.LogicalPort, 0)
lPorts = append(lPorts, logicalDevice.Ports...)
@@ -491,7 +519,11 @@
//updateLogicalDeviceWithoutLock updates the model with the logical device. It clones the logicaldevice before saving it
func (agent *LogicalDeviceAgent) updateLogicalDeviceWithoutLock(logicalDevice *voltha.LogicalDevice) error {
updateCtx := context.WithValue(context.Background(), model.RequestTimestamp, time.Now().UnixNano())
- afterUpdate := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
+ afterUpdate, err := agent.clusterDataProxy.Update(updateCtx, "/logical_devices/"+agent.logicalDeviceID, logicalDevice, false, "")
+ if err != nil {
+ log.Errorw("failed-to-update-logical-devices-to-cluster-proxy", log.Fields{"error": err})
+ return err
+ }
if afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-updating-logical-device:%s", agent.logicalDeviceID)
}
@@ -945,7 +977,7 @@
}
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: toKeep}); err != nil {
- log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
@@ -1068,7 +1100,7 @@
if changedFlow {
var flowMetadata voltha.FlowMetadata
if err := agent.GetMeterConfig(flowsToDelete, meters, &flowMetadata); err != nil {
- log.Error("Meter-referred-in-flows-not-present")
+ log.Error("meter-referred-in-flows-not-present")
return err
}
deviceRules := agent.flowDecomposer.DecomposeRules(agent, ofp.Flows{Items: flowsToDelete}, ofp.FlowGroups{})
@@ -1080,7 +1112,7 @@
}
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
- log.Errorw("Cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-flows", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
@@ -1124,7 +1156,7 @@
}
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
- log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
} else {
@@ -1173,13 +1205,13 @@
if groupsChanged {
if err := agent.updateLogicalDeviceFlowGroupsWithoutLock(&ofp.FlowGroups{Items: groups}); err != nil {
- log.Errorw("Cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-group", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}
if flowsChanged {
if err := agent.updateLogicalDeviceFlowsWithoutLock(&ofp.Flows{Items: flows}); err != nil {
- log.Errorw("Cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ log.Errorw("cannot-update-flow", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
return err
}
}