[VOL-3601] Processing requests batches in parallel and then send the correct BarrierReply
Change-Id: I7c4c4bfade0d85df08a649b32d475efeb14fdcd0
diff --git a/VERSION b/VERSION
index c3f0d2b..88c5fb8 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-1.4.0-dev
+1.4.0
diff --git a/internal/pkg/openflow/connection.go b/internal/pkg/openflow/connection.go
index 08f848a..b9cb6e2 100644
--- a/internal/pkg/openflow/connection.go
+++ b/internal/pkg/openflow/connection.go
@@ -23,6 +23,7 @@
"errors"
"io"
"net"
+ "sync"
"time"
"github.com/opencord/goloxi"
@@ -232,6 +233,7 @@
* a message larger than this then we will have issues
*/
headerBuf := make([]byte, 8)
+ wg := sync.WaitGroup{}
top:
// Continue until we are told to stop
for {
@@ -308,17 +310,22 @@
"device-id": ofc.DeviceID,
"header": js})
}
- /*
- * Spawning a go routine for every incoming message removes processing ordering guarantees.
- * Removing such guarantees puts burden on the controller to ensure the correct ordering of
- * incoming messages and is a less optimal and safe agent implementation.
- * This is OK for now because ONOS keeps the order guaranteed but the agent needs to avoid
- * relying on external fairness. Particular care and attention has to be placed in flow add/delete
- * and relative barrier requests. e.g. a flowMod will be handled in thread different from a barrier,
- * with no guarantees of handling all messages before a barrier.
- * A multiple queue (incoming worker and outgoing) is a possible solution.
- */
- go ofc.parseHeader(ctx, msg)
+
+ // We can parallelize the processing of all the operations
+ // that we get before a BarrieRequest, then we need to wait.
+ // What we are doing is:
+ // - spawn threads until we get a Barrier
+ // - when we get a barrier wait for the threads to complete before continuing
+
+ msgType := msg.GetType()
+ if msgType == ofp.OFPTBarrierRequest {
+ logger.Debug(ctx, "received-barrier-request-waiting-for-pending-requests")
+ wg.Wait()
+ logger.Debug(ctx, "restarting-requests-processing")
+ }
+
+ wg.Add(1)
+ go ofc.parseHeader(ctx, msg, &wg)
}
}
logger.Debugw(ctx, "end-of-stream",
@@ -349,7 +356,8 @@
}
}
-func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader) {
+func (ofc *OFConnection) parseHeader(ctx context.Context, header ofp.IHeader, wg *sync.WaitGroup) {
+ defer wg.Done()
headerType := header.GetType()
logger.Debugw(ctx, "packet-header-type",
log.Fields{
diff --git a/internal/pkg/openflow/flowMod.go b/internal/pkg/openflow/flowMod.go
index 86088de..342375f 100644
--- a/internal/pkg/openflow/flowMod.go
+++ b/internal/pkg/openflow/flowMod.go
@@ -458,6 +458,7 @@
"device-id": ofc.DeviceID,
"flow-update": flowUpdateJs})
}
+
if _, err := volthaClient.UpdateLogicalDeviceFlowTable(log.WithSpanFromContext(context.Background(), ctx), &flowUpdate); err != nil {
logger.Errorw(ctx, "Error calling FlowDelete ",
log.Fields{