[VOL-4774] openonuAdapterGo: Panic during scale test

Change-Id: I37cc31697f42cb50e44970c2b537db07381a3a3a
diff --git a/VERSION b/VERSION
index 197c4d5..005119b 100755
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.4.0
+2.4.1
diff --git a/internal/pkg/common/defines.go b/internal/pkg/common/defines.go
index c2eecbd..9cb4ba8 100755
--- a/internal/pkg/common/defines.go
+++ b/internal/pkg/common/defines.go
@@ -361,6 +361,12 @@
 	OnuOmciCommunicationFailureConfig     = "ONU_OMCI_COMMUNICATION_FAILURE_CONFIG"
 	OnuOmciCommunicationFailureConfigDesc = "OMCI communication during ONU configuration failed"
 
+	OnuOmciCommunicationAbortConfig     = "ONU_OMCI_COMMUNICATION_ABORT_CONFIG"
+	OnuOmciCommunicationAbortConfigDesc = "OMCI communication during ONU configuration aborted - max failures reached: stopping device"
+
 	OnuOmciCommunicationFailureSwUpgrade     = "ONU_OMCI_COMMUNICATION_FAILURE_SW_UPGRADE"
 	OnuOmciCommunicationFailureSwUpgradeDesc = "OMCI communication during ONU SW upgrade failed"
+
+	OnuOmciCommunicationAbortSwUpgrade     = "ONU_OMCI_COMMUNICATION_ABORT_SW_UPGRADE"
+	OnuOmciCommunicationAbortSwUpgradeDesc = "OMCI communication during ONU SW upgrade aborted - max failures reached: stopping device"
 )
diff --git a/internal/pkg/common/interfaces.go b/internal/pkg/common/interfaces.go
index 98f644c..69fa26f 100755
--- a/internal/pkg/common/interfaces.go
+++ b/internal/pkg/common/interfaces.go
@@ -124,6 +124,8 @@
 	CreatePortInCore(context.Context, *voltha.Port) error
 
 	PerOnuFlowHandlerRoutine(uniID uint8)
+
+	UpdateInterface(context.Context) error
 }
 
 // IonuDeviceEntry interface to onuDeviceEntry
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 60d37e5..551f389 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -73,6 +73,8 @@
 // CDefaultRetries - TODO: add comment
 const CDefaultRetries = 2
 
+const cMaxConsecutiveOmciTimeouts = 3
+
 // ### OMCI related definitions - end
 
 //CallbackPairEntry to be used for OMCI send/receive correlation
@@ -132,15 +134,19 @@
 	UploadSequNo   uint16
 	UploadNoOfCmds uint16
 
-	mutexSendQueuedRequests sync.Mutex
-	mutexLowPrioTxQueue     sync.Mutex
-	lowPrioTxQueue          *list.List
-	mutexHighPrioTxQueue    sync.Mutex
-	highPrioTxQueue         *list.List
-	mutexRxSchedMap         sync.Mutex
-	rxSchedulerMap          map[uint16]CallbackPairEntry
-	mutexMonReq             sync.RWMutex
-	monitoredRequests       map[uint16]OmciTransferStructure
+	mutexSendQueuedRequests      sync.Mutex
+	mutexLowPrioTxQueue          sync.Mutex
+	lowPrioTxQueue               *list.List
+	mutexHighPrioTxQueue         sync.Mutex
+	highPrioTxQueue              *list.List
+	mutexRxSchedMap              sync.Mutex
+	rxSchedulerMap               map[uint16]CallbackPairEntry
+	mutexMonReq                  sync.RWMutex
+	monitoredRequests            map[uint16]OmciTransferStructure
+	mutexConsecutiveOmciTimeouts sync.RWMutex
+	consecutiveOmciTimeouts      uint8
+	mutexOmciAbortInProgress     sync.RWMutex
+	omciAbortInProgress          bool
 }
 
 var responsesWithMibDataSync = []omci.MessageType{
@@ -180,7 +186,8 @@
 	omciCC.highPrioTxQueue = list.New()
 	omciCC.rxSchedulerMap = make(map[uint16]CallbackPairEntry)
 	omciCC.monitoredRequests = make(map[uint16]OmciTransferStructure)
-
+	omciCC.consecutiveOmciTimeouts = 0
+	omciCC.omciAbortInProgress = false
 	return &omciCC
 }
 
@@ -416,6 +423,12 @@
 	oo.mutexRxSchedMap.Lock()
 	rxCallbackEntry, ok := oo.rxSchedulerMap[omciMsg.TransactionID]
 	if ok && rxCallbackEntry.CbFunction != nil {
+
+		// valid OMCI Response Message received - reset counter of consecutive OMCI timeouts
+		oo.mutexConsecutiveOmciTimeouts.Lock()
+		oo.consecutiveOmciTimeouts = 0
+		oo.mutexConsecutiveOmciTimeouts.Unlock()
+
 		if rxCallbackEntry.FramePrint {
 			oo.printRxMessage(ctx, rxMsg)
 		}
@@ -4467,6 +4480,31 @@
 					log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
 				oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationFailureSwUpgrade, OnuOmciCommunicationFailureSwUpgradeDesc)
 				oo.incrementTxTimesouts()
+				oo.mutexConsecutiveOmciTimeouts.Lock()
+				if oo.consecutiveOmciTimeouts < cMaxConsecutiveOmciTimeouts {
+					oo.consecutiveOmciTimeouts++
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+				} else {
+					oo.consecutiveOmciTimeouts = 0
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+					oo.mutexOmciAbortInProgress.Lock()
+					if !oo.omciAbortInProgress {
+						oo.omciAbortInProgress = true
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Errorw(ctx, "reqMon: communication aborted - no of max consecutive timeouts reached - stopping device and send ONU device event!",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+						oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationAbortSwUpgrade, OnuOmciCommunicationAbortSwUpgradeDesc)
+						// stop all running FSM processing
+						_ = oo.pBaseDeviceHandler.UpdateInterface(ctx)
+						oo.mutexOmciAbortInProgress.Lock()
+						oo.omciAbortInProgress = false
+						oo.mutexOmciAbortInProgress.Unlock()
+					} else {
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Infow(ctx, "reqMon: communication aborted - corresponding processing already running",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+					}
+				}
 				break loop
 			} else {
 				logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
@@ -4970,6 +5008,31 @@
 					log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
 				oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationFailureConfig, OnuOmciCommunicationFailureConfigDesc)
 				oo.incrementTxTimesouts()
+				oo.mutexConsecutiveOmciTimeouts.Lock()
+				if oo.consecutiveOmciTimeouts < cMaxConsecutiveOmciTimeouts {
+					oo.consecutiveOmciTimeouts++
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+				} else {
+					oo.consecutiveOmciTimeouts = 0
+					oo.mutexConsecutiveOmciTimeouts.Unlock()
+					oo.mutexOmciAbortInProgress.Lock()
+					if !oo.omciAbortInProgress {
+						oo.omciAbortInProgress = true
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Errorw(ctx, "reqMon: communication aborted - no of max consecutive timeouts reached - stopping device and send ONU device event!",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+						oo.pOnuDeviceEntry.SendOnuDeviceEvent(ctx, OnuOmciCommunicationAbortConfig, OnuOmciCommunicationAbortConfigDesc)
+						// stop all running FSM processing
+						_ = oo.pBaseDeviceHandler.UpdateInterface(ctx)
+						oo.mutexOmciAbortInProgress.Lock()
+						oo.omciAbortInProgress = false
+						oo.mutexOmciAbortInProgress.Unlock()
+					} else {
+						oo.mutexOmciAbortInProgress.Unlock()
+						logger.Infow(ctx, "reqMon: communication aborted - corresponding processing already running",
+							log.Fields{"tid": tid, "device-id": oo.deviceID})
+					}
+				}
 				break loop
 			} else {
 				logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index ddbfe7a..9ad2b7d 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -2131,7 +2131,7 @@
 	return nil
 }
 
-func (dh *deviceHandler) updateInterface(ctx context.Context, onuind *oop.OnuIndication) error {
+func (dh *deviceHandler) UpdateInterface(ctx context.Context) error {
 	//state checking to prevent unneeded processing (eg. on ONU 'unreachable' and 'down')
 	// (but note that the deviceReason may also have changed to e.g. TechProf*Delete_Success in between)
 	if dh.getDeviceReason() != cmn.DrStoppingOpenomci {
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index 1995a9a..f502c31 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -899,7 +899,7 @@
 			}
 			return &empty.Empty{}, nil
 		} else if (onuOperstate == "down") || (onuOperstate == "unreachable") {
-			if err := handler.updateInterface(ctx, onuIndication); err != nil {
+			if err := handler.UpdateInterface(ctx); err != nil {
 				return nil, err
 			}
 			return &empty.Empty{}, nil