[VOL-3842] Hardening of reconcile functionality

Change-Id: Iab1a585eac5d7e3e5a76c8801aecb65eb793e3e8
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 5db246b..91f9a8d 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -91,6 +91,9 @@
 const (
 	cOnuActivatedEvent = "ONU_ACTIVATED"
 )
+const (
+	cReconcilingTimeout = 10 //seconds
+)
 
 type usedOmciConfigFsms int
 
@@ -201,6 +204,8 @@
 	lockVlanConfig             sync.Mutex
 	UniVlanConfigFsmMap        map[uint8]*UniVlanConfigFsm
 	reconciling                bool
+	mutexReconcilingFlag       sync.RWMutex
+	chReconcilingFinished      chan bool //channel to indicate that reconciling has been finished
 	ReadyForSpecificOmciConfig bool
 }
 
@@ -229,6 +234,7 @@
 	dh.lockVlanConfig = sync.Mutex{}
 	dh.UniVlanConfigFsmMap = make(map[uint8]*UniVlanConfigFsm)
 	dh.reconciling = false
+	dh.chReconcilingFinished = make(chan bool)
 	dh.ReadyForSpecificOmciConfig = false
 
 	if dh.device.PmConfigs != nil { // can happen after onu adapter restart
@@ -771,7 +777,7 @@
 		} else {
 			logger.Errorw(ctx, "reconciling - restoring OnuTp-data failed - abort", log.Fields{"err": err, "device-id": dh.deviceID})
 		}
-		dh.reconciling = false
+		dh.stopReconciling(ctx)
 		return
 	}
 	var onuIndication oop.OnuIndication
@@ -798,7 +804,7 @@
 	if len(pDevEntry.sOnuPersistentData.PersUniConfig) == 0 {
 		logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
-		dh.reconciling = false
+		dh.stopReconciling(ctx)
 		return
 	}
 	for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
@@ -806,7 +812,7 @@
 		if len(uniData.PersTpPathMap) == 0 {
 			logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
 				log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
-			dh.reconciling = false
+			dh.stopReconciling(ctx)
 			return
 		}
 		for tpID := range uniData.PersTpPathMap {
@@ -827,7 +833,7 @@
 		if len(uniData.PersFlowParams) == 0 {
 			logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
 				log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
-			dh.reconciling = false
+			dh.stopReconciling(ctx)
 		}
 	}
 }
@@ -846,7 +852,7 @@
 	if len(pDevEntry.sOnuPersistentData.PersUniConfig) == 0 {
 		logger.Debugw(ctx, "reconciling - no uni-configs have been stored before adapter restart - terminate reconcilement",
 			log.Fields{"device-id": dh.deviceID})
-		dh.reconciling = false
+		dh.stopReconciling(ctx)
 		return
 	}
 	for _, uniData := range pDevEntry.sOnuPersistentData.PersUniConfig {
@@ -854,7 +860,7 @@
 		if len(uniData.PersFlowParams) == 0 {
 			logger.Debugw(ctx, "reconciling - no flows have been stored before adapter restart - terminate reconcilement",
 				log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
-			dh.reconciling = false
+			dh.stopReconciling(ctx)
 			return
 		}
 		var uniPort *onuUniPort
@@ -884,7 +890,7 @@
 		if len(uniData.PersTpPathMap) == 0 {
 			logger.Debugw(ctx, "reconciling - no TPs have been stored before adapter restart - terminate reconcilement",
 				log.Fields{"uni-id": uniData.PersUniID, "device-id": dh.deviceID})
-			dh.reconciling = false
+			dh.stopReconciling(ctx)
 		}
 	}
 }
@@ -893,7 +899,7 @@
 	logger.Debugw(ctx, "reconciling - trigger metrics - to be implemented in scope of VOL-3324!", log.Fields{"device-id": dh.deviceID})
 
 	//TODO: reset of reconciling-flag has always to be done in the last reconcile*() function
-	dh.reconciling = false
+	dh.stopReconciling(ctx)
 }
 
 func (dh *deviceHandler) deleteDevicePersistencyData(ctx context.Context) error {
@@ -991,7 +997,7 @@
 
 	dh.logicalDeviceID = dh.deviceID // really needed - what for ??? //TODO!!!
 
-	if !dh.reconciling {
+	if !dh.isReconciling() {
 		logger.Infow(ctx, "DeviceUpdate", log.Fields{"deviceReason": dh.device.Reason, "device-id": dh.deviceID})
 		_ = dh.coreProxy.DeviceUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.device)
 		//TODO Need to Update Device Reason To CORE as part of device update userstory
@@ -1019,7 +1025,7 @@
 				   oper_status=self._pon.get_port().oper_status,
 				   )
 	*/
-	if !dh.reconciling {
+	if !dh.isReconciling() {
 		logger.Debugw(ctx, "adding-pon-port", log.Fields{"device-id": dh.deviceID, "ponPortNo": dh.ponPortNumber})
 		var ponPortNo uint32 = 1
 		if dh.ponPortNumber != 0 {
@@ -1061,7 +1067,7 @@
 		return
 	}
 
-	if dh.reconciling {
+	if dh.isReconciling() {
 		go dh.reconcileDeviceOnuInd(ctx)
 		// reconcilement will be continued after mib download is done
 	}
@@ -1285,7 +1291,7 @@
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
 		return fmt.Errorf("no valid OnuDevice: %s", dh.deviceID)
 	}
-	if !dh.reconciling {
+	if !dh.isReconciling() {
 		if err := dh.storePersistentData(ctx); err != nil {
 			logger.Warnw(ctx, "store persistent data error - continue as there will be additional write attempts",
 				log.Fields{"device-id": dh.deviceID, "err": err})
@@ -1304,7 +1310,7 @@
 		if !pDevEntry.sOnuPersistentData.PersUniUnlockDone {
 			logger.Debugw(ctx, "reconciling - uni-ports were not unlocked before adapter restart - resume with a normal start-up",
 				log.Fields{"device-id": dh.deviceID})
-			dh.reconciling = false
+			dh.stopReconciling(ctx)
 		}
 	}
 	// It does not look to me as if makes sense to work with the real core device here, (not the stored clone)?
@@ -1325,7 +1331,7 @@
 		return err
 	}
 
-	_ = dh.deviceReasonUpdate(ctx, drStartingOpenomci, !dh.reconciling)
+	_ = dh.deviceReasonUpdate(ctx, drStartingOpenomci, !dh.isReconciling())
 
 	/* this might be a good time for Omci Verify message?  */
 	verifyExec := make(chan bool)
@@ -1592,7 +1598,7 @@
 func (dh *deviceHandler) processMibDatabaseSyncEvent(ctx context.Context, devEvent OnuDeviceEvent) {
 	logger.Debugw(ctx, "MibInSync event received, adding uni ports and locking the ONU interfaces", log.Fields{"device-id": dh.deviceID})
 
-	_ = dh.deviceReasonUpdate(ctx, drDiscoveryMibsyncComplete, !dh.reconciling)
+	_ = dh.deviceReasonUpdate(ctx, drDiscoveryMibsyncComplete, !dh.isReconciling())
 	pDevEntry := dh.getOnuDeviceEntry(ctx, false)
 	if pDevEntry == nil {
 		logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.deviceID})
@@ -1684,7 +1690,7 @@
 func (dh *deviceHandler) processMibDownloadDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
 	logger.Debugw(ctx, "MibDownloadDone event received, unlocking the ONU interfaces", log.Fields{"device-id": dh.deviceID})
 	//initiate DevStateUpdate
-	if !dh.reconciling {
+	if !dh.isReconciling() {
 		logger.Debugw(ctx, "call DeviceStateUpdate upon mib-download done", log.Fields{"ConnectStatus": voltha.ConnectStatus_REACHABLE,
 			"OperStatus": voltha.OperStatus_ACTIVE, "device-id": dh.deviceID})
 		if err := dh.coreProxy.DeviceStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID,
@@ -1703,13 +1709,13 @@
 		if pDevEntry.sOnuPersistentData.PersUniDisableDone {
 			logger.Debugw(ctx, "reconciling - uni-ports were disabled by admin before adapter restart - keep the ports locked and wait for re-enabling",
 				log.Fields{"device-id": dh.deviceID})
-			dh.reconciling = false
+			dh.stopReconciling(ctx)
 			return
 		}
 		logger.Debugw(ctx, "reconciling - don't notify core about DeviceStateUpdate to ACTIVE",
 			log.Fields{"device-id": dh.deviceID})
 	}
-	_ = dh.deviceReasonUpdate(ctx, drInitialMibDownloaded, !dh.reconciling)
+	_ = dh.deviceReasonUpdate(ctx, drInitialMibDownloaded, !dh.isReconciling())
 
 	// Initialize classical L2 PM Interval Counters
 	if err := dh.pOnuMetricsMgr.pAdaptFsm.pFsm.Event(l2PmEventInit); err != nil {
@@ -1731,7 +1737,7 @@
 func (dh *deviceHandler) processUniUnlockStateDoneEvent(ctx context.Context, devEvent OnuDeviceEvent) {
 	dh.enableUniPortStateUpdate(ctx) //cmp python yield self.enable_ports()
 
-	if !dh.reconciling {
+	if !dh.isReconciling() {
 		logger.Infow(ctx, "UniUnlockStateDone event: Sending OnuUp event", log.Fields{"device-id": dh.deviceID})
 		raisedTs := time.Now().UnixNano()
 		go dh.sendOnuOperStateEvent(ctx, voltha.OperStatus_ACTIVE, dh.deviceID, raisedTs) //cmp python onu_active_event
@@ -1817,9 +1823,9 @@
 		//  - which may cause some inconsistency
 		if dh.deviceReason != drTechProfileConfigDownloadSuccess {
 			// which may be the case from some previous actvity even on this UNI Port (but also other UNI ports)
-			_ = dh.deviceReasonUpdate(ctx, drTechProfileConfigDownloadSuccess, !dh.reconciling)
+			_ = dh.deviceReasonUpdate(ctx, drTechProfileConfigDownloadSuccess, !dh.isReconciling())
 		}
-		if dh.reconciling {
+		if dh.isReconciling() {
 			go dh.reconcileDeviceFlowConfig(ctx)
 		}
 	} else { // should be the OmciAniResourceRemoved block
@@ -1843,8 +1849,8 @@
 		if dh.deviceReason != drOmciFlowsPushed {
 			// which may be the case from some previous actvity on another UNI Port of the ONU
 			// or even some previous flow add activity on the same port
-			_ = dh.deviceReasonUpdate(ctx, drOmciFlowsPushed, !dh.reconciling)
-			if dh.reconciling {
+			_ = dh.deviceReasonUpdate(ctx, drOmciFlowsPushed, !dh.isReconciling())
+			if dh.isReconciling() {
 				go dh.reconcileMetrics(ctx)
 			}
 		}
@@ -1916,7 +1922,7 @@
 		} else {
 			//store UniPort with the System-PortNumber key
 			dh.uniEntityMap[uniNo] = pUniPort
-			if !dh.reconciling {
+			if !dh.isReconciling() {
 				// create announce the UniPort to the core as VOLTHA Port object
 				if err := pUniPort.createVolthaPort(ctx, dh); err == nil {
 					logger.Infow(ctx, "onuUniPort-added", log.Fields{"for PortNo": uniNo})
@@ -1943,7 +1949,7 @@
 		if (1<<uniPort.uniID)&activeUniPortStateUpdateMask == (1 << uniPort.uniID) {
 			logger.Infow(ctx, "onuUniPort-forced-OperState-ACTIVE", log.Fields{"for PortNo": uniNo})
 			uniPort.setOperState(vc.OperStatus_ACTIVE)
-			if !dh.reconciling {
+			if !dh.isReconciling() {
 				//maybe also use getter functions on uniPort - perhaps later ...
 				go dh.coreProxy.PortStateUpdate(log.WithSpanFromContext(context.TODO(), ctx), dh.deviceID, voltha.Port_ETHERNET_UNI, uniPort.portNo, uniPort.operState)
 			} else {
@@ -2497,7 +2503,7 @@
 
 func (dh *deviceHandler) storePersUniFlowConfig(ctx context.Context, aUniID uint8, aUniVlanFlowParams *[]uniVlanFlowParams) error {
 
-	if dh.reconciling {
+	if dh.isReconciling() {
 		logger.Debugw(ctx, "reconciling - don't store persistent UniFlowConfig", log.Fields{"device-id": dh.deviceID})
 		return nil
 	}
@@ -2852,7 +2858,7 @@
 		go dh.startCollector(ctx)
 	}
 	dh.uniEntityMap = make(map[uint32]*onuUniPort)
-	dh.reconciling = true
+	dh.startReconciling(ctx)
 }
 
 func (dh *deviceHandler) setCollectorIsRunning(flagValue bool) {
@@ -2878,3 +2884,42 @@
 		dh.pAlarmMgr.stopProcessingOmciMessages <- true // Stop the OMCI routines if any
 	}
 }
+func (dh *deviceHandler) startReconciling(ctx context.Context) {
+	logger.Debugw(ctx, "start reconciling", log.Fields{"device-id": dh.deviceID})
+	if !dh.isReconciling() {
+		go func() {
+			select {
+			case <-dh.chReconcilingFinished:
+				logger.Debugw(ctx, "reconciling has been finished in time",
+					log.Fields{"device-id": dh.deviceID})
+			case <-time.After(time.Duration(cReconcilingTimeout) * time.Second):
+				logger.Errorw(ctx, "timeout waiting for reconciling to be finished!",
+					log.Fields{"device-id": dh.deviceID})
+			}
+			dh.mutexReconcilingFlag.Lock()
+			dh.reconciling = false
+			dh.mutexReconcilingFlag.Unlock()
+		}()
+		dh.mutexReconcilingFlag.Lock()
+		dh.reconciling = true
+		dh.mutexReconcilingFlag.Unlock()
+	} else {
+		logger.Warnw(ctx, "reconciling is already running", log.Fields{"device-id": dh.deviceID})
+	}
+}
+
+func (dh *deviceHandler) stopReconciling(ctx context.Context) {
+	logger.Debugw(ctx, "stop reconciling", log.Fields{"device-id": dh.deviceID})
+	if dh.isReconciling() {
+		dh.chReconcilingFinished <- true
+	} else {
+		logger.Infow(ctx, "reconciling is not running", log.Fields{"device-id": dh.deviceID})
+	}
+}
+
+func (dh *deviceHandler) isReconciling() bool {
+	dh.mutexReconcilingFlag.RLock()
+	value := dh.reconciling
+	dh.mutexReconcilingFlag.RUnlock()
+	return value
+}