[VOL-4466] new rpc to send multi onu sw sections

Change-Id: I5a6f374b119024e3b729395c24a47f1525d38362
diff --git a/internal/pkg/common/interfaces.go b/internal/pkg/common/interfaces.go
index 39b37ba..4c4878a 100755
--- a/internal/pkg/common/interfaces.go
+++ b/internal/pkg/common/interfaces.go
@@ -117,6 +117,8 @@
 	GetDeletionInProgress() bool
 
 	SendOMCIRequest(context.Context, string, *ia.OmciMessage) error
+	SendOnuSwSectionsOfWindow(context.Context, string, *ia.OmciMessages) error
+
 	CreatePortInCore(context.Context, *voltha.Port) error
 
 	PerOnuFlowHandlerRoutine(uniID uint8)
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 45c536d..a3df18a 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -94,6 +94,7 @@
 	withFramePrint bool
 	cbPair         CallbackPair
 	chSuccess      chan bool
+	OnuSwWindow    *ia.OmciMessages
 }
 
 //OmciCC structure holds information needed for OMCI communication (to/from OLT Adapter)
@@ -531,6 +532,7 @@
 		printFrame,
 		receiveCallbackPair,
 		nil,
+		nil,
 	}
 	oo.mutexMonReq.Lock()
 	defer oo.mutexMonReq.Unlock()
@@ -581,9 +583,19 @@
 	oo.mutexLowPrioTxQueue.Lock()
 	for oo.lowPrioTxQueue.Len() > 0 {
 		queueElement := oo.lowPrioTxQueue.Front() // First element
-		if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
-			oo.mutexLowPrioTxQueue.Unlock()
-			return err
+		// check if the element is for onu sw section
+		aOmciTxReq := queueElement.Value.(OmciTransferStructure)
+		if aOmciTxReq.OnuSwWindow != nil {
+			if err := oo.sendOnuSwSectionsOfWindow(ctx, aOmciTxReq); err != nil {
+				oo.mutexLowPrioTxQueue.Unlock()
+				return err
+			}
+		} else {
+			err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure))
+			if err != nil {
+				oo.mutexLowPrioTxQueue.Unlock()
+				return err
+			}
 		}
 		oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
 		// Interrupt the sending of low priority requests to process any high priority requests
@@ -646,6 +658,36 @@
 	return next
 }
 
+//GetOnuSwSecNextTid get the next low prio tid for the onu sw sections
+//onu sw sections uses only low priority tids
+//The mutexTid lock should be taken prior to using this function
+func (oo *OmciCC) GetOnuSwSecNextTid() uint16 {
+	next := oo.tid
+	oo.tid++
+	if oo.tid >= 0x8000 {
+		oo.tid = 1
+	}
+	return next
+}
+
+//GetOnuSwSecLastTid gets the last allocated tid
+//The mutexTid lock should be taken prior to using this function
+func (oo *OmciCC) GetOnuSwSecLastTid() uint16 {
+	next := oo.tid
+	lastAllocatedTid := next - 1
+	return lastAllocatedTid
+}
+
+//LockMutexTID locks mutexTid
+func (oo *OmciCC) LockMutexTID() {
+	oo.mutexTid.Lock()
+}
+
+//UnLockMutexTID unlocks mutexTid
+func (oo *OmciCC) UnLockMutexTID() {
+	oo.mutexTid.Unlock()
+}
+
 // ###################################################################################
 // # utility methods provided to work on OMCI messages
 
@@ -4132,6 +4174,161 @@
 	return nil
 }
 
+// PrepareOnuSectionsOfWindow prepares a list of sections for each window
+//Before invoking this function the oo.mutexTid  needs to be be locked so that
+//GetOnuSwSecNextTid can be invoked without further locking
+func (oo *OmciCC) PrepareOnuSectionsOfWindow(ctx context.Context,
+	aImageMeID uint16, aAckRequest uint8, aDownloadSectionNo uint8, aSection []byte,
+	omciMsgsPerWindow *ia.OmciMessages) (OmciTransferStructure, error) {
+	//onuswsections uses only low prioirity tids
+	tid := oo.GetOnuSwSecNextTid()
+	logger.Infow(ctx, "send DlSectionRequest:", log.Fields{"device-id": oo.deviceID,
+		"SequNo": strconv.FormatInt(int64(tid), 16),
+		"InstId": strconv.FormatInt(int64(aImageMeID), 16), "omci-ack": aAckRequest, "sectionNo": aDownloadSectionNo, "sectionData": aSection})
+
+	var omciTxReq OmciTransferStructure
+	msgType := omci.DownloadSectionRequestType
+
+	if aAckRequest > 0 {
+		msgType = omci.DownloadSectionRequestWithResponseType
+
+	}
+	omciLayer := &omci.OMCI{
+		TransactionID: tid,
+		MessageType:   msgType,
+		// DeviceIdentifier: omci.BaselineIdent,		// Optional, defaults to Baseline
+		// Length:           0x28,						// Optional, defaults to 40 octets
+	}
+	localSectionData := make([]byte, len(aSection))
+
+	copy(localSectionData[:], aSection) // as long as DownloadSectionRequest defines array for SectionData we need to copy into the array
+	request := &omci.DownloadSectionRequest{
+		MeBasePacket: omci.MeBasePacket{
+			EntityClass:    me.SoftwareImageClassID,
+			EntityInstance: aImageMeID, //inactive image
+		},
+		SectionNumber: aDownloadSectionNo,
+		SectionData:   localSectionData,
+	}
+
+	var options gopacket.SerializeOptions
+	options.FixLengths = true
+	buffer := gopacket.NewSerializeBuffer()
+	err := gopacket.SerializeLayers(buffer, options, omciLayer, request)
+	if err != nil {
+		logger.Errorw(ctx, "Cannot serialize DlSectionRequest", log.Fields{"Err": err,
+			"device-id": oo.deviceID})
+		return omciTxReq, err
+	}
+	outgoingPacket := buffer.Bytes()
+
+	omciMsgsPerWindow.Messages = append(omciMsgsPerWindow.Messages, outgoingPacket)
+
+	if aAckRequest > 0 {
+		// only the last section should have a timeout as an ack is required only for the last section of the window
+		omciTxReq = OmciTransferStructure{
+			withFramePrint: true,
+			OnuSwWindow:    omciMsgsPerWindow,
+		}
+		return omciTxReq, nil
+	}
+
+	return omciTxReq, nil
+}
+
+//SendOnuSwSectionsWindowWithRxSupervision sends onu swd sections
+func (oo *OmciCC) SendOnuSwSectionsWindowWithRxSupervision(ctx context.Context,
+	aOmciTxRequest OmciTransferStructure, aTimeout int, rxChan chan Message) {
+	if aOmciTxRequest.OnuSwWindow == nil {
+		logger.Errorw(ctx, "SendOnuSwSectionsWindowWithRxSupervision: omciTxRequest.OnuSwWindow is nil",
+			log.Fields{"device-id": oo.deviceID})
+		return
+
+	}
+
+	tid := oo.GetOnuSwSecLastTid()
+	logger.Debugw(ctx, "SendOnuSwSectionsWindowWithRxSupervision tid for the last segment is ", log.Fields{"TID": tid})
+	omciRxCallbackPair := CallbackPair{CbKey: tid,
+		CbEntry: CallbackPairEntry{rxChan, oo.receiveOmciResponse, true},
+	}
+
+	aOmciTxRequest.cbPair = omciRxCallbackPair
+	logger.Debugw(ctx, "register-response-callback:", log.Fields{"for TansCorrId": aOmciTxRequest.cbPair.CbKey})
+	oo.mutexRxSchedMap.Lock()
+	// it could be checked, if the callback key is already registered - but simply overwrite may be acceptable ...
+	oo.rxSchedulerMap[aOmciTxRequest.cbPair.CbKey] = aOmciTxRequest.cbPair.CbEntry
+	oo.mutexRxSchedMap.Unlock()
+
+	chSuccess := make(chan bool)
+	aOmciTxRequest.chSuccess = chSuccess
+	aOmciTxRequest.timeout = aTimeout
+	aOmciTxRequest.retries = CDefaultRetries
+
+	//tid := aOmciTxRequest.cbPair.CbKey
+	oo.mutexMonReq.Lock()
+	oo.monitoredRequests[tid] = aOmciTxRequest
+	oo.mutexMonReq.Unlock()
+
+	retries := aOmciTxRequest.retries
+	retryCounter := 0
+	if aTimeout == 0 {
+		logger.Errorw(ctx, "no timeout present for last section of window", log.Fields{"device-id": oo.deviceID})
+		return
+	}
+loop:
+	for retryCounter <= retries {
+		// the onu sw sections are enqueued only to the low priority queue
+		oo.mutexLowPrioTxQueue.Lock()
+		oo.lowPrioTxQueue.PushBack(aOmciTxRequest)
+		oo.mutexLowPrioTxQueue.Unlock()
+
+		go oo.sendQueuedRequests(ctx)
+
+		select {
+		case success := <-chSuccess:
+			if success {
+				logger.Debugw(ctx, "reqMon: response received in time",
+					log.Fields{"tid": tid, "device-id": oo.deviceID})
+			} else {
+				logger.Debugw(ctx, "reqMon: wait for response aborted",
+					log.Fields{"tid": tid, "device-id": oo.deviceID})
+			}
+			break loop
+		case <-time.After(time.Duration(aTimeout) * time.Second):
+			if retryCounter == retries {
+				logger.Errorw(ctx, "reqMon: timeout waiting for response - no of max retries reached!",
+					log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
+				break loop
+			} else {
+				logger.Infow(ctx, "reqMon: timeout waiting for response - retry",
+					log.Fields{"tid": tid, "retries": retryCounter, "device-id": oo.deviceID})
+			}
+		}
+		retryCounter++
+	}
+	oo.mutexMonReq.Lock()
+	delete(oo.monitoredRequests, tid)
+	oo.mutexMonReq.Unlock()
+}
+
+func (oo *OmciCC) sendOnuSwSectionsOfWindow(ctx context.Context, omciTxRequest OmciTransferStructure) error {
+	if omciTxRequest.withFramePrint && omciTxRequest.OnuSwWindow != nil {
+		lastSection := omciTxRequest.OnuSwWindow.Messages[len(omciTxRequest.OnuSwWindow.Messages)-1]
+		logger.Debugw(ctx, "omci-message-to-send:", log.Fields{
+			"TxOmciMessage": hex.EncodeToString(lastSection),
+			"device-id":     oo.deviceID,
+			"toDeviceType":  oo.pBaseDeviceHandler.GetProxyAddressType(),
+			"proxyDeviceID": oo.pBaseDeviceHandler.GetProxyAddressID(),
+			"proxyAddress":  oo.pBaseDeviceHandler.GetProxyAddress()})
+	}
+	sendErr := oo.pBaseDeviceHandler.SendOnuSwSectionsOfWindow(ctx, oo.pBaseDeviceHandler.GetProxyAddress().AdapterEndpoint, omciTxRequest.OnuSwWindow)
+	if sendErr != nil {
+		logger.Errorw(ctx, "send onu sw sections omci request error", log.Fields{"ChildId": oo.deviceID, "error": sendErr})
+		return sendErr
+	}
+	return nil
+}
+
 // SendDownloadSection sends DownloadSectionRequestWithResponse
 func (oo *OmciCC) SendDownloadSection(ctx context.Context, aTimeout int, highPrio bool,
 	rxChan chan Message, aImageMeID uint16, aAckRequest uint8, aDownloadSectionNo uint8, aSection []byte, aPrint bool) error {
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 332861b..920802a 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -44,6 +44,7 @@
 	pmmgr "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/pmmgr"
 	"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/swupg"
 	uniprt "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/uniprt"
+	"github.com/opencord/voltha-protos/v5/go/common"
 	vc "github.com/opencord/voltha-protos/v5/go/common"
 	ca "github.com/opencord/voltha-protos/v5/go/core_adapter"
 	"github.com/opencord/voltha-protos/v5/go/extension"
@@ -4338,6 +4339,26 @@
 	}
 }
 
+func (dh *deviceHandler) SendOnuSwSectionsOfWindow(ctx context.Context, parentEndpoint string, request *ia.OmciMessages) error {
+	request.ParentDeviceId = dh.GetProxyAddressID()
+	request.ChildDeviceId = dh.DeviceID
+	request.ProxyAddress = dh.GetProxyAddress()
+	request.ConnectStatus = common.ConnectStatus_REACHABLE
+
+	pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
+	if err != nil || pgClient == nil {
+		return err
+	}
+	subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), dh.config.MaxTimeoutInterAdapterComm)
+	defer cancel()
+	logger.Debugw(subCtx, "send-omci-request", log.Fields{"request": request, "parent-endpoint": parentEndpoint})
+	_, err = pgClient.ProxyOmciRequests(subCtx, request)
+	if err != nil {
+		logger.Errorw(ctx, "omci-failure", log.Fields{"request": request, "error": err, "request-parent": request.ParentDeviceId, "request-child": request.ChildDeviceId, "request-proxy": request.ProxyAddress})
+	}
+	return err
+}
+
 func (dh *deviceHandler) SendOMCIRequest(ctx context.Context, parentEndpoint string, request *ia.OmciMessage) error {
 	pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
 	if err != nil || pgClient == nil {
diff --git a/internal/pkg/swupg/omci_onu_upgrade.go b/internal/pkg/swupg/omci_onu_upgrade.go
index 7809ef8..369890a 100755
--- a/internal/pkg/swupg/omci_onu_upgrade.go
+++ b/internal/pkg/swupg/omci_onu_upgrade.go
@@ -32,6 +32,7 @@
 	"github.com/opencord/voltha-lib-go/v7/pkg/log"
 	cmn "github.com/opencord/voltha-openonu-adapter-go/internal/pkg/common"
 	"github.com/opencord/voltha-openonu-adapter-go/internal/pkg/devdb"
+	ia "github.com/opencord/voltha-protos/v5/go/inter_adapter"
 	"github.com/opencord/voltha-protos/v5/go/voltha"
 )
 
@@ -723,14 +724,18 @@
 	var bufferStartOffset uint32
 	var bufferEndOffset uint32
 	var downloadSection []byte
-	framePrint := false //default no printing of downloadSection frames
+
 	oFsm.mutexUpgradeParams.Lock()
 	oFsm.upgradePhase = cUpgradeDownloading //start of downloading image to ONU
 	if oFsm.nextDownloadSectionsAbsolute == 0 {
-		//debug print of first section frame
-		framePrint = true
 		oFsm.volthaImageState = voltha.ImageState_IMAGE_DOWNLOADING
 	}
+	//var omuTxSecPerWindow []*common.OmciTransferStructure
+	omciMsgsPerSection := &ia.OmciMessages{}
+	//take the TID mutex
+	//To ensure the insequesnce of TIDS for all the onusw sections within a window
+	oFsm.pOmciCC.LockMutexTID()
+	defer oFsm.pOmciCC.UnLockMutexTID()
 	for {
 		oFsm.mutexAbortRequest.RLock()
 		// this way out of the section download loop on abort request
@@ -780,33 +785,43 @@
 		}
 		if oFsm.nextDownloadSectionsAbsolute+1 >= oFsm.noOfSections {
 			windowAckRequest = 1
-			framePrint = true //debug print of last frame
+
 			oFsm.omciDownloadWindowSizeLast = oFsm.nextDownloadSectionsWindow
 			logger.Infow(ctx, "DlSection expect Response for last window (section)", log.Fields{
 				"device-id": oFsm.deviceID, "DlSectionNoAbsolute": oFsm.nextDownloadSectionsAbsolute})
 		}
 		oFsm.mutexUpgradeParams.Unlock() //unlock here to give other functions some chance to process during/after the send request
-		err := oFsm.pOmciCC.SendDownloadSection(log.WithSpanFromContext(context.Background(), ctx), oFsm.pDeviceHandler.GetOmciTimeout(), false,
-			oFsm.PAdaptFsm.CommChan, oFsm.InactiveImageMeID, windowAckRequest, oFsm.nextDownloadSectionsWindow, downloadSection, framePrint)
+		omciTxReq, err := oFsm.pOmciCC.PrepareOnuSectionsOfWindow(log.WithSpanFromContext(context.Background(), ctx),
+			oFsm.InactiveImageMeID, windowAckRequest, oFsm.nextDownloadSectionsWindow, downloadSection, omciMsgsPerSection)
 		if err != nil {
 			logger.Errorw(ctx, "DlSection abort: can't send section", log.Fields{
 				"device-id": oFsm.deviceID, "section absolute": oFsm.nextDownloadSectionsAbsolute, "error": err})
 			oFsm.abortOnOmciError(ctx, false)
 			return
 		}
+
 		oFsm.mutexUpgradeParams.Lock()
 		oFsm.nextDownloadSectionsAbsolute++ //always increase the absolute section counter after having sent one
 		if windowAckRequest == 1 {
 			oFsm.mutexUpgradeParams.Unlock()
+
+			if omciTxReq.OnuSwWindow == nil {
+				logger.Errorw(ctx, "fail to send sections in a window", log.Fields{
+					"device-id": oFsm.deviceID, "section absolute": oFsm.nextDownloadSectionsAbsolute, "error": err})
+				oFsm.abortOnOmciError(ctx, false)
+				return
+			}
+
 			pUpgradeFsm := oFsm.PAdaptFsm
 			if pUpgradeFsm != nil {
 				_ = pUpgradeFsm.PFsm.Event(UpgradeEvWaitWindowAck) //state transition to upgradeStVerifyWindow
+				oFsm.pOmciCC.SendOnuSwSectionsWindowWithRxSupervision(ctx, omciTxReq, oFsm.pDeviceHandler.GetOmciTimeout(), oFsm.PAdaptFsm.CommChan)
 				return
 			}
 			logger.Warnw(ctx, "pUpgradeFsm is nil", log.Fields{"device-id": oFsm.deviceID})
 			return
 		}
-		framePrint = false                //for the next Section frame (if wanted, can be enabled in logic before sendXXX())
+
 		oFsm.nextDownloadSectionsWindow++ //increase the window related section counter only if not in the last section
 		if oFsm.omciSectionInterleaveDelay > 0 {
 			//ensure a defined intersection-time-gap to leave space for further processing, other ONU's ...