[VOL-3982] Improving locks on GemPort adn AllocId maps in Olt struct to avoid concurrency issues

Change-Id: Iaea4915856af6057e273e05a5f34087cfdf73cad
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index ad0a8d0..6ad068d 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -1610,11 +1610,22 @@
 	o.AllocIDsLock.Lock()
 	defer o.AllocIDsLock.Unlock()
 
+	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)]; !ok {
+		oltLogger.WithFields(log.Fields{
+			"IntfId":    flow.AccessIntfId,
+			"OnuId":     flow.OnuId,
+			"PortNo":    flow.PortNo,
+			"GemportId": flow.GemportId,
+			"FlowId":    flow.FlowId,
+		}).Error("trying-to-store-alloc-id-for-unknown-onu")
+	}
+
 	oltLogger.WithFields(log.Fields{
-		"IntfId":  flow.AccessIntfId,
-		"OnuId":   flow.OnuId,
-		"PortNo":  flow.PortNo,
-		"AllocId": flow.AllocId,
+		"IntfId":    flow.AccessIntfId,
+		"OnuId":     flow.OnuId,
+		"PortNo":    flow.PortNo,
+		"GemportId": flow.GemportId,
+		"FlowId":    flow.FlowId,
 	}).Trace("storing-alloc-id-via-flow")
 
 	if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
@@ -1660,6 +1671,27 @@
 }
 
 func (o *OltDevice) storeGemPortId(ponId uint32, onuId uint32, portNo uint32, gemId int32, flowId uint64) {
+	o.GemPortIDsLock.Lock()
+	defer o.GemPortIDsLock.Unlock()
+
+	if _, ok := o.GemPortIDs[ponId][onuId]; !ok {
+		oltLogger.WithFields(log.Fields{
+			"IntfId":    ponId,
+			"OnuId":     onuId,
+			"PortNo":    portNo,
+			"GemportId": gemId,
+			"FlowId":    flowId,
+		}).Error("trying-to-store-gemport-for-unknown-onu")
+	}
+
+	oltLogger.WithFields(log.Fields{
+		"IntfId":    ponId,
+		"OnuId":     onuId,
+		"PortNo":    portNo,
+		"GemportId": gemId,
+		"FlowId":    flowId,
+	}).Trace("storing-alloc-id-via-flow")
+
 	if _, ok := o.GemPortIDs[ponId][onuId][portNo]; !ok {
 		o.GemPortIDs[ponId][onuId][portNo] = make(map[int32]map[uint64]bool)
 	}
@@ -1670,14 +1702,14 @@
 }
 
 func (o *OltDevice) storeGemPortIdByFlow(flow *openolt.Flow) {
-	o.GemPortIDsLock.Lock()
-	defer o.GemPortIDsLock.Unlock()
-
 	oltLogger.WithFields(log.Fields{
-		"IntfId":    flow.AccessIntfId,
-		"OnuId":     flow.OnuId,
-		"PortNo":    flow.PortNo,
-		"GemportId": flow.GemportId,
+		"IntfId":        flow.AccessIntfId,
+		"OnuId":         flow.OnuId,
+		"PortNo":        flow.PortNo,
+		"GemportId":     flow.GemportId,
+		"FlowId":        flow.FlowId,
+		"ReplicateFlow": flow.ReplicateFlow,
+		"PbitToGemport": flow.PbitToGemport,
 	}).Trace("storing-gem-port-id-via-flow")
 
 	if flow.ReplicateFlow {
@@ -1687,7 +1719,6 @@
 	} else {
 		o.storeGemPortId(uint32(flow.AccessIntfId), uint32(flow.OnuId), flow.PortNo, flow.GemportId, flow.FlowId)
 	}
-
 }
 
 func (o *OltDevice) freeGemPortId(flow *openolt.Flow) {
@@ -1736,12 +1767,10 @@
 // - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON
 // - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON
 func (o *OltDevice) validateFlow(flow *openolt.Flow) error {
-
 	// validate gemPort
 	o.GemPortIDsLock.RLock()
-	allocatedGems := o.GemPortIDs[uint32(flow.AccessIntfId)]
-	o.GemPortIDsLock.RUnlock()
-	for onuId, onu := range allocatedGems {
+	defer o.GemPortIDsLock.RUnlock()
+	for onuId, onu := range o.GemPortIDs[uint32(flow.AccessIntfId)] {
 		if onuId == uint32(flow.OnuId) {
 			continue
 		}
@@ -1763,9 +1792,8 @@
 	}
 
 	o.AllocIDsLock.RLock()
-	allocatedAllocIds := o.AllocIDs[uint32(flow.AccessIntfId)]
-	o.AllocIDsLock.RUnlock()
-	for onuId, onu := range allocatedAllocIds {
+	defer o.AllocIDsLock.RUnlock()
+	for onuId, onu := range o.AllocIDs[uint32(flow.AccessIntfId)] {
 		if onuId == uint32(flow.OnuId) {
 			continue
 		}
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index 4de6609..4544539 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -23,11 +23,11 @@
 	"github.com/opencord/bbsim/internal/bbsim/types"
 	bbsim "github.com/opencord/bbsim/internal/bbsim/types"
 	"github.com/opencord/bbsim/internal/common"
+	"github.com/opencord/voltha-protos/v4/go/openolt"
 	"github.com/stretchr/testify/assert"
 	"net"
+	"sync"
 	"testing"
-
-	"github.com/opencord/voltha-protos/v4/go/openolt"
 )
 
 func createMockOlt(numPon int, numOnu int, services []ServiceIf) *OltDevice {
@@ -436,6 +436,57 @@
 	assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 0)
 }
 
+// testing that we can validate flows while we are adding them
+func Benchmark_validateAndAddFlows(b *testing.B) {
+	const (
+		pon   = 0
+		start = 0
+		end   = 512
+	)
+
+	for r := 0; r < b.N; r++ {
+		olt := createMockOlt(1, 512, []ServiceIf{})
+
+		wg := sync.WaitGroup{}
+
+		// concurrently adding 1K gems
+		for i := start; i < end; i++ {
+			wg.Add(1)
+			flow := &openolt.Flow{
+				AccessIntfId: pon,
+				OnuId:        int32(i),
+				PortNo:       uint32(i),
+				GemportId:    int32(i),
+				FlowId:       uint64(i),
+			}
+			go func(wg *sync.WaitGroup) {
+				olt.storeGemPortIdByFlow(flow)
+				olt.storeAllocId(flow)
+				wg.Done()
+			}(&wg)
+		}
+
+		// at the same time validate flows
+		for i := start; i < end; i++ {
+			wg.Add(1)
+			flow := &openolt.Flow{
+				AccessIntfId: pon,
+				OnuId:        int32(i),
+				PortNo:       uint32(i),
+				GemportId:    1,
+				FlowId:       uint64(i),
+			}
+			go func(wg *sync.WaitGroup) {
+				_ = olt.validateFlow(flow)
+				wg.Done()
+			}(&wg)
+		}
+
+		wg.Wait()
+		// NOTE this tests only fails if there is concurrent access to the map
+	}
+}
+
 func Test_Olt_validateFlow(t *testing.T) {
 
 	const (