Make sure all events for logical device are sent before its deletion to avoid race conditions
Change-Id: I5fcbc8e2c176cf866f8e7f68d1da777d6f0b573c
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7323c90..ab1b82b 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -397,6 +397,19 @@
}()
return nil
}
+func (e *orderedEvents) waitForAllEventsToBeSent(ctx context.Context, cancel context.CancelFunc) error {
+ defer cancel()
+ ch := make(chan struct{})
+ e.sendCompletion(ch)
+ select {
+ case <-ctx.Done():
+ logger.Error(ctx, "timeout-while-waiting-for-event-queue-to-be-cleared")
+ return ctx.Err()
+ case <-ch:
+ logger.Debug(ctx, "event-queue-is-empty")
+ return nil
+ }
+}
// send is a convenience to avoid calling both assignQueuePosition and qp.send
func (e *orderedEvents) send(ctx context.Context, agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
@@ -404,6 +417,12 @@
go qp.send(log.WithSpanFromContext(context.Background(), ctx), agent, deviceID, reason, desc)
}
+// sendCompletion will make sure that given channel is notified when queue is empty
+func (e *orderedEvents) sendCompletion(ch chan struct{}) {
+ qp := e.assignQueuePosition()
+ go qp.sendCompletion(ch)
+}
+
// TODO: shouldn't need to guarantee event ordering like this
// event ordering should really be protected by per-LogicalPort lock
// once routing uses on-demand calculation only, this should be changed
@@ -439,6 +458,16 @@
}
agent.ldeviceMgr.SendChangeEvent(ctx, deviceID, reason, desc)
close(qp.next) // notify next
+
+}
+
+// sendCompletion waits for its turn, then notifies the given channel that queue is empty
+func (qp queuePosition) sendCompletion(ch chan struct{}) {
+ if qp.prev != nil {
+ <-qp.prev // wait for turn
+ }
+ close(ch)
+ close(qp.next)
}
// GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and