VOL-4471: Stale data in resource manager
Change-Id: I026774317ba577b1d5d0748c3d177b4b7bf2ac94
diff --git a/VERSION b/VERSION
index 6284111..efe3085 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.5.9
+3.5.10
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 3f23fb4..f7d1405 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -102,7 +102,9 @@
// Slice of channels. Each channel in slice, index by (mcast-group-id modulo MaxNumOfGroupHandlerChannels)
// A go routine per index, waits on a unique channel for incoming mcast flow or group (add/modify/remove).
- incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+ incomingMcastFlowOrGroup []chan McastFlowOrGroupControlBlock
+ stopMcastHandlerRoutine []chan bool
+ mcastHandlerRoutineActive []bool
adapterPreviouslyConnected bool
agentPreviouslyConnected bool
@@ -188,13 +190,17 @@
dh.perPonOnuIndicationChannel = make(map[uint32]onuIndicationChannels)
// Create a slice of buffered channels for handling concurrent mcast flow/group.
dh.incomingMcastFlowOrGroup = make([]chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+ dh.stopMcastHandlerRoutine = make([]chan bool, MaxNumOfGroupHandlerChannels)
+ dh.mcastHandlerRoutineActive = make([]bool, MaxNumOfGroupHandlerChannels)
for i := range dh.incomingMcastFlowOrGroup {
dh.incomingMcastFlowOrGroup[i] = make(chan McastFlowOrGroupControlBlock, MaxNumOfGroupHandlerChannels)
+ dh.stopMcastHandlerRoutine[i] = make(chan bool, 1)
// Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
// There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
// These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
// for incoming mcast flow/group to be processed serially.
- go dh.mcastFlowOrGroupChannelHandlerRoutine(dh.incomingMcastFlowOrGroup[i])
+ dh.mcastHandlerRoutineActive[i] = true
+ go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
}
//TODO initialize the support classes.
return &dh
@@ -639,6 +645,20 @@
//starting the stat collector
go startCollector(ctx, dh)
+ // instantiate the mcast handler routines.
+ for i := range dh.incomingMcastFlowOrGroup {
+ // We land inside the below "if" code path, after the OLT comes back from a reboot, otherwise the routines
+ // are already active when the DeviceHandler module is first instantiated (as part of Adopt_device RPC invocation).
+ if !dh.mcastHandlerRoutineActive[i] {
+ // Spin up a go routine to handling incoming mcast flow/group (add/modify/remove).
+ // There will be MaxNumOfGroupHandlerChannels number of mcastFlowOrGroupChannelHandlerRoutine go routines.
+ // These routines will be blocked on the dh.incomingMcastFlowOrGroup[mcast-group-id modulo MaxNumOfGroupHandlerChannels] channel
+ // for incoming mcast flow/group to be processed serially.
+ dh.mcastHandlerRoutineActive[i] = true
+ go dh.mcastFlowOrGroupChannelHandlerRoutine(i, dh.incomingMcastFlowOrGroup[i], dh.stopMcastHandlerRoutine[i])
+ }
+ }
+
// Synchronous call to update device state - this method is run in its own go routine
if err := dh.coreProxy.DeviceStateUpdate(ctx, dh.device.Id, voltha.ConnectStatus_REACHABLE,
voltha.OperStatus_ACTIVE); err != nil {
@@ -1525,9 +1545,7 @@
}
}
-//UpdateFlowsIncrementally updates the device flow
-func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
- logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+func (dh *DeviceHandler) handleFlows(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, flowMetadata *voltha.FlowMetadata) []error {
var err error
var errorsList []error
@@ -1564,7 +1582,13 @@
if flow_utils.HasGroup(flow) {
err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
} else {
- err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+ // The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
+ logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
+ err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
+ } else {
+ err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ }
}
if err != nil {
errorsList = append(errorsList, err)
@@ -1572,6 +1596,19 @@
}
}
+ return errorsList
+}
+
+func (dh *DeviceHandler) handleGroups(ctx context.Context, groups *of.FlowGroupChanges) []error {
+ var err error
+ var errorsList []error
+
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+ return nil
+ }
+
// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
@@ -1596,6 +1633,24 @@
}
}
}
+
+ return errorsList
+}
+
+//UpdateFlowsIncrementally updates the device flow
+func (dh *DeviceHandler) UpdateFlowsIncrementally(ctx context.Context, device *voltha.Device, flows *of.FlowChanges, groups *of.FlowGroupChanges, flowMetadata *voltha.FlowMetadata) error {
+
+ var errorsList []error
+
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+ return nil
+ }
+
+ logger.Debugw(ctx, "received-incremental-flowupdate-in-device-handler", log.Fields{"device-id": device.Id, "flows": flows, "groups": groups, "flowMetadata": flowMetadata})
+ errorsList = append(errorsList, dh.handleFlows(ctx, device, flows, flowMetadata)...)
+ errorsList = append(errorsList, dh.handleGroups(ctx, groups)...)
if len(errorsList) > 0 {
return fmt.Errorf("errors-installing-flows-groups, errors:%v", errorsList)
}
@@ -1744,6 +1799,18 @@
*/
dh.setDeviceDeletionInProgressFlag(true)
+ var wg sync.WaitGroup
+ wg.Add(1) // for the mcast routine below to finish
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+ for _, flMgr := range dh.flowMgr {
+ wg.Add(1) // for the flow handler routine below to finish
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+ }
dh.cleanupDeviceResources(ctx)
logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -2054,6 +2121,18 @@
}
dh.lockDevice.RUnlock()
+ var wg sync.WaitGroup
+ wg.Add(1) // for the multicast handler routine
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+ for _, flMgr := range dh.flowMgr {
+ wg.Add(1) // for the flow handler routine
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+ }
//reset adapter reconcile flag
dh.adapterPreviouslyConnected = false
@@ -2400,6 +2479,11 @@
// RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
// instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *voltha.OfpFlowStats, group *voltha.OfpGroupEntry, action string) error {
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+ return nil
+ }
// Step1 : Fill McastFlowOrGroupControlBlock
// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
// Step3 : Wait on response channel for response
@@ -2422,64 +2506,89 @@
} else {
return errors.New("flow-and-group-both-nil")
}
- // Derive the appropriate go routine to handle the request by a simple module operation.
- // There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
- dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
- // Wait for handler to return error value
- err := <-errChan
- logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
- return err
+ mcastRoutineIdx := groupID % MaxNumOfGroupHandlerChannels
+ if dh.mcastHandlerRoutineActive[mcastRoutineIdx] {
+ // Derive the appropriate go routine to handle the request by a simple module operation.
+ // There are only MaxNumOfGroupHandlerChannels number of channels to handle the mcast flow or group
+ dh.incomingMcastFlowOrGroup[groupID%MaxNumOfGroupHandlerChannels] <- mcastFlowOrGroupCb
+ // Wait for handler to return error value
+ err := <-errChan
+ logger.Debugw(ctx, "process-flow-or-group--received-resp", log.Fields{"err": err, "totalTimeInSeconds": time.Since(startTime).Milliseconds()})
+ return err
+ }
+ logger.Errorw(ctx, "mcast handler routine not active for onu", log.Fields{"mcastRoutineIdx": mcastRoutineIdx})
+ return fmt.Errorf("mcast-handler-routine-not-active-for-index-%v", mcastRoutineIdx)
}
// mcastFlowOrGroupChannelHandlerRoutine routine to handle incoming mcast flow/group message
-func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock) {
+func (dh *DeviceHandler) mcastFlowOrGroupChannelHandlerRoutine(routineIndex int, mcastFlowOrGroupChannel chan McastFlowOrGroupControlBlock, stopHandler chan bool) {
for {
+ select {
// block on the channel to receive an incoming mcast flow/group
// process the flow completely before proceeding to handle the next flow
- mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel
- if mcastFlowOrGroupCb.flow != nil {
- if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
- logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
- log.Fields{"device-id": dh.device.Id,
- "flowToAdd": mcastFlowOrGroupCb.flow})
- // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
- err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- } else { // flow remove
- logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
- log.Fields{"device-id": dh.device.Id,
- "flowToRemove": mcastFlowOrGroupCb.flow})
- // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
- err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
+ case mcastFlowOrGroupCb := <-mcastFlowOrGroupChannel:
+ if mcastFlowOrGroupCb.flow != nil {
+ if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-flow",
+ log.Fields{"device-id": dh.device.Id,
+ "flowToAdd": mcastFlowOrGroupCb.flow})
+ // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+ err := dh.flowMgr[0].AddFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow, nil)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else { // flow remove
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-flow",
+ log.Fields{"device-id": dh.device.Id,
+ "flowToRemove": mcastFlowOrGroupCb.flow})
+ // The mcast flow is not unique to any particular PON port, so it is OK to default to PON0
+ err := dh.flowMgr[0].RemoveFlow(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.flow)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ }
+ } else { // mcast group
+ if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToAdd": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToModify": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ } else { // group remove
+ logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
+ log.Fields{"device-id": dh.device.Id,
+ "groupToRemove": mcastFlowOrGroupCb.group})
+ err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
+ // Pass the return value over the return channel
+ *mcastFlowOrGroupCb.errChan <- err
+ }
}
- } else { // mcast group
- if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupAdd {
- logger.Debugw(mcastFlowOrGroupCb.ctx, "adding-mcast-group",
- log.Fields{"device-id": dh.device.Id,
- "groupToAdd": mcastFlowOrGroupCb.group})
- err := dh.groupMgr.AddGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- } else if mcastFlowOrGroupCb.flowOrGroupAction == McastFlowOrGroupModify { // group modify
- logger.Debugw(mcastFlowOrGroupCb.ctx, "modifying-mcast-group",
- log.Fields{"device-id": dh.device.Id,
- "groupToModify": mcastFlowOrGroupCb.group})
- err := dh.groupMgr.ModifyGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
- } else { // group remove
- logger.Debugw(mcastFlowOrGroupCb.ctx, "removing-mcast-group",
- log.Fields{"device-id": dh.device.Id,
- "groupToRemove": mcastFlowOrGroupCb.group})
- err := dh.groupMgr.DeleteGroup(mcastFlowOrGroupCb.ctx, mcastFlowOrGroupCb.group)
- // Pass the return value over the return channel
- *mcastFlowOrGroupCb.errChan <- err
+ case <-stopHandler:
+ dh.mcastHandlerRoutineActive[routineIndex] = false
+ return
+ }
+ }
+}
+
+// StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+ for i, v := range dh.stopMcastHandlerRoutine {
+ if dh.mcastHandlerRoutineActive[i] {
+ select {
+ case v <- true:
+ case <-time.After(time.Second * 5):
+ logger.Warnw(ctx, "timeout stopping mcast handler routine", log.Fields{"idx": i, "deviceID": dh.device.Id})
}
}
}
+ wg.Done()
+ logger.Debug(ctx, "stopped all mcast handler routines")
}
func (dh *DeviceHandler) getOltPortCounters(ctx context.Context, oltPortInfo *extension.GetOltPortCounters) *extension.SingleGetValueResponse {
@@ -2659,3 +2768,19 @@
defer dh.lockDevice.RUnlock()
return dh.isDeviceDeletionInProgress
}
+
+// waitForTimeoutOrCompletion waits for the waitgroup for the specified max timeout.
+// Returns false if waiting timed out.
+func (dh *DeviceHandler) waitForTimeoutOrCompletion(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return true // completed normally
+ case <-time.After(timeout):
+ return false // timed out
+ }
+}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 8c855a0..b596412 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -219,7 +219,9 @@
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
- incomingFlows []chan flowControlBlock
+ incomingFlows []chan flowControlBlock
+ stopFlowHandlerRoutine []chan bool
+ flowHandlerRoutineActive []bool
}
//NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -243,12 +245,16 @@
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
flowMgr.incomingFlows = make([]chan flowControlBlock, MaxOnusPerPon+1)
+ flowMgr.stopFlowHandlerRoutine = make([]chan bool, MaxOnusPerPon+1)
+ flowMgr.flowHandlerRoutineActive = make([]bool, MaxOnusPerPon+1)
for i := range flowMgr.incomingFlows {
flowMgr.incomingFlows[i] = make(chan flowControlBlock, maxConcurrentFlowsPerOnu)
+ flowMgr.stopFlowHandlerRoutine[i] = make(chan bool, 1)
// Spin up a go routine to handling incoming flows (add/remove).
// There will be on go routine per ONU.
// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
- go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
+ flowMgr.flowHandlerRoutineActive[i] = true
+ go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
}
flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
//Load the onugem info cache from kv store on flowmanager start
@@ -2151,6 +2157,11 @@
// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *voltha.OfpFlowStats, addFlow bool, flowMetadata *voltha.FlowMetadata) error {
+ if f.deviceHandler.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": f.deviceHandler.device.Id})
+ return nil
+ }
// Step1 : Fill flowControlBlock
// Step2 : Push the flowControlBlock to ONU channel
// Step3 : Wait on response channel for response
@@ -2170,41 +2181,66 @@
if inPort != InvalidPort && outPort != InvalidPort {
_, _, onuID, _ = ExtractAccessFromFlow(inPort, outPort)
}
- // inPort or outPort is InvalidPort for trap-from-nni flows.
- // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
- // Send the flowCb on the ONU flow channel
- f.incomingFlows[onuID] <- flowCb
- // Wait on the channel for flow handlers return value
- err := <-errChan
- logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
- return err
+ if f.flowHandlerRoutineActive[onuID] {
+ // inPort or outPort is InvalidPort for trap-from-nni flows.
+ // In the that case onuID is 0 which is the reserved index for trap-from-nni flows in the f.incomingFlows slice
+ // Send the flowCb on the ONU flow channel
+ f.incomingFlows[onuID] <- flowCb
+ // Wait on the channel for flow handlers return value
+ err := <-errChan
+ logger.Infow(ctx, "process-flow-received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
+ return err
+ }
+ logger.Errorw(ctx, "flow handler routine not active for onu", log.Fields{"onuID": onuID, "ponPortIdx": f.ponPortIdx})
+ return fmt.Errorf("flow-handler-routine-not-active-for-onu-%v-pon-%d", onuID, f.ponPortIdx)
}
// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
-func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(subscriberFlowChannel chan flowControlBlock) {
+func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
+ var flowCb flowControlBlock
for {
+ select {
// block on the channel to receive an incoming flow
// process the flow completely before proceeding to handle the next flow
- flowCb := <-subscriberFlowChannel
- if flowCb.addFlow {
- logger.Info(flowCb.ctx, "adding-flow-start")
- startTime := time.Now()
- err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
- logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
- // Pass the return value over the return channel
- *flowCb.errChan <- err
- } else {
- logger.Info(flowCb.ctx, "removing-flow-start")
- startTime := time.Now()
- err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
- logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
- // Pass the return value over the return channel
- *flowCb.errChan <- err
+ case flowCb = <-subscriberFlowChannel:
+ if flowCb.addFlow {
+ logger.Info(flowCb.ctx, "adding-flow-start")
+ startTime := time.Now()
+ err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+ logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ } else {
+ logger.Info(flowCb.ctx, "removing-flow-start")
+ startTime := time.Now()
+ err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+ logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
+ // Pass the return value over the return channel
+ *flowCb.errChan <- err
+ }
+ case <-stopHandler:
+ f.flowHandlerRoutineActive[handlerRoutineIndex] = false
+ return
}
}
}
+// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
+ for i, v := range f.stopFlowHandlerRoutine {
+ if f.flowHandlerRoutineActive[i] {
+ select {
+ case v <- true:
+ case <-time.After(time.Second * 5):
+ logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
+ }
+ }
+ }
+ wg.Done()
+ logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
+}
+
// AddFlow add flow to device
// nolint: gocyclo
func (f *OpenOltFlowMgr) AddFlow(ctx context.Context, flow *ofp.OfpFlowStats, flowMetadata *voltha.FlowMetadata) error {