[VOL-4532] Remove duplicate maps in FlowManager and ResourceManager

Change-Id: I0a0fee7dbd3b3a25f2f0eee062bf565ba3212df3
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 818b167..81f7fcb 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -204,20 +204,9 @@
 	grpMgr        *OpenOltGroupMgr
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 
-	gemToFlowIDs    map[uint32][]uint64 // gem port id to flow ids
-	gemToFlowIDsKey sync.RWMutex        // lock to be used to access the gemToFlowIDs map
-
 	packetInGemPort     map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	packetInGemPortLock sync.RWMutex
 
-	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
-	onuGemInfoMap map[uint32]*rsrcMgr.OnuGemInfo //onu, gem and uni info local cache -> map of onuID to OnuGemInfo
-	// We need to have a global lock on the onuGemInfo map
-	onuGemInfoLock sync.RWMutex
-
-	flowIDToGems     map[uint64][]uint32
-	flowIDToGemsLock sync.RWMutex
-
 	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
 	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
 	incomingFlows            []chan flowControlBlock
@@ -239,9 +228,7 @@
 		logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
 		return nil
 	}
-	flowMgr.gemToFlowIDs = make(map[uint32][]uint64)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
-	flowMgr.flowIDToGems = make(map[uint64][]uint32)
 
 	// Create a slice of buffered channels for handling concurrent flows per ONU.
 	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
@@ -257,20 +244,6 @@
 		flowMgr.flowHandlerRoutineActive[i] = true
 		go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
 	}
-	flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
-	//Load the onugem info cache from kv store on flowmanager start
-	onuIDStart := flowMgr.deviceHandler.deviceInfo.OnuIdStart
-	onuIDEnd := flowMgr.deviceHandler.deviceInfo.OnuIdEnd
-	for onuID := onuIDStart; onuID <= onuIDEnd; onuID++ {
-		// check for a valid serial number in onuGem as GetOnuGemInfo can return nil error in case of nothing found in the path.
-		onugem, err := rMgr.GetOnuGemInfo(ctx, ponPortIdx, onuID)
-		if err == nil && onugem != nil && onugem.SerialNumber != "" {
-			flowMgr.onuGemInfoMap[onuID] = onugem
-		}
-	}
-
-	//Load flowID list per gem map And gemIDs per flow per interface from the kvstore.
-	flowMgr.loadFlowIDsForGemAndGemIDsForFlow(ctx)
 
 	//load interface to multicast queue map from kv store
 	flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
@@ -281,11 +254,11 @@
 func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
 	if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
 		// Flow is not replicated in this case, we need to register the flow for a single gem-port
-		return f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+		return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
 	} else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
 		// Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
 		for _, gemPort := range deviceFlow.PbitToGemport {
-			if err := f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+			if err := f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
 				return err
 			}
 		}
@@ -293,31 +266,6 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) registerFlowIDForGemAndGemIDForFlow(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
-	// update gem->flows map
-	f.gemToFlowIDsKey.Lock()
-	flowIDList, ok := f.gemToFlowIDs[gemPortID]
-	if !ok {
-		flowIDList = []uint64{flowFromCore.Id}
-	} else {
-		flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
-	}
-	f.gemToFlowIDs[gemPortID] = flowIDList
-	f.gemToFlowIDsKey.Unlock()
-
-	// update flow->gems map
-	f.flowIDToGemsLock.Lock()
-	if _, ok := f.flowIDToGems[flowFromCore.Id]; !ok {
-		f.flowIDToGems[flowFromCore.Id] = []uint32{gemPortID}
-	} else {
-		f.flowIDToGems[flowFromCore.Id] = appendUnique32bit(f.flowIDToGems[flowFromCore.Id], gemPortID)
-	}
-	f.flowIDToGemsLock.Unlock()
-
-	// update the flowids for a gem to the KVstore
-	return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
-}
-
 func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
 	classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
 	UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) error {
@@ -985,15 +933,17 @@
 			"device-id":   f.deviceHandler.device.Id})
 	/* Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store  */
 	if err := f.resourceMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocID); err != nil {
-		logger.Errorw(ctx, "error-while-uploading-allocid-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
+		logger.Errorw(ctx, "error-while-uploading-allocid-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id, "onuID": onuID, "allocID": allocID})
 	}
 	if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
-		logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
+		logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id, "onuID": onuID, "gemPort": gemPortIDs})
 	}
 
 	logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
 	for _, gemPort := range gemPortIDs {
-		f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
+		if err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort); err != nil {
+			logger.Errorw(ctx, "error-while-uploading-onugeminfos-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id, "onuID": onuID, "gemPort": gemPort})
+		}
 	}
 }
 
@@ -1911,48 +1861,6 @@
 	return nil
 }
 
-// Once the gemport is released for a given onu, it also has to be cleared from local cache
-// which was used for deriving the gemport->logicalPortNo during packet-in.
-// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
-// is conveyed to ONOS during packet-in OF message.
-func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
-	logger.Infow(ctx, "deleting-gem-from-local-cache",
-		log.Fields{
-			"gem-port-id": gemPortID,
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id})
-	f.onuGemInfoLock.RLock()
-	onugem, ok := f.onuGemInfoMap[onuID]
-	f.onuGemInfoLock.RUnlock()
-	if !ok {
-		logger.Warnw(ctx, "onu gem info already cleared from cache", log.Fields{
-			"gem-port-id": gemPortID,
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id})
-		return
-	}
-deleteLoop:
-	for j, gem := range onugem.GemPorts {
-		// If the gemport is found, delete it from local cache.
-		if gem == gemPortID {
-			onugem.GemPorts = append(onugem.GemPorts[:j], onugem.GemPorts[j+1:]...)
-			f.onuGemInfoLock.Lock()
-			f.onuGemInfoMap[onuID] = onugem
-			f.onuGemInfoLock.Unlock()
-			logger.Infow(ctx, "removed-gemport-from-local-cache",
-				log.Fields{
-					"intf-id":           intfID,
-					"onu-id":            onuID,
-					"deletedgemport-id": gemPortID,
-					"gemports":          onugem.GemPorts,
-					"device-id":         f.deviceHandler.device.Id})
-			break deleteLoop
-		}
-	}
-}
-
 //clearResources clears pon resources in kv store and the device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
@@ -1988,20 +1896,15 @@
 	case *tp_pb.TechProfileInstance:
 		for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
 			gemPortID := gemPort.GemportId
-			used := f.isGemPortUsedByAnotherFlow(gemPortID, flowID)
+			used := f.resourceMgr.IsGemPortUsedByAnotherFlow(gemPortID, flowID)
 			if used {
-				f.gemToFlowIDsKey.RLock()
-				flowIDs := f.gemToFlowIDs[gemPortID]
-				f.gemToFlowIDsKey.RUnlock()
-
+				flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, intfID, gemPortID)
+				if err != nil {
+					return err
+				}
 				for i, flowIDinMap := range flowIDs {
 					if flowIDinMap == flowID {
 						flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
-						f.gemToFlowIDsKey.Lock()
-						f.gemToFlowIDs[gemPortID] = flowIDs
-						f.gemToFlowIDsKey.Unlock()
-						// everytime gemToFlowIDs cache is updated the same should be updated
-						// in kv store by calling UpdateFlowIDsForGem
 						if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, gemPortID, flowIDs); err != nil {
 							return err
 						}
@@ -2036,16 +1939,9 @@
 
 		for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
 			gemPortID := gemPort.GemportId
-			f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), gemPortID)
 			_ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), gemPortID) // ignore error and proceed.
 
-			if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err == nil {
-				//everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
-				// by calling DeleteFlowIDsForGem
-				f.gemToFlowIDsKey.Lock()
-				delete(f.gemToFlowIDs, gemPortID)
-				f.gemToFlowIDsKey.Unlock()
-			} else {
+			if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err != nil {
 				logger.Errorw(ctx, "error-removing-flow-ids-of-gem-port",
 					log.Fields{
 						"err":        err,
@@ -2214,11 +2110,6 @@
 		return err
 	}
 
-	// Delete the flow-id to gemport list entry from the map now the flow is deleted.
-	f.flowIDToGemsLock.Lock()
-	delete(f.flowIDToGems, flow.Id)
-	f.flowIDToGemsLock.Unlock()
-
 	if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID, true); err != nil {
 		logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 			"flow-id":   flow.Id,
@@ -2440,24 +2331,6 @@
 	}
 
 	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
-	// also update flowmgr cache
-	f.onuGemInfoLock.Lock()
-	onugem, ok := f.onuGemInfoMap[onuID]
-	if ok {
-		found := false
-		for _, uni := range onugem.UniPorts {
-			if uni == portNo {
-				found = true
-				break
-			}
-		}
-		if !found {
-			onugem.UniPorts = append(onugem.UniPorts, portNo)
-			f.onuGemInfoMap[onuID] = onugem
-			logger.Infow(ctx, "added uni port to onugem cache", log.Fields{"uni": portNo})
-		}
-	}
-	f.onuGemInfoLock.Unlock()
 
 	tpID, err := getTpIDFromFlow(ctx, flow)
 	if err != nil {
@@ -2596,118 +2469,6 @@
 	return nil
 }
 
-//AddOnuInfoToFlowMgrCacheAndKvStore function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) AddOnuInfoToFlowMgrCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
-
-	f.onuGemInfoLock.RLock()
-	_, ok := f.onuGemInfoMap[onuID]
-	f.onuGemInfoLock.RUnlock()
-	// If the ONU already exists in onuGemInfo list, nothing to do
-	if ok {
-		logger.Debugw(ctx, "onu-id-already-exists-in-cache",
-			log.Fields{"onuID": onuID,
-				"serialNum": serialNum})
-		return nil
-	}
-
-	onuGemInfo := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
-	f.onuGemInfoLock.Lock()
-	f.onuGemInfoMap[onuID] = &onuGemInfo
-	f.onuGemInfoLock.Unlock()
-	if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
-		return err
-	}
-	logger.Infow(ctx, "added-onuinfo",
-		log.Fields{
-			"intf-id":    intfID,
-			"onu-id":     onuID,
-			"serial-num": serialNum,
-			"onu":        onuGemInfo,
-			"device-id":  f.deviceHandler.device.Id})
-	return nil
-}
-
-//RemoveOnuInfoFromFlowMgrCacheAndKvStore function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) RemoveOnuInfoFromFlowMgrCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32) error {
-
-	f.onuGemInfoLock.Lock()
-	delete(f.onuGemInfoMap, onuID)
-	f.onuGemInfoLock.Unlock()
-
-	if err := f.resourceMgr.DelOnuGemInfo(ctx, intfID, onuID); err != nil {
-		return err
-	}
-	logger.Infow(ctx, "deleted-onuinfo",
-		log.Fields{
-			"intf-id":   intfID,
-			"onu-id":    onuID,
-			"device-id": f.deviceHandler.device.Id})
-	return nil
-}
-
-//addGemPortToOnuInfoMap function adds GEMport to ONU map
-func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
-
-	logger.Infow(ctx, "adding-gem-to-onu-info-map",
-		log.Fields{
-			"gem-port-id": gemPort,
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id})
-	f.onuGemInfoLock.RLock()
-	onugem, ok := f.onuGemInfoMap[onuID]
-	f.onuGemInfoLock.RUnlock()
-	if !ok {
-		logger.Warnw(ctx, "onu gem info is missing", log.Fields{
-			"gem-port-id": gemPort,
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id})
-		return
-	}
-
-	if onugem.OnuID == onuID {
-		// check if gem already exists , else update the cache and kvstore
-		for _, gem := range onugem.GemPorts {
-			if gem == gemPort {
-				logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
-					log.Fields{
-						"gem":       gemPort,
-						"device-id": f.deviceHandler.device.Id})
-				return
-			}
-		}
-		onugem.GemPorts = append(onugem.GemPorts, gemPort)
-		f.onuGemInfoLock.Lock()
-		f.onuGemInfoMap[onuID] = onugem
-		f.onuGemInfoLock.Unlock()
-		logger.Debugw(ctx, "updated onu gem info from cache", log.Fields{"onugem": onugem})
-	} else {
-		logger.Warnw(ctx, "mismatched onu id", log.Fields{
-			"gem-port-id": gemPort,
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id})
-		return
-	}
-	err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
-	if err != nil {
-		logger.Errorw(ctx, "failed-to-add-gem-to-onu",
-			log.Fields{
-				"intf-id":   intfID,
-				"onu-id":    onuID,
-				"gemPort":   gemPort,
-				"device-id": f.deviceHandler.device.Id})
-		return
-	}
-	logger.Infow(ctx, "gem-added-to-onu-info-map",
-		log.Fields{
-			"gem-port-id": gemPort,
-			"intf-id":     intfID,
-			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id})
-}
-
 //GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
 func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
 	var logicalPortNum uint32
@@ -3128,20 +2889,6 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32, flowID uint64) bool {
-	f.gemToFlowIDsKey.RLock()
-	flowIDList := f.gemToFlowIDs[gemPortID]
-	f.gemToFlowIDsKey.RUnlock()
-	if len(flowIDList) > 0 {
-		for _, id := range flowIDList {
-			if flowID != id {
-				return true
-			}
-		}
-	}
-	return false
-}
-
 func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {
 	tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
 	if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
@@ -3357,15 +3104,6 @@
 	return uint32(TpID), nil
 }
 
-func appendUnique64bit(slice []uint64, item uint64) []uint64 {
-	for _, sliceElement := range slice {
-		if sliceElement == item {
-			return slice
-		}
-	}
-	return append(slice, item)
-}
-
 func appendUnique32bit(slice []uint32, item uint32) []uint32 {
 	for _, sliceElement := range slice {
 		if sliceElement == item {
@@ -3466,32 +3204,6 @@
 	return 0, 0, nil
 }
 
-func (f *OpenOltFlowMgr) loadFlowIDsForGemAndGemIDsForFlow(ctx context.Context) {
-	logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - start")
-	f.onuGemInfoLock.RLock()
-	f.gemToFlowIDsKey.Lock()
-	f.flowIDToGemsLock.Lock()
-	for _, og := range f.onuGemInfoMap {
-		for _, gem := range og.GemPorts {
-			flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, f.ponPortIdx, gem)
-			if err == nil {
-				f.gemToFlowIDs[gem] = flowIDs
-				for _, flowID := range flowIDs {
-					if _, ok := f.flowIDToGems[flowID]; !ok {
-						f.flowIDToGems[flowID] = []uint32{gem}
-					} else {
-						f.flowIDToGems[flowID] = appendUnique32bit(f.flowIDToGems[flowID], gem)
-					}
-				}
-			}
-		}
-	}
-	f.flowIDToGemsLock.Unlock()
-	f.gemToFlowIDsKey.Unlock()
-	f.onuGemInfoLock.RUnlock()
-	logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - end")
-}
-
 //clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
 // clears resources reserved for this multicast flow
 func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
@@ -3548,16 +3260,6 @@
 	}, nil
 }
 
-func (f *OpenOltFlowMgr) getOnuGemInfoList(ctx context.Context) []rsrcMgr.OnuGemInfo {
-	var onuGemInfoLst []rsrcMgr.OnuGemInfo
-	f.onuGemInfoLock.RLock()
-	defer f.onuGemInfoLock.RUnlock()
-	for _, v := range f.onuGemInfoMap {
-		onuGemInfoLst = append(onuGemInfoLst, *v)
-	}
-	return onuGemInfoLst
-}
-
 // revertTechProfileInstance is called when CreateScheduler or CreateQueues request fails
 func (f *OpenOltFlowMgr) revertTechProfileInstance(ctx context.Context, sq schedQueue) {