[VOL-3982] Improving locks on GemPort adn AllocId maps in Olt struct to avoid concurrency issues
Change-Id: Iaea4915856af6057e273e05a5f34087cfdf73cad
diff --git a/internal/bbsim/devices/olt.go b/internal/bbsim/devices/olt.go
index ad0a8d0..6ad068d 100644
--- a/internal/bbsim/devices/olt.go
+++ b/internal/bbsim/devices/olt.go
@@ -1610,11 +1610,22 @@
o.AllocIDsLock.Lock()
defer o.AllocIDsLock.Unlock()
+ if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)]; !ok {
+ oltLogger.WithFields(log.Fields{
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "GemportId": flow.GemportId,
+ "FlowId": flow.FlowId,
+ }).Error("trying-to-store-alloc-id-for-unknown-onu")
+ }
+
oltLogger.WithFields(log.Fields{
- "IntfId": flow.AccessIntfId,
- "OnuId": flow.OnuId,
- "PortNo": flow.PortNo,
- "AllocId": flow.AllocId,
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "GemportId": flow.GemportId,
+ "FlowId": flow.FlowId,
}).Trace("storing-alloc-id-via-flow")
if _, ok := o.AllocIDs[uint32(flow.AccessIntfId)][uint32(flow.OnuId)][flow.PortNo]; !ok {
@@ -1660,6 +1671,27 @@
}
func (o *OltDevice) storeGemPortId(ponId uint32, onuId uint32, portNo uint32, gemId int32, flowId uint64) {
+ o.GemPortIDsLock.Lock()
+ defer o.GemPortIDsLock.Unlock()
+
+ if _, ok := o.GemPortIDs[ponId][onuId]; !ok {
+ oltLogger.WithFields(log.Fields{
+ "IntfId": ponId,
+ "OnuId": onuId,
+ "PortNo": portNo,
+ "GemportId": gemId,
+ "FlowId": flowId,
+ }).Error("trying-to-store-gemport-for-unknown-onu")
+ }
+
+ oltLogger.WithFields(log.Fields{
+ "IntfId": ponId,
+ "OnuId": onuId,
+ "PortNo": portNo,
+ "GemportId": gemId,
+ "FlowId": flowId,
+ }).Trace("storing-alloc-id-via-flow")
+
if _, ok := o.GemPortIDs[ponId][onuId][portNo]; !ok {
o.GemPortIDs[ponId][onuId][portNo] = make(map[int32]map[uint64]bool)
}
@@ -1670,14 +1702,14 @@
}
func (o *OltDevice) storeGemPortIdByFlow(flow *openolt.Flow) {
- o.GemPortIDsLock.Lock()
- defer o.GemPortIDsLock.Unlock()
-
oltLogger.WithFields(log.Fields{
- "IntfId": flow.AccessIntfId,
- "OnuId": flow.OnuId,
- "PortNo": flow.PortNo,
- "GemportId": flow.GemportId,
+ "IntfId": flow.AccessIntfId,
+ "OnuId": flow.OnuId,
+ "PortNo": flow.PortNo,
+ "GemportId": flow.GemportId,
+ "FlowId": flow.FlowId,
+ "ReplicateFlow": flow.ReplicateFlow,
+ "PbitToGemport": flow.PbitToGemport,
}).Trace("storing-gem-port-id-via-flow")
if flow.ReplicateFlow {
@@ -1687,7 +1719,6 @@
} else {
o.storeGemPortId(uint32(flow.AccessIntfId), uint32(flow.OnuId), flow.PortNo, flow.GemportId, flow.FlowId)
}
-
}
func (o *OltDevice) freeGemPortId(flow *openolt.Flow) {
@@ -1736,12 +1767,10 @@
// - the AllocId is not used in any flow referencing other ONUs/UNIs on the same PON
// - the GemPortId is not used in any flow referencing other ONUs/UNIs on the same PON
func (o *OltDevice) validateFlow(flow *openolt.Flow) error {
-
// validate gemPort
o.GemPortIDsLock.RLock()
- allocatedGems := o.GemPortIDs[uint32(flow.AccessIntfId)]
- o.GemPortIDsLock.RUnlock()
- for onuId, onu := range allocatedGems {
+ defer o.GemPortIDsLock.RUnlock()
+ for onuId, onu := range o.GemPortIDs[uint32(flow.AccessIntfId)] {
if onuId == uint32(flow.OnuId) {
continue
}
@@ -1763,9 +1792,8 @@
}
o.AllocIDsLock.RLock()
- allocatedAllocIds := o.AllocIDs[uint32(flow.AccessIntfId)]
- o.AllocIDsLock.RUnlock()
- for onuId, onu := range allocatedAllocIds {
+ defer o.AllocIDsLock.RUnlock()
+ for onuId, onu := range o.AllocIDs[uint32(flow.AccessIntfId)] {
if onuId == uint32(flow.OnuId) {
continue
}
diff --git a/internal/bbsim/devices/olt_test.go b/internal/bbsim/devices/olt_test.go
index 4de6609..4544539 100644
--- a/internal/bbsim/devices/olt_test.go
+++ b/internal/bbsim/devices/olt_test.go
@@ -23,11 +23,11 @@
"github.com/opencord/bbsim/internal/bbsim/types"
bbsim "github.com/opencord/bbsim/internal/bbsim/types"
"github.com/opencord/bbsim/internal/common"
+ "github.com/opencord/voltha-protos/v4/go/openolt"
"github.com/stretchr/testify/assert"
"net"
+ "sync"
"testing"
-
- "github.com/opencord/voltha-protos/v4/go/openolt"
)
func createMockOlt(numPon int, numOnu int, services []ServiceIf) *OltDevice {
@@ -436,6 +436,57 @@
assert.Equal(t, len(olt.GemPortIDs[pon][onu][uni]), 0)
}
+// testing that we can validate flows while we are adding them
+func Benchmark_validateAndAddFlows(b *testing.B) {
+ const (
+ pon = 0
+ start = 0
+ end = 512
+ )
+
+ for r := 0; r < b.N; r++ {
+ olt := createMockOlt(1, 512, []ServiceIf{})
+
+ wg := sync.WaitGroup{}
+
+ // concurrently adding 1K gems
+ for i := start; i < end; i++ {
+ wg.Add(1)
+ flow := &openolt.Flow{
+ AccessIntfId: pon,
+ OnuId: int32(i),
+ PortNo: uint32(i),
+ GemportId: int32(i),
+ FlowId: uint64(i),
+ }
+ go func(wg *sync.WaitGroup) {
+ olt.storeGemPortIdByFlow(flow)
+ olt.storeAllocId(flow)
+ wg.Done()
+ }(&wg)
+ }
+
+ // at the same time validate flows
+ for i := start; i < end; i++ {
+ wg.Add(1)
+ flow := &openolt.Flow{
+ AccessIntfId: pon,
+ OnuId: int32(i),
+ PortNo: uint32(i),
+ GemportId: 1,
+ FlowId: uint64(i),
+ }
+ go func(wg *sync.WaitGroup) {
+ _ = olt.validateFlow(flow)
+ wg.Done()
+ }(&wg)
+ }
+
+ wg.Wait()
+ // NOTE this tests only fails if there is concurrent access to the map
+ }
+}
+
func Test_Olt_validateFlow(t *testing.T) {
const (
diff --git a/internal/common/omci/get.go b/internal/common/omci/get.go
index 5172f60..4b02bda 100644
--- a/internal/common/omci/get.go
+++ b/internal/common/omci/get.go
@@ -204,7 +204,7 @@
omciLogger.WithFields(log.Fields{
"EntityInstance": entityInstance,
- }).Info("received-get-software-image-request")
+ }).Trace("received-get-software-image-request")
// Only one image can be active and committed
committed := 0
@@ -240,7 +240,7 @@
"entityId": entityInstance,
"active": active,
"committed": committed,
- }).Info("Reporting SoftwareImage")
+ }).Trace("Reporting SoftwareImage")
return res
}