[VOL-4531] - isFlowOnKVStore refactor
removed unused map (FlowIDsForOnu)
store nni trap flowIds into KV store with path intf=-1 and gem=-1
Change-Id: I14568f3558957329ca3b0bad8330d6fe22bbecfc
Change-Id: Iece645414a494b248bb463e2837427007a3ee67f
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index c17d92e..7e6ef2c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -944,11 +944,14 @@
}
dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
-
- dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts)
- dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+ // +1 is for NNI
+ dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts+1)
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
var i uint32
- for i = 0; i < dh.totalPonPorts; i++ {
+ // Index from 0 to until totalPonPorts ( Ex: 0 .. 15 ) -> PonPort Managers
+ // Index totalPonPorts ( Ex: 16 ) -> NniPort Manager
+ // There is only one NNI manager since multiple NNI is not supported for now
+ for i = 0; i < dh.totalPonPorts+1; i++ {
// Instantiate resource manager
if dh.resourceMgr[i] = rsrcMgr.NewResourceMgr(ctx, i, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, dh.deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr[i] == nil {
return olterrors.ErrResourceManagerInstantiating
@@ -959,7 +962,7 @@
if dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0]); dh.groupMgr == nil {
return olterrors.ErrGroupManagerInstantiating
}
- for i = 0; i < dh.totalPonPorts; i++ {
+ for i = 0; i < dh.totalPonPorts+1; i++ {
// Instantiate flow manager
if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, i); dh.flowMgr[i] == nil {
return olterrors.ErrFlowManagerInstantiating
@@ -1730,16 +1733,16 @@
if flows != nil {
for _, flow := range flows.ToRemove.Items {
- ponIf := dh.getPonIfFromFlow(flow)
+ intfID := dh.getIntfIDFromFlow(ctx, flow)
logger.Debugw(ctx, "removing-flow",
log.Fields{"device-id": device.Id,
- "ponIf": ponIf,
+ "intfId": intfID,
"flowToRemove": flow})
if flow_utils.HasGroup(flow) {
err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupRemove)
} else {
- err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+ err = dh.flowMgr[intfID].RouteFlowToOnuChannel(ctx, flow, false, nil)
}
if err != nil {
errorsList = append(errorsList, err)
@@ -1747,20 +1750,20 @@
}
for _, flow := range flows.ToAdd.Items {
- ponIf := dh.getPonIfFromFlow(flow)
+ intfID := dh.getIntfIDFromFlow(ctx, flow)
logger.Debugw(ctx, "adding-flow",
log.Fields{"device-id": device.Id,
- "ponIf": ponIf,
+ "ponIf": intfID,
"flowToAdd": flow})
if flow_utils.HasGroup(flow) {
err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
} else {
- if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+ if dh.flowMgr == nil || dh.flowMgr[intfID] == nil {
// The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
} else {
- err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ err = dh.flowMgr[intfID].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
}
}
if err != nil {
@@ -2051,6 +2054,7 @@
logger.Debug(ctx, err)
}
}
+ _ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx, dh.totalPonPorts)
}
/*Delete ONU map for the device*/
@@ -2612,9 +2616,9 @@
return resp, nil
}
-func (dh *DeviceHandler) getPonIfFromFlow(flow *of.OfpFlowStats) uint32 {
- // Default to PON0
- var intfID uint32
+func (dh *DeviceHandler) getIntfIDFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+ // Default to NNI
+ var intfID = dh.totalPonPorts
inPort, outPort := getPorts(flow)
if inPort != InvalidPort && outPort != InvalidPort {
_, intfID, _, _ = plt.ExtractAccessFromFlow(inPort, outPort)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 867f821..ea40a5c 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -178,9 +178,9 @@
deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
dh.deviceInfo = deviceInf
dh.device = device
- dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts)
+ dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts+1)
var i uint32
- for i = 0; i < deviceInf.PonPorts; i++ {
+ for i = 0; i < deviceInf.PonPorts+1; i++ {
dh.resourceMgr[i] = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
KVStore: &db.Backend{
StoreType: "etcd",
@@ -225,8 +225,8 @@
defer cancel()
dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0])
dh.totalPonPorts = NumPonPorts
- dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i = 0; i < dh.totalPonPorts; i++ {
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
+ for i = 0; i < dh.totalPonPorts+1; i++ {
dh.flowMgr[i] = &OpenOltFlowMgr{}
dh.flowMgr[i].deviceHandler = dh
dh.flowMgr[i].ponPortIdx = i
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 81f7fcb..41988ec 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -224,9 +224,12 @@
flowMgr.ponPortIdx = ponPortIdx
flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
- if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
- logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
- return nil
+ // dh.totalPonPorts is reserved for NNI trap flows. It doesn't need a tech profile
+ if ponPortIdx != dh.totalPonPorts {
+ if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
+ logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
+ return nil
+ }
}
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
@@ -252,6 +255,10 @@
}
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
+ // In case of nni trap flow
+ if deviceFlow.AccessIntfId == -1 {
+ return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.AccessIntfId), flowFromCore)
+ }
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
@@ -1048,14 +1055,20 @@
"gemport-id": gemPortID,
"logicalflow": *logicalFlow})
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-already-exists",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addSymmetricDataPathFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
+
classifierProto, err := makeOpenOltClassifierField(classifier)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
@@ -1141,13 +1154,18 @@
classifier[UDPDst] = uint32(67)
classifier[PacketTagType] = SingleTag
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-exists--not-re-adding",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addDHCPTrapFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
logger.Debugw(ctx, "creating-ul-dhcp-flow",
@@ -1234,9 +1252,14 @@
action[TrapToHost] = true
classifier[PacketTagType] = SingleTag
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addUpstreamTrapFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
logger.Debugw(ctx, "creating-upstream-trap-flow",
@@ -1320,13 +1343,18 @@
uplinkClassifier[VlanPcp] = classifier[VlanPcp]
// Fill action
uplinkAction[TrapToHost] = true
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
"intf-id": intfID,
"ethType": ethType})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addEthTypeBasedFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
//Add Uplink EthType Flow
logger.Debugw(ctx, "creating-ul-ethType-flow",
@@ -1596,13 +1624,9 @@
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID})
- // Case of trap-on-nni flow when deviceFlow.AccessIntfId is invalid (-1)
- if deviceFlow.AccessIntfId != -1 {
- // No need to register the flow if it is a trap on nni flow.
- if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
- logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
- return err
- }
+ if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
+ logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
+ return err
}
return nil
}
@@ -1663,9 +1687,15 @@
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
}
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), flow.Id)
+ if present {
logger.Infow(ctx, "flow-exists--not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addLLDPFlow--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+ return err
}
classifierProto, err := makeOpenOltClassifierField(classifierInfo)
@@ -2380,12 +2410,17 @@
delete(classifierInfo, EthType)
onuID := NoneOnuID
- uniID := NoneUniID
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), flow.Id)
+ if present {
logger.Infow(ctx, "multicast-flow-exists-not-re-adding", log.Fields{"classifier-info": classifierInfo})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-handleFlowWithGroup--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+ return err
}
+
classifierProto, err := makeOpenOltClassifierField(classifierInfo)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err)
@@ -2573,9 +2608,14 @@
err)
}
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+ if present {
logger.Info(ctx, "flow-exists-not-re-adding")
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addTrapFlowOnNNI--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
logger.Debugw(ctx, "creating-trap-of-nni-flow",
@@ -2669,9 +2709,15 @@
"action": action},
err)
}
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+ if present {
logger.Info(ctx, "igmp-flow-exists-not-re-adding")
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addIgmpTrapFlowOnNNI--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
classifierProto, err := makeOpenOltClassifierField(classifier)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index a880297..218ecf5 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -48,7 +48,7 @@
// onuGemInfoMap := make([]rsrcMgr.onuGemInfoMap, NumPonPorts)
var i uint32
- for i = 0; i < NumPonPorts; i++ {
+ for i = 0; i < NumPonPorts+1; i++ {
packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
@@ -1401,6 +1401,7 @@
type args struct {
ctx context.Context
+ intfID int32
flow *ofp.OfpFlowStats
addFlow bool
flowMetadata *ofp.FlowMetadata
@@ -1415,6 +1416,7 @@
name: "RouteFlowToOnuChannel-0",
args: args{
ctx: ctx,
+ intfID: NumPonPorts,
flow: flow0,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1425,6 +1427,7 @@
name: "RouteFlowToOnuChannel-1",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow1,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1435,6 +1438,7 @@
name: "RouteFlowToOnuChannel-2",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow2,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1445,6 +1449,7 @@
name: "RouteFlowToOnuChannel-3",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow3,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1455,6 +1460,7 @@
name: "RouteFlowToOnuChannel-4",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow4,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1465,6 +1471,7 @@
name: "RouteFlowToOnuChannel-5",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow1,
addFlow: false,
flowMetadata: &flowMetadata1,
@@ -1475,6 +1482,7 @@
name: "RouteFlowToOnuChannel-6",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow1,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1485,6 +1493,7 @@
name: "RouteFlowToOnuChannel-7",
args: args{
ctx: ctx,
+ intfID: 15,
flow: flow5,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1495,6 +1504,7 @@
name: "RouteFlowToOnuChannel-8",
args: args{
ctx: ctx,
+ intfID: 15,
flow: flow6,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1505,6 +1515,7 @@
name: "RouteFlowToOnuChannel-9",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow7,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1515,6 +1526,7 @@
name: "RouteFlowToOnuChannel-10",
args: args{
ctx: ctx,
+ intfID: 15,
flow: flow8,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1525,6 +1537,7 @@
name: "RouteFlowToOnuChannel-11", // Test Remove trap-from-nni LLDP flow
args: args{
ctx: ctx,
+ intfID: NumPonPorts,
flow: flow0,
addFlow: false,
flowMetadata: &flowMetadata1,
@@ -1543,7 +1556,7 @@
time.Sleep(5 * time.Millisecond)
t.Run(tt.name, func(t *testing.T) {
defer wg.Done()
- tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+ tt.returnedErr = flowMgr[tt.args.intfID].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index cfc755e..301e2e9 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -82,6 +82,9 @@
//OnuGemInfoPath is path on the kvstore to store onugem info map
//format: onu_gem_info/<intfid>/<onu_id>
OnuGemInfoPath = OnuGemInfoPathPathPrefix + "/{%d}"
+
+ // NNI uint32 version of -1 which represents the NNI port
+ NNI = 4294967295
)
// FlowInfo holds the flow information
@@ -133,9 +136,6 @@
PonRsrMgr *ponrmgr.PONResourceManager
// Local maps used for write-through-cache - start
- flowIDsForOnu map[string][]uint64
- flowIDsForOnuLock sync.RWMutex
-
allocIDsForOnu map[string][]uint32
allocIDsForOnuLock sync.RWMutex
@@ -253,7 +253,6 @@
//InitLocalCache initializes local maps used for write-through-cache
func (rsrcMgr *OpenOltResourceMgr) InitLocalCache() {
- rsrcMgr.flowIDsForOnu = make(map[string][]uint64)
rsrcMgr.allocIDsForOnu = make(map[string][]uint32)
rsrcMgr.gemPortIDsForOnu = make(map[string][]uint32)
rsrcMgr.techProfileIDsForOnu = make(map[string][]uint32)
@@ -386,42 +385,6 @@
return 0, fmt.Errorf("no-onu-id-allocated")
}
-// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (rsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PonIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
-
- subs := fmt.Sprintf("%d,%d,%d", PonIntfID, onuID, uniID)
- path := fmt.Sprintf(FlowIDPath, subs)
-
- // fetch from cache
- rsrcMgr.flowIDsForOnuLock.RLock()
- flowIDsForOnu, ok := rsrcMgr.flowIDsForOnu[path]
- rsrcMgr.flowIDsForOnuLock.RUnlock()
-
- if ok {
- return flowIDsForOnu, nil
- }
-
- var data []uint64
- value, err := rsrcMgr.KVStore.Get(ctx, path)
- if err == nil {
- if value != nil {
- Val, _ := toByte(value.Value)
- if err = json.Unmarshal(Val, &data); err != nil {
- logger.Error(ctx, "Failed to unmarshal")
- return nil, err
- }
- }
- }
- // update cache
- rsrcMgr.flowIDsForOnuLock.Lock()
- rsrcMgr.flowIDsForOnu[path] = data
- rsrcMgr.flowIDsForOnuLock.Unlock()
-
- return data, nil
-}
-
// UpdateAllocIdsForOnu updates alloc ids in kv store for a given pon interface id, onu id and uni id
func (rsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocIDs []uint32) error {
@@ -639,23 +602,44 @@
// IsFlowOnKvStore checks if the given flowID is present on the kv store
// Returns true if the flowID is found, otherwise it returns false
-func (rsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, intfID uint32, onuID int32, uniID int32,
- flowID uint64) bool {
+func (rsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, intfID uint32, onuID int32, flowID uint64) (bool, error) {
+ var anyError error
- FlowIDs, err := rsrcMgr.GetCurrentFlowIDsForOnu(ctx, intfID, onuID, uniID)
- if err != nil {
- // error logged in the called function
- return false
- }
- if FlowIDs != nil {
- logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
- for _, id := range FlowIDs {
+ // In case of nni trap flow
+ if onuID == -1 {
+ nniTrapflowIDs, err := rsrcMgr.GetFlowIDsForGem(ctx, NNI, NNI)
+ if err != nil {
+ logger.Warnw(ctx, "failed-to-get-nni-trap-flowIDs", log.Fields{"err": err})
+ return false, err
+ }
+ for _, id := range nniTrapflowIDs {
if flowID == id {
- return true
+ return true, nil
}
}
}
- return false
+
+ path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+ rsrcMgr.onuGemInfoLock.RLock()
+ val, ok := rsrcMgr.onuGemInfo[path]
+ rsrcMgr.onuGemInfoLock.RUnlock()
+
+ if ok {
+ for _, gem := range val.GemPorts {
+ flowIDs, err := rsrcMgr.GetFlowIDsForGem(ctx, intfID, gem)
+ if err != nil {
+ anyError = err
+ logger.Warnw(ctx, "failed-to-get-flowIDs-for-gem", log.Fields{"err": err, "onuID": onuID, "gem": gem})
+ } else {
+ for _, id := range flowIDs {
+ if flowID == id {
+ return true, nil
+ }
+ }
+ }
+ }
+ }
+ return false, anyError
}
// GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
@@ -1228,7 +1212,7 @@
if err != nil {
logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
return nil, err
- } else if value == nil {
+ } else if value == nil || value.Value == nil {
logger.Debug(ctx, "no flow-ids found", log.Fields{"path": path})
return nil, nil
}
@@ -1326,6 +1310,9 @@
//DeleteAllFlowIDsForGemForIntf deletes all the flow ids associated for all the gems on the given pon interface
func (rsrcMgr *OpenOltResourceMgr) DeleteAllFlowIDsForGemForIntf(ctx context.Context, intfID uint32) error {
+ if intfID == rsrcMgr.DevInfo.PonPorts {
+ intfID = NNI
+ }
path := fmt.Sprintf(FlowIDsForGemPathPrefix, intfID)
logger.Debugw(ctx, "delete-flow-ids-for-gem-for-pon-intf", log.Fields{"intfID": intfID})
@@ -1541,19 +1528,6 @@
return onuGemInfoLst
}
-// toByte converts an interface value to a []byte. The interface should either be of
-// a string type or []byte. Otherwise, an error is returned.
-func toByte(value interface{}) ([]byte, error) {
- switch t := value.(type) {
- case []byte:
- return value.([]byte), nil
- case string:
- return []byte(value.(string)), nil
- default:
- return nil, fmt.Errorf("unexpected-type-%T", t)
- }
-}
-
func appendUnique64bit(slice []uint64, item uint64) []uint64 {
for _, sliceElement := range slice {
if sliceElement == item {
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 85a5811..e3fa49c 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -110,6 +110,7 @@
ranges["gemport_id_shared"] = uint32(0)
ranges["flow_id_shared"] = uint32(0)
resMgr.NumOfPonPorts = 16
+ resMgr.DevInfo = &openolt.DeviceInfo{PonPorts: 16}
resMgr.PonRsrMgr.DeviceID = "onu-1"
resMgr.PonRsrMgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
resMgr.PonRsrMgr.KVStore = &db.Backend{
@@ -411,37 +412,6 @@
}
}
-func TestOpenOltResourceMgr_GetCurrentFlowIDsForOnu(t *testing.T) {
-
- type args struct {
- PONIntfID uint32
- ONUID int32
- UNIID int32
- }
- tests := []struct {
- name string
- fields *fields
- args args
- want []uint64
- }{
- {"GetCurrentFlowIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint64{1, 2}},
- }
- for _, tt := range tests {
- t.Run(tt.name, func(t *testing.T) {
- RsrcMgr := testResMgrObject(tt.fields)
- ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
- defer cancel()
- got, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, tt.args.PONIntfID, tt.args.ONUID, tt.args.UNIID)
- if err != nil {
- t.Errorf("GetCurrentFlowIDsForOnu() returned error")
- }
- if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
- t.Errorf("GetCurrentFlowIDsForOnu() = %v, want %v", got, tt.want)
- }
- })
- }
-}
-
func TestOpenOltResourceMgr_DeleteAllFlowIDsForGemForIntf(t *testing.T) {
type args struct {