[VOL-4466] new rpc to send multi onu sw sections
Change-Id: I5a6f374b119024e3b729395c24a47f1525d38362
diff --git a/internal/pkg/common/interfaces.go b/internal/pkg/common/interfaces.go
index 39b37ba..4c4878a 100755
--- a/internal/pkg/common/interfaces.go
+++ b/internal/pkg/common/interfaces.go
@@ -117,6 +117,8 @@
GetDeletionInProgress() bool
SendOMCIRequest(context.Context, string, *ia.OmciMessage) error
+ SendOnuSwSectionsOfWindow(context.Context, string, *ia.OmciMessages) error
+
CreatePortInCore(context.Context, *voltha.Port) error
PerOnuFlowHandlerRoutine(uniID uint8)
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 45c536d..a3df18a 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -94,6 +94,7 @@
withFramePrint bool
cbPair CallbackPair
chSuccess chan bool
+ OnuSwWindow *ia.OmciMessages
}
//OmciCC structure holds information needed for OMCI communication (to/from OLT Adapter)
@@ -531,6 +532,7 @@
printFrame,
receiveCallbackPair,
nil,
+ nil,
}
oo.mutexMonReq.Lock()
defer oo.mutexMonReq.Unlock()
@@ -581,9 +583,19 @@
oo.mutexLowPrioTxQueue.Lock()
for oo.lowPrioTxQueue.Len() > 0 {
queueElement := oo.lowPrioTxQueue.Front() // First element
- if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
- oo.mutexLowPrioTxQueue.Unlock()
- return err
+ // check if the element is for onu sw section
+ aOmciTxReq := queueElement.Value.(OmciTransferStructure)
+ if aOmciTxReq.OnuSwWindow != nil {
+ if err := oo.sendOnuSwSectionsOfWindow(ctx, aOmciTxReq); err != nil {
+ oo.mutexLowPrioTxQueue.Unlock()
+ return err
+ }
+ } else {
+ err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure))
+ if err != nil {
+ oo.mutexLowPrioTxQueue.Unlock()
+ return err
+ }
}
oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
// Interrupt the sending of low priority requests to process any high priority requests
@@ -646,6 +658,36 @@
return next
}
+//GetOnuSwSecNextTid get the next low prio tid for the onu sw sections
+//onu sw sections uses only low priority tids
+//The mutexTid lock should be taken prior to using this function
+func (oo *OmciCC) GetOnuSwSecNextTid() uint16 {
+ next := oo.tid
+ oo.tid++
+ if oo.tid >= 0x8000 {
+ oo.tid = 1
+ }
+ return next
+}
+
+//GetOnuSwSecLastTid gets the last allocated tid
+//The mutexTid lock should be taken prior to using this function
+func (oo *OmciCC) GetOnuSwSecLastTid() uint16 {
+ next := oo.tid
+ lastAllocatedTid := next - 1
+ return lastAllocatedTid
+}
+
+//LockMutexTID locks mutexTid
+func (oo *OmciCC) LockMutexTID() {
+ oo.mutexTid.Lock()
+}
+
+//UnLockMutexTID unlocks mutexTid
+func (oo *OmciCC) UnLockMutexTID() {
+ oo.mutexTid.Unlock()
+}
+
// ###################################################################################
// # utility methods provided to work on OMCI messages
@@ -4132,6 +4174,161 @@
return nil
}
+// PrepareOnuSectionsOfWindow prepares a list of sections for each window
+//Before invoking this function the oo.mutexTid needs to be be locked so that
+//GetOnuSwSecNextTid can be invoked without further locking
+func (oo *OmciCC) PrepareOnuSectionsOfWindow(ctx context.Context,
+ aImageMeID uint16, aAckRequest uint8, aDownloadSectionNo uint8, aSection []byte,
+ omciMsgsPerWindow *ia.OmciMessages) (OmciTransferStructure, error) {
+ //onuswsections uses only low prioirity tids
+ tid := oo.GetOnuSwSecNextTid()
+ logger.Infow(ctx, "send DlSectionRequest:", log.Fields{"device-id": oo.deviceID,
+ "SequNo": strconv.FormatInt(int64(tid), 16),
+ "InstId": strconv.FormatInt(int64(aImageMeID), 16), "omci-ack": aAckRequest, "sectionNo": aDownloadSectionNo, "sectionData": aSection})
+
+ var omciTxReq OmciTransferStructure
+ msgType := omci.DownloadSectionRequestType
+
+ if aAckRequest > 0 {
+ msgType = omci.DownloadSectionRequestWithResponseType
+
+ }
+ omciLayer := &omci.OMCI{
+ TransactionID: tid,
+ MessageType: msgType,
+ // DeviceIdentifier: omci.BaselineIdent, // Optional, defaults to Baseline
+ // Length: 0x28, // Optional, defaults to 40 octets
+ }
+ localSectionData := make([]byte, len(aSection))
+
+ copy(localSectionData[:], aSection) // as long as DownloadSectionRequest defines array for SectionData we need to copy into the array
+ request := &omci.DownloadSectionRequest{
+ MeBasePacket: omci.MeBasePacket{
+ EntityClass: me.SoftwareImageClassID,
+ EntityInstance: aImageMeID, //inactive image
+ },
+ SectionNumber: aDownloadSectionNo,
+ SectionData: localSectionData,
+ }
+
+ var options gopacket.SerializeOptions
+ options.FixLengths = true
+ buffer := gopacket.NewSerializeBuffer()
+ err := gopacket.SerializeLayers(buffer, options, omciLayer, request)
+ if err != nil {
+ logger.Errorw(ctx, "Cannot serialize DlSectionRequest", log.Fields{"Err": err,
+ "device-id": oo.deviceID})
+ return omciTxReq, err
+ }
+ outgoingPacket := buffer.Bytes()
+
+ omciMsgsPerWindow.Messages = append(omciMsgsPerWindow.Messages, outgoingPacket)
+
+ if aAckRequest > 0 {
+ // only the last section should have a timeout as an ack is required only for the last section of the window
+ omciTxReq = OmciTransferStructure{
+ withFramePrint: true,
+ OnuSwWindow: omciMsgsPerWindow,
+ }
+ return omciTxReq, nil
+ }
+
+ return omciTxReq, nil
+}
+
+//SendOnuSwSectionsWindowWithRxSupervision sends onu swd sections
+func (oo *OmciCC) SendOnuSwSectionsWindowWithRxSupervision(ctx context.Context,
+ aOmciTxRequest OmciTransferStructure, aTimeout int, rxChan chan Message) {
+ if aOmciTxRequest.OnuSwWindow == nil {
+ logger.Errorw(ctx, "SendOnuSwSectionsWindowWithRxSupervision: omciTxRequest.OnuSwWindow is nil",
+ log.Fields{"device-id": oo.deviceID})
+ return
+
+ }
+
+ tid := oo.GetOnuSwSecLastTid()
+ logger.Debugw(ctx, "SendOnuSwSectionsWindowWithRxSupervision tid for the last segment is ", log.Fields{"TID": tid})
+ omciRxCallbackPair := CallbackPair{CbKey: tid,
+ CbEntry: CallbackPairEntry{rxChan, oo.receiveOmciResponse, true},
+ }
+
+ aOmciTxRequest.cbPair = omciRxCallbackPair
+ logger.Debugw(ctx, "register-response-callback:", log.Fields{"for TansCorrId": aOmciTxRequest.cbPair.CbKey})
+ oo.mutexRxSchedMap.Lock()
+ // it could be checked, if the callback key is already registered - but simply overwrite may be acceptable ...
+ oo.rxSchedulerMap[aOmciTxRequest.cbPair.CbKey] = aOmciTxRequest.cbPair.CbEntry
+ oo.mutexRxSchedMap.Unlock()
+
+ chSuccess := make(chan bool)
+ aOmciTxRequest.chSuccess = chSuccess
+ aOmciTxRequest.timeout = aTimeout
+ aOmciTxRequest.retries = CDefaultRetries
+
+ //tid := aOmciTxRequest.cbPair.CbKey
+ oo.mutexMonReq.Lock()
+ oo.monitoredRequests[tid] = aOmciTxRequest
+ oo.mutexMonReq.Unlock()
+
+ retries := aOmciTxRequest.retries
+ retryCounter := 0
+ if aTimeout == 0 {
+ logger.Errorw(ctx, "no timeout present for last section of window", log.Fields{"device-id": oo.deviceID})
+ return
+ }
+loop:
+ for retryCounter <= retries {
+ // the onu sw sections are enqueued only to the low priority queue
+ oo.mutexLowPrioTxQueue.Lock()
+ oo.lowPrioTxQueue.PushBack(aOmciTxRequest)
+ oo.mutexLowPrioTxQueue.Unlock()
+
+ go oo.sendQueuedRequests(ctx)
+
+ select {
+ case success := <-chSuccess:
+ if success {
+ logger.Debugw(ctx, "reqMon: response received in time",
+ log.Fields{"tid": tid, "device-id": oo.deviceID})
+ } else {
+ logger.Debugw(ctx, "reqMon: wait for response aborted",
+ log.Fields{"tid": tid, "device-id": oo.deviceID})
+ }
+ break loop
+ case <-time.After(time.Duration(aTimeout) * time.Second):
+ if retryCounter == retries {
+ logger.Errorw(ctx, "reqMon: timeout waiting for response - no of max retries reached!",
+ log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
+ break loop
+ } else {
+ logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
+ log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
+ }
+ }
+ retryCounter++
+ }
+ oo.mutexMonReq.Lock()
+ delete(oo.monitoredRequests, tid)
+ oo.mutexMonReq.Unlock()
+}
+
+func (oo *OmciCC) sendOnuSwSectionsOfWindow(ctx context.Context, omciTxRequest OmciTransferStructure) error {
+ if omciTxRequest.withFramePrint && omciTxRequest.OnuSwWindow != nil {
+ lastSection := omciTxRequest.OnuSwWindow.Messages[len(omciTxRequest.OnuSwWindow.Messages)-1]
+ logger.Debugw(ctx, "omci-message-to-send:", log.Fields{
+ "TxOmciMessage": hex.EncodeToString(lastSection),
+ "device-id": oo.deviceID,
+ "toDeviceType": oo.pBaseDeviceHandler.GetProxyAddressType(),
+ "proxyDeviceID": oo.pBaseDeviceHandler.GetProxyAddressID(),
+ "proxyAddress": oo.pBaseDeviceHandler.GetProxyAddress()})
+ }
+ sendErr := oo.pBaseDeviceHandler.SendOnuSwSectionsOfWindow(ctx, oo.pBaseDeviceHandler.GetProxyAddress().AdapterEndpoint, omciTxRequest.OnuSwWindow)
+ if sendErr != nil {
+ logger.Errorw(ctx, "send onu sw sections omci request error", log.Fields{"ChildId": oo.deviceID, "error": sendErr})
+ return sendErr
+ }
+ return nil
+}
+
// SendDownloadSection sends DownloadSectionRequestWithResponse
func (oo *OmciCC) SendDownloadSection(ctx context.Context, aTimeout int, highPrio bool,
rxChan chan Message, aImageMeID uint16, aAckRequest uint8, aDownloadSectionNo uint8, aSection []byte, aPrint bool) error {
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 332861b..920802a 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -44,6 +44,7 @@
pmmgr "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/pmmgr"
"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/swupg"
uniprt "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/uniprt"
+ "github.com/opencord/voltha-protos/v5/go/common"
vc "github.com/opencord/voltha-protos/v5/go/common"
ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
"github.com/opencord/voltha-protos/v5/go/extension"
@@ -4338,6 +4339,26 @@
}
}
+func (dh *deviceHandler) SendOnuSwSectionsOfWindow(ctx context.Context, parentEndpoint string, request *ia.OmciMessages) error {
+ request.ParentDeviceId = dh.GetProxyAddressID()
+ request.ChildDeviceId = dh.DeviceID
+ request.ProxyAddress = dh.GetProxyAddress()
+ request.ConnectStatus = common.ConnectStatus_REACHABLE
+
+ pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
+ if err != nil || pgClient == nil {
+ return err
+ }
+ subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.MaxTimeoutInterAdapterComm)
+ defer cancel()
+ logger.Debugw(subCtx, "send-omci-request", log.Fields{"request": request, "parent-endpoint": parentEndpoint})
+ _, err = pgClient.ProxyOmciRequests(subCtx, request)
+ if err != nil {
+ logger.Errorw(ctx, "omci-failure", log.Fields{"request": request, "error": err, "request-parent": request.ParentDeviceId, "request-child": request.ChildDeviceId, "request-proxy": request.ProxyAddress})
+ }
+ return err
+}
+
func (dh *deviceHandler) SendOMCIRequest(ctx context.Context, parentEndpoint string, request *ia.OmciMessage) error {
pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
if err != nil || pgClient == nil {
diff --git a/internal/pkg/swupg/omci_onu_upgrade.go b/internal/pkg/swupg/omci_onu_upgrade.go
index 7809ef8..369890a 100755
--- a/internal/pkg/swupg/omci_onu_upgrade.go
+++ b/internal/pkg/swupg/omci_onu_upgrade.go
@@ -32,6 +32,7 @@
"github.com/opencord/voltha-lib-go/v7/pkg/log"
cmn "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/common"
"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/devdb"
+ ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
"github.com/opencord/voltha-protos/v5/go/voltha"
)
@@ -723,14 +724,18 @@
var bufferStartOffset uint32
var bufferEndOffset uint32
var downloadSection []byte
- framePrint := false //default no printing of downloadSection frames
+
oFsm.mutexUpgradeParams.Lock()
oFsm.upgradePhase = cUpgradeDownloading //start of downloading image to ONU
if oFsm.nextDownloadSectionsAbsolute == 0 {
- //debug print of first section frame
- framePrint = true
oFsm.volthaImageState = voltha.ImageState_IMAGE_DOWNLOADING
}
+ //var omuTxSecPerWindow []*common.OmciTransferStructure
+ omciMsgsPerSection := &ia.OmciMessages{}
+ //take the TID mutex
+ //To ensure the insequesnce of TIDS for all the onusw sections within a window
+ oFsm.pOmciCC.LockMutexTID()
+ defer oFsm.pOmciCC.UnLockMutexTID()
for {
oFsm.mutexAbortRequest.RLock()
// this way out of the section download loop on abort request
@@ -780,33 +785,43 @@
}
if oFsm.nextDownloadSectionsAbsolute+1 >= oFsm.noOfSections {
windowAckRequest = 1
- framePrint = true //debug print of last frame
+
oFsm.omciDownloadWindowSizeLast = oFsm.nextDownloadSectionsWindow
logger.Infow(ctx, "DlSection expect Response for last window (section)", log.Fields{
"device-id": oFsm.deviceID, "DlSectionNoAbsolute": oFsm.nextDownloadSectionsAbsolute})
}
oFsm.mutexUpgradeParams.Unlock() //unlock here to give other functions some chance to process during/after the send request
- err := oFsm.pOmciCC.SendDownloadSection(log.WithSpanFromContext(context.Background(), ctx), oFsm.pDeviceHandler.GetOmciTimeout(), false,
- oFsm.PAdaptFsm.CommChan, oFsm.InactiveImageMeID, windowAckRequest, oFsm.nextDownloadSectionsWindow, downloadSection, framePrint)
+ omciTxReq, err := oFsm.pOmciCC.PrepareOnuSectionsOfWindow(log.WithSpanFromContext(context.Background(), ctx),
+ oFsm.InactiveImageMeID, windowAckRequest, oFsm.nextDownloadSectionsWindow, downloadSection, omciMsgsPerSection)
if err != nil {
logger.Errorw(ctx, "DlSection abort: can't send section", log.Fields{
"device-id": oFsm.deviceID, "section absolute": oFsm.nextDownloadSectionsAbsolute, "error": err})
oFsm.abortOnOmciError(ctx, false)
return
}
+
oFsm.mutexUpgradeParams.Lock()
oFsm.nextDownloadSectionsAbsolute++ //always increase the absolute section counter after having sent one
if windowAckRequest == 1 {
oFsm.mutexUpgradeParams.Unlock()
+
+ if omciTxReq.OnuSwWindow == nil {
+ logger.Errorw(ctx, "fail to send sections in a window", log.Fields{
+ "device-id": oFsm.deviceID, "section absolute": oFsm.nextDownloadSectionsAbsolute, "error": err})
+ oFsm.abortOnOmciError(ctx, false)
+ return
+ }
+
pUpgradeFsm := oFsm.PAdaptFsm
if pUpgradeFsm != nil {
_ = pUpgradeFsm.PFsm.Event(UpgradeEvWaitWindowAck) //state transition to upgradeStVerifyWindow
+ oFsm.pOmciCC.SendOnuSwSectionsWindowWithRxSupervision(ctx, omciTxReq, oFsm.pDeviceHandler.GetOmciTimeout(), oFsm.PAdaptFsm.CommChan)
return
}
logger.Warnw(ctx, "pUpgradeFsm is nil", log.Fields{"device-id": oFsm.deviceID})
return
}
- framePrint = false //for the next Section frame (if wanted, can be enabled in logic before sendXXX())
+
oFsm.nextDownloadSectionsWindow++ //increase the window related section counter only if not in the last section
if oFsm.omciSectionInterleaveDelay > 0 {
//ensure a defined intersection-time-gap to leave space for further processing, other ONU's ...