[VOL-2848] : Protect concurrent access for PON resources

Flows currently were serialized on a per ONU basis. However flows
from different ONUs could add/remove concurrently. This meant
possible concurrent access of PON resources. In some tests it was
seen that two ONUs got same resource (like FlowID) and traffic
would fail.
This new change makes concurrent access to get/put/delete on shared PON resource pools
on KV store protected by locks. The adapter maintains certain data on
the KV store on a per ONU basis and this does not need any protection
as the key path are unique and provide inherant protection and moreover
any flow operation on a given ONU is serialized, this is not to worry.

Change-Id: I8a452a7ae84413741cbc2fa24ae42f4329748e32
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 906fd94..b874591 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -25,6 +25,7 @@
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 	"strconv"
 	"strings"
+	"sync"
 
 	"github.com/opencord/voltha-lib-go/v3/pkg/db"
 	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
@@ -108,6 +109,19 @@
 	DevInfo     *openolt.DeviceInfo // device information
 	// array of pon resource managers per interface technology
 	ResourceMgrs map[uint32]*ponrmgr.PONResourceManager
+
+	// This protects concurrent gemport_id allocate/delete calls on a per PON port basis
+	GemPortIDMgmtLock []sync.RWMutex
+	// This protects concurrent alloc_id allocate/delete calls on a per PON port basis
+	AllocIDMgmtLock []sync.RWMutex
+	// This protects concurrent onu_id allocate/delete calls on a per PON port basis
+	OnuIDMgmtLock []sync.RWMutex
+	// This protects concurrent flow_id allocate/delete calls. We do not need this on a
+	// per PON port basis as flow IDs are unique across the OLT.
+	FlowIDMgmtLock sync.RWMutex
+
+	// This protects concurrent access to flowids_per_gem info stored on KV store
+	flowIDToGemInfoLock sync.RWMutex
 }
 
 func newKVClient(storeType string, address string, timeout uint32) (kvstore.Client, error) {
@@ -154,6 +168,7 @@
 	IPPort := strings.Split(KVStoreHostPort, ":")
 	ResourceMgr.Host = IPPort[0]
 	ResourceMgr.Port, _ = strconv.Atoi(IPPort[1])
+	NumPONPorts := devInfo.GetPonPorts()
 
 	Backend := kvStoreType
 	ResourceMgr.KVStore = SetKVClient(Backend, ResourceMgr.Host,
@@ -165,6 +180,10 @@
 	RsrcMgrsByTech := make(map[string]*ponrmgr.PONResourceManager)
 	ResourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
 
+	ResourceMgr.AllocIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+	ResourceMgr.GemPortIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+	ResourceMgr.OnuIDMgmtLock = make([]sync.RWMutex, NumPONPorts)
+
 	// TODO self.args = registry('main').get_args()
 
 	/*
@@ -176,7 +195,6 @@
 		var ranges openolt.DeviceInfo_DeviceResourceRanges
 		ranges.Technology = devInfo.GetTechnology()
 
-		NumPONPorts := devInfo.GetPonPorts()
 		var index uint32
 		for index = 0; index < NumPONPorts; index++ {
 			ranges.IntfIds = append(ranges.IntfIds, index)
@@ -425,9 +443,11 @@
 		err := errors.New("invalid-pon-interface-" + strconv.Itoa(int(ponIntfID)))
 		return 0, err
 	}
+	RsrcMgr.OnuIDMgmtLock[ponIntfID].Lock()
 	// Get ONU id for a provided pon interface ID.
 	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
 		ponrmgr.ONU_ID, 1)
+	RsrcMgr.OnuIDMgmtLock[ponIntfID].Unlock()
 	if err != nil {
 		logger.Errorf("Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.ONU_ID)
@@ -500,8 +520,10 @@
 		}
 	}
 	logger.Debug("No matching flows with flow cookie or flow category, allocating new flowid")
+	RsrcMgr.FlowIDMgmtLock.Lock()
 	FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
 		ponrmgr.FLOW_ID, 1)
+	RsrcMgr.FlowIDMgmtLock.Unlock()
 	if err != nil {
 		logger.Errorf("Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.FLOW_ID)
@@ -530,8 +552,10 @@
 		logger.Debugw("Retrieved alloc ID from pon resource mgr", log.Fields{"AllocID": AllocID})
 		return AllocID[0]
 	}
+	RsrcMgr.AllocIDMgmtLock[intfID].Lock()
 	AllocID, err = RsrcMgr.ResourceMgrs[intfID].GetResourceID(ctx, intfID,
 		ponrmgr.ALLOC_ID, 1)
+	RsrcMgr.AllocIDMgmtLock[intfID].Unlock()
 
 	if AllocID == nil || err != nil {
 		logger.Error("Failed to allocate alloc id")
@@ -660,8 +684,10 @@
 		return GEMPortList, nil
 	}
 
+	RsrcMgr.GemPortIDMgmtLock[ponPort].Lock()
 	GEMPortList, err = RsrcMgr.ResourceMgrs[ponPort].GetResourceID(ctx, ponPort,
 		ponrmgr.GEMPORT_ID, NumOfPorts)
+	RsrcMgr.GemPortIDMgmtLock[ponPort].Unlock()
 	if err != nil && GEMPortList == nil {
 		logger.Errorf("Failed to get gem port id for %s", IntfOnuIDUniID)
 		return nil, err
@@ -692,7 +718,9 @@
 // FreeonuID releases(make free) onu id for a particular pon-port
 func (RsrcMgr *OpenOltResourceMgr) FreeonuID(ctx context.Context, intfID uint32, onuID []uint32) {
 
+	RsrcMgr.OnuIDMgmtLock[intfID].Lock()
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID, ponrmgr.ONU_ID, onuID)
+	RsrcMgr.OnuIDMgmtLock[intfID].Unlock()
 
 	/* Free onu id for a particular interface.*/
 	var IntfonuID string
@@ -716,14 +744,17 @@
 		logger.Errorw("Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
 	}
 	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
+	RsrcMgr.FlowIDMgmtLock.Lock()
+	defer RsrcMgr.FlowIDMgmtLock.Unlock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
 }
 
 // FreeFlowIDs releases the flow Ids
 func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
 	uniID uint32, FlowID []uint32) {
-
+	RsrcMgr.FlowIDMgmtLock.Lock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
+	RsrcMgr.FlowIDMgmtLock.Unlock()
 
 	var IntfOnuIDUniID string
 	var err error
@@ -744,6 +775,8 @@
 	RsrcMgr.RemoveAllocIDForOnu(ctx, IntfID, onuID, uniID, allocID)
 	allocIDs := make([]uint32, 0)
 	allocIDs = append(allocIDs, allocID)
+	RsrcMgr.AllocIDMgmtLock[IntfID].Lock()
+	defer RsrcMgr.AllocIDMgmtLock[IntfID].Unlock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.ALLOC_ID, allocIDs)
 }
 
@@ -754,6 +787,8 @@
 	RsrcMgr.RemoveGemPortIDForOnu(ctx, IntfID, onuID, uniID, gemPortID)
 	gemPortIDs := make([]uint32, 0)
 	gemPortIDs = append(gemPortIDs, gemPortID)
+	RsrcMgr.GemPortIDMgmtLock[IntfID].Lock()
+	defer RsrcMgr.GemPortIDMgmtLock[IntfID].Unlock()
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.GEMPORT_ID, gemPortIDs)
 }
 
@@ -765,19 +800,26 @@
 
 	AllocIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentAllocIDForOnu(ctx, IntfOnuIDUniID)
 
+	RsrcMgr.AllocIDMgmtLock[onuID].Lock()
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
 		ponrmgr.ALLOC_ID,
 		AllocIDs)
+	RsrcMgr.AllocIDMgmtLock[onuID].Unlock()
 
+	RsrcMgr.GemPortIDMgmtLock[onuID].Lock()
 	GEMPortIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentGEMPortIDsForOnu(ctx, IntfOnuIDUniID)
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
 		ponrmgr.GEMPORT_ID,
 		GEMPortIDs)
+	RsrcMgr.GemPortIDMgmtLock[onuID].Unlock()
 
+	RsrcMgr.FlowIDMgmtLock.Lock()
 	FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(ctx, IntfOnuIDUniID)
 	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
 		ponrmgr.FLOW_ID,
 		FlowIDs)
+	RsrcMgr.FlowIDMgmtLock.Unlock()
+
 	// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
 	RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
 	// Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
@@ -1230,6 +1272,8 @@
 		logger.Error("Failed to marshal data", log.Fields{"error": err})
 		return err
 	}
+	RsrcMgr.flowIDToGemInfoLock.Lock()
+	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
 	if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
 		logger.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
 		return err
@@ -1260,6 +1304,10 @@
 		logger.Error("Failed to marshal data", log.Fields{"error": err})
 		return
 	}
+
+	RsrcMgr.flowIDToGemInfoLock.Lock()
+	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
+
 	if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
 		logger.Errorw("Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
 		return
@@ -1272,8 +1320,9 @@
 	path := fmt.Sprintf(FlowIDsForGem, intf)
 	var flowsForGem map[uint32][]uint32
 	var val []byte
-
+	RsrcMgr.flowIDToGemInfoLock.RLock()
 	value, err := RsrcMgr.KVStore.Get(ctx, path)
+	RsrcMgr.flowIDToGemInfoLock.RUnlock()
 	if err != nil {
 		logger.Error("failed to get data from kv store")
 		return nil, err
@@ -1294,6 +1343,8 @@
 //DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
 func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
 	path := fmt.Sprintf(FlowIDsForGem, intf)
+	RsrcMgr.flowIDToGemInfoLock.Lock()
+	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
 	if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
 		logger.Errorw("Failed to delete nni interfaces from kv store", log.Fields{"path": path})
 		return