[VOL-4368] openonu-adapter-go - omci prioritized sending

Change-Id: I71dc01de674e9ebf33624e264dd347402fb15e4b
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 363a59f..e8c669d 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -120,12 +120,15 @@
 	UploadSequNo   uint16
 	UploadNoOfCmds uint16
 
-	mutexTxQueue      sync.Mutex
-	txQueue           *list.List
-	mutexRxSchedMap   sync.Mutex
-	rxSchedulerMap    map[uint16]CallbackPairEntry
-	mutexMonReq       sync.RWMutex
-	monitoredRequests map[uint16]OmciTransferStructure
+	mutexSendQueuedRequests sync.Mutex
+	mutexLowPrioTxQueue     sync.Mutex
+	lowPrioTxQueue          *list.List
+	mutexHighPrioTxQueue    sync.Mutex
+	highPrioTxQueue         *list.List
+	mutexRxSchedMap         sync.Mutex
+	rxSchedulerMap          map[uint16]CallbackPairEntry
+	mutexMonReq             sync.RWMutex
+	monitoredRequests       map[uint16]OmciTransferStructure
 }
 
 var responsesWithMibDataSync = []omci.MessageType{
@@ -162,7 +165,8 @@
 	omciCC.hpTid = 0x8000
 	omciCC.UploadSequNo = 0
 	omciCC.UploadNoOfCmds = 0
-	omciCC.txQueue = list.New()
+	omciCC.lowPrioTxQueue = list.New()
+	omciCC.highPrioTxQueue = list.New()
 	omciCC.rxSchedulerMap = make(map[uint16]CallbackPairEntry)
 	omciCC.monitoredRequests = make(map[uint16]OmciTransferStructure)
 
@@ -174,19 +178,26 @@
 	logger.Debugw(ctx, "omciCC-stopping", log.Fields{"device-id": oo.deviceID})
 	//reseting all internal data, which might also be helpful for discarding any lingering tx/rx requests
 	oo.CancelRequestMonitoring(ctx)
-	oo.mutexTxQueue.Lock()
-	oo.txQueue.Init() // clear the tx queue
-	oo.mutexTxQueue.Unlock()
+	// clear the tx queues
+	oo.mutexHighPrioTxQueue.Lock()
+	oo.highPrioTxQueue.Init()
+	oo.mutexHighPrioTxQueue.Unlock()
+	oo.mutexLowPrioTxQueue.Lock()
+	oo.lowPrioTxQueue.Init()
+	oo.mutexLowPrioTxQueue.Unlock()
+	//clear the scheduler map
 	oo.mutexRxSchedMap.Lock()
 	for k := range oo.rxSchedulerMap {
-		delete(oo.rxSchedulerMap, k) //clear the scheduler map
+		delete(oo.rxSchedulerMap, k)
 	}
 	oo.mutexRxSchedMap.Unlock()
+	//reset the high prio transactionId
 	oo.mutexHpTid.Lock()
-	oo.hpTid = 0x8000 //reset the high prio transactionId
+	oo.hpTid = 0x8000
 	oo.mutexHpTid.Unlock()
+	//reset the low prio transactionId
 	oo.mutexTid.Lock()
-	oo.tid = 1 //reset the low prio transactionId
+	oo.tid = 1
 	oo.mutexTid.Unlock()
 	//reset control values
 	oo.UploadSequNo = 0
@@ -523,92 +534,80 @@
 	return fmt.Errorf("message with tid is processed already %s", oo.deviceID)
 }
 
-//Pull next tx request and send it
-func (oo *OmciCC) sendNextRequest(ctx context.Context) error {
-	//	return errors.New("sendNextRequest unimplemented")
+func (oo *OmciCC) sendQueuedRequests(ctx context.Context) {
+	// Avoid accessing the txQueues from parallel send routines to block
+	// parallel omci send requests at least until SendIAP is 'committed'.
+	// To guarantee window size 1 for one ONU it would be necessary to wait
+	// for the corresponding response too (t.b.d.).
+	oo.mutexSendQueuedRequests.Lock()
+	defer oo.mutexSendQueuedRequests.Unlock()
+	if err := oo.sendQueuedHighPrioRequests(ctx); err != nil {
+		logger.Errorw(ctx, "Error during sending high prio requests!",
+			log.Fields{"err": err, "device-id": oo.deviceID})
+		return
+	}
+	if err := oo.sendQueuedLowPrioRequests(ctx); err != nil {
+		logger.Errorw(ctx, "Error during sending low prio requests!",
+			log.Fields{"err": err, "device-id": oo.deviceID})
+		return
+	}
+}
 
-	// just try to get something transferred !!
-	// avoid accessing the txQueue from parallel send requests
-	// block parallel omci send requests at least until SendIAP is 'committed'
-	// that should be feasible for an onu instance as on OMCI anyway window size 1 is assumed
-	oo.mutexTxQueue.Lock()
-	defer oo.mutexTxQueue.Unlock()
-	for oo.txQueue.Len() > 0 {
-		queueElement := oo.txQueue.Front() // First element
-		omciTxRequest := queueElement.Value.(OmciTransferStructure)
-		/* compare olt device handler code:
-		func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) {
-			logger.Debugw(ctx,"omci indication", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
-			var deviceType string
-			var deviceID string
-			var proxyDeviceID string
-
-			onuKey := dh.formOnuKey(omciInd.IntfId, omciInd.OnuId)
-
-			if onuInCache, ok := dh.onus.Load(onuKey); !ok {
-
-				logger.Debugw(ctx,"omci indication for a device not in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
-				ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
-				kwargs := make(map[string]interface{})
-				kwargs["onu_id"] = omciInd.OnuId
-				kwargs["parent_port_no"] = ponPort
-
-				onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
-				if err != nil {
-					logger.Errorw(ctx,"onu not found", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId, "error": err})
-					return
-				}
-				deviceType = onuDevice.Type
-				deviceID = onuDevice.Id
-				proxyDeviceID = onuDevice.ProxyAddress.DeviceId
-				//if not exist in cache, then add to cache.
-				dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID))
-			} else {
-				//found in cache
-				logger.Debugw(ctx,"omci indication for a device in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
-				deviceType = onuInCache.(*OnuDevice).deviceType
-				deviceID = onuInCache.(*OnuDevice).deviceID
-				proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
-			}
-		*/
-		/* and compare onu_adapter py code:
-		omci_msg = InterAdapterOmciMessage(
-			message=bytes(frame),
-			proxy_address=self._proxy_address,
-			connect_status=self._device.connect_status)
-
-		self.logger.debug('sent-omci-msg', tid=tx_tid, omci_msg=hexlify(bytes(frame)))
-
-		yield self._adapter_proxy.send_inter_adapter_message(
-			msg=omci_msg,
-			type=InterAdapterMessageType.OMCI_REQUEST,
-			from_adapter=self._device.type,
-			to_adapter=self._proxy_address.device_type,
-			to_device_id=self._device_id,
-			proxy_device_id=self._proxy_address.device_id
-		)
-		*/
-		if omciTxRequest.withFramePrint {
-			logger.Debugw(ctx, "omci-message-to-send:", log.Fields{
-				"TxOmciMessage": hex.EncodeToString(omciTxRequest.txFrame),
-				"device-id":     oo.deviceID,
-				"toDeviceType":  oo.pBaseDeviceHandler.GetProxyAddressType(),
-				"proxyDeviceID": oo.pBaseDeviceHandler.GetProxyAddressID(),
-				"proxyAddress":  oo.pBaseDeviceHandler.GetProxyAddress()})
+func (oo *OmciCC) sendQueuedHighPrioRequests(ctx context.Context) error {
+	oo.mutexHighPrioTxQueue.Lock()
+	defer oo.mutexHighPrioTxQueue.Unlock()
+	for oo.highPrioTxQueue.Len() > 0 {
+		queueElement := oo.highPrioTxQueue.Front() // First element
+		if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
+			return err
 		}
-		omciMsg := &ic.OmciMessage{
-			ParentDeviceId: oo.pBaseDeviceHandler.GetProxyAddressID(),
-			ChildDeviceId:  oo.deviceID,
-			Message:        omciTxRequest.txFrame,
-			ProxyAddress:   oo.pBaseDeviceHandler.GetProxyAddress(),
-			ConnectStatus:  common.ConnectStatus_REACHABLE, // If we are sending OMCI messages means we are connected, else we should not be here
+		oo.highPrioTxQueue.Remove(queueElement) // Dequeue
+	}
+	return nil
+}
+
+func (oo *OmciCC) sendQueuedLowPrioRequests(ctx context.Context) error {
+	oo.mutexLowPrioTxQueue.Lock()
+	for oo.lowPrioTxQueue.Len() > 0 {
+		queueElement := oo.lowPrioTxQueue.Front() // First element
+		if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
+			oo.mutexLowPrioTxQueue.Unlock()
+			return err
 		}
-		sendErr := oo.pBaseDeviceHandler.SendOMCIRequest(ctx, oo.pBaseDeviceHandler.GetProxyAddress().AdapterEndpoint, omciMsg)
-		if sendErr != nil {
-			logger.Errorw(ctx, "send omci request error", log.Fields{"ChildId": oo.deviceID, "error": sendErr})
-			return sendErr
+		oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
+		// Interrupt the sending of low priority requests to process any high priority requests
+		// that may have arrived in the meantime
+		oo.mutexLowPrioTxQueue.Unlock()
+		if err := oo.sendQueuedHighPrioRequests(ctx); err != nil {
+			return err
 		}
-		oo.txQueue.Remove(queueElement) // Dequeue
+		oo.mutexLowPrioTxQueue.Lock()
+	}
+
+	oo.mutexLowPrioTxQueue.Unlock()
+	return nil
+}
+
+func (oo *OmciCC) sendOMCIRequest(ctx context.Context, omciTxRequest OmciTransferStructure) error {
+	if omciTxRequest.withFramePrint {
+		logger.Debugw(ctx, "omci-message-to-send:", log.Fields{
+			"TxOmciMessage": hex.EncodeToString(omciTxRequest.txFrame),
+			"device-id":     oo.deviceID,
+			"toDeviceType":  oo.pBaseDeviceHandler.GetProxyAddressType(),
+			"proxyDeviceID": oo.pBaseDeviceHandler.GetProxyAddressID(),
+			"proxyAddress":  oo.pBaseDeviceHandler.GetProxyAddress()})
+	}
+	omciMsg := &ic.OmciMessage{
+		ParentDeviceId: oo.pBaseDeviceHandler.GetProxyAddressID(),
+		ChildDeviceId:  oo.deviceID,
+		Message:        omciTxRequest.txFrame,
+		ProxyAddress:   oo.pBaseDeviceHandler.GetProxyAddress(),
+		ConnectStatus:  common.ConnectStatus_REACHABLE, // If we are sending OMCI messages means we are connected, else we should not be here
+	}
+	sendErr := oo.pBaseDeviceHandler.SendOMCIRequest(ctx, oo.pBaseDeviceHandler.GetProxyAddress().AdapterEndpoint, omciMsg)
+	if sendErr != nil {
+		logger.Errorw(ctx, "send omci request error", log.Fields{"ChildId": oo.deviceID, "error": sendErr})
+		return sendErr
 	}
 	return nil
 }
@@ -3155,10 +3154,17 @@
 	timeout := aOmciTxRequest.timeout
 	if timeout == 0 {
 		//timeout 0 indicates that no response is expected - fire and forget
-		oo.mutexTxQueue.Lock()
-		oo.txQueue.PushBack(aOmciTxRequest) // enqueue
-		oo.mutexTxQueue.Unlock()
-		go oo.sendNextRequest(ctx)
+		// enqueue
+		if aOmciTxRequest.highPrio {
+			oo.mutexHighPrioTxQueue.Lock()
+			oo.highPrioTxQueue.PushBack(aOmciTxRequest)
+			oo.mutexHighPrioTxQueue.Unlock()
+		} else {
+			oo.mutexLowPrioTxQueue.Lock()
+			oo.lowPrioTxQueue.PushBack(aOmciTxRequest)
+			oo.mutexLowPrioTxQueue.Unlock()
+		}
+		go oo.sendQueuedRequests(ctx)
 	} else {
 		//the supervised sending with waiting on the response (based on TID) is called in background
 		//  to avoid blocking of the sender for the complete OMCI handshake procedure
@@ -3180,11 +3186,17 @@
 	retryCounter := 0
 loop:
 	for retryCounter <= retries {
-
-		oo.mutexTxQueue.Lock()
-		oo.txQueue.PushBack(aOmciTxRequest) // enqueue
-		oo.mutexTxQueue.Unlock()
-		go oo.sendNextRequest(ctx)
+		// enqueue
+		if aOmciTxRequest.highPrio {
+			oo.mutexHighPrioTxQueue.Lock()
+			oo.highPrioTxQueue.PushBack(aOmciTxRequest)
+			oo.mutexHighPrioTxQueue.Unlock()
+		} else {
+			oo.mutexLowPrioTxQueue.Lock()
+			oo.lowPrioTxQueue.PushBack(aOmciTxRequest)
+			oo.mutexLowPrioTxQueue.Unlock()
+		}
+		go oo.sendQueuedRequests(ctx)
 
 		select {
 		case success := <-chSuccess: