Make sure all events for logical device are sent before its deletion to avoid race conditions

Change-Id: I5fcbc8e2c176cf866f8e7f68d1da777d6f0b573c
diff --git a/rw_core/core/device/logical_agent.go b/rw_core/core/device/logical_agent.go
index d168bf3..6b0b6f7 100644
--- a/rw_core/core/device/logical_agent.go
+++ b/rw_core/core/device/logical_agent.go
@@ -191,7 +191,13 @@
 			return
 		}
 		defer agent.requestQueue.RequestComplete()
-
+		subCtx, cancel := context.WithTimeout(log.WithSpanFromContext(context.Background(), ctx), agent.defaultTimeout)
+		// Before deletion of the logical agent, make sure all events for ldagent are sent to avoid race conditions
+		if err := agent.orderedEvents.waitForAllEventsToBeSent(subCtx, cancel); err != nil {
+			//Log the error here
+			logger.Errorw(ctx, "failed-to-send-all-events-on-the-logical-device-before-deletion",
+				log.Fields{"error": err, "logical-device-id": agent.logicalDeviceID})
+		}
 		//Remove the logical device from the model
 		if err := agent.ldProxy.Remove(ctx, agent.logicalDeviceID); err != nil {
 			returnErr = err
diff --git a/rw_core/core/device/logical_agent_port.go b/rw_core/core/device/logical_agent_port.go
index 7323c90..ab1b82b 100644
--- a/rw_core/core/device/logical_agent_port.go
+++ b/rw_core/core/device/logical_agent_port.go
@@ -397,6 +397,19 @@
 	}()
 	return nil
 }
+func (e *orderedEvents) waitForAllEventsToBeSent(ctx context.Context, cancel context.CancelFunc) error {
+	defer cancel()
+	ch := make(chan struct{})
+	e.sendCompletion(ch)
+	select {
+	case <-ctx.Done():
+		logger.Error(ctx, "timeout-while-waiting-for-event-queue-to-be-cleared")
+		return ctx.Err()
+	case <-ch:
+		logger.Debug(ctx, "event-queue-is-empty")
+		return nil
+	}
+}
 
 // send is a convenience to avoid calling both assignQueuePosition and qp.send
 func (e *orderedEvents) send(ctx context.Context, agent *LogicalAgent, deviceID string, reason ofp.OfpPortReason, desc *ofp.OfpPort) {
@@ -404,6 +417,12 @@
 	go qp.send(log.WithSpanFromContext(context.Background(), ctx), agent, deviceID, reason, desc)
 }
 
+// sendCompletion will make sure that given channel is notified when queue is empty
+func (e *orderedEvents) sendCompletion(ch chan struct{}) {
+	qp := e.assignQueuePosition()
+	go qp.sendCompletion(ch)
+}
+
 // TODO: shouldn't need to guarantee event ordering like this
 //       event ordering should really be protected by per-LogicalPort lock
 //       once routing uses on-demand calculation only, this should be changed
@@ -439,6 +458,16 @@
 	}
 	agent.ldeviceMgr.SendChangeEvent(ctx, deviceID, reason, desc)
 	close(qp.next) // notify next
+
+}
+
+// sendCompletion waits for its turn, then notifies the given channel that queue is empty
+func (qp queuePosition) sendCompletion(ch chan struct{}) {
+	if qp.prev != nil {
+		<-qp.prev // wait for turn
+	}
+	close(ch)
+	close(qp.next)
 }
 
 // GetWildcardInputPorts filters out the logical port number from the set of logical ports on the device and
diff --git a/rw_core/core/device/logical_agent_test.go b/rw_core/core/device/logical_agent_test.go
index 1db9b25..dcd6691 100644
--- a/rw_core/core/device/logical_agent_test.go
+++ b/rw_core/core/device/logical_agent_test.go
@@ -289,6 +289,45 @@
 	globalWG.Done()
 }
 
+func (lda *LDATest) stopLogicalAgentAndCheckEventQueueIsEmpty(ctx context.Context, t *testing.T, ldAgent *LogicalAgent) {
+	queueIsEmpty := false
+	err := ldAgent.stop(ctx)
+	assert.Nil(t, err)
+	qp := ldAgent.orderedEvents.assignQueuePosition()
+	if qp.prev != nil { // we will be definitely hitting this case as we pushed events on the queue before
+		// If previous channel is closed which it should be now,
+		// only then we can know that queue is empty.
+		_, ok := <-qp.prev
+		if !ok {
+			queueIsEmpty = true
+		} else {
+			queueIsEmpty = false
+		}
+	} else {
+		queueIsEmpty = true
+	}
+	close(qp.next)
+	assert.True(t, queueIsEmpty)
+}
+
+func (lda *LDATest) updateLogicalDevice(t *testing.T, ldAgent *LogicalAgent) {
+	originalLogicalPorts := ldAgent.listLogicalDevicePorts(context.Background())
+	assert.NotNil(t, originalLogicalPorts)
+
+	// Change the state of the first port to FAILED
+	err := ldAgent.updatePortState(context.Background(), 1, voltha.OperStatus_FAILED)
+	assert.Nil(t, err)
+
+	// Change the state of the second port to TESTING
+	err = ldAgent.updatePortState(context.Background(), 2, voltha.OperStatus_TESTING)
+	assert.Nil(t, err)
+
+	// Change the state of the third port to ACTIVE
+	err = ldAgent.updatePortState(context.Background(), 3, voltha.OperStatus_ACTIVE)
+	assert.Nil(t, err)
+
+}
+
 func TestConcurrentLogicalDeviceUpdate(t *testing.T) {
 	ctx := context.Background()
 	lda := newLDATest(ctx)
@@ -308,3 +347,17 @@
 
 	wg.Wait()
 }
+
+func TestLogicalAgentStopWithEventsInQueue(t *testing.T) {
+	ctx := context.Background()
+	lda := newLDATest(ctx)
+	assert.NotNil(t, lda)
+	defer lda.stopAll(ctx)
+
+	// Start the Core
+	lda.startCore(ctx, false)
+
+	a := lda.createLogicalDeviceAgent(t)
+	lda.updateLogicalDevice(t, a)
+	lda.stopLogicalAgentAndCheckEventQueueIsEmpty(ctx, t, a)
+}