[VOL-2404] : RW-Core changes for handling OLT Reboot Scenario

- When the OLT's connection status goes from REACHABLE to UNREACHABLE
  in ENABLED/DISABLED admin state, delete all the logical ports,
  child devices, logical device and device flows.

- When OLT's connection status becomes reachable again, child devices,
  ports will be re-discovered again. The logical device will be recreated
  again.

- Will not handle the case where OLT goes UNREACHABLE when OLT is disabled
  as part of voltha2.3 release

Change-Id: I34c0c538b44afa19e889e9631f0a738060a58fef
diff --git a/.gitignore b/.gitignore
index 8ae7909..9a6c87e 100644
--- a/.gitignore
+++ b/.gitignore
@@ -56,3 +56,9 @@
 # test output
 tests/results
 sca-report
+
+# CPU profile
+**/profile.cpu
+
+# etcd - ?
+rw_core/core/voltha.rwcore.nb.etcd/
diff --git a/rw_core/core/common_test.go b/rw_core/core/common_test.go
index d07a095..8375f70 100644
--- a/rw_core/core/common_test.go
+++ b/rw_core/core/common_test.go
@@ -201,6 +201,14 @@
 				if done {
 					break
 				}
+			} else if d != nil && d.ParentId == "" { // case where logical device deleted
+				if verificationFunction(nil) {
+					ch <- 1
+					break
+				}
+				if done {
+					break
+				}
 			}
 			time.Sleep(retryInterval)
 		}
diff --git a/rw_core/core/device_agent.go b/rw_core/core/device_agent.go
index 5f946ba..ee8e731 100755
--- a/rw_core/core/device_agent.go
+++ b/rw_core/core/device_agent.go
@@ -741,6 +741,24 @@
 	return nil
 }
 
+//deleteAllFlows deletes all flows in the device table
+func (agent *DeviceAgent) deleteAllFlows(ctx context.Context) error {
+	log.Debugw("deleteAllFlows", log.Fields{"deviceId": agent.deviceID})
+	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
+		return err
+	}
+	defer agent.requestQueue.RequestComplete()
+
+	device := agent.getDeviceWithoutLock()
+	// purge all flows on the device by setting it to nil
+	device.Flows = &ofp.Flows{Items: nil}
+	if err := agent.updateDeviceWithoutLock(ctx, device); err != nil {
+		// The caller logs the error
+		return err
+	}
+	return nil
+}
+
 //disableDevice disable a device
 func (agent *DeviceAgent) disableDevice(ctx context.Context) error {
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
diff --git a/rw_core/core/device_manager.go b/rw_core/core/device_manager.go
index 0f68432..1df2051 100755
--- a/rw_core/core/device_manager.go
+++ b/rw_core/core/device_manager.go
@@ -1297,6 +1297,28 @@
 	return nil
 }
 
+//DeleteAllLogicalPorts is invoked as a callback when the parent device's connection status moves to UNREACHABLE
+func (dMgr *DeviceManager) DeleteAllLogicalPorts(ctx context.Context, parentDevice *voltha.Device, prev *voltha.Device) error {
+	log.Debugw("delete-all-logical-ports", log.Fields{"parent-device-id": parentDevice.Id})
+	if err := dMgr.logicalDeviceMgr.deleteAllLogicalPorts(ctx, parentDevice); err != nil {
+		return err
+	}
+	return nil
+}
+
+//DeleteAllDeviceFlows is invoked as a callback when the parent device's connection status moves to UNREACHABLE
+func (dMgr *DeviceManager) DeleteAllDeviceFlows(ctx context.Context, parentDevice *voltha.Device, prev *voltha.Device) error {
+	log.Debugw("delete-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
+	if agent := dMgr.getDeviceAgent(ctx, parentDevice.Id); agent != nil {
+		if err := agent.deleteAllFlows(ctx); err != nil {
+			log.Errorw("error-deleting-all-device-flows", log.Fields{"parent-device-id": parentDevice.Id})
+			return err
+		}
+		return nil
+	}
+	return status.Errorf(codes.NotFound, "%s", parentDevice.Id)
+}
+
 //getAllChildDeviceIds is a helper method to get all the child device IDs from the device passed as parameter
 func (dMgr *DeviceManager) getAllChildDeviceIds(parentDevice *voltha.Device) ([]string, error) {
 	log.Debugw("getAllChildDeviceIds", log.Fields{"parentDeviceId": parentDevice.Id})
diff --git a/rw_core/core/device_state_transitions.go b/rw_core/core/device_state_transitions.go
index 69dfd78..610b3aa 100644
--- a/rw_core/core/device_state_transitions.go
+++ b/rw_core/core/device_state_transitions.go
@@ -135,6 +135,18 @@
 			handlers:      []TransitionHandler{dMgr.DisableAllChildDevices, dMgr.DeleteAllUNILogicalPorts, dMgr.DeleteAllChildDevices, dMgr.DeleteLogicalDevice, dMgr.RunPostDeviceDelete}})
 	transitionMap.transitions = append(transitionMap.transitions,
 		Transition{
+			deviceType:    parent,
+			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+			handlers:      []TransitionHandler{dMgr.DeleteAllLogicalPorts, dMgr.DeleteLogicalDevice, dMgr.DeleteAllChildDevices, dMgr.DeleteAllDeviceFlows}})
+	transitionMap.transitions = append(transitionMap.transitions,
+		Transition{
+			deviceType:    parent,
+			previousState: DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_UNREACHABLE, Operational: voltha.OperStatus_UNKNOWN},
+			currentState:  DeviceState{Admin: voltha.AdminState_ENABLED, Connection: voltha.ConnectStatus_REACHABLE, Operational: voltha.OperStatus_ACTIVE},
+			handlers:      []TransitionHandler{dMgr.CreateLogicalDevice}})
+	transitionMap.transitions = append(transitionMap.transitions,
+		Transition{
 			deviceType:    child,
 			previousState: DeviceState{Admin: voltha.AdminState_UNKNOWN, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
 			currentState:  DeviceState{Admin: voltha.AdminState_DELETED, Connection: voltha.ConnectStatus_UNKNOWN, Operational: voltha.OperStatus_UNKNOWN},
diff --git a/rw_core/core/device_state_transitions_test.go b/rw_core/core/device_state_transitions_test.go
index 8eecd70..161cf24 100644
--- a/rw_core/core/device_state_transitions_test.go
+++ b/rw_core/core/device_state_transitions_test.go
@@ -169,6 +169,29 @@
 	handlers = transitionMap.GetTransitionHandler(from, to)
 	assert.Equal(t, 1, len(handlers))
 	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+	from = getDevice(false, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_ACTIVE)
+
+	to = getDevice(false, voltha.AdminState_DELETED, voltha.ConnectStatus_UNKNOWN, voltha.OperStatus_FAILED)
+	handlers = transitionMap.GetTransitionHandler(from, to)
+	assert.Equal(t, 3, len(handlers))
+	assert.True(t, reflect.ValueOf(tdm.ChildDeviceLost).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalPorts).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
+	assert.True(t, reflect.ValueOf(tdm.RunPostDeviceDelete).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
+
+	from = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+	to = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
+	handlers = transitionMap.GetTransitionHandler(from, to)
+	assert.Equal(t, 4, len(handlers))
+	assert.True(t, reflect.ValueOf(tdm.DeleteAllLogicalPorts).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
+	assert.True(t, reflect.ValueOf(tdm.DeleteLogicalDevice).Pointer() == reflect.ValueOf(handlers[1]).Pointer())
+	assert.True(t, reflect.ValueOf(tdm.DeleteAllChildDevices).Pointer() == reflect.ValueOf(handlers[2]).Pointer())
+	assert.True(t, reflect.ValueOf(tdm.DeleteAllDeviceFlows).Pointer() == reflect.ValueOf(handlers[3]).Pointer())
+
+	from = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN)
+	to = getDevice(true, voltha.AdminState_ENABLED, voltha.ConnectStatus_REACHABLE, voltha.OperStatus_ACTIVE)
+	handlers = transitionMap.GetTransitionHandler(from, to)
+	assert.Equal(t, 1, len(handlers))
+	assert.True(t, reflect.ValueOf(tdm.CreateLogicalDevice).Pointer() == reflect.ValueOf(handlers[0]).Pointer())
 
 	var deleteDeviceTest = struct {
 		from                   []*voltha.Device
diff --git a/rw_core/core/grpc_nbi_api_handler_test.go b/rw_core/core/grpc_nbi_api_handler_test.go
index 662302c..47a46e3 100755
--- a/rw_core/core/grpc_nbi_api_handler_test.go
+++ b/rw_core/core/grpc_nbi_api_handler_test.go
@@ -710,6 +710,104 @@
 
 }
 
+func (nb *NBTest) testDeviceRebootWhenOltIsEnabled(t *testing.T, nbi *APIHandler) {
+	//Get an OLT device
+	oltDevice, err := nb.getADevice(true, nbi)
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+	assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+	assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+	// Verify that we have one or more ONUs to start with
+	onuDevices, err := nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+	assert.Nil(t, err)
+	assert.NotNil(t, onuDevices)
+	assert.Greater(t, len(onuDevices.Items), 0)
+
+	// Reboot the OLT and very that Connection Status goes to UNREACHABLE and operation status to UNKNOWN
+	_, err = nbi.RebootDevice(getContext(), &voltha.ID{Id: oltDevice.Id})
+	assert.Nil(t, err)
+
+	var vlFunction0 = func(d *voltha.Device) bool {
+		return d.ConnectStatus == voltha.ConnectStatus_UNREACHABLE && d.OperStatus == voltha.OperStatus_UNKNOWN
+	}
+
+	err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction0, nbi)
+	assert.Nil(t, err)
+
+	// Wait for the logical device to satisfy the expected condition
+	var vlFunction1 = func(ld *voltha.LogicalDevice) bool {
+		return ld == nil
+	}
+
+	err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction1)
+	assert.Nil(t, err)
+
+	// Wait for the device to satisfy the expected condition (device does not have flows)
+	var vlFunction2 = func(d *voltha.Device) bool {
+		var deviceFlows *ofp.Flows
+		var err error
+		if deviceFlows, err = nbi.ListDeviceFlows(getContext(), &voltha.ID{Id: d.Id}); err != nil {
+			return false
+		}
+		return len(deviceFlows.Items) == 0
+	}
+
+	err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction2, nbi)
+	assert.Nil(t, err)
+
+	// Wait for the device to satisfy the expected condition (there are no child devices)
+	var vlFunction3 = func(d *voltha.Device) bool {
+		var devices *voltha.Devices
+		var err error
+		if devices, err = nbi.ListDevices(getContext(), nil); err != nil {
+			return false
+		}
+		for _, device := range devices.Items {
+			if device.ParentId == d.Id {
+				// We have a child device still left
+				return false
+			}
+		}
+		return true
+	}
+
+	err = waitUntilDeviceReadiness(oltDevice.Id, nb.maxTimeout, vlFunction3, nbi)
+	assert.Nil(t, err)
+
+	// Update the OLT Connection Status to REACHABLE and operation status to ACTIVE
+	// Normally, in a real adapter this happens after connection regain via a heartbeat mechanism with real hardware
+	deviceAgent := nbi.deviceMgr.getDeviceAgent(getContext(), oltDevice.Id)
+	err = deviceAgent.updateDeviceStatus(getContext(), voltha.OperStatus_ACTIVE, voltha.ConnectStatus_REACHABLE)
+	assert.Nil(t, err)
+
+	// Verify the device connection and operation states
+	oltDevice, err = nb.getADevice(true, nbi)
+	assert.Nil(t, err)
+	assert.NotNil(t, oltDevice)
+	assert.Equal(t, oltDevice.ConnectStatus, voltha.ConnectStatus_REACHABLE)
+	assert.Equal(t, oltDevice.AdminState, voltha.AdminState_ENABLED)
+
+	// Wait for the logical device to satisfy the expected condition
+	var vlFunction4 = func(ld *voltha.LogicalDevice) bool {
+		return ld != nil
+	}
+	err = waitUntilLogicalDeviceReadiness(oltDevice.Id, nb.maxTimeout, nbi, vlFunction4)
+	assert.Nil(t, err)
+
+	// Verify that logical device is created again
+	logicalDevices, err := nbi.ListLogicalDevices(getContext(), &empty.Empty{})
+	assert.Nil(t, err)
+	assert.NotNil(t, logicalDevices)
+	assert.Equal(t, 1, len(logicalDevices.Items))
+
+	// Verify that we have no ONUs left
+	onuDevices, err = nb.core.deviceMgr.getAllChildDevices(getContext(), oltDevice.Id)
+	assert.Nil(t, err)
+	assert.NotNil(t, onuDevices)
+	assert.Equal(t, 0, len(onuDevices.Items))
+}
+
 func makeSimpleFlowMod(fa *flows.FlowArgs) *ofp.OfpFlowMod {
 	matchFields := make([]*ofp.OfpOxmField, 0)
 	for _, val := range fa.MatchFields {
@@ -883,20 +981,25 @@
 	nb.sendTrapFlows(t, nbi, logicalDevice, uint64(meterID), startingVlan)
 
 	// Listen for port events
-	processedLogicalPorts := 0
 	start := time.Now()
+	processedNniLogicalPorts := 0
+	processedUniLogicalPorts := 0
+
 	for event := range nbi.changeEventQueue {
 		startingVlan++
 		if portStatus, ok := (event.Event).(*ofp.ChangeEvent_PortStatus); ok {
 			ps := portStatus.PortStatus
 			if ps.Reason == ofp.OfpPortReason_OFPPR_ADD {
-				processedLogicalPorts++
 				if ps.Desc.PortNo >= uint32(nb.startingUNIPortNo) {
+					processedUniLogicalPorts++
 					nb.sendEAPFlows(t, nbi, logicalDevice.Id, ps.Desc, startingVlan, uint64(meterID))
+				} else {
+					processedNniLogicalPorts++
 				}
 			}
 		}
-		if processedLogicalPorts >= numNNIPorts+numUNIPorts {
+
+		if processedNniLogicalPorts >= numNNIPorts && processedUniLogicalPorts >= numUNIPorts {
 			fmt.Println("Total time to send all flows:", time.Since(start))
 			break
 		}
@@ -968,10 +1071,13 @@
 		// 6. Test disable and Enable pon port of OLT device
 		nb.testDisableAndEnablePort(t, nbi)
 
-		// 6. Test disable and delete all devices
+		// 7.Test Device unreachable when OLT is enabled
+		nb.testDeviceRebootWhenOltIsEnabled(t, nbi)
+
+		// 8. Test disable and delete all devices
 		nb.testDisableAndDeleteAllDevice(t, nbi)
 
-		//7. Test enable and delete all devices
+		// 9. Test enable and delete all devices
 		nb.testEnableAndDeleteAllDevice(t, nbi)
 	}
 
diff --git a/rw_core/core/logical_device_agent.go b/rw_core/core/logical_device_agent.go
index 4e029c8..a6ce9ab 100644
--- a/rw_core/core/logical_device_agent.go
+++ b/rw_core/core/logical_device_agent.go
@@ -529,8 +529,8 @@
 	return err
 }
 
-// deleteAllLogicalPorts deletes all logical ports associated with this device
-func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context, device *voltha.Device) error {
+// deleteAllLogicalPorts deletes all logical ports associated with this logical device
+func (agent *LogicalDeviceAgent) deleteAllLogicalPorts(ctx context.Context) error {
 	log.Infow("updatePortsState-start", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
 	if err := agent.requestQueue.WaitForGreenLight(ctx); err != nil {
 		return err
@@ -540,21 +540,13 @@
 	ld := agent.getLogicalDeviceWithoutLock()
 
 	cloned := (proto.Clone(ld)).(*voltha.LogicalDevice)
-	updateLogicalPorts := []*voltha.LogicalPort{}
-	for _, lport := range cloned.Ports {
-		if lport.DeviceId != device.Id {
-			updateLogicalPorts = append(updateLogicalPorts, lport)
-		}
-	}
-	if len(updateLogicalPorts) < len(cloned.Ports) {
-		cloned.Ports = updateLogicalPorts
-		// Updating the logical device will trigger the poprt change events to be populated to the controller
-		if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
-			log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
-			return err
-		}
-	} else {
-		log.Debugw("no-change-required", log.Fields{"logicalDeviceId": agent.logicalDeviceID})
+	var updateLogicalPorts []*voltha.LogicalPort
+	// Update an empty ports slice to remove all the ports
+	cloned.Ports = updateLogicalPorts
+
+	if err := agent.updateLogicalDeviceWithoutLock(ctx, cloned); err != nil {
+		log.Warnw("logical-device-update-failed", log.Fields{"ldeviceId": agent.logicalDeviceID, "error": err})
+		return err
 	}
 	return nil
 }
diff --git a/rw_core/core/logical_device_manager.go b/rw_core/core/logical_device_manager.go
index df11ac4..1859784 100644
--- a/rw_core/core/logical_device_manager.go
+++ b/rw_core/core/logical_device_manager.go
@@ -495,7 +495,7 @@
 		return err
 	}
 	if agent := ldMgr.getLogicalDeviceAgent(ctx, *ldID); agent != nil {
-		if err := agent.deleteAllLogicalPorts(ctx, device); err != nil {
+		if err := agent.deleteAllLogicalPorts(ctx); err != nil {
 			return err
 		}
 	}
diff --git a/rw_core/coreif/device_manager_if.go b/rw_core/coreif/device_manager_if.go
index a3f65cf..f828ab0 100644
--- a/rw_core/coreif/device_manager_if.go
+++ b/rw_core/coreif/device_manager_if.go
@@ -39,4 +39,6 @@
 	RunPostDeviceDelete(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
 	ChildDeviceLost(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
 	DeleteAllUNILogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DeleteAllLogicalPorts(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
+	DeleteAllDeviceFlows(ctx context.Context, curr *voltha.Device, prev *voltha.Device) error
 }
diff --git a/rw_core/mocks/adapter_olt.go b/rw_core/mocks/adapter_olt.go
index f145ab6..46f0e28 100644
--- a/rw_core/mocks/adapter_olt.go
+++ b/rw_core/mocks/adapter_olt.go
@@ -193,9 +193,9 @@
 			log.Warnw("updating-ports-failed", log.Fields{"deviceId": device.Id, "error": err})
 		}
 
-		//Update the device state
-		cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+		//Update the device operational state
 		cloned.OperStatus = voltha.OperStatus_UNKNOWN
+		// The device is still reachable after it has been disabled, so the connection status should not be changed.
 
 		if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
 			// Device may already have been deleted in the core
@@ -230,7 +230,6 @@
 		}
 
 		//Update the device state
-		cloned.ConnectStatus = voltha.ConnectStatus_REACHABLE
 		cloned.OperStatus = voltha.OperStatus_ACTIVE
 
 		if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
@@ -296,6 +295,21 @@
 	return nil
 }
 
+// Reboot_device -
+func (oltA *OLTAdapter) Reboot_device(device *voltha.Device) error { // nolint
+	log.Infow("reboot-device", log.Fields{"deviceId": device.Id})
+
+	go func() {
+		if err := oltA.coreProxy.DeviceStateUpdate(context.TODO(), device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+			log.Fatalf("device-state-update-failed", log.Fields{"device-id": device.Id})
+		}
+		if err := oltA.coreProxy.PortsStateUpdate(context.TODO(), device.Id, voltha.OperStatus_UNKNOWN); err != nil {
+			log.Fatalf("port-update-failed", log.Fields{"device-id": device.Id})
+		}
+	}()
+	return nil
+}
+
 // GetFlowCount returns the total number of flows presently under this adapter
 func (oltA *OLTAdapter) GetFlowCount() int {
 	oltA.lock.Lock()
diff --git a/rw_core/mocks/device_manager.go b/rw_core/mocks/device_manager.go
index b3d63cb..7fb2983 100644
--- a/rw_core/mocks/device_manager.go
+++ b/rw_core/mocks/device_manager.go
@@ -78,6 +78,16 @@
 	return nil
 }
 
+// DeleteAllLogicalPorts -
+func (dm *DeviceManager) DeleteAllLogicalPorts(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+	return nil
+}
+
+// DeleteAllDeviceFlows -
+func (dm *DeviceManager) DeleteAllDeviceFlows(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
+	return nil
+}
+
 // RunPostDeviceDelete -
 func (dm *DeviceManager) RunPostDeviceDelete(ctx context.Context, cDevice *voltha.Device, pDevice *voltha.Device) error {
 	return nil