[VOL-4368] openonu-adapter-go - omci prioritized sending
Change-Id: I71dc01de674e9ebf33624e264dd347402fb15e4b
diff --git a/VERSION b/VERSION
index 09bf7ca..9559721 100755
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-2.0.1-dev232
+2.0.2-dev233
diff --git a/internal/pkg/common/omci_cc.go b/internal/pkg/common/omci_cc.go
index 363a59f..e8c669d 100755
--- a/internal/pkg/common/omci_cc.go
+++ b/internal/pkg/common/omci_cc.go
@@ -120,12 +120,15 @@
UploadSequNo uint16
UploadNoOfCmds uint16
- mutexTxQueue sync.Mutex
- txQueue *list.List
- mutexRxSchedMap sync.Mutex
- rxSchedulerMap map[uint16]CallbackPairEntry
- mutexMonReq sync.RWMutex
- monitoredRequests map[uint16]OmciTransferStructure
+ mutexSendQueuedRequests sync.Mutex
+ mutexLowPrioTxQueue sync.Mutex
+ lowPrioTxQueue *list.List
+ mutexHighPrioTxQueue sync.Mutex
+ highPrioTxQueue *list.List
+ mutexRxSchedMap sync.Mutex
+ rxSchedulerMap map[uint16]CallbackPairEntry
+ mutexMonReq sync.RWMutex
+ monitoredRequests map[uint16]OmciTransferStructure
}
var responsesWithMibDataSync = []omci.MessageType{
@@ -162,7 +165,8 @@
omciCC.hpTid = 0x8000
omciCC.UploadSequNo = 0
omciCC.UploadNoOfCmds = 0
- omciCC.txQueue = list.New()
+ omciCC.lowPrioTxQueue = list.New()
+ omciCC.highPrioTxQueue = list.New()
omciCC.rxSchedulerMap = make(map[uint16]CallbackPairEntry)
omciCC.monitoredRequests = make(map[uint16]OmciTransferStructure)
@@ -174,19 +178,26 @@
logger.Debugw(ctx, "omciCC-stopping", log.Fields{"device-id": oo.deviceID})
//reseting all internal data, which might also be helpful for discarding any lingering tx/rx requests
oo.CancelRequestMonitoring(ctx)
- oo.mutexTxQueue.Lock()
- oo.txQueue.Init() // clear the tx queue
- oo.mutexTxQueue.Unlock()
+ // clear the tx queues
+ oo.mutexHighPrioTxQueue.Lock()
+ oo.highPrioTxQueue.Init()
+ oo.mutexHighPrioTxQueue.Unlock()
+ oo.mutexLowPrioTxQueue.Lock()
+ oo.lowPrioTxQueue.Init()
+ oo.mutexLowPrioTxQueue.Unlock()
+ //clear the scheduler map
oo.mutexRxSchedMap.Lock()
for k := range oo.rxSchedulerMap {
- delete(oo.rxSchedulerMap, k) //clear the scheduler map
+ delete(oo.rxSchedulerMap, k)
}
oo.mutexRxSchedMap.Unlock()
+ //reset the high prio transactionId
oo.mutexHpTid.Lock()
- oo.hpTid = 0x8000 //reset the high prio transactionId
+ oo.hpTid = 0x8000
oo.mutexHpTid.Unlock()
+ //reset the low prio transactionId
oo.mutexTid.Lock()
- oo.tid = 1 //reset the low prio transactionId
+ oo.tid = 1
oo.mutexTid.Unlock()
//reset control values
oo.UploadSequNo = 0
@@ -523,92 +534,80 @@
return fmt.Errorf("message with tid is processed already %s", oo.deviceID)
}
-//Pull next tx request and send it
-func (oo *OmciCC) sendNextRequest(ctx context.Context) error {
- // return errors.New("sendNextRequest unimplemented")
+func (oo *OmciCC) sendQueuedRequests(ctx context.Context) {
+ // Avoid accessing the txQueues from parallel send routines to block
+ // parallel omci send requests at least until SendIAP is 'committed'.
+ // To guarantee window size 1 for one ONU it would be necessary to wait
+ // for the corresponding response too (t.b.d.).
+ oo.mutexSendQueuedRequests.Lock()
+ defer oo.mutexSendQueuedRequests.Unlock()
+ if err := oo.sendQueuedHighPrioRequests(ctx); err != nil {
+ logger.Errorw(ctx, "Error during sending high prio requests!",
+ log.Fields{"err": err, "device-id": oo.deviceID})
+ return
+ }
+ if err := oo.sendQueuedLowPrioRequests(ctx); err != nil {
+ logger.Errorw(ctx, "Error during sending low prio requests!",
+ log.Fields{"err": err, "device-id": oo.deviceID})
+ return
+ }
+}
- // just try to get something transferred !!
- // avoid accessing the txQueue from parallel send requests
- // block parallel omci send requests at least until SendIAP is 'committed'
- // that should be feasible for an onu instance as on OMCI anyway window size 1 is assumed
- oo.mutexTxQueue.Lock()
- defer oo.mutexTxQueue.Unlock()
- for oo.txQueue.Len() > 0 {
- queueElement := oo.txQueue.Front() // First element
- omciTxRequest := queueElement.Value.(OmciTransferStructure)
- /* compare olt device handler code:
- func (dh *DeviceHandler) omciIndication(omciInd *oop.OmciIndication) {
- logger.Debugw(ctx,"omci indication", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
- var deviceType string
- var deviceID string
- var proxyDeviceID string
-
- onuKey := dh.formOnuKey(omciInd.IntfId, omciInd.OnuId)
-
- if onuInCache, ok := dh.onus.Load(onuKey); !ok {
-
- logger.Debugw(ctx,"omci indication for a device not in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
- ponPort := IntfIDToPortNo(omciInd.GetIntfId(), voltha.Port_PON_OLT)
- kwargs := make(map[string]interface{})
- kwargs["onu_id"] = omciInd.OnuId
- kwargs["parent_port_no"] = ponPort
-
- onuDevice, err := dh.coreProxy.GetChildDevice(log.WithSpanFromContext(context.TODO(), ctx), dh.device.Id, kwargs)
- if err != nil {
- logger.Errorw(ctx,"onu not found", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId, "error": err})
- return
- }
- deviceType = onuDevice.Type
- deviceID = onuDevice.Id
- proxyDeviceID = onuDevice.ProxyAddress.DeviceId
- //if not exist in cache, then add to cache.
- dh.onus.Store(onuKey, NewOnuDevice(deviceID, deviceType, onuDevice.SerialNumber, omciInd.OnuId, omciInd.IntfId, proxyDeviceID))
- } else {
- //found in cache
- logger.Debugw(ctx,"omci indication for a device in cache.", log.Fields{"intfID": omciInd.IntfId, "onuID": omciInd.OnuId})
- deviceType = onuInCache.(*OnuDevice).deviceType
- deviceID = onuInCache.(*OnuDevice).deviceID
- proxyDeviceID = onuInCache.(*OnuDevice).proxyDeviceID
- }
- */
- /* and compare onu_adapter py code:
- omci_msg = InterAdapterOmciMessage(
- message=bytes(frame),
- proxy_address=self._proxy_address,
- connect_status=self._device.connect_status)
-
- self.logger.debug('sent-omci-msg', tid=tx_tid, omci_msg=hexlify(bytes(frame)))
-
- yield self._adapter_proxy.send_inter_adapter_message(
- msg=omci_msg,
- type=InterAdapterMessageType.OMCI_REQUEST,
- from_adapter=self._device.type,
- to_adapter=self._proxy_address.device_type,
- to_device_id=self._device_id,
- proxy_device_id=self._proxy_address.device_id
- )
- */
- if omciTxRequest.withFramePrint {
- logger.Debugw(ctx, "omci-message-to-send:", log.Fields{
- "TxOmciMessage": hex.EncodeToString(omciTxRequest.txFrame),
- "device-id": oo.deviceID,
- "toDeviceType": oo.pBaseDeviceHandler.GetProxyAddressType(),
- "proxyDeviceID": oo.pBaseDeviceHandler.GetProxyAddressID(),
- "proxyAddress": oo.pBaseDeviceHandler.GetProxyAddress()})
+func (oo *OmciCC) sendQueuedHighPrioRequests(ctx context.Context) error {
+ oo.mutexHighPrioTxQueue.Lock()
+ defer oo.mutexHighPrioTxQueue.Unlock()
+ for oo.highPrioTxQueue.Len() > 0 {
+ queueElement := oo.highPrioTxQueue.Front() // First element
+ if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
+ return err
}
- omciMsg := &ic.OmciMessage{
- ParentDeviceId: oo.pBaseDeviceHandler.GetProxyAddressID(),
- ChildDeviceId: oo.deviceID,
- Message: omciTxRequest.txFrame,
- ProxyAddress: oo.pBaseDeviceHandler.GetProxyAddress(),
- ConnectStatus: common.ConnectStatus_REACHABLE, // If we are sending OMCI messages means we are connected, else we should not be here
+ oo.highPrioTxQueue.Remove(queueElement) // Dequeue
+ }
+ return nil
+}
+
+func (oo *OmciCC) sendQueuedLowPrioRequests(ctx context.Context) error {
+ oo.mutexLowPrioTxQueue.Lock()
+ for oo.lowPrioTxQueue.Len() > 0 {
+ queueElement := oo.lowPrioTxQueue.Front() // First element
+ if err := oo.sendOMCIRequest(ctx, queueElement.Value.(OmciTransferStructure)); err != nil {
+ oo.mutexLowPrioTxQueue.Unlock()
+ return err
}
- sendErr := oo.pBaseDeviceHandler.SendOMCIRequest(ctx, oo.pBaseDeviceHandler.GetProxyAddress().AdapterEndpoint, omciMsg)
- if sendErr != nil {
- logger.Errorw(ctx, "send omci request error", log.Fields{"ChildId": oo.deviceID, "error": sendErr})
- return sendErr
+ oo.lowPrioTxQueue.Remove(queueElement) // Dequeue
+ // Interrupt the sending of low priority requests to process any high priority requests
+ // that may have arrived in the meantime
+ oo.mutexLowPrioTxQueue.Unlock()
+ if err := oo.sendQueuedHighPrioRequests(ctx); err != nil {
+ return err
}
- oo.txQueue.Remove(queueElement) // Dequeue
+ oo.mutexLowPrioTxQueue.Lock()
+ }
+
+ oo.mutexLowPrioTxQueue.Unlock()
+ return nil
+}
+
+func (oo *OmciCC) sendOMCIRequest(ctx context.Context, omciTxRequest OmciTransferStructure) error {
+ if omciTxRequest.withFramePrint {
+ logger.Debugw(ctx, "omci-message-to-send:", log.Fields{
+ "TxOmciMessage": hex.EncodeToString(omciTxRequest.txFrame),
+ "device-id": oo.deviceID,
+ "toDeviceType": oo.pBaseDeviceHandler.GetProxyAddressType(),
+ "proxyDeviceID": oo.pBaseDeviceHandler.GetProxyAddressID(),
+ "proxyAddress": oo.pBaseDeviceHandler.GetProxyAddress()})
+ }
+ omciMsg := &ic.OmciMessage{
+ ParentDeviceId: oo.pBaseDeviceHandler.GetProxyAddressID(),
+ ChildDeviceId: oo.deviceID,
+ Message: omciTxRequest.txFrame,
+ ProxyAddress: oo.pBaseDeviceHandler.GetProxyAddress(),
+ ConnectStatus: common.ConnectStatus_REACHABLE, // If we are sending OMCI messages means we are connected, else we should not be here
+ }
+ sendErr := oo.pBaseDeviceHandler.SendOMCIRequest(ctx, oo.pBaseDeviceHandler.GetProxyAddress().AdapterEndpoint, omciMsg)
+ if sendErr != nil {
+ logger.Errorw(ctx, "send omci request error", log.Fields{"ChildId": oo.deviceID, "error": sendErr})
+ return sendErr
}
return nil
}
@@ -3155,10 +3154,17 @@
timeout := aOmciTxRequest.timeout
if timeout == 0 {
//timeout 0 indicates that no response is expected - fire and forget
- oo.mutexTxQueue.Lock()
- oo.txQueue.PushBack(aOmciTxRequest) // enqueue
- oo.mutexTxQueue.Unlock()
- go oo.sendNextRequest(ctx)
+ // enqueue
+ if aOmciTxRequest.highPrio {
+ oo.mutexHighPrioTxQueue.Lock()
+ oo.highPrioTxQueue.PushBack(aOmciTxRequest)
+ oo.mutexHighPrioTxQueue.Unlock()
+ } else {
+ oo.mutexLowPrioTxQueue.Lock()
+ oo.lowPrioTxQueue.PushBack(aOmciTxRequest)
+ oo.mutexLowPrioTxQueue.Unlock()
+ }
+ go oo.sendQueuedRequests(ctx)
} else {
//the supervised sending with waiting on the response (based on TID) is called in background
// to avoid blocking of the sender for the complete OMCI handshake procedure
@@ -3180,11 +3186,17 @@
retryCounter := 0
loop:
for retryCounter <= retries {
-
- oo.mutexTxQueue.Lock()
- oo.txQueue.PushBack(aOmciTxRequest) // enqueue
- oo.mutexTxQueue.Unlock()
- go oo.sendNextRequest(ctx)
+ // enqueue
+ if aOmciTxRequest.highPrio {
+ oo.mutexHighPrioTxQueue.Lock()
+ oo.highPrioTxQueue.PushBack(aOmciTxRequest)
+ oo.mutexHighPrioTxQueue.Unlock()
+ } else {
+ oo.mutexLowPrioTxQueue.Lock()
+ oo.lowPrioTxQueue.PushBack(aOmciTxRequest)
+ oo.mutexLowPrioTxQueue.Unlock()
+ }
+ go oo.sendQueuedRequests(ctx)
select {
case success := <-chSuccess: