[VOL-2404] : RW-Core changes for handling OLT Reboot Scenario
- When the OLT's connection status goes from REACHABLE to UNREACHABLE
in ENABLED/DISABLED admin state, delete all the logical ports,
child devices, logical device and device flows.
- When OLT's connection status becomes reachable again, child devices,
ports will be re-discovered again. The logical device will be recreated
again.
- Will not handle the case where OLT goes UNREACHABLE when OLT is disabled
as part of voltha2.3 release
Change-Id: I34c0c538b44afa19e889e9631f0a738060a58fef
diff --git a/.gitignore b/.gitignore
index 8ae7909..9a6c87e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,3 +56,9 @@
# test output
tests/results
sca-report
+
+# CPU profile
+**/profile.cpu
+
+# etcd - ?
+rw_core/core/voltha.rwcore.nb.etcd/
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index d07a095..8375f70 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -201,6 +201,14 @@
if done {
break
}
+ } else if d != nil && d.ParentId == "" { // case where logical device deleted
+ if verificationFunction(nil) {
+ ch <- 1
+ break
+ }
+ if done {
+ break
+ }
}
time.Sleep(retryInterval)
}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 5f946ba..ee8e731 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -741,6 +741,24 @@
return nil
}
+//deleteAllFlows deletes all flows in the device table
+func (agent *DeviceAgent) deleteAllFlows(ctx context.Context) error {
+ log.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
+ if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+ return err
+ }
+ defer agent.requestQueue.RequestComplete()
+
+ device := agent.getDeviceWithoutLock()
+ // purge all flows on the device by setting it to nil
+ device.Flows = &ofp.Flows{Items: nil}
+ if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+ // The caller logs the error
+ return err
+ }
+ return nil
+}
+
//disableDevice disable a device
func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 0f68432..1df2051 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -1297,6 +1297,28 @@
return nil
}
+//DeleteAllLogicalPorts is invoked as a callback when the parent device's connection status moves to UNREACHABLE
+func (dMgr *DeviceManager) DeleteAllLogicalPorts(ctx context.Context, parentDevice *voltha.Device, prev *voltha.Device) error {
+ log.Debugw("delete-all-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
+ if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, parentDevice); err != nil {
+ return err
+ }
+ return nil
+}
+
+//DeleteAllDeviceFlows is invoked as a callback when the parent device's connection status moves to UNREACHABLE
+func (dMgr *DeviceManager) DeleteAllDeviceFlows(ctx context.Context, parentDevice *voltha.Device, prev *voltha.Device) error {
+ log.Debugw("delete-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
+ if agent := dMgr.getDeviceAgent(ctx, parentDevice.Id); agent != nil {
+ if err := agent.deleteAllFlows(ctx); err != nil {
+ log.Errorw("error-deleting-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
+ return err
+ }
+ return nil
+ }
+ return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+}
+
//getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
func (dMgr *DeviceManager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
log.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 69dfd78..610b3aa 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -135,6 +135,18 @@
handlers: []TransitionHandler{dMgr.DisableAllChildDevices, dMgr.DeleteAllUNILogicalPorts, dMgr.DeleteAllChildDevices, dMgr.DeleteLogicalDevice, dMgr.RunPostDeviceDelete}})
transitionMap.transitions = append(transitionMap.transitions,
Transition{
+ deviceType: parent,
+ previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+ currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ handlers: []TransitionHandler{dMgr.DeleteAllLogicalPorts, dMgr.DeleteLogicalDevice, dMgr.DeleteAllChildDevices, dMgr.DeleteAllDeviceFlows}})
+ transitionMap.transitions = append(transitionMap.transitions,
+ Transition{
+ deviceType: parent,
+ previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+ currentState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+ handlers: []TransitionHandler{dMgr.CreateLogicalDevice}})
+ transitionMap.transitions = append(transitionMap.transitions,
+ Transition{
deviceType: child,
previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
currentState: DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
diff --git a/rw_core/core/device_state_transitions_test.go b/rw_core/core/device_state_transitions_test.go
index 8eecd70..161cf24 100644
--- a/rw_core/core/device_state_transitions_test.go
+++ b/rw_core/core/device_state_transitions_test.go
@@ -169,6 +169,29 @@
handlers = transitionMap.GetTransitionHandler(from, to)
assert.Equal(t, 1, len(handlers))
assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+ from = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
+
+ to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED)
+ handlers = transitionMap.GetTransitionHandler(from, to)
+ assert.Equal(t, 3, len(handlers))
+ assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+ assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
+ assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
+
+ from = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ to = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
+ handlers = transitionMap.GetTransitionHandler(from, to)
+ assert.Equal(t, 4, len(handlers))
+ assert.True(t, reflect.ValueOf(tdm.DeleteAllLogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+ assert.True(t, reflect.ValueOf(tdm.DeleteLogicalDevice).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
+ assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
+ assert.True(t, reflect.ValueOf(tdm.DeleteAllDeviceFlows).Pointer() == reflect.ValueOf(handlers[3]).Pointer())
+
+ from = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
+ to = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+ handlers = transitionMap.GetTransitionHandler(from, to)
+ assert.Equal(t, 1, len(handlers))
+ assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
var deleteDeviceTest = struct {
from []*voltha.Device
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 662302c..47a46e3 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -710,6 +710,104 @@
}
+func (nb *NBTest) testDeviceRebootWhenOltIsEnabled(t *testing.T, nbi *APIHandler) {
+ //Get an OLT device
+ oltDevice, err := nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+ assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+ // Verify that we have one or more ONUs to start with
+ onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Greater(t, len(onuDevices.Items), 0)
+
+ // Reboot the OLT and very that Connection Status goes to UNREACHABLE and operation status to UNKNOWN
+ _, err = nbi.RebootDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+ assert.Nil(t, err)
+
+ var vlFunction0 = func(d *voltha.Device) bool {
+ return d.ConnectStatus == voltha.ConnectStatus_UNREACHABLE && d.OperStatus == voltha.OperStatus_UNKNOWN
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction0, nbi)
+ assert.Nil(t, err)
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction1 = func(ld *voltha.LogicalDevice) bool {
+ return ld == nil
+ }
+
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction1)
+ assert.Nil(t, err)
+
+ // Wait for the device to satisfy the expected condition (device does not have flows)
+ var vlFunction2 = func(d *voltha.Device) bool {
+ var deviceFlows *ofp.Flows
+ var err error
+ if deviceFlows, err = nbi.ListDeviceFlows(getContext(), &voltha.ID{Id: d.Id}); err != nil {
+ return false
+ }
+ return len(deviceFlows.Items) == 0
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction2, nbi)
+ assert.Nil(t, err)
+
+ // Wait for the device to satisfy the expected condition (there are no child devices)
+ var vlFunction3 = func(d *voltha.Device) bool {
+ var devices *voltha.Devices
+ var err error
+ if devices, err = nbi.ListDevices(getContext(), nil); err != nil {
+ return false
+ }
+ for _, device := range devices.Items {
+ if device.ParentId == d.Id {
+ // We have a child device still left
+ return false
+ }
+ }
+ return true
+ }
+
+ err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction3, nbi)
+ assert.Nil(t, err)
+
+ // Update the OLT Connection Status to REACHABLE and operation status to ACTIVE
+ // Normally, in a real adapter this happens after connection regain via a heartbeat mechanism with real hardware
+ deviceAgent := nbi.deviceMgr.getDeviceAgent(getContext(), oltDevice.Id)
+ err = deviceAgent.updateDeviceStatus(getContext(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+ assert.Nil(t, err)
+
+ // Verify the device connection and operation states
+ oltDevice, err = nb.getADevice(true, nbi)
+ assert.Nil(t, err)
+ assert.NotNil(t, oltDevice)
+ assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+ assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+ // Wait for the logical device to satisfy the expected condition
+ var vlFunction4 = func(ld *voltha.LogicalDevice) bool {
+ return ld != nil
+ }
+ err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction4)
+ assert.Nil(t, err)
+
+ // Verify that logical device is created again
+ logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+ assert.Nil(t, err)
+ assert.NotNil(t, logicalDevices)
+ assert.Equal(t, 1, len(logicalDevices.Items))
+
+ // Verify that we have no ONUs left
+ onuDevices, err = nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+ assert.Nil(t, err)
+ assert.NotNil(t, onuDevices)
+ assert.Equal(t, 0, len(onuDevices.Items))
+}
+
func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
matchFields := make([]*ofp.OfpOxmField, 0)
for _, val := range fa.MatchFields {
@@ -883,20 +981,25 @@
nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
// Listen for port events
- processedLogicalPorts := 0
start := time.Now()
+ processedNniLogicalPorts := 0
+ processedUniLogicalPorts := 0
+
for event := range nbi.changeEventQueue {
startingVlan++
if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
ps := portStatus.PortStatus
if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
- processedLogicalPorts++
if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+ processedUniLogicalPorts++
nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+ } else {
+ processedNniLogicalPorts++
}
}
}
- if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+
+ if processedNniLogicalPorts >= numNNIPorts && processedUniLogicalPorts >= numUNIPorts {
fmt.Println("Total time to send all flows:", time.Since(start))
break
}
@@ -968,10 +1071,13 @@
// 6. Test disable and Enable pon port of OLT device
nb.testDisableAndEnablePort(t, nbi)
- // 6. Test disable and delete all devices
+ // 7.Test Device unreachable when OLT is enabled
+ nb.testDeviceRebootWhenOltIsEnabled(t, nbi)
+
+ // 8. Test disable and delete all devices
nb.testDisableAndDeleteAllDevice(t, nbi)
- //7. Test enable and delete all devices
+ // 9. Test enable and delete all devices
nb.testEnableAndDeleteAllDevice(t, nbi)
}
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 4e029c8..a6ce9ab 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -529,8 +529,8 @@
return err
}
-// deleteAllLogicalPorts deletes all logical ports associated with this device
-func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
+// deleteAllLogicalPorts deletes all logical ports associated with this logical device
+func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context) error {
log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
return err
@@ -540,21 +540,13 @@
ld := agent.getLogicalDeviceWithoutLock()
cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
- updateLogicalPorts := []*voltha.LogicalPort{}
- for _, lport := range cloned.Ports {
- if lport.DeviceId != device.Id {
- updateLogicalPorts = append(updateLogicalPorts, lport)
- }
- }
- if len(updateLogicalPorts) < len(cloned.Ports) {
- cloned.Ports = updateLogicalPorts
- // Updating the logical device will trigger the poprt change events to be populated to the controller
- if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
- log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
- return err
- }
- } else {
- log.Debugw("no-change-required", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+ var updateLogicalPorts []*voltha.LogicalPort
+ // Update an empty ports slice to remove all the ports
+ cloned.Ports = updateLogicalPorts
+
+ if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+ log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
+ return err
}
return nil
}
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index df11ac4..1859784 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -495,7 +495,7 @@
return err
}
if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
- if err := agent.deleteAllLogicalPorts(ctx, device); err != nil {
+ if err := agent.deleteAllLogicalPorts(ctx); err != nil {
return err
}
}
diff --git a/rw_core/coreif/device_manager_if.go b/rw_core/coreif/device_manager_if.go
index a3f65cf..f828ab0 100644
--- a/rw_core/coreif/device_manager_if.go
+++ b/rw_core/coreif/device_manager_if.go
@@ -39,4 +39,6 @@
RunPostDeviceDelete(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
ChildDeviceLost(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+ DeleteAllLogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+ DeleteAllDeviceFlows(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
}
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index f145ab6..46f0e28 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -193,9 +193,9 @@
log.Warnw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
}
- //Update the device state
- cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+ //Update the device operational state
cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ // The device is still reachable after it has been disabled, so the connection status should not be changed.
if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
// Device may already have been deleted in the core
@@ -230,7 +230,6 @@
}
//Update the device state
- cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
cloned.OperStatus = voltha.OperStatus_ACTIVE
if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
@@ -296,6 +295,21 @@
return nil
}
+// Reboot_device -
+func (oltA *OLTAdapter) Reboot_device(device *voltha.Device) error { // nolint
+ log.Infow("reboot-device", log.Fields{"deviceId": device.Id})
+
+ go func() {
+ if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+ log.Fatalf("device-state-update-failed", log.Fields{"device-id": device.Id})
+ }
+ if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), device.Id, voltha.OperStatus_UNKNOWN); err != nil {
+ log.Fatalf("port-update-failed", log.Fields{"device-id": device.Id})
+ }
+ }()
+ return nil
+}
+
// GetFlowCount returns the total number of flows presently under this adapter
func (oltA *OLTAdapter) GetFlowCount() int {
oltA.lock.Lock()
diff --git a/rw_core/mocks/device_manager.go b/rw_core/mocks/device_manager.go
index b3d63cb..7fb2983 100644
--- a/rw_core/mocks/device_manager.go
+++ b/rw_core/mocks/device_manager.go
@@ -78,6 +78,16 @@
return nil
}
+// DeleteAllLogicalPorts -
+func (dm *DeviceManager) DeleteAllLogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+ return nil
+}
+
+// DeleteAllDeviceFlows -
+func (dm *DeviceManager) DeleteAllDeviceFlows(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+ return nil
+}
+
// RunPostDeviceDelete -
func (dm *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
return nil