ONU SW upgrade API change - step 4: correction of DownloadSection frame sequence and abort response image version
Signed-off-by: mpagenko <michael.pagenkopf@adtran.com>
Change-Id: I33f4c67a2883185e9ff17116e64c5adce4e1036a
diff --git a/internal/pkg/onuadaptercore/device_handler.go b/internal/pkg/onuadaptercore/device_handler.go
index 81e3f79..929964c 100644
--- a/internal/pkg/onuadaptercore/device_handler.go
+++ b/internal/pkg/onuadaptercore/device_handler.go
@@ -1363,7 +1363,7 @@
func (dh *deviceHandler) cancelOnuSwUpgrade(ctx context.Context, aImageIdentifier string,
aVersion string, pDeviceImageState *voltha.DeviceImageState) {
pDeviceImageState.DeviceId = dh.deviceID
- pDeviceImageState.ImageState.Version = aImageIdentifier
+ pDeviceImageState.ImageState.Version = aVersion
pDeviceImageState.ImageState.ImageState = voltha.ImageState_IMAGE_UNKNOWN
dh.lockUpgradeFsm.RLock()
if dh.pOnuUpradeFsm != nil {
diff --git a/internal/pkg/onuadaptercore/omci_cc.go b/internal/pkg/onuadaptercore/omci_cc.go
index 649b58f..ed4d59d 100644
--- a/internal/pkg/onuadaptercore/omci_cc.go
+++ b/internal/pkg/onuadaptercore/omci_cc.go
@@ -510,7 +510,9 @@
oo.mutexMonReq.Lock()
defer oo.mutexMonReq.Unlock()
if _, exist := oo.monitoredRequests[receiveCallbackPair.cbKey]; !exist {
- go oo.processRequestMonitoring(ctx, omciTxRequest)
+ // do not call processRequestMonitoring in background here to ensure correct sequencing
+ // of requested messages into txQueue (especially for non-response-supervised messages)
+ oo.processRequestMonitoring(ctx, omciTxRequest)
return nil
}
logger.Errorw(ctx, "A message with this tid is processed already!",
@@ -3011,7 +3013,6 @@
}
func (oo *omciCC) processRequestMonitoring(ctx context.Context, aOmciTxRequest omciTransferStructure) {
-
timeout := aOmciTxRequest.timeout
if timeout == 0 {
//timeout 0 indicates that no response is expected - fire and forget
@@ -3020,52 +3021,59 @@
oo.mutexTxQueue.Unlock()
go oo.sendNextRequest(ctx)
} else {
- chSuccess := make(chan bool)
- aOmciTxRequest.chSuccess = chSuccess
- tid := aOmciTxRequest.cbPair.cbKey
- retries := aOmciTxRequest.retries
-
- oo.mutexMonReq.Lock()
- oo.monitoredRequests[tid] = aOmciTxRequest
- oo.mutexMonReq.Unlock()
-
- retryCounter := 0
- loop:
- for retryCounter <= retries {
-
- oo.mutexTxQueue.Lock()
- oo.txQueue.PushBack(aOmciTxRequest) // enqueue
- oo.mutexTxQueue.Unlock()
- go oo.sendNextRequest(ctx)
-
- select {
- case success := <-chSuccess:
- if success {
- logger.Debugw(ctx, "reqMon: response received in time",
- log.Fields{"tid": tid, "device-id": oo.deviceID})
- } else {
- logger.Debugw(ctx, "reqMon: wait for response aborted",
- log.Fields{"tid": tid, "device-id": oo.deviceID})
- }
- break loop
- case <-time.After(time.Duration(timeout) * time.Second):
- if retryCounter == retries {
- logger.Errorw(ctx, "reqMon: timeout waiting for response - no of max retries reached!",
- log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
- break loop
- } else {
- logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
- log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
- }
- }
- retryCounter++
- }
- oo.mutexMonReq.Lock()
- delete(oo.monitoredRequests, tid)
- oo.mutexMonReq.Unlock()
+ //the supervised sending with waiting on the response (based on TID) is called in background
+ // to avoid blocking of the sender for the complete OMCI handshake procedure
+ // to stay consistent with the processing tested so far, sending of next messages of the same control procedure
+ // is ensured by the according control instances (FSM's etc.) (by waiting for the respective responses there)
+ go oo.sendWithRxSupervision(ctx, aOmciTxRequest, timeout)
}
}
+func (oo *omciCC) sendWithRxSupervision(ctx context.Context, aOmciTxRequest omciTransferStructure, aTimeout int) {
+ chSuccess := make(chan bool)
+ aOmciTxRequest.chSuccess = chSuccess
+ tid := aOmciTxRequest.cbPair.cbKey
+ oo.mutexMonReq.Lock()
+ oo.monitoredRequests[tid] = aOmciTxRequest
+ oo.mutexMonReq.Unlock()
+
+ retries := aOmciTxRequest.retries
+ retryCounter := 0
+loop:
+ for retryCounter <= retries {
+
+ oo.mutexTxQueue.Lock()
+ oo.txQueue.PushBack(aOmciTxRequest) // enqueue
+ oo.mutexTxQueue.Unlock()
+ go oo.sendNextRequest(ctx)
+
+ select {
+ case success := <-chSuccess:
+ if success {
+ logger.Debugw(ctx, "reqMon: response received in time",
+ log.Fields{"tid": tid, "device-id": oo.deviceID})
+ } else {
+ logger.Debugw(ctx, "reqMon: wait for response aborted",
+ log.Fields{"tid": tid, "device-id": oo.deviceID})
+ }
+ break loop
+ case <-time.After(time.Duration(aTimeout) * time.Second):
+ if retryCounter == retries {
+ logger.Errorw(ctx, "reqMon: timeout waiting for response - no of max retries reached!",
+ log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
+ break loop
+ } else {
+ logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
+ log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
+ }
+ }
+ retryCounter++
+ }
+ oo.mutexMonReq.Lock()
+ delete(oo.monitoredRequests, tid)
+ oo.mutexMonReq.Unlock()
+}
+
//CancelRequestMonitoring terminates monitoring of outstanding omci requests
func (oo *omciCC) CancelRequestMonitoring() {
oo.mutexMonReq.RLock()