[VOL-1614] Device Management update in the Core
This commit went over the device management of devices in the Core
and made the following changes:
1) Update the device state machine to not remove logical
device or ports when a device is disabled.
2) Fix some issues around device deletion
3) Add additional APIs between the Core and Adapters to handle
the scenarios of enable/disable/delete a device
4) Update the simulated Adapters to handle disable/reenable/delete
5) Add a new set of tests for teh device state machine.
Change-Id: Ib2be87ec011762d5315a6d54581a87c1891e92be
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 9782fee..d45fca6 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -112,8 +112,13 @@
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
log.Debug("stopping-device-agent")
+ // Remove the device from the KV store
+ if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
+ log.Errorw("failed-removing-device", log.Fields{"id": agent.deviceId})
+ }
agent.exitChannel <- 1
log.Debug("device-agent-stopped")
+
}
// GetDevice retrieves the latest device information from the data model
@@ -186,16 +191,19 @@
log.Debugw("adoptDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
}
- } else {
+ } else if device.AdminState == voltha.AdminState_DISABLED {
// First send the request to an Adapter and wait for a response
if err := agent.adapterProxy.ReEnableDevice(ctx, device); err != nil {
log.Debugw("renableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
return err
}
+ } else {
+ err = status.Error(codes.FailedPrecondition, fmt.Sprintf("cannot-delete-a-deleted-device: %s ", device.Id))
+ log.Warnw("invalid-state", log.Fields{"id": agent.deviceId, "state": device.AdminState, "error": err})
+ return err
}
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned := proto.Clone(device).(*voltha.Device)
- cloned.AdminState = voltha.AdminState_ENABLED
cloned.OperStatus = voltha.OperStatus_ACTIVATING
if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
@@ -323,32 +331,48 @@
//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
- agent.lockDevice.Lock()
+ agent.lockDevice.RLock()
log.Debugw("disableDevice", log.Fields{"id": agent.deviceId})
// Get the most up to date the device info
if device, err := agent.getDeviceWithoutLock(); err != nil {
- agent.lockDevice.Unlock()
+ agent.lockDevice.RUnlock()
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
+ agent.lockDevice.RUnlock()
if device.AdminState == voltha.AdminState_DISABLED {
log.Debugw("device-already-disabled", log.Fields{"id": agent.deviceId})
- agent.lockDevice.Unlock()
return nil
}
// First send the request to an Adapter and wait for a response
if err := agent.adapterProxy.DisableDevice(ctx, device); err != nil {
log.Debugw("disableDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
- agent.lockDevice.Unlock()
return err
}
+ if err = agent.updateAdminState(voltha.AdminState_DISABLED); err != nil {
+ log.Errorw("failed-update-device", log.Fields{"deviceId": device.Id, "currentState": device.AdminState, "expectedState": voltha.AdminState_DISABLED})
+ }
+ }
+ return nil
+}
+
+func (agent *DeviceAgent) updateAdminState(adminState voltha.AdminState_AdminState) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debugw("updateAdminState", log.Fields{"id": agent.deviceId})
+ // Get the most up to date the device info
+ if device, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if device.AdminState == adminState {
+ log.Debugw("no-change-needed", log.Fields{"id": agent.deviceId, "state": adminState})
+ return nil
+ }
// Received an Ack (no error found above). Now update the device in the model to the expected state
cloned := proto.Clone(device).(*voltha.Device)
- cloned.AdminState = voltha.AdminState_DISABLED
+ cloned.AdminState = adminState
if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
- agent.lockDevice.Unlock()
}
return nil
}
@@ -377,32 +401,41 @@
func (agent *DeviceAgent) deleteDevice(ctx context.Context) error {
agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
log.Debugw("deleteDevice", log.Fields{"id": agent.deviceId})
// Get the most up to date the device info
if device, err := agent.getDeviceWithoutLock(); err != nil {
- agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
+ if device.AdminState == voltha.AdminState_DELETED {
+ log.Debugw("device-already-in-deleted-state", log.Fields{"id": agent.deviceId})
+ return nil
+ }
if (device.AdminState != voltha.AdminState_DISABLED) &&
(device.AdminState != voltha.AdminState_PREPROVISIONED) {
log.Debugw("device-not-disabled", log.Fields{"id": agent.deviceId})
//TODO: Needs customized error message
- agent.lockDevice.Unlock()
return status.Errorf(codes.FailedPrecondition, "deviceId:%s, expected-admin-state:%s", agent.deviceId, voltha.AdminState_DISABLED)
}
- if device.AdminState != voltha.AdminState_PREPROVISIONED {
- // Send the request to an Adapter and wait for a response
- if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
- log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
- agent.lockDevice.Unlock()
- return err
- }
+ // Send the request to an Adapter and wait for a response
+ if err := agent.adapterProxy.DeleteDevice(ctx, device); err != nil {
+ log.Debugw("deleteDevice-error", log.Fields{"id": agent.lastData.Id, "error": err})
+ return err
}
- if removed := agent.clusterDataProxy.Remove("/devices/"+agent.deviceId, ""); removed == nil {
- agent.lockDevice.Unlock()
+
+ // Set the state to deleted - this will trigger some background process to clean up the device as well
+ // as its association with the logical device
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.AdminState = voltha.AdminState_DELETED
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
return status.Errorf(codes.Internal, "failed-update-device:%s", agent.deviceId)
}
- agent.lockDevice.Unlock()
+
+ // If this is a child device then remove the associated peer ports on the parent device
+ if !device.Root {
+ go agent.deviceMgr.deletePeerPorts(device.ParentId, device.Id)
+ }
+
}
return nil
}
@@ -753,9 +786,9 @@
func (agent *DeviceAgent) updateDeviceStatus(operStatus voltha.OperStatus_OperStatus, connStatus voltha.ConnectStatus_ConnectStatus) error {
agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
// Work only on latest data
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
- agent.lockDevice.Unlock()
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
} else {
// clone the device
@@ -772,10 +805,8 @@
log.Debugw("updateDeviceStatus", log.Fields{"deviceId": cloned.Id, "operStatus": cloned.OperStatus, "connectStatus": cloned.ConnectStatus})
// Store the device
if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
- agent.lockDevice.Unlock()
return status.Errorf(codes.Internal, "%s", agent.deviceId)
}
- agent.lockDevice.Unlock()
return nil
}
}
@@ -801,6 +832,7 @@
}
func (agent *DeviceAgent) disablePorts() error {
+ log.Debugw("disablePorts", log.Fields{"deviceid": agent.deviceId})
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
@@ -857,6 +889,35 @@
}
}
+func (agent *DeviceAgent) deleteAllPorts() error {
+ log.Debugw("deleteAllPorts", log.Fields{"deviceId": agent.deviceId})
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ if storeDevice.AdminState != voltha.AdminState_DISABLED && storeDevice.AdminState != voltha.AdminState_DELETED {
+ err = status.Error(codes.FailedPrecondition, fmt.Sprintf("invalid-state-%v", storeDevice.AdminState))
+ log.Warnw("invalid-state-removing-ports", log.Fields{"state": storeDevice.AdminState, "error": err})
+ return err
+ }
+ if len(storeDevice.Ports) == 0 {
+ log.Debugw("no-ports-present", log.Fields{"deviceId": agent.deviceId})
+ return nil
+ }
+ // clone the device & set the fields to empty
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ cloned.Ports = []*voltha.Port{}
+ log.Debugw("portStatusUpdate", log.Fields{"deviceId": cloned.Id})
+ // Store the device
+ if afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, ""); afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
func (agent *DeviceAgent) updatePmConfigs(pmConfigs *voltha.PmConfigs) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
@@ -880,7 +941,7 @@
func (agent *DeviceAgent) addPort(port *voltha.Port) error {
agent.lockDevice.Lock()
defer agent.lockDevice.Unlock()
- log.Debugw("addLogicalPortToMap", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("addPort", log.Fields{"deviceId": agent.deviceId})
// Work only on latest data
if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
return status.Errorf(codes.NotFound, "%s", agent.deviceId)
@@ -889,7 +950,7 @@
cloned := proto.Clone(storeDevice).(*voltha.Device)
if cloned.Ports == nil {
// First port
- log.Debugw("addLogicalPortToMap-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
+ log.Debugw("addPort-first-port-to-add", log.Fields{"deviceId": agent.deviceId})
cloned.Ports = make([]*voltha.Port, 0)
} else {
for _, p := range cloned.Ports {
@@ -898,7 +959,6 @@
return nil
}
}
-
}
cp := proto.Clone(port).(*voltha.Port)
// Set the admin state of the port to ENABLE if the operational state is ACTIVE
@@ -944,6 +1004,36 @@
}
}
+func (agent *DeviceAgent) deletePeerPorts(deviceId string) error {
+ agent.lockDevice.Lock()
+ defer agent.lockDevice.Unlock()
+ log.Debug("deletePeerPorts")
+ // Work only on latest data
+ if storeDevice, err := agent.getDeviceWithoutLock(); err != nil {
+ return status.Errorf(codes.NotFound, "%s", agent.deviceId)
+ } else {
+ // clone the device
+ cloned := proto.Clone(storeDevice).(*voltha.Device)
+ var updatedPeers []*voltha.Port_PeerPort
+ for _, port := range cloned.Ports {
+ updatedPeers = make([]*voltha.Port_PeerPort, 0)
+ for _, peerPort := range port.Peers {
+ if peerPort.DeviceId != deviceId {
+ updatedPeers = append(updatedPeers, peerPort)
+ }
+ }
+ port.Peers = updatedPeers
+ }
+
+ // Store the device with updated peer ports
+ afterUpdate := agent.clusterDataProxy.Update("/devices/"+agent.deviceId, cloned, false, "")
+ if afterUpdate == nil {
+ return status.Errorf(codes.Internal, "%s", agent.deviceId)
+ }
+ return nil
+ }
+}
+
// TODO: A generic device update by attribute
func (agent *DeviceAgent) updateDeviceAttribute(name string, value interface{}) {
agent.lockDevice.Lock()