changes for olt reconnect , reboot redesing
Change-Id: I192c01f7672b62956b4b55e19b8cd0d33ca7cfae
fixes for device state handling
Change-Id: I6765b7b53b2c130c70ac37cad28373cec7397908
olt reconnect, reboot redesign changes
Change-Id: I2a4981bc815d0961ffbf7e36ba7cfb06243e8319
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 2d3c7c5..500dbf1 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -21,6 +21,7 @@
"context"
"encoding/binary"
"encoding/hex"
+ "encoding/json"
"errors"
"fmt"
"net"
@@ -72,6 +73,7 @@
oltPortInfoTimeout = 3
defaultPortSpeedMbps = 1000
+ heartbeatPath = "heartbeat"
)
//DeviceHandler will interact with the OLT device.
@@ -93,6 +95,7 @@
groupMgr *OpenOltGroupMgr
eventMgr *OpenOltEventMgr
resourceMgr []*rsrcMgr.OpenOltResourceMgr
+ kvStore *db.Backend // backend kv store connection handle
deviceInfo *oop.DeviceInfo
@@ -120,6 +123,7 @@
agentPreviouslyConnected bool
isDeviceDeletionInProgress bool
+ heartbeatSignature uint32
}
//OnuDevice represents ONU related info
@@ -185,6 +189,7 @@
//NewDeviceHandler creates a new device handler
func NewDeviceHandler(cc *vgrpc.Client, ep eventif.EventProxy, device *voltha.Device, adapter *OpenOLT, cm *config.ConfigManager, cfg *conf.AdapterFlags) *DeviceHandler {
var dh DeviceHandler
+ ctx := context.Background()
dh.cm = cm
dh.coreClient = cc
dh.EventProxy = ep
@@ -201,6 +206,13 @@
dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
dh.childAdapterClients = make(map[string]*vgrpc.Client)
dh.cfg = cfg
+ kvStoreDevicePath := fmt.Sprintf(dh.cm.Backend.PathPrefix, "/%s/", dh.device.Id)
+ dh.kvStore = SetKVClient(ctx, dh.openOLT.KVStoreType, dh.openOLT.KVStoreAddress, dh.device.Id, kvStoreDevicePath)
+ if dh.kvStore == nil {
+ logger.Error(ctx, "Failed to setup KV store")
+ return nil
+ }
+
// Create a slice of buffered channels for handling concurrent mcast flow/group.
dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
@@ -219,6 +231,33 @@
return &dh
}
+func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
+ logger.Infow(ctx, "kv-store-type", log.Fields{"store": storeType})
+ switch storeType {
+ case "etcd":
+ return kvstore.NewEtcdClient(ctx, address, timeout, log.FatalLevel)
+ }
+ return nil, errors.New("unsupported-kv-store")
+}
+
+// SetKVClient sets the KV client and return a kv backend
+func SetKVClient(ctx context.Context, backend string, addr string, DeviceID string, basePathKvStore string) *db.Backend {
+ kvClient, err := newKVClient(ctx, backend, addr, rsrcMgr.KvstoreTimeout)
+ if err != nil {
+ logger.Fatalw(ctx, "Failed to init KV client\n", log.Fields{"err": err})
+ return nil
+ }
+
+ kvbackend := &db.Backend{
+ Client: kvClient,
+ StoreType: backend,
+ Address: addr,
+ Timeout: rsrcMgr.KvstoreTimeout,
+ PathPrefix: fmt.Sprintf(rsrcMgr.BasePathKvStore, basePathKvStore, DeviceID)}
+
+ return kvbackend
+}
+
// start save the device to the data model
func (dh *DeviceHandler) start(ctx context.Context) {
dh.lockDevice.Lock()
@@ -744,18 +783,6 @@
raisedTs := time.Now().Unix()
go dh.eventMgr.oltCommunicationEvent(ctx, dh.device, raisedTs)
- //check adapter and agent reconcile status
- //reboot olt if needed (olt disconnection case)
- if dh.adapterPreviouslyConnected != dh.agentPreviouslyConnected {
- logger.Warnw(ctx, "different-reconcile-status-between-adapter-and-agent-rebooting-device",
- log.Fields{
- "device-id": dh.device.Id,
- "adapter-status": dh.adapterPreviouslyConnected,
- "agent-status": dh.agentPreviouslyConnected,
- })
- _ = dh.RebootDevice(ctx, dh.device)
- }
-
return nil
}
@@ -839,6 +866,7 @@
}
}
+ logger.Debugw(ctx, "Dailing grpc", log.Fields{"device-id": dh.device.Id})
// Use Interceptors to automatically inject and publish Open Tracing Spans by this GRPC client
dh.clientCon, err = grpc.Dial(dh.device.GetHostAndPort(),
grpc.WithInsecure(),
@@ -905,13 +933,6 @@
return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
- // Start reading indications
- go func() {
- if err = dh.readIndications(ctx); err != nil {
- _ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
- }
- }()
-
go startHeartbeatCheck(ctx, dh)
return nil
@@ -931,12 +952,6 @@
return olterrors.NewErrAdapter("device-handler-initialization-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
- // Start reading indications
- go func() {
- if err := dh.readIndications(ctx); err != nil {
- _ = olterrors.NewErrAdapter("read-indications-failure", log.Fields{"device-id": dh.device.Id}, err).Log()
- }
- }()
go dh.updateLocalDevice(ctx)
if device.PmConfigs != nil {
@@ -1368,13 +1383,88 @@
return nil
}
-func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication) error {
+//getChildDevice function can be used in general to get child device, if not found in cache the function will
+//get from core and update the cache and return the child device.
+func (dh *DeviceHandler) getChildDevice(ctx context.Context, sn string, parentPortNo uint32) *OnuDevice {
+ var InCacheOnuDev *OnuDevice
+ dh.onus.Range(func(Onukey interface{}, onuInCache interface{}) bool {
+ if onuInCache.(*OnuDevice).serialNumber == sn {
+ InCacheOnuDev = onuInCache.(*OnuDevice)
+ return false
+ }
+ return true
+ })
+ //Got the onu device from cache return
+ if InCacheOnuDev != nil {
+ logger.Debugw(ctx, "Got child device from cache", log.Fields{"onudev": InCacheOnuDev.serialNumber})
+ return InCacheOnuDev
+ }
+ onuDevice, _ := dh.getChildDeviceFromCore(ctx, &ca.ChildDeviceFilter{
+ ParentId: dh.device.Id,
+ SerialNumber: sn,
+ ParentPortNo: parentPortNo,
+ })
+ //No device found in core return nil
+ if onuDevice == nil {
+ return nil
+ }
+ onuID := onuDevice.ProxyAddress.OnuId
+ intfID := plt.PortNoToIntfID(parentPortNo, voltha.Port_PON_OLT)
+ onuKey := dh.formOnuKey(intfID, onuID)
+
+ onuDev := NewOnuDevice(onuDevice.Id, onuDevice.Type, onuDevice.SerialNumber, onuDevice.ProxyAddress.OnuId, intfID, onuDevice.ProxyAddress.DeviceId, false, onuDevice.AdapterEndpoint)
+ dh.onus.Store(onuKey, onuDev)
+ logger.Debugw(ctx, "got child device from core", log.Fields{"onudev": onuDevice})
+ return onuDev
+}
+
+func (dh *DeviceHandler) checkForResourceExistance(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) (bool, error) {
channelID := onuDiscInd.GetIntfId()
parentPortNo := plt.IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
+ tpInstExists := false
- sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
- logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
+ //CheckOnuDevExistenceAtOnuDiscovery if true , a check will be made for the existence of the onu device. If the onu device
+ // still exists , the onu discovery will be ignored, else a check for active techprofiles for ONU is checked.
+ if !dh.openOLT.CheckOnuDevExistenceAtOnuDiscovery {
+ onuDev := dh.getChildDevice(ctx, sn, parentPortNo)
+ if onuDev != nil {
+ var onuGemInfo *rsrcMgr.OnuGemInfo
+ var err error
+ if onuGemInfo, err = dh.resourceMgr[channelID].GetOnuGemInfo(ctx, onuDev.onuID); err != nil {
+ logger.Warnw(ctx, "Unable to find onuGemInfo", log.Fields{"onuID": onuDev.onuID})
+ return false, err
+ }
+ if onuGemInfo != nil {
+ for _, uni := range onuGemInfo.UniPorts {
+ uniID := plt.UniIDFromPortNum(uni)
+ tpIDs := dh.resourceMgr[channelID].GetTechProfileIDForOnu(ctx, onuDev.onuID, uniID)
+ if len(tpIDs) != 0 {
+ logger.Warnw(ctx, "Techprofile present for ONU, ignoring onu discovery", log.Fields{"onuID": onuDev.onuID})
+ tpInstExists = true
+ break
+ }
+ }
+ }
+ }
+ return tpInstExists, nil
+ }
+ onuDevice, _ := dh.getChildDeviceFromCore(ctx, &ca.ChildDeviceFilter{
+ ParentId: dh.device.Id,
+ SerialNumber: sn,
+ ParentPortNo: parentPortNo,
+ })
+ if onuDevice != nil {
+ logger.Infow(ctx, "Child device still present ignoring discovery indication", log.Fields{"sn": sn})
+ return true, nil
+ }
+ logger.Infow(ctx, "No device present in core , continuing with discovery", log.Fields{"sn": sn})
+
+ return false, nil
+
+}
+
+func (dh *DeviceHandler) processDiscONULOSClear(ctx context.Context, onuDiscInd *oop.OnuDiscIndication, sn string) bool {
var alarmInd oop.OnuAlarmIndication
raisedTs := time.Now().Unix()
if _, loaded := dh.discOnus.LoadOrStore(sn, true); loaded {
@@ -1405,9 +1495,29 @@
})
logger.Warnw(ctx, "onu-sn-is-already-being-processed", log.Fields{"sn": sn})
+ return true
+ }
+ return false
+}
+
+func (dh *DeviceHandler) onuDiscIndication(ctx context.Context, onuDiscInd *oop.OnuDiscIndication) error {
+ channelID := onuDiscInd.GetIntfId()
+ parentPortNo := plt.IntfIDToPortNo(onuDiscInd.GetIntfId(), voltha.Port_PON_OLT)
+
+ sn := dh.stringifySerialNumber(onuDiscInd.SerialNumber)
+ logger.Infow(ctx, "new-discovery-indication", log.Fields{"sn": sn})
+
+ tpInstExists, errtp := dh.checkForResourceExistance(ctx, onuDiscInd, sn)
+ if errtp != nil {
+ return errtp
+ }
+ if tpInstExists {
+ //ignore the discovery if tpinstance is present.
return nil
}
-
+ if onuBeingProcessed := dh.processDiscONULOSClear(ctx, onuDiscInd, sn); onuBeingProcessed {
+ return nil
+ }
var onuID uint32
// check the ONU is already know to the OLT
@@ -2283,6 +2393,7 @@
// start the heartbeat check towards the OLT.
var timerCheck *time.Timer
+ dh.heartbeatSignature = dh.getHeartbeatSignature(ctx)
for {
heartbeatTimer := time.NewTimer(dh.openOLT.HeartbeatCheckInterval)
@@ -2302,9 +2413,36 @@
}
timerCheck = nil
}
- logger.Debugw(ctx, "heartbeat",
- log.Fields{"signature": heartBeat,
- "device-id": dh.device.Id})
+ if dh.heartbeatSignature == 0 || dh.heartbeatSignature == heartBeat.HeartbeatSignature {
+ if dh.heartbeatSignature == 0 {
+ // First time the signature will be 0, update the signture to DB when not found.
+ dh.updateHeartbeatSignature(ctx, heartBeat.HeartbeatSignature)
+ dh.heartbeatSignature = heartBeat.HeartbeatSignature
+ }
+ logger.Infow(ctx, "heartbeat signature", log.Fields{"sign": dh.heartbeatSignature})
+
+ dh.lockDevice.RLock()
+ // Stop the read indication only if it the routine is active
+ // The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
+ // Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
+ // on next execution of the readIndication routine.
+ if !dh.isReadIndicationRoutineActive {
+ // Start reading indications
+ go func() {
+ if err = dh.readIndications(ctx); err != nil {
+ _ = olterrors.NewErrAdapter("indication-read-failure", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+ }
+ }()
+ }
+ dh.lockDevice.RUnlock()
+
+ } else {
+ logger.Warn(ctx, "Heartbeat signature changed, OLT is rebooted. Cleaningup resources.")
+ dh.updateHeartbeatSignature(ctx, heartBeat.HeartbeatSignature)
+ dh.heartbeatSignature = heartBeat.HeartbeatSignature
+ go dh.updateStateRebooted(ctx)
+ }
+
}
cancel()
case <-dh.stopHeartbeatCheck:
@@ -2327,7 +2465,7 @@
return
}
- logger.Debugw(ctx, "update-state-unreachable", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
+ logger.Warnw(ctx, "update-state-unreachable", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
"admin-state": device.AdminState, "oper-status": device.OperStatus})
if device.ConnectStatus == voltha.ConnectStatus_REACHABLE {
if err = dh.updateDeviceStateInCore(ctx, &ca.DeviceStateFilter{
@@ -2337,14 +2475,15 @@
}); err != nil {
_ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
}
-
- if err = dh.updatePortsStateInCore(ctx, &ca.PortStateFilter{
- DeviceId: dh.device.Id,
- PortTypeFilter: 0,
- OperStatus: voltha.OperStatus_UNKNOWN,
- }); err != nil {
- _ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
- }
+ /*
+ if err = dh.updatePortsStateInCore(ctx, &ca.PortStateFilter{
+ DeviceId: dh.device.Id,
+ PortTypeFilter: 0,
+ OperStatus: voltha.OperStatus_UNKNOWN,
+ }); err != nil {
+ _ = olterrors.NewErrAdapter("port-update-failed", log.Fields{"device-id": dh.device.Id}, err).Log()
+ }
+ */
//raise olt communication failure event
raisedTs := time.Now().Unix()
@@ -2354,12 +2493,10 @@
dh.device = cloned // update local copy of the device
go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
- dh.cleanupDeviceResources(ctx)
// Stop the Stats collector
dh.stopCollector <- true
// stop the heartbeat check routine
dh.stopHeartbeatCheck <- true
-
dh.lockDevice.RLock()
// Stop the read indication only if it the routine is active
// The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
@@ -2369,28 +2506,94 @@
dh.stopIndications <- true
}
dh.lockDevice.RUnlock()
-
- var wg sync.WaitGroup
- wg.Add(1) // for the multicast handler routine
- go dh.StopAllMcastHandlerRoutines(ctx, &wg)
- for _, flMgr := range dh.flowMgr {
- wg.Add(1) // for the flow handler routine
- go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
- }
- if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
- logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
- } else {
- logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
- }
-
- //reset adapter reconcile flag
- dh.adapterPreviouslyConnected = false
-
dh.transitionMap.Handle(ctx, DeviceInit)
}
}
+func (dh *DeviceHandler) updateStateRebooted(ctx context.Context) {
+ device, err := dh.getDeviceFromCore(ctx, dh.device.Id)
+ if err != nil || device == nil {
+ // One case where we have seen core returning an error for GetDevice call is after OLT device delete.
+ // After OLT delete, the adapter asks for OLT to reboot. When OLT is rebooted, shortly we loose heartbeat.
+ // The 'startHeartbeatCheck' then asks the device to be marked unreachable towards the core, but the core
+ // has already deleted the device and returns error. In this particular scenario, it is Ok because any necessary
+ // cleanup in the adapter was already done during DeleteDevice API handler routine.
+ _ = olterrors.NewErrNotFound("device", log.Fields{"device-id": dh.device.Id}, err).Log()
+ // Immediately return, otherwise accessing a null 'device' struct would cause panic
+ return
+ }
+
+ logger.Warnw(ctx, "update-state-rebooted", log.Fields{"device-id": dh.device.Id, "connect-status": device.ConnectStatus,
+ "admin-state": device.AdminState, "oper-status": device.OperStatus, "conn-status": voltha.ConnectStatus_UNREACHABLE})
+ if err = dh.updateDeviceStateInCore(ctx, &ca.DeviceStateFilter{
+ DeviceId: dh.device.Id,
+ OperStatus: voltha.OperStatus_REBOOTED,
+ ConnStatus: voltha.ConnectStatus_REACHABLE,
+ }); err != nil {
+ _ = olterrors.NewErrAdapter("device-state-update-failed", log.Fields{"device-id": dh.device.Id}, err).LogAt(log.ErrorLevel)
+ }
+
+ dh.lockDevice.RLock()
+ // Stop the read indication only if it the routine is active
+ // The read indication would have already stopped due to failure on the gRPC stream following OLT going unreachable
+ // Sending message on the 'stopIndication' channel again will cause the readIndication routine to immediately stop
+ // on next execution of the readIndication routine.
+ if dh.isReadIndicationRoutineActive {
+ dh.stopIndications <- true
+ }
+ dh.lockDevice.RUnlock()
+
+ //raise olt communication failure event
+ raisedTs := time.Now().Unix()
+ cloned := proto.Clone(device).(*voltha.Device)
+ cloned.ConnectStatus = voltha.ConnectStatus_UNREACHABLE
+ cloned.OperStatus = voltha.OperStatus_UNKNOWN
+ dh.device = cloned // update local copy of the device
+ go dh.eventMgr.oltCommunicationEvent(ctx, cloned, raisedTs)
+
+ dh.cleanupDeviceResources(ctx)
+ // Stop the Stats collector
+ dh.stopCollector <- true
+ // stop the heartbeat check routine
+ dh.stopHeartbeatCheck <- true
+
+ var wg sync.WaitGroup
+ wg.Add(1) // for the multicast handler routine
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+ for _, flMgr := range dh.flowMgr {
+ wg.Add(1) // for the flow handler routine
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+ }
+
+ //reset adapter reconcile flag
+ dh.adapterPreviouslyConnected = false
+ for {
+
+ childDevices, err := dh.getChildDevicesFromCore(ctx, dh.device.Id)
+ if err != nil || childDevices == nil {
+ logger.Errorw(ctx, "Failed to get child devices from core", log.Fields{"deviceID": dh.device.Id})
+ continue
+ }
+ if len(childDevices.Items) == 0 {
+ logger.Infow(ctx, "All childDevices cleared from core, proceed with device init", log.Fields{"deviceID": dh.device.Id})
+ break
+ } else {
+ logger.Warn(ctx, "Not all child devices are cleared, continuing to wait")
+ time.Sleep(5 * time.Second)
+ }
+
+ }
+ logger.Infow(ctx, "cleanup complete after reboot , moving to init", log.Fields{"deviceID": device.Id})
+ dh.transitionMap.Handle(ctx, DeviceInit)
+
+}
+
// EnablePort to enable Pon interface
func (dh *DeviceHandler) EnablePort(ctx context.Context, port *voltha.Port) error {
logger.Debugw(ctx, "enable-port", log.Fields{"Device": dh.device, "port": port})
@@ -3368,3 +3571,34 @@
return false // timed out
}
}
+
+func (dh *DeviceHandler) updateHeartbeatSignature(ctx context.Context, signature uint32) {
+ val, err := json.Marshal(signature)
+ if err != nil {
+ logger.Error(ctx, "failed-to-marshal")
+ return
+ }
+ if err = dh.kvStore.Put(ctx, heartbeatPath, val); err != nil {
+ logger.Error(ctx, "failed-to-store-hearbeat-signature")
+ }
+}
+
+func (dh *DeviceHandler) getHeartbeatSignature(ctx context.Context) uint32 {
+ var signature uint32
+
+ Value, er := dh.kvStore.Get(ctx, heartbeatPath)
+ if er == nil {
+ if Value != nil {
+ Val, er := kvstore.ToByte(Value.Value)
+ if er != nil {
+ logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"err": er})
+ return signature
+ }
+ if er = json.Unmarshal(Val, &signature); er != nil {
+ logger.Error(ctx, "Failed to unmarshal signature", log.Fields{"err": er})
+ return signature
+ }
+ }
+ }
+ return signature
+}