[VOL-3601] Processing requests batches in parallel and then send the correct BarrierReply

Change-Id: I7c4c4bfade0d85df08a649b32d475efeb14fdcd0
diff --git a/VERSION b/VERSION
index c3f0d2b..88c5fb8 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.4.0-dev
+1.4.0
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index 08f848a..b9cb6e2 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -23,6 +23,7 @@
 	"errors"
 	"io"
 	"net"
+	"sync"
 	"time"
 
 	"github.com/opencord/goloxi"
@@ -232,6 +233,7 @@
 	 * a message larger than this then we will have issues
 	 */
 	headerBuf := make([]byte, 8)
+	wg := sync.WaitGroup{}
 top:
 	// Continue until we are told to stop
 	for {
@@ -308,17 +310,22 @@
 						"device-id": ofc.DeviceID,
 						"header":    js})
 			}
-			/*
-			 * Spawning a go routine for every incoming message removes processing ordering guarantees.
-			 * Removing such guarantees puts burden on the controller to ensure the correct ordering of
-			 * incoming messages and is a less optimal and safe agent implementation.
-			 * This is OK for now because ONOS keeps the order guaranteed but the agent needs to avoid
-			 * relying on external fairness. Particular care and attention has to be placed in flow add/delete
-			 * and relative barrier requests. e.g. a flowMod will be handled in thread different from a barrier,
-			 * with no guarantees of handling all messages before a barrier.
-			 * A multiple queue (incoming worker and outgoing) is a possible solution.
-			 */
-			go ofc.parseHeader(ctx, msg)
+
+			// We can parallelize the processing of all the operations
+			// that we get before a BarrieRequest, then we need to wait.
+			// What we are doing is:
+			// - spawn threads until we get a Barrier
+			// - when we get a barrier wait for the threads to complete before continuing
+
+			msgType := msg.GetType()
+			if msgType == ofp.OFPTBarrierRequest {
+				logger.Debug(ctx, "received-barrier-request-waiting-for-pending-requests")
+				wg.Wait()
+				logger.Debug(ctx, "restarting-requests-processing")
+			}
+
+			wg.Add(1)
+			go ofc.parseHeader(ctx, msg, &wg)
 		}
 	}
 	logger.Debugw(ctx, "end-of-stream",
@@ -349,7 +356,8 @@
 	}
 }
 
-func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader) {
+func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader, wg *sync.WaitGroup) {
+	defer wg.Done()
 	headerType := header.GetType()
 	logger.Debugw(ctx, "packet-header-type",
 		log.Fields{
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index 86088de..342375f 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -458,6 +458,7 @@
 				"device-id":   ofc.DeviceID,
 				"flow-update": flowUpdateJs})
 	}
+
 	if _, err := volthaClient.UpdateLogicalDeviceFlowTable(log.WithSpanFromContext(context.Background(), ctx), &flowUpdate); err != nil {
 		logger.Errorw(ctx, "Error calling FlowDelete ",
 			log.Fields{