[VOL-3842] Hardening of reconcile functionality
Change-Id: Iab1a585eac5d7e3e5a76c8801aecb65eb793e3e8
diff --git a/VERSION b/VERSION
index 1abf7ca..815b2eb 100755
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.2.4-dev165
+1.2.4-dev166
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 5db246b..91f9a8d 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -91,6 +91,9 @@
const (
cOnuActivatedEvent = "ONU_ACTIVATED"
)
+const (
+ cReconcilingTimeout = 10 //seconds
+)
type usedOmciConfigFsms int
@@ -201,6 +204,8 @@
lockVlanConfig sync.Mutex
UniVlanConfigFsmMap map[uint8]*UniVlanConfigFsm
reconciling bool
+ mutexReconcilingFlag sync.RWMutex
+ chReconcilingFinished chan bool //channel to indicate that reconciling has been finished
ReadyForSpecificOmciConfig bool
}
@@ -229,6 +234,7 @@
dh.lockVlanConfig = sync.Mutex{}
dh.UniVlanConfigFsmMap = make(map[uint8]*UniVlanConfigFsm)
dh.reconciling = false
+ dh.chReconcilingFinished = make(chan bool)
dh.ReadyForSpecificOmciConfig = false
if dh.device.PmConfigs != nil { // can happen after onu adapter restart
@@ -771,7 +777,7 @@
} else {
logger.Errorw(ctx, "reconciling - restoring OnuTp-data failed - abort", log.Fields{"err": err, "device-id": dh.deviceID})
}
- dh.reconciling = false
+ dh.stopReconciling(ctx)
return
}
var onuIndication oop.OnuIndication
@@ -798,7 +804,7 @@
if len(pDevEntry.sOnuPersistentData.PersUniConfig) == 0 {
logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
return
}
for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
@@ -806,7 +812,7 @@
if len(uniData.PersTpPathMap) == 0 {
logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
return
}
for tpID := range uniData.PersTpPathMap {
@@ -827,7 +833,7 @@
if len(uniData.PersFlowParams) == 0 {
logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
}
}
}
@@ -846,7 +852,7 @@
if len(pDevEntry.sOnuPersistentData.PersUniConfig) == 0 {
logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
log.Fields{"device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
return
}
for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
@@ -854,7 +860,7 @@
if len(uniData.PersFlowParams) == 0 {
logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
return
}
var uniPort *onuUniPort
@@ -884,7 +890,7 @@
if len(uniData.PersTpPathMap) == 0 {
logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
}
}
}
@@ -893,7 +899,7 @@
logger.Debugw(ctx, "reconciling - trigger metrics - to be implemented in scope of VOL-3324!", log.Fields{"device-id": dh.deviceID})
//TODO: reset of reconciling-flag has always to be done in the last reconcile*() function
- dh.reconciling = false
+ dh.stopReconciling(ctx)
}
func (dh *deviceHandler) deleteDevicePersistencyData(ctx context.Context) error {
@@ -991,7 +997,7 @@
dh.logicalDeviceID = dh.deviceID // really needed - what for ??? //TODO!!!
- if !dh.reconciling {
+ if !dh.isReconciling() {
logger.Infow(ctx, "DeviceUpdate", log.Fields{"deviceReason": dh.device.Reason, "device-id": dh.deviceID})
_ = dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device)
//TODO Need to Update Device Reason To CORE as part of device update userstory
@@ -1019,7 +1025,7 @@
oper_status=self._pon.get_port().oper_status,
)
*/
- if !dh.reconciling {
+ if !dh.isReconciling() {
logger.Debugw(ctx, "adding-pon-port", log.Fields{"device-id": dh.deviceID, "ponPortNo": dh.ponPortNumber})
var ponPortNo uint32 = 1
if dh.ponPortNumber != 0 {
@@ -1061,7 +1067,7 @@
return
}
- if dh.reconciling {
+ if dh.isReconciling() {
go dh.reconcileDeviceOnuInd(ctx)
// reconcilement will be continued after mib download is done
}
@@ -1285,7 +1291,7 @@
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
return fmt.Errorf("no valid OnuDevice: %s", dh.deviceID)
}
- if !dh.reconciling {
+ if !dh.isReconciling() {
if err := dh.storePersistentData(ctx); err != nil {
logger.Warnw(ctx, "store persistent data error - continue as there will be additional write attempts",
log.Fields{"device-id": dh.deviceID, "err": err})
@@ -1304,7 +1310,7 @@
if !pDevEntry.sOnuPersistentData.PersUniUnlockDone {
logger.Debugw(ctx, "reconciling - uni-ports were not unlocked before adapter restart - resume with a normal start-up",
log.Fields{"device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
}
}
// It does not look to me as if makes sense to work with the real core device here, (not the stored clone)?
@@ -1325,7 +1331,7 @@
return err
}
- _ = dh.deviceReasonUpdate(ctx, drStartingOpenomci, !dh.reconciling)
+ _ = dh.deviceReasonUpdate(ctx, drStartingOpenomci, !dh.isReconciling())
/* this might be a good time for Omci Verify message? */
verifyExec := make(chan bool)
@@ -1592,7 +1598,7 @@
func (dh *deviceHandler) processMibDatabaseSyncEvent(ctx context.Context, devEvent OnuDeviceEvent) {
logger.Debugw(ctx, "MibInSync event received, adding uni ports and locking the ONU interfaces", log.Fields{"device-id": dh.deviceID})
- _ = dh.deviceReasonUpdate(ctx, drDiscoveryMibsyncComplete, !dh.reconciling)
+ _ = dh.deviceReasonUpdate(ctx, drDiscoveryMibsyncComplete, !dh.isReconciling())
pDevEntry := dh.getOnuDeviceEntry(ctx, false)
if pDevEntry == nil {
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
@@ -1684,7 +1690,7 @@
func (dh *deviceHandler) processMibDownloadDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
logger.Debugw(ctx, "MibDownloadDone event received, unlocking the ONU interfaces", log.Fields{"device-id": dh.deviceID})
//initiate DevStateUpdate
- if !dh.reconciling {
+ if !dh.isReconciling() {
logger.Debugw(ctx, "call DeviceStateUpdate upon mib-download done", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
"OperStatus": voltha.OperStatus_ACTIVE, "device-id": dh.deviceID})
if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
@@ -1703,13 +1709,13 @@
if pDevEntry.sOnuPersistentData.PersUniDisableDone {
logger.Debugw(ctx, "reconciling - uni-ports were disabled by admin before adapter restart - keep the ports locked and wait for re-enabling",
log.Fields{"device-id": dh.deviceID})
- dh.reconciling = false
+ dh.stopReconciling(ctx)
return
}
logger.Debugw(ctx, "reconciling - don't notify core about DeviceStateUpdate to ACTIVE",
log.Fields{"device-id": dh.deviceID})
}
- _ = dh.deviceReasonUpdate(ctx, drInitialMibDownloaded, !dh.reconciling)
+ _ = dh.deviceReasonUpdate(ctx, drInitialMibDownloaded, !dh.isReconciling())
// Initialize classical L2 PM Interval Counters
if err := dh.pOnuMetricsMgr.pAdaptFsm.pFsm.Event(l2PmEventInit); err != nil {
@@ -1731,7 +1737,7 @@
func (dh *deviceHandler) processUniUnlockStateDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
dh.enableUniPortStateUpdate(ctx) //cmp python yield self.enable_ports()
- if !dh.reconciling {
+ if !dh.isReconciling() {
logger.Infow(ctx, "UniUnlockStateDone event: Sending OnuUp event", log.Fields{"device-id": dh.deviceID})
raisedTs := time.Now().UnixNano()
go dh.sendOnuOperStateEvent(ctx, voltha.OperStatus_ACTIVE, dh.deviceID, raisedTs) //cmp python onu_active_event
@@ -1817,9 +1823,9 @@
// - which may cause some inconsistency
if dh.deviceReason != drTechProfileConfigDownloadSuccess {
// which may be the case from some previous actvity even on this UNI Port (but also other UNI ports)
- _ = dh.deviceReasonUpdate(ctx, drTechProfileConfigDownloadSuccess, !dh.reconciling)
+ _ = dh.deviceReasonUpdate(ctx, drTechProfileConfigDownloadSuccess, !dh.isReconciling())
}
- if dh.reconciling {
+ if dh.isReconciling() {
go dh.reconcileDeviceFlowConfig(ctx)
}
} else { // should be the OmciAniResourceRemoved block
@@ -1843,8 +1849,8 @@
if dh.deviceReason != drOmciFlowsPushed {
// which may be the case from some previous actvity on another UNI Port of the ONU
// or even some previous flow add activity on the same port
- _ = dh.deviceReasonUpdate(ctx, drOmciFlowsPushed, !dh.reconciling)
- if dh.reconciling {
+ _ = dh.deviceReasonUpdate(ctx, drOmciFlowsPushed, !dh.isReconciling())
+ if dh.isReconciling() {
go dh.reconcileMetrics(ctx)
}
}
@@ -1916,7 +1922,7 @@
} else {
//store UniPort with the System-PortNumber key
dh.uniEntityMap[uniNo] = pUniPort
- if !dh.reconciling {
+ if !dh.isReconciling() {
// create announce the UniPort to the core as VOLTHA Port object
if err := pUniPort.createVolthaPort(ctx, dh); err == nil {
logger.Infow(ctx, "onuUniPort-added", log.Fields{"for PortNo": uniNo})
@@ -1943,7 +1949,7 @@
if (1<<uniPort.uniID)&activeUniPortStateUpdateMask == (1 << uniPort.uniID) {
logger.Infow(ctx, "onuUniPort-forced-OperState-ACTIVE", log.Fields{"for PortNo": uniNo})
uniPort.setOperState(vc.OperStatus_ACTIVE)
- if !dh.reconciling {
+ if !dh.isReconciling() {
//maybe also use getter functions on uniPort - perhaps later ...
go dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
} else {
@@ -2497,7 +2503,7 @@
func (dh *deviceHandler) storePersUniFlowConfig(ctx context.Context, aUniID uint8, aUniVlanFlowParams *[]uniVlanFlowParams) error {
- if dh.reconciling {
+ if dh.isReconciling() {
logger.Debugw(ctx, "reconciling - don't store persistent UniFlowConfig", log.Fields{"device-id": dh.deviceID})
return nil
}
@@ -2852,7 +2858,7 @@
go dh.startCollector(ctx)
}
dh.uniEntityMap = make(map[uint32]*onuUniPort)
- dh.reconciling = true
+ dh.startReconciling(ctx)
}
func (dh *deviceHandler) setCollectorIsRunning(flagValue bool) {
@@ -2878,3 +2884,42 @@
dh.pAlarmMgr.stopProcessingOmciMessages <- true // Stop the OMCI routines if any
}
}
+func (dh *deviceHandler) startReconciling(ctx context.Context) {
+ logger.Debugw(ctx, "start reconciling", log.Fields{"device-id": dh.deviceID})
+ if !dh.isReconciling() {
+ go func() {
+ select {
+ case <-dh.chReconcilingFinished:
+ logger.Debugw(ctx, "reconciling has been finished in time",
+ log.Fields{"device-id": dh.deviceID})
+ case <-time.After(time.Duration(cReconcilingTimeout) * time.Second):
+ logger.Errorw(ctx, "timeout waiting for reconciling to be finished!",
+ log.Fields{"device-id": dh.deviceID})
+ }
+ dh.mutexReconcilingFlag.Lock()
+ dh.reconciling = false
+ dh.mutexReconcilingFlag.Unlock()
+ }()
+ dh.mutexReconcilingFlag.Lock()
+ dh.reconciling = true
+ dh.mutexReconcilingFlag.Unlock()
+ } else {
+ logger.Warnw(ctx, "reconciling is already running", log.Fields{"device-id": dh.deviceID})
+ }
+}
+
+func (dh *deviceHandler) stopReconciling(ctx context.Context) {
+ logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID})
+ if dh.isReconciling() {
+ dh.chReconcilingFinished <- true
+ } else {
+ logger.Infow(ctx, "reconciling is not running", log.Fields{"device-id": dh.deviceID})
+ }
+}
+
+func (dh *deviceHandler) isReconciling() bool {
+ dh.mutexReconcilingFlag.RLock()
+ value := dh.reconciling
+ dh.mutexReconcilingFlag.RUnlock()
+ return value
+}
diff --git a/internal/pkg/onuadaptercore/mib_sync.go b/internal/pkg/onuadaptercore/mib_sync.go
index 4f1ac22..eb7dce6 100644
--- a/internal/pkg/onuadaptercore/mib_sync.go
+++ b/internal/pkg/onuadaptercore/mib_sync.go
@@ -74,7 +74,7 @@
func (oo *OnuDeviceEntry) enterResettingMibState(ctx context.Context, e *fsm.Event) {
logger.Debugw(ctx, "MibSync FSM", log.Fields{"Start MibTemplate processing in State": e.FSM.Current(), "device-id": oo.deviceID})
- if !oo.isNewOnu() {
+ if !oo.isNewOnu() && !oo.baseDeviceHandler.isReconciling() {
oo.baseDeviceHandler.prepareReconcilingWithActiveAdapter(ctx)
oo.devState = DeviceStatusInit
}
diff --git a/internal/pkg/onuadaptercore/onu_device_entry.go b/internal/pkg/onuadaptercore/onu_device_entry.go
index 81899a1..e01de18 100644
--- a/internal/pkg/onuadaptercore/onu_device_entry.go
+++ b/internal/pkg/onuadaptercore/onu_device_entry.go
@@ -316,7 +316,7 @@
onuDeviceEntry.mibDbClass = onuDeviceEntry.supportedFsms["mib-synchronizer"].databaseClass
logger.Debug(ctx, "access2mibDbClass")
go onuDeviceEntry.mibDbClass(ctx)
- if !dh.reconciling {
+ if !dh.isReconciling() {
onuDeviceEntry.mibAuditInterval = onuDeviceEntry.supportedFsms["mib-synchronizer"].auditInterval
onuDeviceEntry.sOnuPersistentData.PersMibAuditInterval = onuDeviceEntry.mibAuditInterval
} else {
diff --git a/internal/pkg/onuadaptercore/openonu.go b/internal/pkg/onuadaptercore/openonu.go
index 3a2780a..f073282 100644
--- a/internal/pkg/onuadaptercore/openonu.go
+++ b/internal/pkg/onuadaptercore/openonu.go
@@ -320,7 +320,7 @@
handler := newDeviceHandler(ctx, oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
oo.addDeviceHandlerToMap(ctx, handler)
handler.device = device
- handler.reconciling = true
+ handler.startReconciling(ctx)
go handler.adoptOrReconcileDevice(ctx, handler.device)
// reconcilement will be continued after onu-device entry is added
} else {