[VOL-4633] openonuAdapterGo - multiple OLTs: reconcile for disabled ONU-devices in TT workflow fails sporadically
Change-Id: I3431d47847d4030f60697e69b7413e76d8ad97cf
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 478df56..3aa6835 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -46,6 +46,13 @@
"github.com/opencord/voltha-protos/v4/go/voltha"
)
+const (
+ //constants for reconcile flow check channel
+ cWaitReconcileFlowAbortOnSuccess = 0xFFFD
+ cWaitReconcileFlowAbortOnError = 0xFFFE
+ cWaitReconcileFlowNoActivity = 0xFFFF
+)
+
// Constants for timeouts
const (
cTimeOutRemoveUpgrade = 1 //for usage in seconds
@@ -173,6 +180,11 @@
cSkipOnuConfigReconciling
)
+//WaitGroupWithTimeOut definitions to have waitGroup functionality with timeout
+type WaitGroupWithTimeOut struct {
+ sync.WaitGroup
+}
+
//deviceHandler will interact with the ONU ? device.
type deviceHandler struct {
deviceID string
@@ -215,33 +227,37 @@
//discOnus sync.Map
//onus sync.Map
//portStats *OpenOltStatisticsMgr
- collectorIsRunning bool
- mutexCollectorFlag sync.RWMutex
- stopCollector chan bool
- alarmManagerIsRunning bool
- mutextAlarmManagerFlag sync.RWMutex
- stopAlarmManager chan bool
- stopHeartbeatCheck chan bool
- uniEntityMap map[uint32]*onuUniPort
- mutexKvStoreContext sync.Mutex
- lockVlanConfig sync.RWMutex
- lockVlanAdd sync.RWMutex
- UniVlanConfigFsmMap map[uint8]*UniVlanConfigFsm
- lockUpgradeFsm sync.RWMutex
- pOnuUpradeFsm *OnuUpgradeFsm
- upgradeCanceled bool
- reconciling uint8
- mutexReconcilingFlag sync.RWMutex
- chReconcilingFinished chan bool //channel to indicate that reconciling has been finished
- reconcilingFlows bool
- mutexReconcilingFlowsFlag sync.RWMutex
- chReconcilingFlowsFinished chan bool //channel to indicate that reconciling of flows has been finished
- mutexReadyForOmciConfig sync.RWMutex
- readyForOmciConfig bool
- deletionInProgress bool
- mutexDeletionInProgressFlag sync.RWMutex
- pLastUpgradeImageState *voltha.ImageState
- upgradeFsmChan chan struct{}
+ collectorIsRunning bool
+ mutexCollectorFlag sync.RWMutex
+ stopCollector chan bool
+ alarmManagerIsRunning bool
+ mutextAlarmManagerFlag sync.RWMutex
+ stopAlarmManager chan bool
+ stopHeartbeatCheck chan bool
+ uniEntityMap map[uint32]*onuUniPort
+ mutexKvStoreContext sync.Mutex
+ lockVlanConfig sync.RWMutex
+ lockVlanAdd sync.RWMutex
+ UniVlanConfigFsmMap map[uint8]*UniVlanConfigFsm
+ lockUpgradeFsm sync.RWMutex
+ pOnuUpradeFsm *OnuUpgradeFsm
+ upgradeCanceled bool
+ reconciling uint8
+ mutexReconcilingFlag sync.RWMutex
+ reconcilingFirstPass bool
+ mutexReconcilingFirstPassFlag sync.RWMutex
+ reconcilingReasonUpdate bool
+ mutexReconcilingReasonUpdate sync.RWMutex
+ chUniVlanConfigReconcilingDone chan uint16 //channel to indicate that VlanConfig reconciling for a specific UNI has been finished
+ chReconcilingFinished chan bool //channel to indicate that reconciling has been finished
+ reconcileExpiryComplete time.Duration
+ reconcileExpiryVlanConfig time.Duration
+ mutexReadyForOmciConfig sync.RWMutex
+ readyForOmciConfig bool
+ deletionInProgress bool
+ mutexDeletionInProgressFlag sync.RWMutex
+ pLastUpgradeImageState *voltha.ImageState
+ upgradeFsmChan chan struct{}
}
//newDeviceHandler creates a new device handler
@@ -272,9 +288,18 @@
dh.lockUpgradeFsm = sync.RWMutex{}
dh.UniVlanConfigFsmMap = make(map[uint8]*UniVlanConfigFsm)
dh.reconciling = cNoReconciling
+ dh.reconcilingReasonUpdate = false
+ dh.reconcilingFirstPass = true
dh.chReconcilingFinished = make(chan bool)
- dh.reconcilingFlows = false
- dh.chReconcilingFlowsFinished = make(chan bool)
+ dh.reconcileExpiryComplete = adapter.maxTimeoutReconciling //assumption is to have it as duration in s!
+ rECSeconds := int(dh.reconcileExpiryComplete / time.Second)
+ if rECSeconds < 2 {
+ dh.reconcileExpiryComplete = time.Duration(2) * time.Second //ensure a minimum expiry time of 2s for complete reconciling
+ rECSeconds = 2
+ }
+ rEVCSeconds := rECSeconds / 2
+ dh.reconcileExpiryVlanConfig = time.Duration(rEVCSeconds) * time.Second //set this duration to some according lower value
+
dh.readyForOmciConfig = false
dh.deletionInProgress = false
dh.pLastUpgradeImageState = &voltha.ImageState{
@@ -867,7 +892,7 @@
} else {
logger.Errorw(ctx, "reconciling - restoring OnuTp-data failed - abort", log.Fields{"err": err, "device-id": dh.deviceID})
}
- dh.stopReconciling(ctx, false)
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowNoActivity)
return
}
var onuIndication oop.OnuIndication
@@ -880,16 +905,16 @@
_ = dh.createInterface(ctx, &onuIndication)
}
-func (dh *deviceHandler) reconcileDeviceTechProf(ctx context.Context) {
+func (dh *deviceHandler) reconcileDeviceTechProf(ctx context.Context) bool {
logger.Debugw(ctx, "reconciling - trigger tech profile config", log.Fields{"device-id": dh.deviceID})
+ continueWithFlowConfig := false
+
pDevEntry := dh.getOnuDeviceEntry(ctx, true)
if pDevEntry == nil {
- logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, false)
- }
- return
+ logger.Errorw(ctx, "reconciling - no valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowNoActivity)
+ return continueWithFlowConfig
}
dh.pOnuTP.lockTpProcMutex()
defer dh.pOnuTP.unlockTpProcMutex()
@@ -900,10 +925,8 @@
pDevEntry.mutexPersOnuConfig.RUnlock()
logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, true)
- }
- return
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowNoActivity)
+ return continueWithFlowConfig
}
flowsFound := false
techProfsFound := false
@@ -934,14 +957,15 @@
techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
break outerLoop
}
+ continueWithFlowConfig = true // valid TP found - try flow configuration later
var tpInst tech_profile.TechProfileInstance
switch techTpInst := iaTechTpInst.TechTpInstance.(type) {
case *ic.InterAdapterTechProfileDownloadMessage_TpInstance: // supports only GPON, XGPON, XGS-PON
tpInst = *techTpInst.TpInstance
- logger.Debugw(ctx, "received-tp-instance-successfully-after-reconcile", log.Fields{
+ logger.Debugw(ctx, "reconciling - received-tp-instance-successfully-after-reconcile", log.Fields{
"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
default: // do not support epon or other tech
- logger.Errorw(ctx, "unsupported-tech-profile", log.Fields{
+ logger.Errorw(ctx, "reconciling - unsupported-tech-profile", log.Fields{
"tp-id": tpID, "tpPath": uniData.PersTpPathMap[tpID], "uni-id": uniData.PersUniID, "device-id": dh.deviceID})
techProfInstLoadFailed = true // stop loading tp instance as soon as we hit failure
break outerLoop
@@ -975,6 +999,8 @@
//had to move techProf/flow result evaluation into separate function due to SCA complexity limit
dh.updateReconcileStates(ctx, techProfsFound, techProfInstLoadFailed, flowsFound)
+
+ return continueWithFlowConfig
}
func (dh *deviceHandler) updateReconcileStates(ctx context.Context,
@@ -982,14 +1008,12 @@
if !abTechProfsFound {
logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, true)
- }
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowNoActivity)
return
}
if abTechProfInstLoadFailed {
dh.setDeviceReason(drTechProfileConfigDownloadFailed)
- dh.stopReconciling(ctx, false)
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowNoActivity)
return
} else if dh.isSkipOnuConfigReconciling() {
dh.setDeviceReason(drTechProfileConfigDownloadSuccess)
@@ -997,9 +1021,7 @@
if !abFlowsFound {
logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, true)
- }
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowNoActivity)
}
}
@@ -1008,10 +1030,8 @@
pDevEntry := dh.getOnuDeviceEntry(ctx, true)
if pDevEntry == nil {
- logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, false)
- }
+ logger.Errorw(ctx, "reconciling - no valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowNoActivity)
return
}
@@ -1020,12 +1040,13 @@
pDevEntry.mutexPersOnuConfig.RUnlock()
logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, true)
- }
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowNoActivity)
return
}
flowsFound := false
+ var uniVlanConfigEntries []uint8
+ var loWaitGroupWTO WaitGroupWithTimeOut
+
for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
//TODO: check for uni-port specific reconcilement in case of multi-uni-port-per-onu-support
if len(uniData.PersFlowParams) == 0 {
@@ -1050,49 +1071,20 @@
if uniPort, exist = dh.uniEntityMap[uniNo]; !exist {
logger.Errorw(ctx, "reconciling - onuUniPort data not found - terminate reconcilement",
log.Fields{"uniNo": uniNo, "device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, false)
- }
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowNoActivity)
+
return
}
- flowsFound = true
- lastFlowToReconcile := false
- flowsProcessed := 0
- dh.setReconcilingFlows(true)
- for _, flowData := range uniData.PersFlowParams {
- logger.Debugw(ctx, "reconciling - add flow with cookie slice", log.Fields{
- "device-id": dh.deviceID, "uni-id": uniData.PersUniID, "cookies": flowData.CookieSlice})
- // If this is the last flow for the device we need to announce it the waiting
- // chReconcilingFlowsFinished channel
- if flowsProcessed == len(uniData.PersFlowParams)-1 {
- lastFlowToReconcile = true
- }
- //the slice can be passed 'by value' here, - which internally passes its reference copy
- dh.lockVlanConfig.Lock()
- if _, exist = dh.UniVlanConfigFsmMap[uniData.PersUniID]; exist {
- if err := dh.UniVlanConfigFsmMap[uniData.PersUniID].SetUniFlowParams(ctx, flowData.VlanRuleParams.TpID,
- flowData.CookieSlice, uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
- uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter); err != nil {
- logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
- }
- } else {
- if err := dh.createVlanFilterFsm(ctx, uniPort, flowData.VlanRuleParams.TpID, flowData.CookieSlice,
- uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
- uint8(flowData.VlanRuleParams.SetPcp), OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter); err != nil {
- logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
- }
- }
- dh.lockVlanConfig.Unlock()
- flowsProcessed++
- } //for all flows of this UNI
+ //needed to split up function due to sca complexity
+ dh.updateReconcileFlowConfig(ctx, uniPort, uniData.PersFlowParams, uniVlanConfigEntries, &loWaitGroupWTO, &flowsFound)
+
logger.Debugw(ctx, "reconciling - flows processed", log.Fields{
- "device-id": dh.deviceID, "uni-id": uniData.PersUniID, "flowsProcessed": flowsProcessed,
+ "device-id": dh.deviceID, "uni-id": uniData.PersUniID,
"numUniFlows": dh.UniVlanConfigFsmMap[uniData.PersUniID].numUniFlows,
"configuredUniFlow": dh.UniVlanConfigFsmMap[uniData.PersUniID].configuredUniFlow})
// this can't be used as global finished reconciling flag because
// assumes is getting called before the state machines for the last flow is completed,
// while this is not guaranteed.
- //dh.setReconcilingFlows(false)
pDevEntry.mutexPersOnuConfig.RLock() //set protection again for loop test on sOnuPersistentData
} // for all UNI entries from sOnuPersistentData
pDevEntry.mutexPersOnuConfig.RUnlock()
@@ -1100,19 +1092,172 @@
if !flowsFound {
logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- if !dh.isSkipOnuConfigReconciling() {
- dh.stopReconciling(ctx, true)
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowNoActivity)
+ return
+ }
+ logger.Debugw(ctx, "reconciling flows - waiting on ready indication of requested UNIs", log.Fields{
+ "device-id": dh.deviceID, "expiry": dh.reconcileExpiryVlanConfig})
+ if executed := loWaitGroupWTO.WaitTimeout(dh.reconcileExpiryVlanConfig); executed {
+ logger.Debugw(ctx, "reconciling flows for all UNI's has been finished in time",
+ log.Fields{"device-id": dh.deviceID})
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowAbortOnSuccess)
+ if pDevEntry != nil {
+ pDevEntry.SendChReconcilingFlowsFinished(ctx, true)
+ }
+ } else {
+ logger.Errorw(ctx, "reconciling - timeout waiting for reconciling flows for all UNI's to be finished!",
+ log.Fields{"device-id": dh.deviceID})
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowAbortOnError)
+ if pDevEntry != nil {
+ pDevEntry.SendChReconcilingFlowsFinished(ctx, false)
}
return
}
- if dh.isSkipOnuConfigReconciling() {
- dh.setDeviceReason(drOmciFlowsPushed)
+ dh.setDeviceReason(drOmciFlowsPushed)
+}
+
+func (dh *deviceHandler) updateReconcileFlowConfig(ctx context.Context, apUniPort *onuUniPort,
+ aPersFlowParam []uniVlanFlowParams, aUniVlanConfigEntries []uint8,
+ apWaitGroup *WaitGroupWithTimeOut, apFlowsFound *bool) {
+ flowsProcessed := 0
+ lastFlowToReconcile := false
+ loUniID := apUniPort.uniID
+ for _, flowData := range aPersFlowParam {
+ if !(*apFlowsFound) {
+ *apFlowsFound = true
+ syncChannel := make(chan struct{})
+ // start go routine with select() on reconciling vlan config channel before
+ // starting vlan config reconciling process to prevent loss of any signal
+ // this routine just collects all the received 'flow-reconciled' signals - possibly from different UNI's
+ go dh.waitOnUniVlanConfigReconcilingReady(ctx, syncChannel, apWaitGroup)
+ //block until the wait routine is really blocked on channel input
+ // in order to prevent to early ready signal from VlanConfig processing
+ <-syncChannel
+ }
+ if flowsProcessed == len(aPersFlowParam)-1 {
+ var uniAdded bool
+ lastFlowToReconcile = true
+ if aUniVlanConfigEntries, uniAdded = dh.appendIfMissing(aUniVlanConfigEntries, loUniID); uniAdded {
+ apWaitGroup.Add(1) //increment the waiting group
+ }
+ }
+ logger.Debugw(ctx, "reconciling - add flow with cookie slice", log.Fields{
+ "device-id": dh.deviceID, "uni-id": loUniID,
+ "flowsProcessed": flowsProcessed, "cookies": flowData.CookieSlice})
+ dh.lockVlanConfig.Lock()
+ //the CookieSlice can be passed 'by value' here, - which internally passes its reference
+ if _, exist := dh.UniVlanConfigFsmMap[loUniID]; exist {
+ if err := dh.UniVlanConfigFsmMap[loUniID].SetUniFlowParams(ctx, flowData.VlanRuleParams.TpID,
+ flowData.CookieSlice, uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
+ uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter); err != nil {
+ logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
+ }
+ } else {
+ if err := dh.createVlanFilterFsm(ctx, apUniPort, flowData.VlanRuleParams.TpID, flowData.CookieSlice,
+ uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
+ uint8(flowData.VlanRuleParams.SetPcp), OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter); err != nil {
+ logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.deviceID})
+ }
+ }
+ dh.lockVlanConfig.Unlock()
+ flowsProcessed++
+ } //for all flows of this UNI
+}
+
+//waitOnUniVlanConfigReconcilingReady collects all VlanConfigReady signals from VlanConfig FSM processing in reconciling
+// and decrements the according handler wait group waiting for these indications
+func (dh *deviceHandler) waitOnUniVlanConfigReconcilingReady(ctx context.Context, aSyncChannel chan<- struct{},
+ waitGroup *WaitGroupWithTimeOut) {
+ var reconciledUniVlanConfigEntries []uint8
+ var appended bool
+ expiry := dh.GetReconcileExpiryVlanConfigAbort()
+ logger.Debugw(ctx, "start waiting on reconcile vlanConfig ready indications", log.Fields{
+ "device-id": dh.deviceID, "expiry": expiry})
+ // indicate blocking on channel now to the caller
+ aSyncChannel <- struct{}{}
+ cycle := 1
+ for {
+ logger.Debugw(ctx, "waiting on reconcile vlanConfig ready indications",
+ log.Fields{"device-id": dh.deviceID, "cycle": cycle})
+ cycle++
+ select {
+ case uniIndication := <-dh.chUniVlanConfigReconcilingDone:
+ switch uniIndication {
+ // no activity requested (should normally not be received) - just continue waiting
+ case cWaitReconcileFlowNoActivity:
+ logger.Debugw(ctx, "WaitReconcileFlow no activity",
+ log.Fields{"device-id": dh.deviceID, "rxEntries": reconciledUniVlanConfigEntries})
+ // waiting on channel inputs from VlanConfig for all UNI's to be aborted on error condition
+ case cWaitReconcileFlowAbortOnError:
+ logger.Debugw(ctx, "waitReconcileFlow aborted on error",
+ log.Fields{"device-id": dh.deviceID, "rxEntries": reconciledUniVlanConfigEntries})
+ return
+ // waiting on channel inputs from VlanConfig for all UNI's to be aborted on success condition
+ case cWaitReconcileFlowAbortOnSuccess:
+ logger.Debugw(ctx, "waitReconcileFlow aborted on success",
+ log.Fields{"device-id": dh.deviceID, "rxEntries": reconciledUniVlanConfigEntries})
+ return
+ // this should be a valid UNI vlan config done indication
+ default:
+ if uniIndication < maxUnisPerOnu {
+ logger.Debugw(ctx, "reconciling flows has been finished in time for this UNI",
+ log.Fields{"device-id": dh.deviceID, "uni-id": uniIndication})
+ if reconciledUniVlanConfigEntries, appended =
+ dh.appendIfMissing(reconciledUniVlanConfigEntries, uint8(uniIndication)); appended {
+ waitGroup.Done()
+ }
+ } else {
+ logger.Errorw(ctx, "received unexpected UNI flowConfig done indication - is ignored",
+ log.Fields{"device-id": dh.deviceID, "uni-id": uniIndication})
+ }
+ } //switch uniIndication
+
+ case <-time.After(expiry): //a bit longer than reconcileExpiryVlanConfig
+ logger.Errorw(ctx, "timeout waiting for reconciling all UNI flows to be finished!",
+ log.Fields{"device-id": dh.deviceID})
+ return
+ }
}
}
-func (dh *deviceHandler) reconcileEnd(ctx context.Context) {
- logger.Debugw(ctx, "reconciling - completed!", log.Fields{"device-id": dh.deviceID})
- dh.stopReconciling(ctx, true)
+func (dh *deviceHandler) GetReconcileExpiryVlanConfigAbort() time.Duration {
+ return dh.reconcileExpiryVlanConfig + (500 * time.Millisecond)
+}
+
+func (dh *deviceHandler) appendIfMissing(slice []uint8, val uint8) ([]uint8, bool) {
+ for _, ele := range slice {
+ if ele == val {
+ return slice, false
+ }
+ }
+ return append(slice, val), true
+}
+
+// sendChReconcileFinished - sends true or false on reconcileFinish channel
+func (dh *deviceHandler) sendChReconcileFinished(success bool) {
+ if dh != nil { //if the object still exists (might have been already deleted in background)
+ //use asynchronous channel sending to avoid stucking on non-waiting receiver
+ select {
+ case dh.chReconcilingFinished <- success:
+ default:
+ }
+ }
+}
+
+// SendChUniVlanConfigFinished - sends the Uni number on channel if the flow reconcilement for this UNI is finished
+func (dh *deviceHandler) SendChUniVlanConfigFinished(ctx context.Context, value uint16) {
+ if dh != nil { //if the object still exists (might have been already deleted in background)
+ logger.Debugw(ctx, "reconciling - send to chUniVlanConfigReconcilingDone", log.Fields{
+ "device-id": dh.deviceID, "value": value})
+
+ //use asynchronous channel sending to avoid stucking on non-waiting receiver
+ select {
+ case dh.chUniVlanConfigReconcilingDone <- value:
+ default:
+ logger.Infow(ctx, "reconciling - could not send to chUniVlanConfigReconcilingDone", log.Fields{
+ "device-id": dh.deviceID, "value": value})
+ }
+ }
}
func (dh *deviceHandler) deleteDevicePersistencyData(ctx context.Context) error {
@@ -1864,7 +2009,7 @@
pDevEntry.mutexPersOnuConfig.RUnlock()
logger.Debugw(ctx, "reconciling - uni-ports were not unlocked before adapter restart - resume with a normal start-up",
log.Fields{"device-id": dh.deviceID})
- dh.stopReconciling(ctx, true)
+ dh.stopReconciling(ctx, true, cWaitReconcileFlowNoActivity)
} else {
pDevEntry.mutexPersOnuConfig.RUnlock()
}
@@ -2108,6 +2253,8 @@
if pMibDlFsm != nil {
_ = pMibDlFsm.Event(dlEvReset)
}
+ //stop any deviceHandler reconcile processing (if running)
+ dh.stopReconciling(ctx, false, cWaitReconcileFlowAbortOnError)
//port lock/unlock FSM's may be active
if dh.pUnlockStateFsm != nil {
_ = dh.pUnlockStateFsm.pAdaptFsm.pFsm.Event(uniEvReset)
@@ -2408,7 +2555,15 @@
_ = dh.deviceReasonUpdate(ctx, drTechProfileConfigDownloadSuccess, !dh.isReconciling())
}
if dh.isReconciling() {
- go dh.reconcileDeviceFlowConfig(ctx)
+ // during reconciling with OMCI configuration in TT multi-UNI scenario, OmciAniConfigDone is reached several times
+ // therefore it must be ensured that reconciling of flow config is only started on the first pass of this code position
+ dh.mutexReconcilingFirstPassFlag.Lock()
+ if dh.reconcilingFirstPass {
+ logger.Debugw(ctx, "reconciling - OmciAniConfigDone first pass, start flow processing", log.Fields{"device-id": dh.deviceID})
+ dh.reconcilingFirstPass = false
+ go dh.reconcileDeviceFlowConfig(ctx)
+ }
+ dh.mutexReconcilingFirstPassFlag.Unlock()
}
} else { // should be the OmciAniResourceRemoved block
logger.Debugw(ctx, "OmciAniResourceRemoved event received", log.Fields{"device-id": dh.deviceID})
@@ -2432,9 +2587,6 @@
// which may be the case from some previous actvity on another UNI Port of the ONU
// or even some previous flow add activity on the same port
_ = dh.deviceReasonUpdate(ctx, drOmciFlowsPushed, !dh.isReconciling())
- if dh.isReconciling() {
- go dh.reconcileEnd(ctx)
- }
}
} else {
if dh.getDeviceReason() != drOmciFlowsDeleted {
@@ -2533,14 +2685,14 @@
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
return
}
- i := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
+ uniCnt := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
if pptpInstKeys := pDevEntry.pOnuDB.getSortedInstKeys(
ctx, me.PhysicalPathTerminationPointEthernetUniClassID); len(pptpInstKeys) > 0 {
for _, mgmtEntityID := range pptpInstKeys {
logger.Debugw(ctx, "Add PPTPEthUni port for MIB-stored instance:", log.Fields{
"device-id": dh.deviceID, "PPTPEthUni EntityID": mgmtEntityID})
- dh.addUniPort(ctx, mgmtEntityID, i, uniPPTP)
- i++
+ dh.addUniPort(ctx, mgmtEntityID, uniCnt, uniPPTP)
+ uniCnt++
}
} else {
logger.Debugw(ctx, "No PPTP instances found", log.Fields{"device-id": dh.deviceID})
@@ -2550,15 +2702,17 @@
for _, mgmtEntityID := range veipInstKeys {
logger.Debugw(ctx, "Add VEIP for MIB-stored instance:", log.Fields{
"device-id": dh.deviceID, "VEIP EntityID": mgmtEntityID})
- dh.addUniPort(ctx, mgmtEntityID, i, uniVEIP)
- i++
+ dh.addUniPort(ctx, mgmtEntityID, uniCnt, uniVEIP)
+ uniCnt++
}
} else {
logger.Debugw(ctx, "No VEIP instances found", log.Fields{"device-id": dh.deviceID})
}
- if i == 0 {
+ if uniCnt == 0 {
logger.Warnw(ctx, "No UniG instances found", log.Fields{"device-id": dh.deviceID})
}
+ //chUniVlanConfigReconcilingDone needs to have the capacity of all UniPorts as flow reconcile may run parallel for all of them
+ dh.chUniVlanConfigReconcilingDone = make(chan uint16, uniCnt)
}
// enableUniPortStateUpdate enables UniPortState and update core port state accordingly
@@ -2743,7 +2897,6 @@
// createOnuUpgradeFsm initializes and runs the Onu Software upgrade FSM
// precondition: lockUpgradeFsm is already locked from caller of this function
func (dh *deviceHandler) createOnuUpgradeFsm(ctx context.Context, apDevEntry *OnuDeviceEntry, aDevEvent OnuDeviceEvent) error {
- //in here lockUpgradeFsm is already locked
chUpgradeFsm := make(chan Message, 2048)
var sFsmName = "OnuSwUpgradeFSM"
logger.Debugw(ctx, "create OnuSwUpgradeFSM", log.Fields{"device-id": dh.deviceID})
@@ -3770,7 +3923,7 @@
if !dh.isReconciling() {
go func() {
logger.Debugw(ctx, "wait for channel signal or timeout",
- log.Fields{"timeout": dh.pOpenOnuAc.maxTimeoutReconciling, "device-id": dh.deviceID})
+ log.Fields{"timeout": dh.reconcileExpiryComplete, "device-id": dh.deviceID})
select {
case success := <-dh.chReconcilingFinished:
logger.Debugw(ctx, "reconciling finished signal received",
@@ -3804,7 +3957,8 @@
operState = voltha.OperStatus_DISCOVERED
}
onuDevEntry.mutexPersOnuConfig.RUnlock()
- logger.Debugw(ctx, "Core DeviceStateUpdate", log.Fields{"connectStatus": connectStatus, "operState": operState})
+ logger.Debugw(ctx, "Core DeviceStateUpdate",
+ log.Fields{"device-id": dh.device.Id, "connectStatus": connectStatus, "operState": operState})
}
logger.Debugw(ctx, "reconciling has been finished in time",
log.Fields{"device-id": dh.deviceID})
@@ -3829,7 +3983,7 @@
dh.deviceReconcileFailedUpdate(ctx, drReconcileCanceled, connectStatus)
}
- case <-time.After(dh.pOpenOnuAc.maxTimeoutReconciling):
+ case <-time.After(dh.reconcileExpiryComplete):
logger.Errorw(ctx, "timeout waiting for reconciling to be finished!",
log.Fields{"device-id": dh.deviceID})
dh.mutexReconcilingFlag.Lock()
@@ -3848,6 +4002,8 @@
}
dh.reconciling = cNoReconciling
dh.mutexReconcilingFlag.Unlock()
+ dh.SetReconcilingReasonUpdate(false)
+ dh.SetReconcilingFirstPass(true)
}()
}
dh.mutexReconcilingFlag.Lock()
@@ -3859,12 +4015,15 @@
dh.mutexReconcilingFlag.Unlock()
}
-func (dh *deviceHandler) stopReconciling(ctx context.Context, success bool) {
+func (dh *deviceHandler) stopReconciling(ctx context.Context, success bool, reconcileFlowResult uint16) {
logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID, "success": success})
if dh.isReconciling() {
- dh.chReconcilingFinished <- success
+ dh.sendChReconcileFinished(success)
+ if reconcileFlowResult != cWaitReconcileFlowNoActivity {
+ dh.SendChUniVlanConfigFinished(ctx, reconcileFlowResult)
+ }
} else {
- logger.Infow(ctx, "reconciling is not running", log.Fields{"device-id": dh.deviceID})
+ logger.Debugw(ctx, "nothing to stop - reconciling is not running", log.Fields{"device-id": dh.deviceID})
}
}
@@ -3880,10 +4039,22 @@
return dh.reconciling == cSkipOnuConfigReconciling
}
-func (dh *deviceHandler) setDeviceReason(value uint8) {
- dh.mutexDeviceReason.Lock()
- dh.deviceReason = value
- dh.mutexDeviceReason.Unlock()
+func (dh *deviceHandler) SetReconcilingFirstPass(value bool) {
+ dh.mutexReconcilingFirstPassFlag.Lock()
+ dh.reconcilingFirstPass = value
+ dh.mutexReconcilingFirstPassFlag.Unlock()
+}
+
+func (dh *deviceHandler) SetReconcilingReasonUpdate(value bool) {
+ dh.mutexReconcilingReasonUpdate.Lock()
+ dh.reconcilingReasonUpdate = value
+ dh.mutexReconcilingReasonUpdate.Unlock()
+}
+
+func (dh *deviceHandler) IsReconcilingReasonUpdate() bool {
+ dh.mutexReconcilingReasonUpdate.RLock()
+ defer dh.mutexReconcilingReasonUpdate.RUnlock()
+ return dh.reconcilingReasonUpdate
}
func (dh *deviceHandler) getDeviceReason() uint8 {
@@ -3893,23 +4064,16 @@
return value
}
+func (dh *deviceHandler) setDeviceReason(value uint8) {
+ dh.mutexDeviceReason.Lock()
+ dh.deviceReason = value
+ dh.mutexDeviceReason.Unlock()
+}
+
func (dh *deviceHandler) getDeviceReasonString() string {
return deviceReasonMap[dh.getDeviceReason()]
}
-func (dh *deviceHandler) setReconcilingFlows(value bool) {
- dh.mutexReconcilingFlowsFlag.Lock()
- dh.reconcilingFlows = value
- dh.mutexReconcilingFlowsFlag.Unlock()
-}
-
-func (dh *deviceHandler) isReconcilingFlows() bool {
- dh.mutexReconcilingFlowsFlag.RLock()
- value := dh.reconcilingFlows
- dh.mutexReconcilingFlowsFlag.RUnlock()
- return value
-}
-
func (dh *deviceHandler) setReadyForOmciConfig(flagValue bool) {
dh.mutexReadyForOmciConfig.Lock()
dh.readyForOmciConfig = flagValue
@@ -3989,3 +4153,19 @@
dh.pUnlockStateFsm = nil
dh.pOnuUpradeFsm = nil
}
+
+//WaitTimeout of waitGroupWithTimeOut is blocking
+// returns true, if the wg request was executed successfully, false on timeout
+func (wg *WaitGroupWithTimeOut) WaitTimeout(timeout time.Duration) bool {
+ done := make(chan struct{})
+ go func() {
+ defer close(done)
+ wg.Wait()
+ }()
+ select {
+ case <-done:
+ return true
+ case <-time.After(timeout):
+ return false
+ }
+}
diff --git a/internal/pkg/onuadaptercore/mib_sync.go b/internal/pkg/onuadaptercore/mib_sync.go
index 5ac2d15..4e96d6d 100644
--- a/internal/pkg/onuadaptercore/mib_sync.go
+++ b/internal/pkg/onuadaptercore/mib_sync.go
@@ -316,43 +316,6 @@
if !oo.baseDeviceHandler.getAlarmManagerIsRunning(ctx) {
go oo.baseDeviceHandler.startAlarmManager(ctx)
}
- // no need to reconcile additional data for MibDownloadFsm, LockStateFsm, or UnlockStateFsm
- oo.baseDeviceHandler.reconcileDeviceTechProf(ctx)
-
- // start go routine with select() on reconciling flow channel before
- // starting flow reconciling process to prevent loss of any signal
- syncChannel := make(chan struct{})
- go func(aSyncChannel chan struct{}) {
- // In multi-ONU/multi-flow environment stopping reconcilement has to be delayed until
- // we get a signal that the processing of the last step to rebuild the adapter internal
- // flow data is finished.
- aSyncChannel <- struct{}{}
- select {
- case success := <-oo.baseDeviceHandler.chReconcilingFlowsFinished:
- if success {
- logger.Debugw(ctx, "reconciling flows has been finished in time",
- log.Fields{"device-id": oo.deviceID})
- oo.baseDeviceHandler.stopReconciling(ctx, true)
- _ = oo.pMibUploadFsm.pFsm.Event(ulEvSuccess)
-
- } else {
- logger.Debugw(ctx, "wait for reconciling flows aborted",
- log.Fields{"device-id": oo.deviceID})
- oo.baseDeviceHandler.setReconcilingFlows(false)
- }
- case <-time.After(500 * time.Millisecond):
- logger.Errorw(ctx, "timeout waiting for reconciling flows to be finished!",
- log.Fields{"device-id": oo.deviceID})
- oo.baseDeviceHandler.setReconcilingFlows(false)
- _ = oo.pMibUploadFsm.pFsm.Event(ulEvMismatch)
- }
- }(syncChannel)
- // block further processing until the above Go routine has really started
- // and is ready to receive values from chReconcilingFlowsFinished
- <-syncChannel
-
- oo.baseDeviceHandler.reconcileDeviceFlowConfig(ctx)
-
oo.mutexPersOnuConfig.RLock()
if oo.sOnuPersistentData.PersUniDisableDone {
oo.mutexPersOnuConfig.RUnlock()
@@ -362,6 +325,43 @@
oo.mutexPersOnuConfig.RUnlock()
oo.baseDeviceHandler.enableUniPortStateUpdate(ctx)
}
+
+ // no need to reconcile additional data for MibDownloadFsm, LockStateFsm, or UnlockStateFsm
+ if oo.baseDeviceHandler.reconcileDeviceTechProf(ctx) {
+
+ // start go routine with select() on reconciling flow channel before
+ // starting flow reconciling process to prevent loss of any signal
+ syncChannel := make(chan struct{})
+ go func(aSyncChannel chan struct{}) {
+ // In multi-ONU/multi-flow environment stopping reconcilement has to be delayed until
+ // we get a signal that the processing of the last step to rebuild the adapter internal
+ // flow data is finished.
+ expiry := oo.baseDeviceHandler.GetReconcileExpiryVlanConfigAbort()
+ oo.setReconcilingFlows(true)
+ aSyncChannel <- struct{}{}
+ select {
+ case success := <-oo.chReconcilingFlowsFinished:
+ if success {
+ logger.Debugw(ctx, "reconciling flows has been finished in time",
+ log.Fields{"device-id": oo.deviceID})
+ _ = oo.pMibUploadFsm.pFsm.Event(ulEvSuccess)
+
+ } else {
+ logger.Debugw(ctx, "wait for reconciling flows aborted",
+ log.Fields{"device-id": oo.deviceID})
+ }
+ case <-time.After(expiry):
+ logger.Errorw(ctx, "timeout waiting for reconciling flows to be finished!",
+ log.Fields{"device-id": oo.deviceID})
+ _ = oo.pMibUploadFsm.pFsm.Event(ulEvMismatch)
+ }
+ oo.setReconcilingFlows(false)
+ }(syncChannel)
+ // block further processing until the above Go routine has really started
+ // and is ready to receive values from chReconcilingFlowsFinished
+ <-syncChannel
+ oo.baseDeviceHandler.reconcileDeviceFlowConfig(ctx)
+ }
} else {
logger.Debugw(ctx, "MibSync FSM",
log.Fields{"Getting MIB from template not successful": e.FSM.Current(), "device-id": oo.deviceID})
@@ -1108,26 +1108,27 @@
//CancelProcessing terminates potentially running reconciling processes and stops the FSM
func (oo *OnuDeviceEntry) CancelProcessing(ctx context.Context) {
- if oo.baseDeviceHandler.isReconcilingFlows() {
- oo.baseDeviceHandler.chReconcilingFlowsFinished <- false
- }
- if oo.baseDeviceHandler.isReconciling() {
- oo.baseDeviceHandler.stopReconciling(ctx, false)
+ if oo.isReconcilingFlows() {
+ oo.SendChReconcilingFlowsFinished(ctx, false)
}
oo.mutexMibSyncMsgProcessorRunning.RLock()
defer oo.mutexMibSyncMsgProcessorRunning.RUnlock()
//the MibSync FSM might be active all the ONU-active time,
// hence it must be stopped unconditionally
- pMibUlFsm := oo.pMibUploadFsm
- if pMibUlFsm != nil {
- // abort running message processing
- fsmAbortMsg := Message{
- Type: TestMsg,
- Data: TestMessage{
- TestMessageVal: AbortMessageProcessing,
- },
+ oo.mutexMibSyncMsgProcessorRunning.RLock()
+ defer oo.mutexMibSyncMsgProcessorRunning.RUnlock()
+ if oo.mibSyncMsgProcessorRunning {
+ pMibUlFsm := oo.pMibUploadFsm
+ if pMibUlFsm != nil {
+ // abort running message processing
+ fsmAbortMsg := Message{
+ Type: TestMsg,
+ Data: TestMessage{
+ TestMessageVal: AbortMessageProcessing,
+ },
+ }
+ pMibUlFsm.commChan <- fsmAbortMsg
+ _ = pMibUlFsm.pFsm.Event(ulEvStop)
}
- pMibUlFsm.commChan <- fsmAbortMsg
- _ = pMibUlFsm.pFsm.Event(ulEvStop)
}
}
diff --git a/internal/pkg/onuadaptercore/omci_vlan_config.go b/internal/pkg/onuadaptercore/omci_vlan_config.go
index 5311952..c7e8013 100644
--- a/internal/pkg/onuadaptercore/omci_vlan_config.go
+++ b/internal/pkg/onuadaptercore/omci_vlan_config.go
@@ -1288,6 +1288,7 @@
logger.Infow(ctx, "UniVlanConfigFsm config done - checking on more flows", log.Fields{
"device-id": oFsm.deviceID,
"overall-uni-rules": oFsm.numUniFlows, "configured-uni-rules": oFsm.configuredUniFlow})
+
pConfigVlanStateAFsm := oFsm.pAdaptFsm
if pConfigVlanStateAFsm == nil {
oFsm.mutexFlowParams.Unlock()
@@ -1310,18 +1311,15 @@
}(pConfigVlanStateBaseFsm)
return
}
+ if oFsm.lastFlowToReconcile {
+ //note: lastFlowToReconcile does not mean that this block may run only once within reconcilement here,
+ // due to asynchronous event processing from SetUniFlowParams() it may be executed multiple times
+ logger.Debugw(ctx, "reconciling - flow processing finished", log.Fields{
+ "device-id": oFsm.deviceID, "uni-id": oFsm.pOnuUniPort.uniID})
+ oFsm.pDeviceHandler.SendChUniVlanConfigFinished(ctx, uint16(oFsm.pOnuUniPort.uniID))
+ }
if oFsm.pDeviceHandler.isSkipOnuConfigReconciling() {
oFsm.configuredUniFlow = oFsm.numUniFlows
- if oFsm.lastFlowToReconcile {
- logger.Debugw(ctx, "reconciling - flow processing finished", log.Fields{
- "device-id": oFsm.deviceID, "uni-id": oFsm.pOnuUniPort.uniID})
- oFsm.pDeviceHandler.setReconcilingFlows(false)
- //use asynchronous channel sending to avoid stucking on non-waiting receiver
- select {
- case oFsm.pDeviceHandler.chReconcilingFlowsFinished <- true:
- default:
- }
- }
logger.Debugw(ctx, "reconciling - skip enterVlanConfigDone processing",
log.Fields{"numUniFlows": oFsm.numUniFlows, "configuredUniFlow": oFsm.configuredUniFlow, "device-id": oFsm.deviceID})
oFsm.mutexFlowParams.Unlock()
diff --git a/internal/pkg/onuadaptercore/onu_device_entry.go b/internal/pkg/onuadaptercore/onu_device_entry.go
index 18bd1ef..2f6756c 100644
--- a/internal/pkg/onuadaptercore/onu_device_entry.go
+++ b/internal/pkg/onuadaptercore/onu_device_entry.go
@@ -278,6 +278,9 @@
mibTemplateKVStore *db.Backend
mutexPersOnuConfig sync.RWMutex
sOnuPersistentData onuPersistentData
+ reconcilingFlows bool
+ mutexReconcilingFlowsFlag sync.RWMutex
+ chReconcilingFlowsFinished chan bool //channel to indicate that reconciling of flows has been finished
mibTemplatePath string
mutexOnuKVStore sync.RWMutex
onuKVStore *db.Backend
@@ -326,6 +329,8 @@
onuDeviceEntry.devState = DeviceStatusInit
onuDeviceEntry.sOnuPersistentData.PersUniConfig = make([]uniPersConfig, 0)
onuDeviceEntry.sOnuPersistentData.PersTcontMap = make(map[uint16]uint16)
+ onuDeviceEntry.chReconcilingFlowsFinished = make(chan bool)
+ onuDeviceEntry.reconcilingFlows = false
onuDeviceEntry.chOnuKvProcessingStep = make(chan uint8)
onuDeviceEntry.omciRebootMessageReceivedChannel = make(chan Message, 2048)
//openomciagent.lockDeviceHandlersMap = sync.RWMutex{}
@@ -983,6 +988,34 @@
delete(oo.sOnuPersistentData.PersTcontMap, allocID)
}
+// setReconcilingFlows - TODO: add comment
+func (oo *OnuDeviceEntry) setReconcilingFlows(value bool) {
+ oo.mutexReconcilingFlowsFlag.Lock()
+ oo.reconcilingFlows = value
+ oo.mutexReconcilingFlowsFlag.Unlock()
+}
+
+// SendChReconcilingFlowsFinished - TODO: add comment
+func (oo *OnuDeviceEntry) SendChReconcilingFlowsFinished(ctx context.Context, value bool) {
+ if oo != nil { //if the object still exists (might have been already deleted in background)
+ //use asynchronous channel sending to avoid stucking on non-waiting receiver
+ select {
+ case oo.chReconcilingFlowsFinished <- value:
+ logger.Debugw(ctx, "reconciling - flows finished sent", log.Fields{"device-id": oo.deviceID})
+ default:
+ logger.Infow(ctx, "reconciling - flows finished not sent!", log.Fields{"device-id": oo.deviceID})
+ }
+ }
+}
+
+// isReconcilingFlows - TODO: add comment
+func (oo *OnuDeviceEntry) isReconcilingFlows() bool {
+ oo.mutexReconcilingFlowsFlag.RLock()
+ value := oo.reconcilingFlows
+ oo.mutexReconcilingFlowsFlag.RUnlock()
+ return value
+}
+
// PrepareForGarbageCollection - remove references to prepare for garbage collection
func (oo *OnuDeviceEntry) PrepareForGarbageCollection(ctx context.Context, aDeviceID string) {
logger.Debugw(ctx, "prepare for garbage collection", log.Fields{"device-id": aDeviceID})