[VOL-2848] : Protect concurrent access for PON resources

Flows currently were serialized on a per ONU basis. However flows
from different ONUs could add/remove concurrently. This meant
possible concurrent access of PON resources. In some tests it was
seen that two ONUs got same resource (like FlowID) and traffic
would fail.
This new change makes concurrent access to get/put/delete on shared PON resource pools
on KV store protected by locks. The adapter maintains certain data on
the KV store on a per ONU basis and this does not need any protection
as the key path are unique and provide inherant protection and moreover
any flow operation on a given ONU is serialized, this is not to worry.

Change-Id: I8a452a7ae84413741cbc2fa24ae42f4329748e32
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index b44ba07..4786940 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -37,6 +37,7 @@
 	"reflect"
 	"strconv"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 )
@@ -68,15 +69,16 @@
 
 // fields mocks  OpenOltResourceMgr struct.
 type fields struct {
-	DeviceID     string
-	HostAndPort  string
-	Args         string
-	KVStore      *db.Backend
-	DeviceType   string
-	Host         string
-	Port         int
-	DevInfo      *openolt.DeviceInfo
-	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+	DeviceID      string
+	HostAndPort   string
+	Args          string
+	KVStore       *db.Backend
+	DeviceType    string
+	Host          string
+	Port          int
+	DevInfo       *openolt.DeviceInfo
+	ResourceMgrs  map[uint32]*ponrmgr.PONResourceManager
+	NumOfPonPorts uint32
 }
 
 // MockKVClient mocks the AdapterProxy interface.
@@ -104,6 +106,7 @@
 	ranges["alloc_id_shared"] = uint32(0)
 	ranges["gemport_id_shared"] = uint32(0)
 	ranges["flow_id_shared"] = uint32(0)
+	resMgr.NumOfPonPorts = 2
 	ponMgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
 		IntfIDs:  []uint32{1, 2},
@@ -115,6 +118,7 @@
 	}
 	resMgr.ResourceMgrs[1] = ponMgr
 	resMgr.ResourceMgrs[2] = ponMgr
+
 	return &resMgr
 }
 
@@ -247,7 +251,7 @@
 
 // testResMgrObject maps fields type to OpenOltResourceMgr type.
 func testResMgrObject(testResMgr *fields) *OpenOltResourceMgr {
-	return &OpenOltResourceMgr{
+	var rsrMgr = OpenOltResourceMgr{
 		DeviceID:     testResMgr.DeviceID,
 		HostAndPort:  testResMgr.HostAndPort,
 		Args:         testResMgr.Args,
@@ -258,6 +262,12 @@
 		DevInfo:      testResMgr.DevInfo,
 		ResourceMgrs: testResMgr.ResourceMgrs,
 	}
+
+	rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+	rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+	rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+
+	return &rsrMgr
 }
 
 func TestNewResourceMgr(t *testing.T) {
@@ -321,7 +331,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreeFlowID-1", getResMgr(), args{2, 2, 2, 2}},
+		{"FreeFlowID-1", getResMgr(), args{1, 2, 2, 2}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -346,7 +356,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreeFlowIDs-1", getResMgr(), args{2, 2, 2, []uint32{1, 2}}},
+		{"FreeFlowIDs-1", getResMgr(), args{1, 2, 2, []uint32{1, 2}}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -369,7 +379,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreePONResourcesForONU-1", getResMgr(), args{2, 0, 2}},
+		{"FreePONResourcesForONU-1", getResMgr(), args{1, 0, 2}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -391,7 +401,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreeOnuID-1", getResMgr(), args{2, []uint32{1, 2}}},
+		{"FreeOnuID-1", getResMgr(), args{1, []uint32{1, 2}}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -416,7 +426,7 @@
 		args   args
 		want   uint32
 	}{
-		{"GetAllocID-1", getResMgr(), args{2, 2, 2}, 0},
+		{"GetAllocID-1", getResMgr(), args{1, 2, 2}, 0},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -442,7 +452,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetCurrentAllocIDForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+		{"GetCurrentAllocIDForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -469,7 +479,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetCurrentFlowIDsForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+		{"GetCurrentFlowIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -495,7 +505,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetCurrentGEMPortIDsForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+		{"GetCurrentGEMPortIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -527,7 +537,7 @@
 		want    uint32
 		wantErr error
 	}{
-		{"GetFlowID-1", getResMgr(), args{2, 2, 2, 2, 2,
+		{"GetFlowID-1", getResMgr(), args{1, 2, 2, 2, 2,
 			"HSIA", nil}, 0, errors.New("failed to get flows")},
 	}
 	for _, tt := range tests {
@@ -561,7 +571,7 @@
 		want    []uint32
 		wantErr error
 	}{
-		{"GetGEMPortID-1", getResMgr(), args{2, 2, 2, 2}, []uint32{},
+		{"GetGEMPortID-1", getResMgr(), args{1, 2, 2, 2}, []uint32{},
 			errors.New("failed to get gem port")},
 	}
 	for _, tt := range tests {
@@ -596,9 +606,9 @@
 		want    *ofp.OfpMeterConfig
 		wantErr error
 	}{
-		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 1, 1, 64},
+		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
 			&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
-		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 2, 2, 2, 65},
+		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
 			&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
 	}
 	for _, tt := range tests {
@@ -625,7 +635,7 @@
 		want    uint32
 		wantErr error
 	}{
-		{"GetONUID-1", getResMgr(), args{2}, uint32(0), errors.New("json errors")},
+		{"GetONUID-1", getResMgr(), args{1}, uint32(0), errors.New("json errors")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -653,7 +663,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2},
+		{"GetTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2},
 			[]uint32{1}},
 	}
 	for _, tt := range tests {
@@ -681,7 +691,7 @@
 		args   args
 		want   bool
 	}{
-		{"IsFlowCookieOnKVStore-1", getResMgr(), args{2, 2, 2, 2}, false},
+		{"IsFlowCookieOnKVStore-1", getResMgr(), args{1, 2, 2, 2}, false},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -739,7 +749,7 @@
 		args    args
 		wantErr error
 	}{
-		{"RemoveTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2, 64},
+		{"RemoveTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2, 64},
 			errors.New("failed to delete techprofile id resource %s in KV store")},
 	}
 	for _, tt := range tests {
@@ -768,7 +778,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateAllocIdsForOnu-1", getResMgr(), args{2, 2, 2, []uint32{1, 2}},
+		{"UpdateAllocIdsForOnu-1", getResMgr(), args{1, 2, 2, []uint32{1, 2}},
 			errors.New("")},
 	}
 	for _, tt := range tests {
@@ -797,7 +807,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateFlowIDInfo-1", getResMgr(), args{2, 2, 2, 2, &[]FlowInfo{}}, errors.New("")},
+		{"UpdateFlowIDInfo-1", getResMgr(), args{1, 2, 2, 2, &[]FlowInfo{}}, errors.New("")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -825,7 +835,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateGEMPortIDsForOnu-1", getResMgr(), args{2, 2, 2,
+		{"UpdateGEMPortIDsForOnu-1", getResMgr(), args{1, 2, 2,
 			[]uint32{1, 2}}, errors.New("failed to update resource")},
 	}
 	for _, tt := range tests {
@@ -854,7 +864,7 @@
 		wantErr error
 	}{
 		{"UpdateGEMportsPonportToOnuMapOnKVStore-1", getResMgr(), args{[]uint32{1, 2},
-			2, 2, 2}, errors.New("failed to update resource")},
+			1, 2, 2}, errors.New("failed to update resource")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -884,7 +894,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 2, 2,
+		{"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 1, 2,
 			2, 64, &ofp.OfpMeterConfig{}}, errors.New("failed to get Meter config from kvstore for path")},
 	}
 	for _, tt := range tests {
@@ -913,7 +923,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2,
+		{"UpdateTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2,
 			2}, errors.New("failed to update resource")},
 	}
 	for _, tt := range tests {