[VOL-2741]: code changes to support OLT reboot

Change-Id: I8a26347fee26802f5f32ed27f2030f1e0608fbb1
diff --git a/VERSION b/VERSION
index c68b9b3..5f0cf38 100644
--- a/VERSION
+++ b/VERSION
@@ -1,2 +1 @@
 2.3.20-dev
-
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 1ba00e1..a9fa936 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -20,9 +20,10 @@
 import (
 	"flag"
 	"fmt"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 	"os"
 	"time"
+
+	"github.com/opencord/voltha-lib-go/v3/pkg/log"
 )
 
 // Open OLT default constants
@@ -49,9 +50,9 @@
 	defaultLiveProbeInterval    = 60 * time.Second
 	defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
 	//defaultHearbeatFailReportInterval is the time in seconds the adapter will keep checking the hardware for heartbeat.
-	defaultHearbeatCheckInterval = 30 * time.Second
+	defaultHearbeatCheckInterval = 15 * time.Second
 	// defaultHearbeatFailReportInterval is the time adapter will wait before updating the state to the core.
-	defaultHearbeatFailReportInterval = 180 * time.Second
+	defaultHearbeatFailReportInterval = 0 * time.Second
 	//defaultGrpcTimeoutInterval is the time in seconds a grpc call will wait before returning error.
 	defaultGrpcTimeoutInterval = 2 * time.Second
 )
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 625ab63..2b0b52a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -79,6 +79,7 @@
 	stopCollector      chan bool
 	stopHeartbeatCheck chan bool
 	activePorts        sync.Map
+	stopIndications    chan bool
 }
 
 //OnuDevice represents ONU related info
@@ -136,6 +137,7 @@
 	dh.stopHeartbeatCheck = make(chan bool, 2)
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	dh.activePorts = sync.Map{}
+	dh.stopIndications = make(chan bool, 1)
 	//TODO initialize the support classes.
 	return &dh
 }
@@ -309,50 +311,64 @@
 	indicationBackoff.MaxElapsedTime = 0
 	indicationBackoff.MaxInterval = 1 * time.Minute
 	for {
-		indication, err := indications.Recv()
-		if err == io.EOF {
-			log.Infow("EOF for  indications", log.Fields{"err": err})
-			// Use an exponential back off to prevent getting into a tight loop
-			duration := indicationBackoff.NextBackOff()
-			if duration == backoff.Stop {
-				// If we reach a maximum then warn and reset the backoff
-				// timer and keep attempting.
-				log.Warnw("Maximum indication backoff reached, resetting backoff timer",
-					log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
-				indicationBackoff.Reset()
+		select {
+		case <-dh.stopIndications:
+			log.Debugw("Stopping-collecting-indications-for-OLT", log.Fields{"deviceID:": dh.deviceID})
+			break
+		default:
+			indication, err := indications.Recv()
+			if err == io.EOF {
+				log.Infow("EOF for  indications", log.Fields{"err": err})
+				// Use an exponential back off to prevent getting into a tight loop
+				duration := indicationBackoff.NextBackOff()
+				if duration == backoff.Stop {
+					// If we reach a maximum then warn and reset the backoff
+					// timer and keep attempting.
+					log.Warnw("Maximum indication backoff reached, resetting backoff timer",
+						log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
+					indicationBackoff.Reset()
+				}
+				time.Sleep(indicationBackoff.NextBackOff())
+				indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
+				if err != nil {
+					return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
+				}
+				if indications == nil {
+					return olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
+				}
+				continue
 			}
-			time.Sleep(indicationBackoff.NextBackOff())
-			indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
 			if err != nil {
-				return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err)
+				if dh.adminState == "deleted" {
+					log.Debug("Device deleted stoping the read indication thread")
+					break
+				}
+				continue
 			}
-			continue
-		}
-		if err != nil {
-			log.Infow("Failed to read from indications", log.Fields{"err": err})
-			if dh.adminState == "deleted" {
-				log.Debug("Device deleted stoping the read indication thread")
-				break
+			// Reset backoff if we have a successful receive
+			indicationBackoff.Reset()
+			dh.lockDevice.RLock()
+			adminState := dh.adminState
+			dh.lockDevice.RUnlock()
+			// When OLT is admin down, ignore all indications.
+			if adminState == "down" && !isIndicationAllowedDuringOltAdminDown(indication) {
+				log.Debugw("olt is admin down, ignore indication", log.Fields{"indication": indication})
+				continue
 			}
-			dh.transitionMap.Handle(ctx, DeviceDownInd)
-			dh.transitionMap.Handle(ctx, DeviceInit)
-			return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err)
+			dh.handleIndication(ctx, indication)
 		}
-		// Reset backoff if we have a successful receive
-		indicationBackoff.Reset()
-		dh.lockDevice.RLock()
-		adminState := dh.adminState
-		dh.lockDevice.RUnlock()
-		// When OLT is admin down, ignore all indications.
-		if adminState == "down" {
-
-			log.Infow("olt is admin down, ignore indication", log.Fields{"indication": indication})
-			continue
-		}
-		dh.handleIndication(ctx, indication)
-
 	}
-	return nil
+}
+
+// isIndicationAllowedDuringOltAdminDown returns true if the indication is allowed during OLT Admin down, else false
+func isIndicationAllowedDuringOltAdminDown(indication *oop.Indication) bool {
+	switch indication.Data.(type) {
+	case *oop.Indication_OltInd, *oop.Indication_IntfInd, *oop.Indication_IntfOperInd:
+		return true
+
+	default:
+		return false
+	}
 }
 
 func (dh *DeviceHandler) handleOltIndication(ctx context.Context, oltIndication *oop.OltIndication) error {
@@ -477,14 +493,9 @@
 	}
 
 	cloned := proto.Clone(device).(*voltha.Device)
-	// Update the all ports state on that device to disable
-	if err = dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
-		return olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": device.Id}, err)
-	}
 
 	//Update the device oper state and connection status
 	cloned.OperStatus = voltha.OperStatus_UNKNOWN
-	cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
 	dh.device = cloned
 
 	if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
@@ -563,6 +574,12 @@
 		if err != nil {
 			olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 		}
+		// We should still go ahead an initialize various device handler modules so that when OLT is re-enabled, we have
+		// all the modules initialized and ready to handle incoming ONUs.
+
+		if err := dh.initializeDeviceHandlerModules(ctx); err != nil {
+			olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+		}
 
 		// Start reading indications
 		go func() {
@@ -573,11 +590,6 @@
 		return nil
 	}
 
-	deviceInfo, err := dh.populateDeviceInfo()
-	if err != nil {
-		return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
-	}
-
 	device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
 	if err != nil || device == nil {
 		/*TODO: needs to handle error scenarios */
@@ -588,20 +600,9 @@
 		return olterrors.NewErrAdapter("port-status-update-failed", log.Fields{"device": device}, err)
 	}
 
-	KVStoreHostPort := fmt.Sprintf("%s:%d", dh.openOLT.KVStoreHost, dh.openOLT.KVStorePort)
-	// Instantiate resource manager
-	if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
-		return olterrors.ErrResourceManagerInstantiating
+	if err := dh.initializeDeviceHandlerModules(ctx); err != nil {
+		olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
 	}
-	// Instantiate flow manager
-	if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
-		return olterrors.ErrResourceManagerInstantiating
-	}
-	/* TODO: Instantiate Alarm , stats , BW managers */
-	/* Instantiating Event Manager to handle Alarms and KPIs */
-	dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
-	// Stats config for new device
-	dh.portStats = NewOpenOltStatsMgr(dh)
 
 	// Start reading indications
 	go func() {
@@ -612,6 +613,34 @@
 	return nil
 }
 
+func (dh *DeviceHandler) initializeDeviceHandlerModules(ctx context.Context) error {
+	deviceInfo, err := dh.populateDeviceInfo()
+
+	if err != nil {
+		return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
+	}
+	KVStoreHostPort := fmt.Sprintf("%s:%d", dh.openOLT.KVStoreHost, dh.openOLT.KVStorePort)
+	// Instantiate resource manager
+	if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
+		return olterrors.ErrResourceManagerInstantiating
+	}
+
+	// Instantiate flow manager
+	if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
+		return olterrors.ErrResourceManagerInstantiating
+
+	}
+	/* TODO: Instantiate Alarm , stats , BW managers */
+	/* Instantiating Event Manager to handle Alarms and KPIs */
+	dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
+
+	// Stats config for new device
+	dh.portStats = NewOpenOltStatsMgr(dh)
+
+	return nil
+
+}
+
 func (dh *DeviceHandler) populateDeviceInfo() (*oop.DeviceInfo, error) {
 	var err error
 	var deviceInfo *oop.DeviceInfo
@@ -701,7 +730,7 @@
 	}
 
 	go startCollector(dh)
-	go startHeartbeatCheck(dh)
+	go startHeartbeatCheck(ctx, dh)
 }
 
 //GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -1461,6 +1490,30 @@
 	   This clears up flow data and also resource map data for various
 	   other pon resources like alloc_id and gemport_id
 	*/
+	go dh.cleanupDeviceResources(ctx)
+	log.Debug("Removed-device-from-Resource-manager-KV-store")
+	// Stop the Stats collector
+	dh.stopCollector <- true
+	// stop the heartbeat check routine
+	dh.stopHeartbeatCheck <- true
+	//Reset the state
+	if dh.Client != nil {
+		if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
+			return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.deviceID}, err).Log()
+		}
+	}
+	cloned := proto.Clone(device).(*voltha.Device)
+	cloned.OperStatus = voltha.OperStatus_UNKNOWN
+	cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+	if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+		return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{
+			"device-id":      device.Id,
+			"connect-status": cloned.ConnectStatus,
+			"oper-status":    cloned.OperStatus}, err).Log()
+	}
+	return nil
+}
+func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) error {
 	if dh.resourceMgr != nil {
 		noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
 		var ponPort uint32
@@ -1509,26 +1562,12 @@
 		return true
 	})
 
-	log.Debug("Removed-device-from-Resource-manager-KV-store")
-	// Stop the Stats collector
-	dh.stopCollector <- true
-	// stop the heartbeat check routine
-	dh.stopHeartbeatCheck <- true
-	//Reset the state
-	if dh.Client != nil {
-		if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
-			return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.deviceID}, err)
-		}
-	}
-	cloned := proto.Clone(device).(*voltha.Device)
-	cloned.OperStatus = voltha.OperStatus_UNKNOWN
-	cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
-	if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
-		return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{
-			"device-id":      device.Id,
-			"connect-status": cloned.ConnectStatus,
-			"oper-status":    cloned.OperStatus}, err)
-	}
+	/*Delete discovered ONU map for the device*/
+	dh.discOnus.Range(func(key interface{}, value interface{}) bool {
+		dh.discOnus.Delete(key)
+		return true
+	})
+
 	return nil
 }
 
@@ -1660,7 +1699,7 @@
 	return "" + strconv.Itoa(int(intfID)) + "." + strconv.Itoa(int(onuID))
 }
 
-func startHeartbeatCheck(dh *DeviceHandler) {
+func startHeartbeatCheck(ctx context.Context, dh *DeviceHandler) {
 	// start the heartbeat check towards the OLT.
 	var timerCheck *time.Timer
 
@@ -1668,25 +1707,17 @@
 		heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
 		select {
 		case <-heartbeatTimer.C:
-			ctx, cancel := context.WithTimeout(context.Background(), dh.openOLT.GrpcTimeoutInterval)
-			if heartBeat, err := dh.Client.HeartbeatCheck(ctx, new(oop.Empty)); err != nil {
+			ctxWithTimeout, cancel := context.WithTimeout(context.Background(), dh.openOLT.GrpcTimeoutInterval)
+			if heartBeat, err := dh.Client.HeartbeatCheck(ctxWithTimeout, new(oop.Empty)); err != nil {
 				log.Error("Hearbeat failed")
 				if timerCheck == nil {
 					// start a after func, when expired will update the state to the core
-					timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, dh.updateStateUnreachable)
+					timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, func() { dh.updateStateUnreachable(ctx) })
 				}
 			} else {
 				if timerCheck != nil {
 					if timerCheck.Stop() {
 						log.Debug("We got hearbeat within the timeout")
-					} else {
-
-						log.Debug("We got hearbeat after the timeout expired, changing the states")
-						go dh.notifyChildDevices("up")
-						if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
-							voltha.OperStatus_ACTIVE); err != nil {
-							log.Errorw("Failed to update device state", log.Fields{"deviceID": dh.device.Id, "error": err})
-						}
 					}
 					timerCheck = nil
 				}
@@ -1700,11 +1731,24 @@
 	}
 }
 
-func (dh *DeviceHandler) updateStateUnreachable() {
+func (dh *DeviceHandler) updateStateUnreachable(ctx context.Context) {
+	device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+	if err != nil || device == nil {
+		olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
+	}
 
-	go dh.notifyChildDevices("unreachable")
-	if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
-		olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+	if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
+		if err = dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+			olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+		}
+		if err = dh.coreProxy.PortsStateUpdate(ctx, dh.device.Id, voltha.OperStatus_UNKNOWN); err != nil {
+			olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
+		}
+		go dh.cleanupDeviceResources(ctx)
+
+		dh.stopIndications <- true
+		dh.transitionMap.Handle(ctx, DeviceInit)
+
 	}
 }
 
diff --git a/internal/pkg/core/olt_state_transitions.go b/internal/pkg/core/olt_state_transitions.go
index c10d17d..c249e2e 100644
--- a/internal/pkg/core/olt_state_transitions.go
+++ b/internal/pkg/core/olt_state_transitions.go
@@ -91,7 +91,7 @@
 	// In doInit establish the grpc session
 	transitionMap.transitions[DeviceInit] =
 		Transition{
-			previousState: []DeviceState{deviceStateNull, deviceStateDown},
+			previousState: []DeviceState{deviceStateNull, deviceStateUp, deviceStateDown},
 			currentState:  deviceStateInit,
 			before:        []TransitionHandler{dh.doStateInit},
 			after:         []TransitionHandler{dh.postInit}}