[VOL-2741]: code changes to support OLT reboot
Change-Id: I8a26347fee26802f5f32ed27f2030f1e0608fbb1
diff --git a/VERSION b/VERSION
index c68b9b3..5f0cf38 100644
--- a/VERSION
+++ b/VERSION
@@ -1,2 +1 @@
2.3.20-dev
-
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index 1ba00e1..a9fa936 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -20,9 +20,10 @@
import (
"flag"
"fmt"
- "github.com/opencord/voltha-lib-go/v3/pkg/log"
"os"
"time"
+
+ "github.com/opencord/voltha-lib-go/v3/pkg/log"
)
// Open OLT default constants
@@ -49,9 +50,9 @@
defaultLiveProbeInterval = 60 * time.Second
defaultNotLiveProbeInterval = 5 * time.Second // Probe more frequently when not alive
//defaultHearbeatFailReportInterval is the time in seconds the adapter will keep checking the hardware for heartbeat.
- defaultHearbeatCheckInterval = 30 * time.Second
+ defaultHearbeatCheckInterval = 15 * time.Second
// defaultHearbeatFailReportInterval is the time adapter will wait before updating the state to the core.
- defaultHearbeatFailReportInterval = 180 * time.Second
+ defaultHearbeatFailReportInterval = 0 * time.Second
//defaultGrpcTimeoutInterval is the time in seconds a grpc call will wait before returning error.
defaultGrpcTimeoutInterval = 2 * time.Second
)
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 625ab63..2b0b52a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -79,6 +79,7 @@
stopCollector chan bool
stopHeartbeatCheck chan bool
activePorts sync.Map
+ stopIndications chan bool
}
//OnuDevice represents ONU related info
@@ -136,6 +137,7 @@
dh.stopHeartbeatCheck = make(chan bool, 2)
dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
dh.activePorts = sync.Map{}
+ dh.stopIndications = make(chan bool, 1)
//TODO initialize the support classes.
return &dh
}
@@ -309,50 +311,64 @@
indicationBackoff.MaxElapsedTime = 0
indicationBackoff.MaxInterval = 1 * time.Minute
for {
- indication, err := indications.Recv()
- if err == io.EOF {
- log.Infow("EOF for indications", log.Fields{"err": err})
- // Use an exponential back off to prevent getting into a tight loop
- duration := indicationBackoff.NextBackOff()
- if duration == backoff.Stop {
- // If we reach a maximum then warn and reset the backoff
- // timer and keep attempting.
- log.Warnw("Maximum indication backoff reached, resetting backoff timer",
- log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
- indicationBackoff.Reset()
+ select {
+ case <-dh.stopIndications:
+ log.Debugw("Stopping-collecting-indications-for-OLT", log.Fields{"deviceID:": dh.deviceID})
+ break
+ default:
+ indication, err := indications.Recv()
+ if err == io.EOF {
+ log.Infow("EOF for indications", log.Fields{"err": err})
+ // Use an exponential back off to prevent getting into a tight loop
+ duration := indicationBackoff.NextBackOff()
+ if duration == backoff.Stop {
+ // If we reach a maximum then warn and reset the backoff
+ // timer and keep attempting.
+ log.Warnw("Maximum indication backoff reached, resetting backoff timer",
+ log.Fields{"max_indication_backoff": indicationBackoff.MaxElapsedTime})
+ indicationBackoff.Reset()
+ }
+ time.Sleep(indicationBackoff.NextBackOff())
+ indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
+ if err != nil {
+ return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
+ }
+ if indications == nil {
+ return olterrors.NewErrInvalidValue(log.Fields{"indications": nil, "device-id": dh.device.Id}, nil).Log()
+ }
+ continue
}
- time.Sleep(indicationBackoff.NextBackOff())
- indications, err = dh.Client.EnableIndication(ctx, new(oop.Empty))
if err != nil {
- return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err)
+ if dh.adminState == "deleted" {
+ log.Debug("Device deleted stoping the read indication thread")
+ break
+ }
+ continue
}
- continue
- }
- if err != nil {
- log.Infow("Failed to read from indications", log.Fields{"err": err})
- if dh.adminState == "deleted" {
- log.Debug("Device deleted stoping the read indication thread")
- break
+ // Reset backoff if we have a successful receive
+ indicationBackoff.Reset()
+ dh.lockDevice.RLock()
+ adminState := dh.adminState
+ dh.lockDevice.RUnlock()
+ // When OLT is admin down, ignore all indications.
+ if adminState == "down" && !isIndicationAllowedDuringOltAdminDown(indication) {
+ log.Debugw("olt is admin down, ignore indication", log.Fields{"indication": indication})
+ continue
}
- dh.transitionMap.Handle(ctx, DeviceDownInd)
- dh.transitionMap.Handle(ctx, DeviceInit)
- return olterrors.NewErrCommunication("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err)
+ dh.handleIndication(ctx, indication)
}
- // Reset backoff if we have a successful receive
- indicationBackoff.Reset()
- dh.lockDevice.RLock()
- adminState := dh.adminState
- dh.lockDevice.RUnlock()
- // When OLT is admin down, ignore all indications.
- if adminState == "down" {
-
- log.Infow("olt is admin down, ignore indication", log.Fields{"indication": indication})
- continue
- }
- dh.handleIndication(ctx, indication)
-
}
- return nil
+}
+
+// isIndicationAllowedDuringOltAdminDown returns true if the indication is allowed during OLT Admin down, else false
+func isIndicationAllowedDuringOltAdminDown(indication *oop.Indication) bool {
+ switch indication.Data.(type) {
+ case *oop.Indication_OltInd, *oop.Indication_IntfInd, *oop.Indication_IntfOperInd:
+ return true
+
+ default:
+ return false
+ }
}
func (dh *DeviceHandler) handleOltIndication(ctx context.Context, oltIndication *oop.OltIndication) error {
@@ -477,14 +493,9 @@
}
cloned := proto.Clone(device).(*voltha.Device)
- // Update the all ports state on that device to disable
- if err = dh.coreProxy.PortsStateUpdate(ctx, cloned.Id, voltha.OperStatus_UNKNOWN); err != nil {
- return olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": device.Id}, err)
- }
//Update the device oper state and connection status
cloned.OperStatus = voltha.OperStatus_UNKNOWN
- cloned.ConnectStatus = common.ConnectStatus_UNREACHABLE
dh.device = cloned
if err = dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
@@ -563,6 +574,12 @@
if err != nil {
olterrors.NewErrAdapter("olt-disable-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
+ // We should still go ahead an initialize various device handler modules so that when OLT is re-enabled, we have
+ // all the modules initialized and ready to handle incoming ONUs.
+
+ if err := dh.initializeDeviceHandlerModules(ctx); err != nil {
+ olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+ }
// Start reading indications
go func() {
@@ -573,11 +590,6 @@
return nil
}
- deviceInfo, err := dh.populateDeviceInfo()
- if err != nil {
- return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
- }
-
device, err := dh.coreProxy.GetDevice(context.TODO(), dh.device.Id, dh.device.Id)
if err != nil || device == nil {
/*TODO: needs to handle error scenarios */
@@ -588,20 +600,9 @@
return olterrors.NewErrAdapter("port-status-update-failed", log.Fields{"device": device}, err)
}
- KVStoreHostPort := fmt.Sprintf("%s:%d", dh.openOLT.KVStoreHost, dh.openOLT.KVStorePort)
- // Instantiate resource manager
- if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
+ if err := dh.initializeDeviceHandlerModules(ctx); err != nil {
+ olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
- // Instantiate flow manager
- if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
- return olterrors.ErrResourceManagerInstantiating
- }
- /* TODO: Instantiate Alarm , stats , BW managers */
- /* Instantiating Event Manager to handle Alarms and KPIs */
- dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
- // Stats config for new device
- dh.portStats = NewOpenOltStatsMgr(dh)
// Start reading indications
go func() {
@@ -612,6 +613,34 @@
return nil
}
+func (dh *DeviceHandler) initializeDeviceHandlerModules(ctx context.Context) error {
+ deviceInfo, err := dh.populateDeviceInfo()
+
+ if err != nil {
+ return olterrors.NewErrAdapter("populate-device-info-failed", log.Fields{"device-id": dh.device.Id}, err)
+ }
+ KVStoreHostPort := fmt.Sprintf("%s:%d", dh.openOLT.KVStoreHost, dh.openOLT.KVStorePort)
+ // Instantiate resource manager
+ if dh.resourceMgr = rsrcMgr.NewResourceMgr(ctx, dh.deviceID, KVStoreHostPort, dh.openOLT.KVStoreType, dh.deviceType, deviceInfo); dh.resourceMgr == nil {
+ return olterrors.ErrResourceManagerInstantiating
+ }
+
+ // Instantiate flow manager
+ if dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr); dh.flowMgr == nil {
+ return olterrors.ErrResourceManagerInstantiating
+
+ }
+ /* TODO: Instantiate Alarm , stats , BW managers */
+ /* Instantiating Event Manager to handle Alarms and KPIs */
+ dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
+
+ // Stats config for new device
+ dh.portStats = NewOpenOltStatsMgr(dh)
+
+ return nil
+
+}
+
func (dh *DeviceHandler) populateDeviceInfo() (*oop.DeviceInfo, error) {
var err error
var deviceInfo *oop.DeviceInfo
@@ -701,7 +730,7 @@
}
go startCollector(dh)
- go startHeartbeatCheck(dh)
+ go startHeartbeatCheck(ctx, dh)
}
//GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -1461,6 +1490,30 @@
This clears up flow data and also resource map data for various
other pon resources like alloc_id and gemport_id
*/
+ go dh.cleanupDeviceResources(ctx)
+ log.Debug("Removed-device-from-Resource-manager-KV-store")
+ // Stop the Stats collector
+ dh.stopCollector <- true
+ // stop the heartbeat check routine
+ dh.stopHeartbeatCheck <- true
+ //Reset the state
+ if dh.Client != nil {
+ if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
+ return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.deviceID}, err).Log()
+ }
+ }
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+ if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
+ return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{
+ "device-id": device.Id,
+ "connect-status": cloned.ConnectStatus,
+ "oper-status": cloned.OperStatus}, err).Log()
+ }
+ return nil
+}
+func (dh *DeviceHandler) cleanupDeviceResources(ctx context.Context) error {
if dh.resourceMgr != nil {
noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
var ponPort uint32
@@ -1509,26 +1562,12 @@
return true
})
- log.Debug("Removed-device-from-Resource-manager-KV-store")
- // Stop the Stats collector
- dh.stopCollector <- true
- // stop the heartbeat check routine
- dh.stopHeartbeatCheck <- true
- //Reset the state
- if dh.Client != nil {
- if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
- return olterrors.NewErrAdapter("olt-reboot-failed", log.Fields{"device-id": dh.deviceID}, err)
- }
- }
- cloned := proto.Clone(device).(*voltha.Device)
- cloned.OperStatus = voltha.OperStatus_UNKNOWN
- cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
- if err := dh.coreProxy.DeviceStateUpdate(ctx, cloned.Id, cloned.ConnectStatus, cloned.OperStatus); err != nil {
- return olterrors.NewErrAdapter("device-state-update-failed", log.Fields{
- "device-id": device.Id,
- "connect-status": cloned.ConnectStatus,
- "oper-status": cloned.OperStatus}, err)
- }
+ /*Delete discovered ONU map for the device*/
+ dh.discOnus.Range(func(key interface{}, value interface{}) bool {
+ dh.discOnus.Delete(key)
+ return true
+ })
+
return nil
}
@@ -1660,7 +1699,7 @@
return "" + strconv.Itoa(int(intfID)) + "." + strconv.Itoa(int(onuID))
}
-func startHeartbeatCheck(dh *DeviceHandler) {
+func startHeartbeatCheck(ctx context.Context, dh *DeviceHandler) {
// start the heartbeat check towards the OLT.
var timerCheck *time.Timer
@@ -1668,25 +1707,17 @@
heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
select {
case <-heartbeatTimer.C:
- ctx, cancel := context.WithTimeout(context.Background(), dh.openOLT.GrpcTimeoutInterval)
- if heartBeat, err := dh.Client.HeartbeatCheck(ctx, new(oop.Empty)); err != nil {
+ ctxWithTimeout, cancel := context.WithTimeout(context.Background(), dh.openOLT.GrpcTimeoutInterval)
+ if heartBeat, err := dh.Client.HeartbeatCheck(ctxWithTimeout, new(oop.Empty)); err != nil {
log.Error("Hearbeat failed")
if timerCheck == nil {
// start a after func, when expired will update the state to the core
- timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, dh.updateStateUnreachable)
+ timerCheck = time.AfterFunc(dh.openOLT.HeartbeatFailReportInterval, func() { dh.updateStateUnreachable(ctx) })
}
} else {
if timerCheck != nil {
if timerCheck.Stop() {
log.Debug("We got hearbeat within the timeout")
- } else {
-
- log.Debug("We got hearbeat after the timeout expired, changing the states")
- go dh.notifyChildDevices("up")
- if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
- voltha.OperStatus_ACTIVE); err != nil {
- log.Errorw("Failed to update device state", log.Fields{"deviceID": dh.device.Id, "error": err})
- }
}
timerCheck = nil
}
@@ -1700,11 +1731,24 @@
}
}
-func (dh *DeviceHandler) updateStateUnreachable() {
+func (dh *DeviceHandler) updateStateUnreachable(ctx context.Context) {
+ device, err := dh.coreProxy.GetDevice(ctx, dh.device.Id, dh.device.Id)
+ if err != nil || device == nil {
+ olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
+ }
- go dh.notifyChildDevices("unreachable")
- if err := dh.coreProxy.DeviceStateUpdate(context.TODO(), dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
- olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+ if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
+ if err = dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_UNREACHABLE, voltha.OperStatus_UNKNOWN); err != nil {
+ olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+ }
+ if err = dh.coreProxy.PortsStateUpdate(ctx, dh.device.Id, voltha.OperStatus_UNKNOWN); err != nil {
+ olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
+ }
+ go dh.cleanupDeviceResources(ctx)
+
+ dh.stopIndications <- true
+ dh.transitionMap.Handle(ctx, DeviceInit)
+
}
}
diff --git a/internal/pkg/core/olt_state_transitions.go b/internal/pkg/core/olt_state_transitions.go
index c10d17d..c249e2e 100644
--- a/internal/pkg/core/olt_state_transitions.go
+++ b/internal/pkg/core/olt_state_transitions.go
@@ -91,7 +91,7 @@
// In doInit establish the grpc session
transitionMap.transitions[DeviceInit] =
Transition{
- previousState: []DeviceState{deviceStateNull, deviceStateDown},
+ previousState: []DeviceState{deviceStateNull, deviceStateUp, deviceStateDown},
currentState: deviceStateInit,
before: []TransitionHandler{dh.doStateInit},
after: []TransitionHandler{dh.postInit}}