[VOL-4531] - isFlowOnKVStore refactor
removed unused map (FlowIDsForOnu)
store nni trap flowIds into KV store with path intf=-1 and gem=-1
Change-Id: I14568f3558957329ca3b0bad8330d6fe22bbecfc
Change-Id: Iece645414a494b248bb463e2837427007a3ee67f
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index c17d92e..7e6ef2c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -944,11 +944,14 @@
}
dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
-
- dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts)
- dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+ // +1 is for NNI
+ dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts+1)
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
var i uint32
- for i = 0; i < dh.totalPonPorts; i++ {
+ // Index from 0 to until totalPonPorts ( Ex: 0 .. 15 ) -> PonPort Managers
+ // Index totalPonPorts ( Ex: 16 ) -> NniPort Manager
+ // There is only one NNI manager since multiple NNI is not supported for now
+ for i = 0; i < dh.totalPonPorts+1; i++ {
// Instantiate resource manager
if dh.resourceMgr[i] = rsrcMgr.NewResourceMgr(ctx, i, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, dh.deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr[i] == nil {
return olterrors.ErrResourceManagerInstantiating
@@ -959,7 +962,7 @@
if dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0]); dh.groupMgr == nil {
return olterrors.ErrGroupManagerInstantiating
}
- for i = 0; i < dh.totalPonPorts; i++ {
+ for i = 0; i < dh.totalPonPorts+1; i++ {
// Instantiate flow manager
if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, i); dh.flowMgr[i] == nil {
return olterrors.ErrFlowManagerInstantiating
@@ -1730,16 +1733,16 @@
if flows != nil {
for _, flow := range flows.ToRemove.Items {
- ponIf := dh.getPonIfFromFlow(flow)
+ intfID := dh.getIntfIDFromFlow(ctx, flow)
logger.Debugw(ctx, "removing-flow",
log.Fields{"device-id": device.Id,
- "ponIf": ponIf,
+ "intfId": intfID,
"flowToRemove": flow})
if flow_utils.HasGroup(flow) {
err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupRemove)
} else {
- err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+ err = dh.flowMgr[intfID].RouteFlowToOnuChannel(ctx, flow, false, nil)
}
if err != nil {
errorsList = append(errorsList, err)
@@ -1747,20 +1750,20 @@
}
for _, flow := range flows.ToAdd.Items {
- ponIf := dh.getPonIfFromFlow(flow)
+ intfID := dh.getIntfIDFromFlow(ctx, flow)
logger.Debugw(ctx, "adding-flow",
log.Fields{"device-id": device.Id,
- "ponIf": ponIf,
+ "ponIf": intfID,
"flowToAdd": flow})
if flow_utils.HasGroup(flow) {
err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
} else {
- if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+ if dh.flowMgr == nil || dh.flowMgr[intfID] == nil {
// The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
} else {
- err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+ err = dh.flowMgr[intfID].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
}
}
if err != nil {
@@ -2051,6 +2054,7 @@
logger.Debug(ctx, err)
}
}
+ _ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx, dh.totalPonPorts)
}
/*Delete ONU map for the device*/
@@ -2612,9 +2616,9 @@
return resp, nil
}
-func (dh *DeviceHandler) getPonIfFromFlow(flow *of.OfpFlowStats) uint32 {
- // Default to PON0
- var intfID uint32
+func (dh *DeviceHandler) getIntfIDFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+ // Default to NNI
+ var intfID = dh.totalPonPorts
inPort, outPort := getPorts(flow)
if inPort != InvalidPort && outPort != InvalidPort {
_, intfID, _, _ = plt.ExtractAccessFromFlow(inPort, outPort)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 867f821..ea40a5c 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -178,9 +178,9 @@
deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
dh.deviceInfo = deviceInf
dh.device = device
- dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts)
+ dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts+1)
var i uint32
- for i = 0; i < deviceInf.PonPorts; i++ {
+ for i = 0; i < deviceInf.PonPorts+1; i++ {
dh.resourceMgr[i] = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
KVStore: &db.Backend{
StoreType: "etcd",
@@ -225,8 +225,8 @@
defer cancel()
dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0])
dh.totalPonPorts = NumPonPorts
- dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
- for i = 0; i < dh.totalPonPorts; i++ {
+ dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
+ for i = 0; i < dh.totalPonPorts+1; i++ {
dh.flowMgr[i] = &OpenOltFlowMgr{}
dh.flowMgr[i].deviceHandler = dh
dh.flowMgr[i].ponPortIdx = i
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 81f7fcb..41988ec 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -224,9 +224,12 @@
flowMgr.ponPortIdx = ponPortIdx
flowMgr.grpMgr = grpMgr
flowMgr.resourceMgr = rMgr
- if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
- logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
- return nil
+ // dh.totalPonPorts is reserved for NNI trap flows. It doesn't need a tech profile
+ if ponPortIdx != dh.totalPonPorts {
+ if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
+ logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
+ return nil
+ }
}
flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
@@ -252,6 +255,10 @@
}
func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
+ // In case of nni trap flow
+ if deviceFlow.AccessIntfId == -1 {
+ return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.AccessIntfId), flowFromCore)
+ }
if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
// Flow is not replicated in this case, we need to register the flow for a single gem-port
return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
@@ -1048,14 +1055,20 @@
"gemport-id": gemPortID,
"logicalflow": *logicalFlow})
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-already-exists",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addSymmetricDataPathFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
+
classifierProto, err := makeOpenOltClassifierField(classifier)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
@@ -1141,13 +1154,18 @@
classifier[UDPDst] = uint32(67)
classifier[PacketTagType] = SingleTag
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-exists--not-re-adding",
log.Fields{
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID,
"onu-id": onuID})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addDHCPTrapFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
logger.Debugw(ctx, "creating-ul-dhcp-flow",
@@ -1234,9 +1252,14 @@
action[TrapToHost] = true
classifier[PacketTagType] = SingleTag
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addUpstreamTrapFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
logger.Debugw(ctx, "creating-upstream-trap-flow",
@@ -1320,13 +1343,18 @@
uplinkClassifier[VlanPcp] = classifier[VlanPcp]
// Fill action
uplinkAction[TrapToHost] = true
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+ if present {
logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{
"device-id": f.deviceHandler.device.Id,
"onu-id": onuID,
"intf-id": intfID,
"ethType": ethType})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addEthTypeBasedFlow--flow-may-already-exist",
+ log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
//Add Uplink EthType Flow
logger.Debugw(ctx, "creating-ul-ethType-flow",
@@ -1596,13 +1624,9 @@
"device-id": f.deviceHandler.device.Id,
"intf-id": intfID})
- // Case of trap-on-nni flow when deviceFlow.AccessIntfId is invalid (-1)
- if deviceFlow.AccessIntfId != -1 {
- // No need to register the flow if it is a trap on nni flow.
- if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
- logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
- return err
- }
+ if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
+ logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
+ return err
}
return nil
}
@@ -1663,9 +1687,15 @@
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
}
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), flow.Id)
+ if present {
logger.Infow(ctx, "flow-exists--not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addLLDPFlow--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+ return err
}
classifierProto, err := makeOpenOltClassifierField(classifierInfo)
@@ -2380,12 +2410,17 @@
delete(classifierInfo, EthType)
onuID := NoneOnuID
- uniID := NoneUniID
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), flow.Id)
+ if present {
logger.Infow(ctx, "multicast-flow-exists-not-re-adding", log.Fields{"classifier-info": classifierInfo})
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-handleFlowWithGroup--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+ return err
}
+
classifierProto, err := makeOpenOltClassifierField(classifierInfo)
if err != nil {
return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err)
@@ -2573,9 +2608,14 @@
err)
}
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+ if present {
logger.Info(ctx, "flow-exists-not-re-adding")
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addTrapFlowOnNNI--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
logger.Debugw(ctx, "creating-trap-of-nni-flow",
@@ -2669,9 +2709,15 @@
"action": action},
err)
}
- if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+
+ present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+ if present {
logger.Info(ctx, "igmp-flow-exists-not-re-adding")
return nil
+ } else if err != nil {
+ logger.Errorw(ctx, "aborting-addIgmpTrapFlowOnNNI--flow-may-already-exist",
+ log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+ return err
}
classifierProto, err := makeOpenOltClassifierField(classifier)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index a880297..218ecf5 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -48,7 +48,7 @@
// onuGemInfoMap := make([]rsrcMgr.onuGemInfoMap, NumPonPorts)
var i uint32
- for i = 0; i < NumPonPorts; i++ {
+ for i = 0; i < NumPonPorts+1; i++ {
packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
@@ -1401,6 +1401,7 @@
type args struct {
ctx context.Context
+ intfID int32
flow *ofp.OfpFlowStats
addFlow bool
flowMetadata *ofp.FlowMetadata
@@ -1415,6 +1416,7 @@
name: "RouteFlowToOnuChannel-0",
args: args{
ctx: ctx,
+ intfID: NumPonPorts,
flow: flow0,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1425,6 +1427,7 @@
name: "RouteFlowToOnuChannel-1",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow1,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1435,6 +1438,7 @@
name: "RouteFlowToOnuChannel-2",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow2,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1445,6 +1449,7 @@
name: "RouteFlowToOnuChannel-3",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow3,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1455,6 +1460,7 @@
name: "RouteFlowToOnuChannel-4",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow4,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1465,6 +1471,7 @@
name: "RouteFlowToOnuChannel-5",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow1,
addFlow: false,
flowMetadata: &flowMetadata1,
@@ -1475,6 +1482,7 @@
name: "RouteFlowToOnuChannel-6",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow1,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1485,6 +1493,7 @@
name: "RouteFlowToOnuChannel-7",
args: args{
ctx: ctx,
+ intfID: 15,
flow: flow5,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1495,6 +1504,7 @@
name: "RouteFlowToOnuChannel-8",
args: args{
ctx: ctx,
+ intfID: 15,
flow: flow6,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1505,6 +1515,7 @@
name: "RouteFlowToOnuChannel-9",
args: args{
ctx: ctx,
+ intfID: 0,
flow: flow7,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1515,6 +1526,7 @@
name: "RouteFlowToOnuChannel-10",
args: args{
ctx: ctx,
+ intfID: 15,
flow: flow8,
addFlow: true,
flowMetadata: &flowMetadata1,
@@ -1525,6 +1537,7 @@
name: "RouteFlowToOnuChannel-11", // Test Remove trap-from-nni LLDP flow
args: args{
ctx: ctx,
+ intfID: NumPonPorts,
flow: flow0,
addFlow: false,
flowMetadata: &flowMetadata1,
@@ -1543,7 +1556,7 @@
time.Sleep(5 * time.Millisecond)
t.Run(tt.name, func(t *testing.T) {
defer wg.Done()
- tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+ tt.returnedErr = flowMgr[tt.args.intfID].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
}