VOL-4125: Respond with success/failure to flow insertion according the
operation result in adapter
VOL-4323: Support serialization of flow messages (execute a flow to
completion before picking up another flow)
Change-Id: I934fc68b103042971109923491a3f27aaf885328
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 449d96e..b84bb83 100755
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -120,6 +120,16 @@
cSkipOnuConfigReconciling
)
+// FlowCb is the flow control block containing flow add/delete information along with a response channel
+type FlowCb struct {
+ ctx context.Context // Flow handler context
+ addFlow bool // if true flow to be added, else removed
+ flowItem *of.OfpFlowStats
+ uniPort *cmn.OnuUniPort
+ flowMetaData *voltha.FlowMetadata
+ respChan *chan error // channel to report the Flow handling error
+}
+
//deviceHandler will interact with the ONU ? device.
type deviceHandler struct {
DeviceID string
@@ -186,6 +196,11 @@
mutexDeletionInProgressFlag sync.RWMutex
pLastUpgradeImageState *voltha.ImageState
upgradeFsmChan chan struct{}
+
+ flowCbChan []chan FlowCb
+ mutexFlowMonitoringRoutineFlag sync.RWMutex
+ stopFlowMonitoringRoutine []chan bool // length of slice equal to number of uni ports
+ isFlowMonitoringRoutineActive []bool // length of slice equal to number of uni ports
}
//newDeviceHandler creates a new device handler
@@ -532,8 +547,9 @@
func (dh *deviceHandler) FlowUpdateIncremental(ctx context.Context,
apOfFlowChanges *of.FlowChanges,
apOfGroupChanges *of.FlowGroupChanges, apFlowMetaData *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.DeviceID, "metadata": apFlowMetaData})
- var retError error = nil
+ logger.Debugw(ctx, "FlowUpdateIncremental started", log.Fields{"device-id": dh.DeviceID, "flow": apOfFlowChanges, "metadata": apFlowMetaData})
+ var errorsList []error
+ var retError error
//Remove flows (always remove flows first - remove old and add new with same cookie may be part of the same request)
if apOfFlowChanges.ToRemove != nil {
for _, flowItem := range apOfFlowChanges.ToRemove.Items {
@@ -541,12 +557,14 @@
logger.Warnw(ctx, "flow-remove no cookie: ignore and continuing on checking further flows", log.Fields{
"device-id": dh.DeviceID})
retError = fmt.Errorf("flow-remove no cookie, device-id %s", dh.DeviceID)
+ errorsList = append(errorsList, retError)
continue
}
flowInPort := flow.GetInPort(flowItem)
if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
logger.Warnw(ctx, "flow-remove inPort invalid: ignore and continuing on checking further flows", log.Fields{"device-id": dh.DeviceID})
retError = fmt.Errorf("flow-remove inPort invalid, device-id %s", dh.DeviceID)
+ errorsList = append(errorsList, retError)
continue
//return fmt.Errorf("flow inPort invalid: %s", dh.DeviceID)
} else if flowInPort == dh.ponPortNumber {
@@ -564,22 +582,43 @@
log.Fields{"device-id": dh.DeviceID, "inPort": flowInPort})
retError = fmt.Errorf("flow-remove inPort not found in UniPorts, inPort %d, device-id %s",
flowInPort, dh.DeviceID)
+ errorsList = append(errorsList, retError)
continue
}
flowOutPort := flow.GetOutPort(flowItem)
logger.Debugw(ctx, "flow-remove port indications", log.Fields{
"device-id": dh.DeviceID, "inPort": flowInPort, "outPort": flowOutPort,
"uniPortName": loUniPort.Name})
- err := dh.removeFlowItemFromUniPort(ctx, flowItem, loUniPort)
- //try next flow after processing error
- if err != nil {
- logger.Warnw(ctx, "flow-remove processing error: continuing on checking further flows",
- log.Fields{"device-id": dh.DeviceID, "error": err})
- retError = err
- continue
- //return err
- } else { // if last setting succeeds, overwrite possibly previously set error
- retError = nil
+
+ if dh.GetFlowMonitoringIsRunning(loUniPort.UniID) {
+ // Step1 : Fill flowControlBlock
+ // Step2 : Push the flowControlBlock to ONU channel
+ // Step3 : Wait on response channel for response
+ // Step4 : Return error value
+ startTime := time.Now()
+ respChan := make(chan error)
+ flowCb := FlowCb{
+ ctx: ctx,
+ addFlow: false,
+ flowItem: flowItem,
+ flowMetaData: nil,
+ uniPort: loUniPort,
+ respChan: &respChan,
+ }
+ dh.flowCbChan[loUniPort.UniID] <- flowCb
+ logger.Infow(ctx, "process-flow-remove-start", log.Fields{"device-id": dh.DeviceID})
+ // Wait on the channel for flow handlers return value
+ retError = <-respChan
+ logger.Infow(ctx, "process-flow-remove-end", log.Fields{"device-id": dh.DeviceID, "err": retError, "totalTimeSeconds": time.Since(startTime).Seconds()})
+ if retError != nil {
+ logger.Warnw(ctx, "flow-delete processing error: continuing on checking further flows",
+ log.Fields{"device-id": dh.DeviceID, "error": retError})
+ errorsList = append(errorsList, retError)
+ continue
+ }
+ } else {
+ retError = fmt.Errorf("flow-handler-routine-not-active-for-onu--device-id-%v", dh.DeviceID)
+ errorsList = append(errorsList, retError)
}
}
}
@@ -590,12 +629,14 @@
logger.Debugw(ctx, "incremental flow-add no cookie: ignore and continuing on checking further flows", log.Fields{
"device-id": dh.DeviceID})
retError = fmt.Errorf("flow-add no cookie, device-id %s", dh.DeviceID)
+ errorsList = append(errorsList, retError)
continue
}
flowInPort := flow.GetInPort(flowItem)
if flowInPort == uint32(of.OfpPortNo_OFPP_INVALID) {
logger.Warnw(ctx, "flow-add inPort invalid: ignore and continuing on checking further flows", log.Fields{"device-id": dh.DeviceID})
retError = fmt.Errorf("flow-add inPort invalid, device-id %s", dh.DeviceID)
+ errorsList = append(errorsList, retError)
continue
//return fmt.Errorf("flow inPort invalid: %s", dh.DeviceID)
} else if flowInPort == dh.ponPortNumber {
@@ -613,8 +654,8 @@
log.Fields{"device-id": dh.DeviceID, "inPort": flowInPort})
retError = fmt.Errorf("flow-add inPort not found in UniPorts, inPort %d, device-id %s",
flowInPort, dh.DeviceID)
+ errorsList = append(errorsList, retError)
continue
- //return fmt.Errorf("flow-parameter inPort %d not found in internal UniPorts", flowInPort)
}
// let's still assume that we receive the flow-add only in some 'active' device state (as so far observed)
// if not, we just throw some error here to have an indication about that, if we really need to support that
@@ -625,28 +666,53 @@
if !dh.IsReadyForOmciConfig() {
logger.Errorw(ctx, "flow-add rejected: improper device state", log.Fields{"device-id": dh.DeviceID,
"last device-reason": dh.GetDeviceReasonString()})
- return fmt.Errorf("improper device state on device %s", dh.DeviceID)
+ retError = fmt.Errorf("improper device state on device %s", dh.DeviceID)
+ errorsList = append(errorsList, retError)
+ continue
}
flowOutPort := flow.GetOutPort(flowItem)
logger.Debugw(ctx, "flow-add port indications", log.Fields{
"device-id": dh.DeviceID, "inPort": flowInPort, "outPort": flowOutPort,
"uniPortName": loUniPort.Name})
- err := dh.addFlowItemToUniPort(ctx, flowItem, loUniPort, apFlowMetaData)
- //try next flow after processing error
- if err != nil {
- logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
- log.Fields{"device-id": dh.DeviceID, "error": err})
- retError = err
- continue
- //return err
- } else { // if last setting succeeds, overwrite possibly previously set error
- retError = nil
+ if dh.GetFlowMonitoringIsRunning(loUniPort.UniID) {
+ // Step1 : Fill flowControlBlock
+ // Step2 : Push the flowControlBlock to ONU channel
+ // Step3 : Wait on response channel for response
+ // Step4 : Return error value
+ startTime := time.Now()
+ respChan := make(chan error)
+ flowCb := FlowCb{
+ ctx: ctx,
+ addFlow: true,
+ flowItem: flowItem,
+ flowMetaData: apFlowMetaData,
+ uniPort: loUniPort,
+ respChan: &respChan,
+ }
+ dh.flowCbChan[loUniPort.UniID] <- flowCb
+ logger.Infow(ctx, "process-flow-add-start", log.Fields{"device-id": dh.DeviceID})
+ // Wait on the channel for flow handlers return value
+ retError = <-respChan
+ logger.Infow(ctx, "process-flow-add-end", log.Fields{"device-id": dh.DeviceID, "err": retError, "totalTimeSeconds": time.Since(startTime).Seconds()})
+ if retError != nil {
+ logger.Warnw(ctx, "flow-add processing error: continuing on checking further flows",
+ log.Fields{"device-id": dh.DeviceID, "error": retError})
+ errorsList = append(errorsList, retError)
+ continue
+ }
+ } else {
+ retError = fmt.Errorf("flow-handler-routine-not-active-for-onu--device-id-%v", dh.DeviceID)
+ errorsList = append(errorsList, retError)
}
}
}
}
- return retError
+ if len(errorsList) > 0 {
+ logger.Errorw(ctx, "error-processing-flow", log.Fields{"device-id": dh.DeviceID, "errList": errorsList})
+ return fmt.Errorf("errors-installing-one-or-more-flows-groups, errors:%v", errorsList)
+ }
+ return nil
}
//disableDevice locks the ONU and its UNI/VEIP ports (admin lock via OMCI)
@@ -939,13 +1005,13 @@
if _, exist = dh.UniVlanConfigFsmMap[uniData.PersUniID]; exist {
if err := dh.UniVlanConfigFsmMap[uniData.PersUniID].SetUniFlowParams(ctx, flowData.VlanRuleParams.TpID,
flowData.CookieSlice, uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
- uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter); err != nil {
+ uint8(flowData.VlanRuleParams.SetPcp), lastFlowToReconcile, flowData.Meter, nil); err != nil {
logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.DeviceID})
}
} else {
if err := dh.createVlanFilterFsm(ctx, uniPort, flowData.VlanRuleParams.TpID, flowData.CookieSlice,
uint16(flowData.VlanRuleParams.MatchVid), uint16(flowData.VlanRuleParams.SetVid),
- uint8(flowData.VlanRuleParams.SetPcp), cmn.OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter); err != nil {
+ uint8(flowData.VlanRuleParams.SetPcp), cmn.OmciVlanFilterAddDone, lastFlowToReconcile, flowData.Meter, nil); err != nil {
logger.Errorw(ctx, err.Error(), log.Fields{"device-id": dh.DeviceID})
}
}
@@ -2025,6 +2091,8 @@
dh.pSelfTestHdlr.StopSelfTestModule <- true
}
+ // Note: We want flow deletes to be processed on onu down, so do not stop flow monitoring routines
+
//reset a possibly running upgrade FSM
// (note the Upgrade FSM may stay alive e.g. in state UpgradeStWaitForCommit to endure the ONU reboot)
dh.lockUpgradeFsm.RLock()
@@ -2146,6 +2214,16 @@
go dh.StartAlarmManager(ctx)
}
+ // Start flow handler routines per UNI
+ for _, uniPort := range dh.uniEntityMap {
+ // only if this port was enabled for use by the operator at startup
+ if (1<<uniPort.UniID)&dh.pOpenOnuAc.config.UniPortMask == (1 << uniPort.UniID) {
+ if !dh.GetFlowMonitoringIsRunning(uniPort.UniID) {
+ go dh.PerOnuFlowHandlerRoutine(uniPort.UniID)
+ }
+ }
+ }
+
// Initialize classical L2 PM Interval Counters
if err := dh.pOnuMetricsMgr.PAdaptFsm.PFsm.Event(pmmgr.L2PmEventInit); err != nil {
// There is no way we should be landing here, but if we do then
@@ -2405,14 +2483,14 @@
logger.Errorw(ctx, "No valid OnuDevice - aborting", log.Fields{"device-id": dh.DeviceID})
return
}
- i := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
+ uniCnt := uint8(0) //UNI Port limit: see MaxUnisPerOnu (by now 16) (OMCI supports max 255 p.b.)
if pptpInstKeys := pDevEntry.GetOnuDB().GetSortedInstKeys(
ctx, me.PhysicalPathTerminationPointEthernetUniClassID); len(pptpInstKeys) > 0 {
for _, mgmtEntityID := range pptpInstKeys {
logger.Debugw(ctx, "Add PPTPEthUni port for MIB-stored instance:", log.Fields{
"device-id": dh.DeviceID, "PPTPEthUni EntityID": mgmtEntityID})
- dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniPPTP)
- i++
+ dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniPPTP)
+ uniCnt++
}
} else {
logger.Debugw(ctx, "No PPTP instances found", log.Fields{"device-id": dh.DeviceID})
@@ -2422,8 +2500,8 @@
for _, mgmtEntityID := range veipInstKeys {
logger.Debugw(ctx, "Add VEIP for MIB-stored instance:", log.Fields{
"device-id": dh.DeviceID, "VEIP EntityID": mgmtEntityID})
- dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniVEIP)
- i++
+ dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniVEIP)
+ uniCnt++
}
} else {
logger.Debugw(ctx, "No VEIP instances found", log.Fields{"device-id": dh.DeviceID})
@@ -2433,14 +2511,23 @@
for _, mgmtEntityID := range potsInstKeys {
logger.Debugw(ctx, "Add PPTP Pots UNI for MIB-stored instance:", log.Fields{
"device-id": dh.DeviceID, "PPTP Pots UNI EntityID": mgmtEntityID})
- dh.addUniPort(ctx, mgmtEntityID, i, cmn.UniPPTPPots)
- i++
+ dh.addUniPort(ctx, mgmtEntityID, uniCnt, cmn.UniPPTPPots)
+ uniCnt++
}
} else {
logger.Debugw(ctx, "No PPTP Pots UNI instances found", log.Fields{"device-id": dh.DeviceID})
}
- if i == 0 {
+ if uniCnt == 0 {
logger.Warnw(ctx, "No UniG instances found", log.Fields{"device-id": dh.DeviceID})
+ return
+ }
+
+ dh.flowCbChan = make([]chan FlowCb, uniCnt)
+ dh.stopFlowMonitoringRoutine = make([]chan bool, uniCnt)
+ dh.isFlowMonitoringRoutineActive = make([]bool, uniCnt)
+ for i := 0; i < int(uniCnt); i++ {
+ dh.flowCbChan[i] = make(chan FlowCb, dh.pOpenOnuAc.config.MaxConcurrentFlowsPerUni)
+ dh.stopFlowMonitoringRoutine[i] = make(chan bool)
}
}
@@ -2931,7 +3018,7 @@
//addFlowItemToUniPort parses the actual flow item to add it to the UniPort
func (dh *deviceHandler) addFlowItemToUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort,
- apFlowMetaData *voltha.FlowMetadata) error {
+ apFlowMetaData *voltha.FlowMetadata, respChan *chan error) {
var loSetVlan uint16 = uint16(of.OfpVlanId_OFPVID_NONE) //noValidEntry
var loMatchVlan uint16 = uint16(of.OfpVlanId_OFPVID_PRESENT) //reserved VLANID entry
var loAddPcp, loSetPcp uint8
@@ -2950,7 +3037,7 @@
if metadata == 0 {
logger.Debugw(ctx, "flow-add invalid metadata - abort",
log.Fields{"device-id": dh.DeviceID})
- return fmt.Errorf("flow-add invalid metadata: %s", dh.DeviceID)
+ *respChan <- fmt.Errorf("flow-add invalid metadata: %s", dh.DeviceID)
}
loTpID := uint8(flow.GetTechProfileIDFromWriteMetaData(ctx, metadata))
loCookie := apFlowItem.GetCookie()
@@ -2977,7 +3064,7 @@
"match_vid": strconv.FormatInt(int64(loMatchVlan), 16)})
//TODO!!: Use DeviceId within the error response to rwCore
// likewise also in other error response cases to calling components as requested in [VOL-3458]
- return fmt.Errorf("flow-add Set/Match VlanId inconsistent: %s", dh.DeviceID)
+ *respChan <- fmt.Errorf("flow-add Set/Match VlanId inconsistent: %s", dh.DeviceID)
}
if loSetVlan == uint16(of.OfpVlanId_OFPVID_NONE) && loMatchVlan == uint16(of.OfpVlanId_OFPVID_PRESENT) {
logger.Debugw(ctx, "flow-add vlan-any/copy", log.Fields{"device-id": dh.DeviceID})
@@ -3005,23 +3092,26 @@
if _, exist := dh.UniVlanConfigFsmMap[apUniPort.UniID]; exist {
//SetUniFlowParams() may block on some rule that is suspended-to-add
// in order to allow for according flow removal lockVlanConfig may only be used with RLock here
- err := dh.UniVlanConfigFsmMap[apUniPort.UniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
- loMatchVlan, loSetVlan, loSetPcp, false, meter)
+ // Also the error is returned to caller via response channel
+ _ = dh.UniVlanConfigFsmMap[apUniPort.UniID].SetUniFlowParams(ctx, loTpID, loCookieSlice,
+ loMatchVlan, loSetVlan, loSetPcp, false, meter, respChan)
dh.lockVlanConfig.RUnlock()
dh.lockVlanAdd.Unlock() //re-admit new Add-flow-processing
- return err
+ return
}
dh.lockVlanConfig.RUnlock()
dh.lockVlanConfig.Lock() //createVlanFilterFsm should always be a non-blocking operation and requires r+w lock
err := dh.createVlanFilterFsm(ctx, apUniPort, loTpID, loCookieSlice,
- loMatchVlan, loSetVlan, loSetPcp, cmn.OmciVlanFilterAddDone, false, meter)
+ loMatchVlan, loSetVlan, loSetPcp, cmn.OmciVlanFilterAddDone, false, meter, respChan)
dh.lockVlanConfig.Unlock()
dh.lockVlanAdd.Unlock() //re-admit new Add-flow-processing
- return err
+ if err != nil {
+ *respChan <- err
+ }
}
//removeFlowItemFromUniPort parses the actual flow item to remove it from the UniPort
-func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort) error {
+func (dh *deviceHandler) removeFlowItemFromUniPort(ctx context.Context, apFlowItem *of.OfpFlowStats, apUniPort *cmn.OnuUniPort, respChan *chan error) {
//optimization and assumption: the flow cookie uniquely identifies the flow and with that the internal rule
//hence only the cookie is used here to find the relevant flow and possibly remove the rule
//no extra check is done on the rule parameters
@@ -3054,22 +3144,30 @@
defer dh.lockVlanConfig.RUnlock()
logger.Debugw(ctx, "flow-remove got RLock", log.Fields{"device-id": dh.DeviceID, "uniID": apUniPort.UniID})
if _, exist := dh.UniVlanConfigFsmMap[apUniPort.UniID]; exist {
- return dh.UniVlanConfigFsmMap[apUniPort.UniID].RemoveUniFlowParams(ctx, loCookie)
+ _ = dh.UniVlanConfigFsmMap[apUniPort.UniID].RemoveUniFlowParams(ctx, loCookie, respChan)
+ return
}
logger.Debugw(ctx, "flow-remove called, but no flow is configured (no VlanConfigFsm, flow already removed) ",
log.Fields{"device-id": dh.DeviceID})
//but as we regard the flow as not existing = removed we respond just ok
// and treat the reason accordingly (which in the normal removal procedure is initiated by the FSM)
+ // Push response on the response channel
+ if respChan != nil {
+ // Do it in a non blocking fashion, so that in case the flow handler routine has shutdown for any reason, we do not block here
+ select {
+ case *respChan <- nil:
+ logger.Debugw(ctx, "submitted-response-for-flow", log.Fields{"device-id": dh.DeviceID, "err": nil})
+ default:
+ }
+ }
go dh.DeviceProcStatusUpdate(ctx, cmn.OmciVlanFilterRemDone)
-
- return nil
}
// createVlanFilterFsm initializes and runs the VlanFilter FSM to transfer OMCI related VLAN config
// if this function is called from possibly concurrent processes it must be mutex-protected from the caller!
// precondition: dh.lockVlanConfig is locked by the caller!
func (dh *deviceHandler) createVlanFilterFsm(ctx context.Context, apUniPort *cmn.OnuUniPort, aTpID uint8, aCookieSlice []uint64,
- aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent cmn.OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig) error {
+ aMatchVlan uint16, aSetVlan uint16, aSetPcp uint8, aDevEvent cmn.OnuDeviceEvent, lastFlowToReconcile bool, aMeter *voltha.OfpMeterConfig, respChan *chan error) error {
chVlanFilterFsm := make(chan cmn.Message, 2048)
pDevEntry := dh.GetOnuDeviceEntry(ctx, true)
@@ -3080,7 +3178,7 @@
pVlanFilterFsm := avcfg.NewUniVlanConfigFsm(ctx, dh, pDevEntry, pDevEntry.PDevOmciCC, apUniPort, dh.pOnuTP,
pDevEntry.GetOnuDB(), aTpID, aDevEvent, "UniVlanConfigFsm", chVlanFilterFsm,
- dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter)
+ dh.pOpenOnuAc.AcceptIncrementalEvto, aCookieSlice, aMatchVlan, aSetVlan, aSetPcp, lastFlowToReconcile, aMeter, respChan)
if pVlanFilterFsm != nil {
//dh.lockVlanConfig is locked (by caller) throughout the state transition to 'starting'
// to prevent unintended (ignored) events to be sent there (from parallel processing)
@@ -3636,6 +3734,21 @@
}
}
+func (dh *deviceHandler) setFlowMonitoringIsRunning(uniID uint8, flag bool) {
+ dh.mutexFlowMonitoringRoutineFlag.Lock()
+ defer dh.mutexFlowMonitoringRoutineFlag.Unlock()
+ logger.Debugw(context.Background(), "set-flow-monitoring-routine", log.Fields{"flag": flag})
+ dh.isFlowMonitoringRoutineActive[uniID] = flag
+}
+
+func (dh *deviceHandler) GetFlowMonitoringIsRunning(uniID uint8) bool {
+ dh.mutexFlowMonitoringRoutineFlag.RLock()
+ defer dh.mutexFlowMonitoringRoutineFlag.RUnlock()
+ logger.Debugw(context.Background(), "get-flow-monitoring-routine",
+ log.Fields{"isFlowMonitoringRoutineActive": dh.isFlowMonitoringRoutineActive})
+ return dh.isFlowMonitoringRoutineActive[uniID]
+}
+
func (dh *deviceHandler) StartReconciling(ctx context.Context, skipOnuConfig bool) {
logger.Debugw(ctx, "start reconciling", log.Fields{"skipOnuConfig": skipOnuConfig, "device-id": dh.DeviceID})
@@ -3891,6 +4004,36 @@
return pgClient.GetTechProfileInstance(subCtx, request)
}
+// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
+// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
+func (dh *deviceHandler) PerOnuFlowHandlerRoutine(uniID uint8) {
+ logger.Infow(context.Background(), "starting-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
+ dh.setFlowMonitoringIsRunning(uniID, true)
+ for {
+ select {
+ // block on the channel to receive an incoming flow
+ // process the flow completely before proceeding to handle the next flow
+ case flowCb := <-dh.flowCbChan[uniID]:
+ startTime := time.Now()
+ logger.Debugw(flowCb.ctx, "serial-flow-processor--start", log.Fields{"device-id": dh.DeviceID})
+ respChan := make(chan error)
+ if flowCb.addFlow {
+ go dh.addFlowItemToUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, flowCb.flowMetaData, &respChan)
+ } else {
+ go dh.removeFlowItemFromUniPort(flowCb.ctx, flowCb.flowItem, flowCb.uniPort, &respChan)
+ }
+ // Block on response and tunnel it back to the caller
+ *flowCb.respChan <- <-respChan
+ logger.Debugw(flowCb.ctx, "serial-flow-processor--end",
+ log.Fields{"device-id": dh.DeviceID, "absoluteTimeForFlowProcessingInSecs": time.Since(startTime).Seconds()})
+ case <-dh.stopFlowMonitoringRoutine[uniID]:
+ logger.Infow(context.Background(), "stopping-flow-handler-routine", log.Fields{"device-id": dh.DeviceID})
+ dh.setFlowMonitoringIsRunning(uniID, false)
+ return
+ }
+ }
+}
+
func (dh *deviceHandler) SendOMCIRequest(ctx context.Context, parentEndpoint string, request *ic.OmciMessage) error {
pgClient, err := dh.pOpenOnuAc.getParentAdapterServiceClient(parentEndpoint)
if err != nil || pgClient == nil {
diff --git a/internal/pkg/core/openonu.go b/internal/pkg/core/openonu.go
index c14655e..f0d9ecc 100755
--- a/internal/pkg/core/openonu.go
+++ b/internal/pkg/core/openonu.go
@@ -79,6 +79,7 @@
alarmAuditInterval time.Duration
dlToOnuTimeout4M time.Duration
rpcTimeout time.Duration
+ maxConcurrentFlowsPerUni int
}
//NewOpenONUAC returns a new instance of OpenONU_AC
@@ -114,6 +115,7 @@
openOnuAc.alarmAuditInterval = cfg.AlarmAuditInterval
openOnuAc.dlToOnuTimeout4M = cfg.DownloadToOnuTimeout4MB
openOnuAc.rpcTimeout = cfg.RPCTimeout
+ openOnuAc.maxConcurrentFlowsPerUni = cfg.MaxConcurrentFlowsPerUni
openOnuAc.pSupportedFsms = &cmn.OmciDeviceFsms{
"mib-synchronizer": {
@@ -331,6 +333,12 @@
handler.pSelfTestHdlr.StopSelfTestModule <- true
logger.Debugw(ctx, "sent stop signal to self test handler module", log.Fields{"device-id": device.Id})
}
+ for _, uni := range handler.uniEntityMap {
+ if handler.GetFlowMonitoringIsRunning(uni.UniID) {
+ handler.stopFlowMonitoringRoutine[uni.UniID] <- true
+ logger.Debugw(ctx, "sent stop signal to self flow monitoring routine", log.Fields{"device-id": device.Id})
+ }
+ }
// Clear PM data on the KV store
if handler.pOnuMetricsMgr != nil {