VOL-4471: Stale data in resource manager
Change-Id: I026774317ba577b1d5d0748c3d177b4b7bf2ac94
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 8c855a0..b596412 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -219,7 +219,9 @@
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
- incomingFlows []chan flowControlBlock
+ incomingFlows []chan flowControlBlock
+ stopFlowHandlerRoutine []chan bool
+ flowHandlerRoutineActive []bool
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -243,12 +245,16 @@
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+ flowMgr.stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+ flowMgr.flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
for i := range flowMgr.incomingFlows {
flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+ flowMgr.stopFlowHandlerRoutine[i] = make(chan bool, 1)
// Spin up a go routine to handling incoming flows (add/remove).
// There will be on go routine per ONU.
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
- go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+ flowMgr.flowHandlerRoutineActive[i] = true
+ go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
}
flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
//Load the onugem info cache from kv store on flowmanager start
@@ -2151,6 +2157,11 @@
// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+ if f.deviceHandler.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": f.deviceHandler.device.Id})
+ return nil
+ }
// Step1 : Fill flowControlBlock
// Step2 : Push the flowControlBlock to ONU channel
// Step3 : Wait on response channel for response
@@ -2170,41 +2181,66 @@
if inPort != InvalidPort && outPort != InvalidPort {
_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
}
- // inPort or outPort is InvalidPort for trap-from-nni flows.
- // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
- // Send the flowCb on the ONU flow channel
- f.incomingFlows[onuID] <- flowCb
- // Wait on the channel for flow handlers return value
- err := <-errChan
- logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
- return err
+ if f.flowHandlerRoutineActive[onuID] {
+ // inPort or outPort is InvalidPort for trap-from-nni flows.
+ // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+ // Send the flowCb on the ONU flow channel
+ f.incomingFlows[onuID] <- flowCb
+ // Wait on the channel for flow handlers return value
+ err := <-errChan
+ logger.Infow(ctx, "process-flow-received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+ return err
+ }
+ logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
+ return fmt.Errorf("flow-handler-routine-not-active-for-onu-%v-pon-%d", onuID, f.ponPortIdx)
}
// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
-func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
+ var flowCb flowControlBlock
for {
+ select {
// block on the channel to receive an incoming flow
// process the flow completely before proceeding to handle the next flow
- flowCb := <-subscriberFlowChannel
- if flowCb.addFlow {
- logger.Info(flowCb.ctx, "adding-flow-start")
- startTime := time.Now()
- err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
- logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
- // Pass the return value over the return channel
- *flowCb.errChan <- err
- } else {
- logger.Info(flowCb.ctx, "removing-flow-start")
- startTime := time.Now()
- err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
- logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
- // Pass the return value over the return channel
- *flowCb.errChan <- err
+ case flowCb = <-subscriberFlowChannel:
+ if flowCb.addFlow {
+ logger.Info(flowCb.ctx, "adding-flow-start")
+ startTime := time.Now()
+ err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ } else {
+ logger.Info(flowCb.ctx, "removing-flow-start")
+ startTime := time.Now()
+ err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ }
+ case <-stopHandler:
+ f.flowHandlerRoutineActive[handlerRoutineIndex] = false
+ return
}
}
}
+// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+ for i, v := range f.stopFlowHandlerRoutine {
+ if f.flowHandlerRoutineActive[i] {
+ select {
+ case v <- true:
+ case <-time.After(time.Second * 5):
+ logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
+ }
+ }
+ }
+ wg.Done()
+ logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
+}
+
// AddFlow add flow to device
// nolint: gocyclo
func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {