[VOL-2848] : Protect concurrent access for PON resources
Flows currently were serialized on a per ONU basis. However flows
from different ONUs could add/remove concurrently. This meant
possible concurrent access of PON resources. In some tests it was
seen that two ONUs got same resource (like FlowID) and traffic
would fail.
This new change makes concurrent access to get/put/delete on shared PON resource pools
on KV store protected by locks. The adapter maintains certain data on
the KV store on a per ONU basis and this does not need any protection
as the key path are unique and provide inherant protection and moreover
any flow operation on a given ONU is serialized, this is not to worry.
Change-Id: I8a452a7ae84413741cbc2fa24ae42f4329748e32
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index b44ba07..4786940 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -37,6 +37,7 @@
"reflect"
"strconv"
"strings"
+ "sync"
"testing"
"time"
)
@@ -68,15 +69,16 @@
// fields mocks OpenOltResourceMgr struct.
type fields struct {
- DeviceID string
- HostAndPort string
- Args string
- KVStore *db.Backend
- DeviceType string
- Host string
- Port int
- DevInfo *openolt.DeviceInfo
- ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+ DeviceID string
+ HostAndPort string
+ Args string
+ KVStore *db.Backend
+ DeviceType string
+ Host string
+ Port int
+ DevInfo *openolt.DeviceInfo
+ ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+ NumOfPonPorts uint32
}
// MockKVClient mocks the AdapterProxy interface.
@@ -104,6 +106,7 @@
ranges["alloc_id_shared"] = uint32(0)
ranges["gemport_id_shared"] = uint32(0)
ranges["flow_id_shared"] = uint32(0)
+ resMgr.NumOfPonPorts = 2
ponMgr := &ponrmgr.PONResourceManager{
DeviceID: "onu-1",
IntfIDs: []uint32{1, 2},
@@ -115,6 +118,7 @@
}
resMgr.ResourceMgrs[1] = ponMgr
resMgr.ResourceMgrs[2] = ponMgr
+
return &resMgr
}
@@ -247,7 +251,7 @@
// testResMgrObject maps fields type to OpenOltResourceMgr type.
func testResMgrObject(testResMgr *fields) *OpenOltResourceMgr {
- return &OpenOltResourceMgr{
+ var rsrMgr = OpenOltResourceMgr{
DeviceID: testResMgr.DeviceID,
HostAndPort: testResMgr.HostAndPort,
Args: testResMgr.Args,
@@ -258,6 +262,12 @@
DevInfo: testResMgr.DevInfo,
ResourceMgrs: testResMgr.ResourceMgrs,
}
+
+ rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+ rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+ rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+
+ return &rsrMgr
}
func TestNewResourceMgr(t *testing.T) {
@@ -321,7 +331,7 @@
fields *fields
args args
}{
- {"FreeFlowID-1", getResMgr(), args{2, 2, 2, 2}},
+ {"FreeFlowID-1", getResMgr(), args{1, 2, 2, 2}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -346,7 +356,7 @@
fields *fields
args args
}{
- {"FreeFlowIDs-1", getResMgr(), args{2, 2, 2, []uint32{1, 2}}},
+ {"FreeFlowIDs-1", getResMgr(), args{1, 2, 2, []uint32{1, 2}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -369,7 +379,7 @@
fields *fields
args args
}{
- {"FreePONResourcesForONU-1", getResMgr(), args{2, 0, 2}},
+ {"FreePONResourcesForONU-1", getResMgr(), args{1, 0, 2}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -391,7 +401,7 @@
fields *fields
args args
}{
- {"FreeOnuID-1", getResMgr(), args{2, []uint32{1, 2}}},
+ {"FreeOnuID-1", getResMgr(), args{1, []uint32{1, 2}}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -416,7 +426,7 @@
args args
want uint32
}{
- {"GetAllocID-1", getResMgr(), args{2, 2, 2}, 0},
+ {"GetAllocID-1", getResMgr(), args{1, 2, 2}, 0},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -442,7 +452,7 @@
args args
want []uint32
}{
- {"GetCurrentAllocIDForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+ {"GetCurrentAllocIDForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -469,7 +479,7 @@
args args
want []uint32
}{
- {"GetCurrentFlowIDsForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+ {"GetCurrentFlowIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -495,7 +505,7 @@
args args
want []uint32
}{
- {"GetCurrentGEMPortIDsForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+ {"GetCurrentGEMPortIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -527,7 +537,7 @@
want uint32
wantErr error
}{
- {"GetFlowID-1", getResMgr(), args{2, 2, 2, 2, 2,
+ {"GetFlowID-1", getResMgr(), args{1, 2, 2, 2, 2,
"HSIA", nil}, 0, errors.New("failed to get flows")},
}
for _, tt := range tests {
@@ -561,7 +571,7 @@
want []uint32
wantErr error
}{
- {"GetGEMPortID-1", getResMgr(), args{2, 2, 2, 2}, []uint32{},
+ {"GetGEMPortID-1", getResMgr(), args{1, 2, 2, 2}, []uint32{},
errors.New("failed to get gem port")},
}
for _, tt := range tests {
@@ -596,9 +606,9 @@
want *ofp.OfpMeterConfig
wantErr error
}{
- {"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 1, 1, 64},
+ {"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
- {"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 2, 2, 2, 65},
+ {"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
}
for _, tt := range tests {
@@ -625,7 +635,7 @@
want uint32
wantErr error
}{
- {"GetONUID-1", getResMgr(), args{2}, uint32(0), errors.New("json errors")},
+ {"GetONUID-1", getResMgr(), args{1}, uint32(0), errors.New("json errors")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -653,7 +663,7 @@
args args
want []uint32
}{
- {"GetTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2},
+ {"GetTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2},
[]uint32{1}},
}
for _, tt := range tests {
@@ -681,7 +691,7 @@
args args
want bool
}{
- {"IsFlowCookieOnKVStore-1", getResMgr(), args{2, 2, 2, 2}, false},
+ {"IsFlowCookieOnKVStore-1", getResMgr(), args{1, 2, 2, 2}, false},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -739,7 +749,7 @@
args args
wantErr error
}{
- {"RemoveTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2, 64},
+ {"RemoveTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2, 64},
errors.New("failed to delete techprofile id resource %s in KV store")},
}
for _, tt := range tests {
@@ -768,7 +778,7 @@
args args
wantErr error
}{
- {"UpdateAllocIdsForOnu-1", getResMgr(), args{2, 2, 2, []uint32{1, 2}},
+ {"UpdateAllocIdsForOnu-1", getResMgr(), args{1, 2, 2, []uint32{1, 2}},
errors.New("")},
}
for _, tt := range tests {
@@ -797,7 +807,7 @@
args args
wantErr error
}{
- {"UpdateFlowIDInfo-1", getResMgr(), args{2, 2, 2, 2, &[]FlowInfo{}}, errors.New("")},
+ {"UpdateFlowIDInfo-1", getResMgr(), args{1, 2, 2, 2, &[]FlowInfo{}}, errors.New("")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -825,7 +835,7 @@
args args
wantErr error
}{
- {"UpdateGEMPortIDsForOnu-1", getResMgr(), args{2, 2, 2,
+ {"UpdateGEMPortIDsForOnu-1", getResMgr(), args{1, 2, 2,
[]uint32{1, 2}}, errors.New("failed to update resource")},
}
for _, tt := range tests {
@@ -854,7 +864,7 @@
wantErr error
}{
{"UpdateGEMportsPonportToOnuMapOnKVStore-1", getResMgr(), args{[]uint32{1, 2},
- 2, 2, 2}, errors.New("failed to update resource")},
+ 1, 2, 2}, errors.New("failed to update resource")},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -884,7 +894,7 @@
args args
wantErr error
}{
- {"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 2, 2,
+ {"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 1, 2,
2, 64, &ofp.OfpMeterConfig{}}, errors.New("failed to get Meter config from kvstore for path")},
}
for _, tt := range tests {
@@ -913,7 +923,7 @@
args args
wantErr error
}{
- {"UpdateTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2,
+ {"UpdateTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2,
2}, errors.New("failed to update resource")},
}
for _, tt := range tests {