[VOL-4774] openonuAdapterGo: Panic during scale test

Change-Id: I37cc31697f42cb50e44970c2b537db07381a3a3a
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 60d37e5..551f389 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -73,6 +73,8 @@
 // CDefaultRetries - TODO: add comment
 const CDefaultRetries = 2
 
+const cMaxConsecutiveOmciTimeouts = 3
+
 // ### OMCI related definitions - end
 
 //CallbackPairEntry to be used for OMCI send/receive correlation
@@ -132,15 +134,19 @@
 	UploadSequNo   uint16
 	UploadNoOfCmds uint16
 
-	mutexSendQueuedRequests sync.Mutex
-	mutexLowPrioTxQueue     sync.Mutex
-	lowPrioTxQueue          *list.List
-	mutexHighPrioTxQueue    sync.Mutex
-	highPrioTxQueue         *list.List
-	mutexRxSchedMap         sync.Mutex
-	rxSchedulerMap          map[uint16]CallbackPairEntry
-	mutexMonReq             sync.RWMutex
-	monitoredRequests       map[uint16]OmciTransferStructure
+	mutexSendQueuedRequests      sync.Mutex
+	mutexLowPrioTxQueue          sync.Mutex
+	lowPrioTxQueue               *list.List
+	mutexHighPrioTxQueue         sync.Mutex
+	highPrioTxQueue              *list.List
+	mutexRxSchedMap              sync.Mutex
+	rxSchedulerMap               map[uint16]CallbackPairEntry
+	mutexMonReq                  sync.RWMutex
+	monitoredRequests            map[uint16]OmciTransferStructure
+	mutexConsecutiveOmciTimeouts sync.RWMutex
+	consecutiveOmciTimeouts      uint8
+	mutexOmciAbortInProgress     sync.RWMutex
+	omciAbortInProgress          bool
 }
 
 var responsesWithMibDataSync = []omci.MessageType{
@@ -180,7 +186,8 @@
 	omciCC.highPrioTxQueue = list.New()
 	omciCC.rxSchedulerMap = make(map[uint16]CallbackPairEntry)
 	omciCC.monitoredRequests = make(map[uint16]OmciTransferStructure)
-
+	omciCC.consecutiveOmciTimeouts = 0
+	omciCC.omciAbortInProgress = false
 	return &omciCC
 }
 
@@ -416,6 +423,12 @@
 	oo.mutexRxSchedMap.Lock()
 	rxCallbackEntry, ok := oo.rxSchedulerMap[omciMsg.TransactionID]
 	if ok && rxCallbackEntry.CbFunction != nil {
+
+		// valid OMCI Response Message received - reset counter of consecutive OMCI timeouts
+		oo.mutexConsecutiveOmciTimeouts.Lock()
+		oo.consecutiveOmciTimeouts = 0
+		oo.mutexConsecutiveOmciTimeouts.Unlock()
+
 		if rxCallbackEntry.FramePrint {
 			oo.printRxMessage(ctx, rxMsg)
 		}
@@ -4467,6 +4480,31 @@
 					log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
 				oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationFailureSwUpgrade, OnuOmciCommunicationFailureSwUpgradeDesc)
 				oo.incrementTxTimesouts()
+				oo.mutexConsecutiveOmciTimeouts.Lock()
+				if oo.consecutiveOmciTimeouts < cMaxConsecutiveOmciTimeouts {
+					oo.consecutiveOmciTimeouts++
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+				} else {
+					oo.consecutiveOmciTimeouts = 0
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+					oo.mutexOmciAbortInProgress.Lock()
+					if !oo.omciAbortInProgress {
+						oo.omciAbortInProgress = true
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Errorw(ctx, "reqMon: communication aborted - no of max consecutive timeouts reached - stopping device and send ONU device event!",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+						oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationAbortSwUpgrade, OnuOmciCommunicationAbortSwUpgradeDesc)
+						// stop all running FSM processing
+						_ = oo.pBaseDeviceHandler.UpdateInterface(ctx)
+						oo.mutexOmciAbortInProgress.Lock()
+						oo.omciAbortInProgress = false
+						oo.mutexOmciAbortInProgress.Unlock()
+					} else {
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Infow(ctx, "reqMon: communication aborted - corresponding processing already running",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+					}
+				}
 				break loop
 			} else {
 				logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
@@ -4970,6 +5008,31 @@
 					log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
 				oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationFailureConfig, OnuOmciCommunicationFailureConfigDesc)
 				oo.incrementTxTimesouts()
+				oo.mutexConsecutiveOmciTimeouts.Lock()
+				if oo.consecutiveOmciTimeouts < cMaxConsecutiveOmciTimeouts {
+					oo.consecutiveOmciTimeouts++
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+				} else {
+					oo.consecutiveOmciTimeouts = 0
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+					oo.mutexOmciAbortInProgress.Lock()
+					if !oo.omciAbortInProgress {
+						oo.omciAbortInProgress = true
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Errorw(ctx, "reqMon: communication aborted - no of max consecutive timeouts reached - stopping device and send ONU device event!",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+						oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationAbortConfig, OnuOmciCommunicationAbortConfigDesc)
+						// stop all running FSM processing
+						_ = oo.pBaseDeviceHandler.UpdateInterface(ctx)
+						oo.mutexOmciAbortInProgress.Lock()
+						oo.omciAbortInProgress = false
+						oo.mutexOmciAbortInProgress.Unlock()
+					} else {
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Infow(ctx, "reqMon: communication aborted - corresponding processing already running",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+					}
+				}
 				break loop
 			} else {
 				logger.Infow(ctx, "reqMon: timeout waiting for response - retry",