[VOL-2848] : Protect concurrent access for PON resources

Flows currently were serialized on a per ONU basis. However flows
from different ONUs could add/remove concurrently. This meant
possible concurrent access of PON resources. In some tests it was
seen that two ONUs got same resource (like FlowID) and traffic
would fail.
This new change makes concurrent access to get/put/delete on shared PON resource pools
on KV store protected by locks. The adapter maintains certain data on
the KV store on a per ONU basis and this does not need any protection
as the key path are unique and provide inherant protection and moreover
any flow operation on a given ONU is serialized, this is not to worry.

Change-Id: I8a452a7ae84413741cbc2fa24ae42f4329748e32
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 5f5ce2a..4db1b93 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -149,11 +149,22 @@
 	ep := &mocks.MockEventProxy{}
 	openOLT := &OpenOLT{coreProxy: cp, adapterProxy: ap, eventProxy: ep}
 	dh := NewDeviceHandler(cp, ap, ep, device, openOLT)
-	deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: nil, Model: "openolt", DeviceId: dh.deviceID}
-	dh.resourceMgr = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.deviceID, DeviceType: dh.deviceType, DevInfo: deviceInf,
+	oopRanges := []*oop.DeviceInfo_DeviceResourceRanges{{
+		IntfIds:    []uint32{0, 1},
+		Technology: "xgs-pon",
+		Pools:      []*oop.DeviceInfo_DeviceResourceRanges_Pool{{}},
+	}}
+
+	deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.deviceID, PonPorts: 2}
+	rsrMgr := resourcemanager.OpenOltResourceMgr{DeviceID: dh.deviceID, DeviceType: dh.deviceType, DevInfo: deviceInf,
 		KVStore: &db.Backend{
 			Client: &mocks.MockKVClient{},
 		}}
+	rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
+	rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
+	rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, deviceInf.PonPorts)
+
+	dh.resourceMgr = &rsrMgr
 	dh.resourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
 	ranges := make(map[string]interface{})
 	sharedIdxByType := make(map[string]string)
@@ -172,15 +183,15 @@
 
 	ponmgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
-		IntfIDs:  []uint32{1, 2},
+		IntfIDs:  []uint32{0, 1},
 		KVStore: &db.Backend{
 			Client: &mocks.MockKVClient{},
 		},
 		PonResourceRanges: ranges,
 		SharedIdxByType:   sharedIdxByType,
 	}
+	dh.resourceMgr.ResourceMgrs[0] = ponmgr
 	dh.resourceMgr.ResourceMgrs[1] = ponmgr
-	dh.resourceMgr.ResourceMgrs[2] = ponmgr
 	ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 	defer cancel()
 	dh.flowMgr = NewFlowManager(ctx, dh, dh.resourceMgr)
@@ -864,10 +875,10 @@
 		devicehandler *DeviceHandler
 		args          args
 	}{
-		{"activateONU-1", dh, args{intfID: 1, onuID: 1, serialNum: &oop.SerialNumber{VendorId: []byte("onu1")}}},
-		{"activateONU-2", dh, args{intfID: 2, onuID: 2, serialNum: &oop.SerialNumber{VendorId: []byte("onu2")}}},
-		{"activateONU-3", dh1, args{intfID: 1, onuID: 1, serialNum: &oop.SerialNumber{VendorId: []byte("onu1")}}},
-		{"activateONU-4", dh1, args{intfID: 2, onuID: 2, serialNum: &oop.SerialNumber{VendorId: []byte("onu2")}}},
+		{"activateONU-1", dh, args{intfID: 0, onuID: 1, serialNum: &oop.SerialNumber{VendorId: []byte("onu1")}}},
+		{"activateONU-2", dh, args{intfID: 1, onuID: 2, serialNum: &oop.SerialNumber{VendorId: []byte("onu2")}}},
+		{"activateONU-3", dh1, args{intfID: 0, onuID: 1, serialNum: &oop.SerialNumber{VendorId: []byte("onu1")}}},
+		{"activateONU-4", dh1, args{intfID: 1, onuID: 2, serialNum: &oop.SerialNumber{VendorId: []byte("onu2")}}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 906fd94..b874591 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -25,6 +25,7 @@
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"strconv"
 	"strings"
+	"sync"
 
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -108,6 +109,19 @@
 	DevInfo     *openolt.DeviceInfo // device information
 	// array of pon resource managers per interface technology
 	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+
+	// This protects concurrent gemport_id allocate/delete calls on a per PON port basis
+	GemPortIDMgmtLock []sync.RWMutex
+	// This protects concurrent alloc_id allocate/delete calls on a per PON port basis
+	AllocIDMgmtLock []sync.RWMutex
+	// This protects concurrent onu_id allocate/delete calls on a per PON port basis
+	OnuIDMgmtLock []sync.RWMutex
+	// This protects concurrent flow_id allocate/delete calls. We do not need this on a
+	// per PON port basis as flow IDs are unique across the OLT.
+	FlowIDMgmtLock sync.RWMutex
+
+	// This protects concurrent access to flowids_per_gem info stored on KV store
+	flowIDToGemInfoLock sync.RWMutex
 }
 
 func newKVClient(storeType string, address string, timeout uint32) (kvstore.Client, error) {
@@ -154,6 +168,7 @@
 	IPPort := strings.Split(KVStoreHostPort, ":")
 	ResourceMgr.Host = IPPort[0]
 	ResourceMgr.Port, _ = strconv.Atoi(IPPort[1])
+	NumPONPorts := devInfo.GetPonPorts()
 
 	Backend := kvStoreType
 	ResourceMgr.KVStore = SetKVClient(Backend, ResourceMgr.Host,
@@ -165,6 +180,10 @@
 	RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
 	ResourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
 
+	ResourceMgr.AllocIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+	ResourceMgr.GemPortIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+	ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+
 	// TODO self.args = registry('main').get_args()
 
 	/*
@@ -176,7 +195,6 @@
 		var ranges openolt.DeviceInfo_DeviceResourceRanges
 		ranges.Technology = devInfo.GetTechnology()
 
-		NumPONPorts := devInfo.GetPonPorts()
 		var index uint32
 		for index = 0; index < NumPONPorts; index++ {
 			ranges.IntfIds = append(ranges.IntfIds, index)
@@ -425,9 +443,11 @@
 		err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
 		return 0, err
 	}
+	RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
 	// Get ONU id for a provided pon interface ID.
 	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
 		ponrmgr.ONU_ID, 1)
+	RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
 	if err != nil {
 		logger.Errorf("Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.ONU_ID)
@@ -500,8 +520,10 @@
 		}
 	}
 	logger.Debug("No matching flows with flow cookie or flow category, allocating new flowid")
+	RsrcMgr.FlowIDMgmtLock.Lock()
 	FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
 		ponrmgr.FLOW_ID, 1)
+	RsrcMgr.FlowIDMgmtLock.Unlock()
 	if err != nil {
 		logger.Errorf("Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.FLOW_ID)
@@ -530,8 +552,10 @@
 		logger.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
 		return AllocID[0]
 	}
+	RsrcMgr.AllocIDMgmtLock[intfID].Lock()
 	AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
 		ponrmgr.ALLOC_ID, 1)
+	RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
 
 	if AllocID == nil || err != nil {
 		logger.Error("Failed to allocate alloc id")
@@ -660,8 +684,10 @@
 		return GEMPortList, nil
 	}
 
+	RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
 	GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
 		ponrmgr.GEMPORT_ID, NumOfPorts)
+	RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
 	if err != nil && GEMPortList == nil {
 		logger.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
 		return nil, err
@@ -692,7 +718,9 @@
 // FreeonuID releases(make free) onu id for a particular pon-port
 func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
 
+	RsrcMgr.OnuIDMgmtLock[intfID].Lock()
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID)
+	RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
 
 	/* Free onu id for a particular interface.*/
 	var IntfonuID string
@@ -716,14 +744,17 @@
 		logger.Errorw("Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
 	}
 	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
+	RsrcMgr.FlowIDMgmtLock.Lock()
+	defer RsrcMgr.FlowIDMgmtLock.Unlock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
 }
 
 // FreeFlowIDs releases the flow Ids
 func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
 	uniID uint32, FlowID []uint32) {
-
+	RsrcMgr.FlowIDMgmtLock.Lock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
+	RsrcMgr.FlowIDMgmtLock.Unlock()
 
 	var IntfOnuIDUniID string
 	var err error
@@ -744,6 +775,8 @@
 	RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
 	allocIDs := make([]uint32, 0)
 	allocIDs = append(allocIDs, allocID)
+	RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
+	defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs)
 }
 
@@ -754,6 +787,8 @@
 	RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
 	gemPortIDs := make([]uint32, 0)
 	gemPortIDs = append(gemPortIDs, gemPortID)
+	RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
+	defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
 }
 
@@ -765,19 +800,26 @@
 
 	AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
 
+	RsrcMgr.AllocIDMgmtLock[onuID].Lock()
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
 		ponrmgr.ALLOC_ID,
 		AllocIDs)
+	RsrcMgr.AllocIDMgmtLock[onuID].Unlock()
 
+	RsrcMgr.GemPortIDMgmtLock[onuID].Lock()
 	GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
 		ponrmgr.GEMPORT_ID,
 		GEMPortIDs)
+	RsrcMgr.GemPortIDMgmtLock[onuID].Unlock()
 
+	RsrcMgr.FlowIDMgmtLock.Lock()
 	FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(ctx, IntfOnuIDUniID)
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
 		ponrmgr.FLOW_ID,
 		FlowIDs)
+	RsrcMgr.FlowIDMgmtLock.Unlock()
+
 	// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
 	RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
 	// Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
@@ -1230,6 +1272,8 @@
 		logger.Error("Failed to marshal data", log.Fields{"error": err})
 		return err
 	}
+	RsrcMgr.flowIDToGemInfoLock.Lock()
+	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
 	if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
 		logger.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
 		return err
@@ -1260,6 +1304,10 @@
 		logger.Error("Failed to marshal data", log.Fields{"error": err})
 		return
 	}
+
+	RsrcMgr.flowIDToGemInfoLock.Lock()
+	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
+
 	if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
 		logger.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
 		return
@@ -1272,8 +1320,9 @@
 	path := fmt.Sprintf(FlowIDsForGem, intf)
 	var flowsForGem map[uint32][]uint32
 	var val []byte
-
+	RsrcMgr.flowIDToGemInfoLock.RLock()
 	value, err := RsrcMgr.KVStore.Get(ctx, path)
+	RsrcMgr.flowIDToGemInfoLock.RUnlock()
 	if err != nil {
 		logger.Error("failed to get data from kv store")
 		return nil, err
@@ -1294,6 +1343,8 @@
 //DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
 func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
 	path := fmt.Sprintf(FlowIDsForGem, intf)
+	RsrcMgr.flowIDToGemInfoLock.Lock()
+	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
 	if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
 		logger.Errorw("Failed to delete nni interfaces from kv store", log.Fields{"path": path})
 		return
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index b44ba07..4786940 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -37,6 +37,7 @@
 	"reflect"
 	"strconv"
 	"strings"
+	"sync"
 	"testing"
 	"time"
 )
@@ -68,15 +69,16 @@
 
 // fields mocks  OpenOltResourceMgr struct.
 type fields struct {
-	DeviceID     string
-	HostAndPort  string
-	Args         string
-	KVStore      *db.Backend
-	DeviceType   string
-	Host         string
-	Port         int
-	DevInfo      *openolt.DeviceInfo
-	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+	DeviceID      string
+	HostAndPort   string
+	Args          string
+	KVStore       *db.Backend
+	DeviceType    string
+	Host          string
+	Port          int
+	DevInfo       *openolt.DeviceInfo
+	ResourceMgrs  map[uint32]*ponrmgr.PONResourceManager
+	NumOfPonPorts uint32
 }
 
 // MockKVClient mocks the AdapterProxy interface.
@@ -104,6 +106,7 @@
 	ranges["alloc_id_shared"] = uint32(0)
 	ranges["gemport_id_shared"] = uint32(0)
 	ranges["flow_id_shared"] = uint32(0)
+	resMgr.NumOfPonPorts = 2
 	ponMgr := &ponrmgr.PONResourceManager{
 		DeviceID: "onu-1",
 		IntfIDs:  []uint32{1, 2},
@@ -115,6 +118,7 @@
 	}
 	resMgr.ResourceMgrs[1] = ponMgr
 	resMgr.ResourceMgrs[2] = ponMgr
+
 	return &resMgr
 }
 
@@ -247,7 +251,7 @@
 
 // testResMgrObject maps fields type to OpenOltResourceMgr type.
 func testResMgrObject(testResMgr *fields) *OpenOltResourceMgr {
-	return &OpenOltResourceMgr{
+	var rsrMgr = OpenOltResourceMgr{
 		DeviceID:     testResMgr.DeviceID,
 		HostAndPort:  testResMgr.HostAndPort,
 		Args:         testResMgr.Args,
@@ -258,6 +262,12 @@
 		DevInfo:      testResMgr.DevInfo,
 		ResourceMgrs: testResMgr.ResourceMgrs,
 	}
+
+	rsrMgr.AllocIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+	rsrMgr.GemPortIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+	rsrMgr.OnuIDMgmtLock = make([]sync.RWMutex, testResMgr.NumOfPonPorts)
+
+	return &rsrMgr
 }
 
 func TestNewResourceMgr(t *testing.T) {
@@ -321,7 +331,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreeFlowID-1", getResMgr(), args{2, 2, 2, 2}},
+		{"FreeFlowID-1", getResMgr(), args{1, 2, 2, 2}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -346,7 +356,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreeFlowIDs-1", getResMgr(), args{2, 2, 2, []uint32{1, 2}}},
+		{"FreeFlowIDs-1", getResMgr(), args{1, 2, 2, []uint32{1, 2}}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -369,7 +379,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreePONResourcesForONU-1", getResMgr(), args{2, 0, 2}},
+		{"FreePONResourcesForONU-1", getResMgr(), args{1, 0, 2}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -391,7 +401,7 @@
 		fields *fields
 		args   args
 	}{
-		{"FreeOnuID-1", getResMgr(), args{2, []uint32{1, 2}}},
+		{"FreeOnuID-1", getResMgr(), args{1, []uint32{1, 2}}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -416,7 +426,7 @@
 		args   args
 		want   uint32
 	}{
-		{"GetAllocID-1", getResMgr(), args{2, 2, 2}, 0},
+		{"GetAllocID-1", getResMgr(), args{1, 2, 2}, 0},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -442,7 +452,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetCurrentAllocIDForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+		{"GetCurrentAllocIDForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -469,7 +479,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetCurrentFlowIDsForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+		{"GetCurrentFlowIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -495,7 +505,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetCurrentGEMPortIDsForOnu-1", getResMgr(), args{2, 2, 2}, []uint32{}},
+		{"GetCurrentGEMPortIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint32{}},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -527,7 +537,7 @@
 		want    uint32
 		wantErr error
 	}{
-		{"GetFlowID-1", getResMgr(), args{2, 2, 2, 2, 2,
+		{"GetFlowID-1", getResMgr(), args{1, 2, 2, 2, 2,
 			"HSIA", nil}, 0, errors.New("failed to get flows")},
 	}
 	for _, tt := range tests {
@@ -561,7 +571,7 @@
 		want    []uint32
 		wantErr error
 	}{
-		{"GetGEMPortID-1", getResMgr(), args{2, 2, 2, 2}, []uint32{},
+		{"GetGEMPortID-1", getResMgr(), args{1, 2, 2, 2}, []uint32{},
 			errors.New("failed to get gem port")},
 	}
 	for _, tt := range tests {
@@ -596,9 +606,9 @@
 		want    *ofp.OfpMeterConfig
 		wantErr error
 	}{
-		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 1, 1, 64},
+		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
 			&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
-		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 2, 2, 2, 65},
+		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
 			&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
 	}
 	for _, tt := range tests {
@@ -625,7 +635,7 @@
 		want    uint32
 		wantErr error
 	}{
-		{"GetONUID-1", getResMgr(), args{2}, uint32(0), errors.New("json errors")},
+		{"GetONUID-1", getResMgr(), args{1}, uint32(0), errors.New("json errors")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -653,7 +663,7 @@
 		args   args
 		want   []uint32
 	}{
-		{"GetTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2},
+		{"GetTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2},
 			[]uint32{1}},
 	}
 	for _, tt := range tests {
@@ -681,7 +691,7 @@
 		args   args
 		want   bool
 	}{
-		{"IsFlowCookieOnKVStore-1", getResMgr(), args{2, 2, 2, 2}, false},
+		{"IsFlowCookieOnKVStore-1", getResMgr(), args{1, 2, 2, 2}, false},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -739,7 +749,7 @@
 		args    args
 		wantErr error
 	}{
-		{"RemoveTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2, 64},
+		{"RemoveTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2, 64},
 			errors.New("failed to delete techprofile id resource %s in KV store")},
 	}
 	for _, tt := range tests {
@@ -768,7 +778,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateAllocIdsForOnu-1", getResMgr(), args{2, 2, 2, []uint32{1, 2}},
+		{"UpdateAllocIdsForOnu-1", getResMgr(), args{1, 2, 2, []uint32{1, 2}},
 			errors.New("")},
 	}
 	for _, tt := range tests {
@@ -797,7 +807,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateFlowIDInfo-1", getResMgr(), args{2, 2, 2, 2, &[]FlowInfo{}}, errors.New("")},
+		{"UpdateFlowIDInfo-1", getResMgr(), args{1, 2, 2, 2, &[]FlowInfo{}}, errors.New("")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -825,7 +835,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateGEMPortIDsForOnu-1", getResMgr(), args{2, 2, 2,
+		{"UpdateGEMPortIDsForOnu-1", getResMgr(), args{1, 2, 2,
 			[]uint32{1, 2}}, errors.New("failed to update resource")},
 	}
 	for _, tt := range tests {
@@ -854,7 +864,7 @@
 		wantErr error
 	}{
 		{"UpdateGEMportsPonportToOnuMapOnKVStore-1", getResMgr(), args{[]uint32{1, 2},
-			2, 2, 2}, errors.New("failed to update resource")},
+			1, 2, 2}, errors.New("failed to update resource")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
@@ -884,7 +894,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 2, 2,
+		{"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 1, 2,
 			2, 64, &ofp.OfpMeterConfig{}}, errors.New("failed to get Meter config from kvstore for path")},
 	}
 	for _, tt := range tests {
@@ -913,7 +923,7 @@
 		args    args
 		wantErr error
 	}{
-		{"UpdateTechProfileIDForOnu-1", getResMgr(), args{2, 2, 2,
+		{"UpdateTechProfileIDForOnu-1", getResMgr(), args{1, 2, 2,
 			2}, errors.New("failed to update resource")},
 	}
 	for _, tt := range tests {