[VOL-4532] Remove duplicate maps in FlowManager and ResourceManager
Change-Id: I0a0fee7dbd3b3a25f2f0eee062bf565ba3212df3
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 818b167..81f7fcb 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -204,20 +204,9 @@
grpMgr *OpenOltGroupMgr
resourceMgr *rsrcMgr.OpenOltResourceMgr
- gemToFlowIDs map[uint32][]uint64 // gem port id to flow ids
- gemToFlowIDsKey sync.RWMutex // lock to be used to access the gemToFlowIDs map
-
packetInGemPort map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
packetInGemPortLock sync.RWMutex
- // TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
- onuGemInfoMap map[uint32]*rsrcMgr.OnuGemInfo //onu, gem and uni info local cache -> map of onuID to OnuGemInfo
- // We need to have a global lock on the onuGemInfo map
- onuGemInfoLock sync.RWMutex
-
- flowIDToGems map[uint64][]uint32
- flowIDToGemsLock sync.RWMutex
-
// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
incomingFlows []chan flowControlBlock
@@ -239,9 +228,7 @@
logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
return nil
}
- flowMgr.gemToFlowIDs = make(map[uint32][]uint64)
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
- flowMgr.flowIDToGems = make(map[uint64][]uint32)
// Create a slice of buffered channels for handling concurrent flows per ONU.
// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
@@ -257,20 +244,6 @@
flowMgr.flowHandlerRoutineActive[i] = true
go flowMgr.perOnuFlowHandlerRoutine(i, flowMgr.incomingFlows[i], flowMgr.stopFlowHandlerRoutine[i])
}
- flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
- //Load the onugem info cache from kv store on flowmanager start
- onuIDStart := flowMgr.deviceHandler.deviceInfo.OnuIdStart
- onuIDEnd := flowMgr.deviceHandler.deviceInfo.OnuIdEnd
- for onuID := onuIDStart; onuID <= onuIDEnd; onuID++ {
- // check for a valid serial number in onuGem as GetOnuGemInfo can return nil error in case of nothing found in the path.
- onugem, err := rMgr.GetOnuGemInfo(ctx, ponPortIdx, onuID)
- if err == nil && onugem != nil && onugem.SerialNumber != "" {
- flowMgr.onuGemInfoMap[onuID] = onugem
- }
- }
-
- //Load flowID list per gem map And gemIDs per flow per interface from the kvstore.
- flowMgr.loadFlowIDsForGemAndGemIDsForFlow(ctx)
//load interface to multicast queue map from kv store
flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
@@ -281,11 +254,11 @@
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
- return f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+ return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
} else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
// Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
for _, gemPort := range deviceFlow.PbitToGemport {
- if err := f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+ if err := f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
return err
}
}
@@ -293,31 +266,6 @@
return nil
}
-func (f *OpenOltFlowMgr) registerFlowIDForGemAndGemIDForFlow(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
- // update gem->flows map
- f.gemToFlowIDsKey.Lock()
- flowIDList, ok := f.gemToFlowIDs[gemPortID]
- if !ok {
- flowIDList = []uint64{flowFromCore.Id}
- } else {
- flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
- }
- f.gemToFlowIDs[gemPortID] = flowIDList
- f.gemToFlowIDsKey.Unlock()
-
- // update flow->gems map
- f.flowIDToGemsLock.Lock()
- if _, ok := f.flowIDToGems[flowFromCore.Id]; !ok {
- f.flowIDToGems[flowFromCore.Id] = []uint32{gemPortID}
- } else {
- f.flowIDToGems[flowFromCore.Id] = appendUnique32bit(f.flowIDToGems[flowFromCore.Id], gemPortID)
- }
- f.flowIDToGemsLock.Unlock()
-
- // update the flowids for a gem to the KVstore
- return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
-}
-
func (f *OpenOltFlowMgr) processAddFlow(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, portNo uint32,
classifierInfo map[string]interface{}, actionInfo map[string]interface{}, flow *ofp.OfpFlowStats, TpID uint32,
UsMeterID uint32, DsMeterID uint32, flowMetadata *ofp.FlowMetadata) error {
@@ -985,15 +933,17 @@
"device-id": f.deviceHandler.device.Id})
/* Update the allocated alloc_id and gem_port_id for the ONU/UNI to KV store */
if err := f.resourceMgr.UpdateAllocIdsForOnu(ctx, intfID, onuID, uniID, allocID); err != nil {
- logger.Errorw(ctx, "error-while-uploading-allocid-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
+ logger.Errorw(ctx, "error-while-uploading-allocid-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id, "onuID": onuID, "allocID": allocID})
}
if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
- logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
+ logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id, "onuID": onuID, "gemPort": gemPortIDs})
}
logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
for _, gemPort := range gemPortIDs {
- f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
+ if err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort); err != nil {
+ logger.Errorw(ctx, "error-while-uploading-onugeminfos-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id, "onuID": onuID, "gemPort": gemPort})
+ }
}
}
@@ -1911,48 +1861,6 @@
return nil
}
-// Once the gemport is released for a given onu, it also has to be cleared from local cache
-// which was used for deriving the gemport->logicalPortNo during packet-in.
-// Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
-// is conveyed to ONOS during packet-in OF message.
-func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
- logger.Infow(ctx, "deleting-gem-from-local-cache",
- log.Fields{
- "gem-port-id": gemPortID,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- f.onuGemInfoLock.RLock()
- onugem, ok := f.onuGemInfoMap[onuID]
- f.onuGemInfoLock.RUnlock()
- if !ok {
- logger.Warnw(ctx, "onu gem info already cleared from cache", log.Fields{
- "gem-port-id": gemPortID,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- return
- }
-deleteLoop:
- for j, gem := range onugem.GemPorts {
- // If the gemport is found, delete it from local cache.
- if gem == gemPortID {
- onugem.GemPorts = append(onugem.GemPorts[:j], onugem.GemPorts[j+1:]...)
- f.onuGemInfoLock.Lock()
- f.onuGemInfoMap[onuID] = onugem
- f.onuGemInfoLock.Unlock()
- logger.Infow(ctx, "removed-gemport-from-local-cache",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "deletedgemport-id": gemPortID,
- "gemports": onugem.GemPorts,
- "device-id": f.deviceHandler.device.Id})
- break deleteLoop
- }
- }
-}
-
//clearResources clears pon resources in kv store and the device
// nolint: gocyclo
func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
@@ -1988,20 +1896,15 @@
case *tp_pb.TechProfileInstance:
for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
gemPortID := gemPort.GemportId
- used := f.isGemPortUsedByAnotherFlow(gemPortID, flowID)
+ used := f.resourceMgr.IsGemPortUsedByAnotherFlow(gemPortID, flowID)
if used {
- f.gemToFlowIDsKey.RLock()
- flowIDs := f.gemToFlowIDs[gemPortID]
- f.gemToFlowIDsKey.RUnlock()
-
+ flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, intfID, gemPortID)
+ if err != nil {
+ return err
+ }
for i, flowIDinMap := range flowIDs {
if flowIDinMap == flowID {
flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
- f.gemToFlowIDsKey.Lock()
- f.gemToFlowIDs[gemPortID] = flowIDs
- f.gemToFlowIDsKey.Unlock()
- // everytime gemToFlowIDs cache is updated the same should be updated
- // in kv store by calling UpdateFlowIDsForGem
if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, gemPortID, flowIDs); err != nil {
return err
}
@@ -2036,16 +1939,9 @@
for _, gemPort := range techprofileInst.UpstreamGemPortAttributeList {
gemPortID := gemPort.GemportId
- f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), gemPortID)
_ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), gemPortID) // ignore error and proceed.
- if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err == nil {
- //everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
- // by calling DeleteFlowIDsForGem
- f.gemToFlowIDsKey.Lock()
- delete(f.gemToFlowIDs, gemPortID)
- f.gemToFlowIDsKey.Unlock()
- } else {
+ if err := f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, gemPortID); err != nil {
logger.Errorw(ctx, "error-removing-flow-ids-of-gem-port",
log.Fields{
"err": err,
@@ -2214,11 +2110,6 @@
return err
}
- // Delete the flow-id to gemport list entry from the map now the flow is deleted.
- f.flowIDToGemsLock.Lock()
- delete(f.flowIDToGems, flow.Id)
- f.flowIDToGemsLock.Unlock()
-
if err = f.clearResources(ctx, Intf, onuID, uniID, flow.Id, portNum, tpID, true); err != nil {
logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
"flow-id": flow.Id,
@@ -2440,24 +2331,6 @@
}
f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNo)
- // also update flowmgr cache
- f.onuGemInfoLock.Lock()
- onugem, ok := f.onuGemInfoMap[onuID]
- if ok {
- found := false
- for _, uni := range onugem.UniPorts {
- if uni == portNo {
- found = true
- break
- }
- }
- if !found {
- onugem.UniPorts = append(onugem.UniPorts, portNo)
- f.onuGemInfoMap[onuID] = onugem
- logger.Infow(ctx, "added uni port to onugem cache", log.Fields{"uni": portNo})
- }
- }
- f.onuGemInfoLock.Unlock()
tpID, err := getTpIDFromFlow(ctx, flow)
if err != nil {
@@ -2596,118 +2469,6 @@
return nil
}
-//AddOnuInfoToFlowMgrCacheAndKvStore function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) AddOnuInfoToFlowMgrCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
-
- f.onuGemInfoLock.RLock()
- _, ok := f.onuGemInfoMap[onuID]
- f.onuGemInfoLock.RUnlock()
- // If the ONU already exists in onuGemInfo list, nothing to do
- if ok {
- logger.Debugw(ctx, "onu-id-already-exists-in-cache",
- log.Fields{"onuID": onuID,
- "serialNum": serialNum})
- return nil
- }
-
- onuGemInfo := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
- f.onuGemInfoLock.Lock()
- f.onuGemInfoMap[onuID] = &onuGemInfo
- f.onuGemInfoLock.Unlock()
- if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
- return err
- }
- logger.Infow(ctx, "added-onuinfo",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "serial-num": serialNum,
- "onu": onuGemInfo,
- "device-id": f.deviceHandler.device.Id})
- return nil
-}
-
-//RemoveOnuInfoFromFlowMgrCacheAndKvStore function adds onu info to cache and kvstore
-func (f *OpenOltFlowMgr) RemoveOnuInfoFromFlowMgrCacheAndKvStore(ctx context.Context, intfID uint32, onuID uint32) error {
-
- f.onuGemInfoLock.Lock()
- delete(f.onuGemInfoMap, onuID)
- f.onuGemInfoLock.Unlock()
-
- if err := f.resourceMgr.DelOnuGemInfo(ctx, intfID, onuID); err != nil {
- return err
- }
- logger.Infow(ctx, "deleted-onuinfo",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- return nil
-}
-
-//addGemPortToOnuInfoMap function adds GEMport to ONU map
-func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
-
- logger.Infow(ctx, "adding-gem-to-onu-info-map",
- log.Fields{
- "gem-port-id": gemPort,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- f.onuGemInfoLock.RLock()
- onugem, ok := f.onuGemInfoMap[onuID]
- f.onuGemInfoLock.RUnlock()
- if !ok {
- logger.Warnw(ctx, "onu gem info is missing", log.Fields{
- "gem-port-id": gemPort,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- return
- }
-
- if onugem.OnuID == onuID {
- // check if gem already exists , else update the cache and kvstore
- for _, gem := range onugem.GemPorts {
- if gem == gemPort {
- logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
- log.Fields{
- "gem": gemPort,
- "device-id": f.deviceHandler.device.Id})
- return
- }
- }
- onugem.GemPorts = append(onugem.GemPorts, gemPort)
- f.onuGemInfoLock.Lock()
- f.onuGemInfoMap[onuID] = onugem
- f.onuGemInfoLock.Unlock()
- logger.Debugw(ctx, "updated onu gem info from cache", log.Fields{"onugem": onugem})
- } else {
- logger.Warnw(ctx, "mismatched onu id", log.Fields{
- "gem-port-id": gemPort,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
- return
- }
- err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
- if err != nil {
- logger.Errorw(ctx, "failed-to-add-gem-to-onu",
- log.Fields{
- "intf-id": intfID,
- "onu-id": onuID,
- "gemPort": gemPort,
- "device-id": f.deviceHandler.device.Id})
- return
- }
- logger.Infow(ctx, "gem-added-to-onu-info-map",
- log.Fields{
- "gem-port-id": gemPort,
- "intf-id": intfID,
- "onu-id": onuID,
- "device-id": f.deviceHandler.device.Id})
-}
-
//GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
var logicalPortNum uint32
@@ -3128,20 +2889,6 @@
return nil
}
-func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32, flowID uint64) bool {
- f.gemToFlowIDsKey.RLock()
- flowIDList := f.gemToFlowIDs[gemPortID]
- f.gemToFlowIDsKey.RUnlock()
- if len(flowIDList) > 0 {
- for _, id := range flowIDList {
- if flowID != id {
- return true
- }
- }
- }
- return false
-}
-
func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {
tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
@@ -3357,15 +3104,6 @@
return uint32(TpID), nil
}
-func appendUnique64bit(slice []uint64, item uint64) []uint64 {
- for _, sliceElement := range slice {
- if sliceElement == item {
- return slice
- }
- }
- return append(slice, item)
-}
-
func appendUnique32bit(slice []uint32, item uint32) []uint32 {
for _, sliceElement := range slice {
if sliceElement == item {
@@ -3466,32 +3204,6 @@
return 0, 0, nil
}
-func (f *OpenOltFlowMgr) loadFlowIDsForGemAndGemIDsForFlow(ctx context.Context) {
- logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - start")
- f.onuGemInfoLock.RLock()
- f.gemToFlowIDsKey.Lock()
- f.flowIDToGemsLock.Lock()
- for _, og := range f.onuGemInfoMap {
- for _, gem := range og.GemPorts {
- flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, f.ponPortIdx, gem)
- if err == nil {
- f.gemToFlowIDs[gem] = flowIDs
- for _, flowID := range flowIDs {
- if _, ok := f.flowIDToGems[flowID]; !ok {
- f.flowIDToGems[flowID] = []uint32{gem}
- } else {
- f.flowIDToGems[flowID] = appendUnique32bit(f.flowIDToGems[flowID], gem)
- }
- }
- }
- }
- }
- f.flowIDToGemsLock.Unlock()
- f.gemToFlowIDsKey.Unlock()
- f.onuGemInfoLock.RUnlock()
- logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - end")
-}
-
//clearMulticastFlowFromResourceManager removes a multicast flow from the KV store and
// clears resources reserved for this multicast flow
func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
@@ -3548,16 +3260,6 @@
}, nil
}
-func (f *OpenOltFlowMgr) getOnuGemInfoList(ctx context.Context) []rsrcMgr.OnuGemInfo {
- var onuGemInfoLst []rsrcMgr.OnuGemInfo
- f.onuGemInfoLock.RLock()
- defer f.onuGemInfoLock.RUnlock()
- for _, v := range f.onuGemInfoMap {
- onuGemInfoLst = append(onuGemInfoLst, *v)
- }
- return onuGemInfoLst
-}
-
// revertTechProfileInstance is called when CreateScheduler or CreateQueues request fails
func (f *OpenOltFlowMgr) revertTechProfileInstance(ctx context.Context, sq schedQueue) {