VOL-4471: Stale data in resource manager
- Make sure to stop flow and mcast group handlers before deleting
data on the DB. Otherwise it is possible that some stray handling
in these routines can create entries on the DB post cleanup.
Change-Id: If73c4ea5f972b7333bd2d6c7a8b88dcf0c31638d
diff --git a/VERSION b/VERSION
index a95f288..6f2dee9 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-4.1.4
+4.1.5-dev
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 646b8f9..86cce43 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1612,6 +1612,12 @@
var err error
var errorsList []error
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": device.Id})
+ return nil
+ }
+
if flows != nil {
for _, flow := range flows.ToRemove.Items {
ponIf := dh.getPonIfFromFlow(flow)
@@ -1660,6 +1666,12 @@
var err error
var errorsList []error
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+ return nil
+ }
+
// Whether we need to synchronize multicast group adds and modifies like flow add and delete needs to be investigated
if groups != nil {
for _, group := range groups.ToAdd.Items {
@@ -1868,6 +1880,18 @@
*/
dh.setDeviceDeletionInProgressFlag(true)
+ var wg sync.WaitGroup
+ wg.Add(1) // for the mcast routine below to finish
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
+ for _, flMgr := range dh.flowMgr {
+ wg.Add(1) // for the flow handler routine below to finish
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
+ }
dh.cleanupDeviceResources(ctx)
logger.Debugw(ctx, "removed-device-from-Resource-manager-KV-store", log.Fields{"device-id": dh.device.Id})
@@ -1882,10 +1906,6 @@
}
dh.lockDevice.RUnlock()
dh.removeOnuIndicationChannels(ctx)
- go dh.StopAllMcastHandlerRoutines(ctx)
- for _, flMgr := range dh.flowMgr {
- go flMgr.StopAllFlowHandlerRoutines(ctx)
- }
//Reset the state
if dh.Client != nil {
if _, err := dh.Client.Reboot(ctx, new(oop.Empty)); err != nil {
@@ -2198,9 +2218,17 @@
}
dh.lockDevice.RUnlock()
- go dh.StopAllMcastHandlerRoutines(ctx)
+ var wg sync.WaitGroup
+ wg.Add(1) // for the multicast handler routine
+ go dh.StopAllMcastHandlerRoutines(ctx, &wg)
for _, flMgr := range dh.flowMgr {
- go flMgr.StopAllFlowHandlerRoutines(ctx)
+ wg.Add(1) // for the flow handler routine
+ go flMgr.StopAllFlowHandlerRoutines(ctx, &wg)
+ }
+ if !dh.waitForTimeoutOrCompletion(&wg, time.Second*30) {
+ logger.Warnw(ctx, "timed out waiting for stopping flow and group handlers", log.Fields{"deviceID": device.Id})
+ } else {
+ logger.Infow(ctx, "all flow and group handlers shutdown gracefully", log.Fields{"deviceID": device.Id})
}
//reset adapter reconcile flag
@@ -2556,6 +2584,12 @@
// RouteMcastFlowOrGroupMsgToChannel routes incoming mcast flow or group to a channel to be handled by the a specific
// instance of mcastFlowOrGroupChannelHandlerRoutine meant to handle messages for that group.
func (dh *DeviceHandler) RouteMcastFlowOrGroupMsgToChannel(ctx context.Context, flow *of.OfpFlowStats, group *of.OfpGroupEntry, action string) error {
+ if dh.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": dh.device.Id})
+ return nil
+ }
+
// Step1 : Fill McastFlowOrGroupControlBlock
// Step2 : Push the McastFlowOrGroupControlBlock to appropriate channel
// Step3 : Wait on response channel for response
@@ -2649,12 +2683,17 @@
}
// StopAllMcastHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
-func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context) {
+func (dh *DeviceHandler) StopAllMcastHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
for i, v := range dh.stopMcastHandlerRoutine {
if dh.mcastHandlerRoutineActive[i] {
- v <- true
+ select {
+ case v <- true:
+ case <-time.After(time.Second * 5):
+ logger.Warnw(ctx, "timeout stopping mcast handler routine", log.Fields{"idx": i, "deviceID": dh.device.Id})
+ }
}
}
+ wg.Done()
logger.Debug(ctx, "stopped all mcast handler routines")
}
@@ -3126,3 +3165,19 @@
defer dh.lockDevice.RUnlock()
return dh.isDeviceDeletionInProgress
}
+
+// waitForTimeoutOrCompletion waits for the waitgroup for the specified max timeout.
+// Returns false if waiting timed out.
+func (dh *DeviceHandler) waitForTimeoutOrCompletion(wg *sync.WaitGroup, timeout time.Duration) bool {
+ c := make(chan struct{})
+ go func() {
+ defer close(c)
+ wg.Wait()
+ }()
+ select {
+ case <-c:
+ return true // completed normally
+ case <-time.After(timeout):
+ return false // timed out
+ }
+}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 78a7480..be74049 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -2202,6 +2202,11 @@
// RouteFlowToOnuChannel routes incoming flow to ONU specific channel
func (f *OpenOltFlowMgr) RouteFlowToOnuChannel(ctx context.Context, flow *ofp.OfpFlowStats, addFlow bool, flowMetadata *ofp.FlowMetadata) error {
+ if f.deviceHandler.getDeviceDeletionInProgressFlag() {
+ // The device itself is going to be reset as part of deletion. So nothing to be done.
+ logger.Infow(ctx, "device-deletion-in-progress--not-handling-flows-or-groups", log.Fields{"device-id": f.deviceHandler.device.Id})
+ return nil
+ }
// Step1 : Fill flowControlBlock
// Step2 : Push the flowControlBlock to ONU channel
// Step3 : Wait on response channel for response
@@ -2238,11 +2243,12 @@
// This routine is unique per ONU ID and blocks on flowControlBlock channel for incoming flows
// Each incoming flow is processed in a synchronous manner, i.e., the flow is processed to completion before picking another
func (f *OpenOltFlowMgr) perOnuFlowHandlerRoutine(handlerRoutineIndex int, subscriberFlowChannel chan flowControlBlock, stopHandler chan bool) {
+ var flowCb flowControlBlock
for {
select {
// block on the channel to receive an incoming flow
// process the flow completely before proceeding to handle the next flow
- case flowCb := <-subscriberFlowChannel:
+ case flowCb = <-subscriberFlowChannel:
if flowCb.addFlow {
logger.Info(flowCb.ctx, "adding-flow-start")
startTime := time.Now()
@@ -2266,12 +2272,17 @@
}
// StopAllFlowHandlerRoutines stops all flow handler routines. Call this when device is being rebooted or deleted
-func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context) {
+func (f *OpenOltFlowMgr) StopAllFlowHandlerRoutines(ctx context.Context, wg *sync.WaitGroup) {
for i, v := range f.stopFlowHandlerRoutine {
if f.flowHandlerRoutineActive[i] {
- v <- true
+ select {
+ case v <- true:
+ case <-time.After(time.Second * 5):
+ logger.Warnw(ctx, "timeout stopping flow handler routine", log.Fields{"onuID": i, "deviceID": f.deviceHandler.device.Id})
+ }
}
}
+ wg.Done()
logger.Debugw(ctx, "stopped all flow handler routines", log.Fields{"ponPortIdx": f.ponPortIdx})
}