[VOL-5450]: Onu adapter exception fix during metrics collection
Change-Id: I887fe6398606e4e75374e72ea065ff4270a83271
Signed-off-by: balaji.nagarajan <balaji.nagarajan@radisys.com>
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 265bcab..01c6fbf 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -642,20 +642,27 @@
oo.mutexHighPrioTxQueue.Lock()
defer oo.mutexHighPrioTxQueue.Unlock()
for oo.highPrioTxQueue.Len() > 0 {
- queueElement := oo.highPrioTxQueue.Front() // First element
- if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
- // Element will be removed from the queue regardless of the send success, to prevent
- // an accumulation of send requests for the same message in the event of an error.
- // In this case, resend attempts for the message are ensured by our retry
- // mechanism after omci-timeout.
+ select {
+ case _, ok := <-oo.pBaseDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ if !ok {
+ return fmt.Errorf("device deletion channel is closed at sendQueuedHighPrioRequests %s", oo.deviceID)
+ }
+ default:
+ queueElement := oo.highPrioTxQueue.Front() // First element
+ if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
+ // Element will be removed from the queue regardless of the send success, to prevent
+ // an accumulation of send requests for the same message in the event of an error.
+ // In this case, resend attempts for the message are ensured by our retry
+ // mechanism after omci-timeout.
+ oo.highPrioTxQueue.Remove(queueElement) // Dequeue
+ //Remove the call back from the rxSchedulerMap
+ oo.mutexRxSchedMap.Lock()
+ delete(oo.rxSchedulerMap, queueElement.Value.(OmciTransferStructure).cbPair.CbKey)
+ oo.mutexRxSchedMap.Unlock()
+ return err
+ }
oo.highPrioTxQueue.Remove(queueElement) // Dequeue
- //Remove the call back from the rxSchedulerMap
- oo.mutexRxSchedMap.Lock()
- delete(oo.rxSchedulerMap, queueElement.Value.(OmciTransferStructure).cbPair.CbKey)
- oo.mutexRxSchedMap.Unlock()
- return err
}
- oo.highPrioTxQueue.Remove(queueElement) // Dequeue
}
return nil
}
@@ -663,46 +670,54 @@
func (oo *OmciCC) sendQueuedLowPrioRequests(ctx context.Context) error {
oo.mutexLowPrioTxQueue.Lock()
for oo.lowPrioTxQueue.Len() > 0 {
- queueElement := oo.lowPrioTxQueue.Front() // First element
- // check if the element is for onu sw section
- aOmciTxReq := queueElement.Value.(OmciTransferStructure)
- if aOmciTxReq.OnuSwWindow != nil {
- if err := oo.sendOnuSwSectionsOfWindow(ctx, aOmciTxReq); err != nil {
- // Element will be removed from the queue regardless of the send success, to prevent
- // an accumulation of send requests for the same message in the event of an error.
- // In this case, resend attempts for the message are ensured by our retry
- // mechanism after omci-timeout.
- oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
+ select {
+ case _, ok := <-oo.pBaseDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ if !ok {
oo.mutexLowPrioTxQueue.Unlock()
- //Remove the call back from the rxSchedulerMap
- oo.mutexRxSchedMap.Lock()
- delete(oo.rxSchedulerMap, queueElement.Value.(OmciTransferStructure).cbPair.CbKey)
- oo.mutexRxSchedMap.Unlock()
+ return fmt.Errorf("device deletion channel is closed at sendQueuedLowPrioRequests %s", oo.deviceID)
+ }
+ default:
+ queueElement := oo.lowPrioTxQueue.Front() // First element
+ // check if the element is for onu sw section
+ aOmciTxReq := queueElement.Value.(OmciTransferStructure)
+ if aOmciTxReq.OnuSwWindow != nil {
+ if err := oo.sendOnuSwSectionsOfWindow(ctx, aOmciTxReq); err != nil {
+ // Element will be removed from the queue regardless of the send success, to prevent
+ // an accumulation of send requests for the same message in the event of an error.
+ // In this case, resend attempts for the message are ensured by our retry
+ // mechanism after omci-timeout.
+ oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
+ oo.mutexLowPrioTxQueue.Unlock()
+ //Remove the call back from the rxSchedulerMap
+ oo.mutexRxSchedMap.Lock()
+ delete(oo.rxSchedulerMap, queueElement.Value.(OmciTransferStructure).cbPair.CbKey)
+ oo.mutexRxSchedMap.Unlock()
+ return err
+ }
+ } else {
+ err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure))
+ if err != nil {
+ // Element will be removed from the queue regardless of the send success, to prevent
+ // an accumulation of send requests for the same message in the event of an error.
+ // In this case, resend attempts for the message are ensured by our retry
+ // mechanism after omci-timeout.
+ oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
+ oo.mutexLowPrioTxQueue.Unlock()
+ oo.mutexRxSchedMap.Lock()
+ delete(oo.rxSchedulerMap, queueElement.Value.(OmciTransferStructure).cbPair.CbKey)
+ oo.mutexRxSchedMap.Unlock()
+ return err
+ }
+ }
+ oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
+ // Interrupt the sending of low priority requests to process any high priority requests
+ // that may have arrived in the meantime
+ oo.mutexLowPrioTxQueue.Unlock()
+ if err := oo.sendQueuedHighPrioRequests(ctx); err != nil {
return err
}
- } else {
- err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure))
- if err != nil {
- // Element will be removed from the queue regardless of the send success, to prevent
- // an accumulation of send requests for the same message in the event of an error.
- // In this case, resend attempts for the message are ensured by our retry
- // mechanism after omci-timeout.
- oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
- oo.mutexLowPrioTxQueue.Unlock()
- oo.mutexRxSchedMap.Lock()
- delete(oo.rxSchedulerMap, queueElement.Value.(OmciTransferStructure).cbPair.CbKey)
- oo.mutexRxSchedMap.Unlock()
- return err
- }
+ oo.mutexLowPrioTxQueue.Lock()
}
- oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
- // Interrupt the sending of low priority requests to process any high priority requests
- // that may have arrived in the meantime
- oo.mutexLowPrioTxQueue.Unlock()
- if err := oo.sendQueuedHighPrioRequests(ctx); err != nil {
- return err
- }
- oo.mutexLowPrioTxQueue.Lock()
}
oo.mutexLowPrioTxQueue.Unlock()
@@ -5033,6 +5048,11 @@
log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
oo.incrementTxRetries()
}
+ case _, ok := <-oo.pBaseDeviceHandler.GetDeviceDeleteCommChan(ctx):
+ if !ok {
+ logger.Warnw(ctx, "device deletion channel is closed at sendWithRxSupervision", log.Fields{"device-id": oo.deviceID})
+ break loop
+ }
}
retryCounter++
}