[VOL-4531] - isFlowOnKVStore refactor
removed unused map (FlowIDsForOnu)
store nni trap flowIds into KV store with path intf=-1 and gem=-1
Change-Id: I14568f3558957329ca3b0bad8330d6fe22bbecfc

Change-Id: Iece645414a494b248bb463e2837427007a3ee67f
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index c17d92e..7e6ef2c 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -944,11 +944,14 @@
 	}
 	dh.totalPonPorts = dh.deviceInfo.GetPonPorts()
 	dh.agentPreviouslyConnected = dh.deviceInfo.PreviouslyConnected
-
-	dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts)
-	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
+	// +1 is for NNI
+	dh.resourceMgr = make([]*rsrcMgr.OpenOltResourceMgr, dh.totalPonPorts+1)
+	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
 	var i uint32
-	for i = 0; i < dh.totalPonPorts; i++ {
+	// Index from 0 to until totalPonPorts ( Ex: 0 .. 15 ) 	-> 	PonPort Managers
+	// Index totalPonPorts ( Ex: 16 ) 				 	    -> 	NniPort Manager
+	// There is only one NNI manager since multiple NNI is not supported for now
+	for i = 0; i < dh.totalPonPorts+1; i++ {
 		// Instantiate resource manager
 		if dh.resourceMgr[i] = rsrcMgr.NewResourceMgr(ctx, i, dh.device.Id, dh.openOLT.KVStoreAddress, dh.openOLT.KVStoreType, dh.device.Type, dh.deviceInfo, dh.cm.Backend.PathPrefix); dh.resourceMgr[i] == nil {
 			return olterrors.ErrResourceManagerInstantiating
@@ -959,7 +962,7 @@
 	if dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0]); dh.groupMgr == nil {
 		return olterrors.ErrGroupManagerInstantiating
 	}
-	for i = 0; i < dh.totalPonPorts; i++ {
+	for i = 0; i < dh.totalPonPorts+1; i++ {
 		// Instantiate flow manager
 		if dh.flowMgr[i] = NewFlowManager(ctx, dh, dh.resourceMgr[i], dh.groupMgr, i); dh.flowMgr[i] == nil {
 			return olterrors.ErrFlowManagerInstantiating
@@ -1730,16 +1733,16 @@
 
 	if flows != nil {
 		for _, flow := range flows.ToRemove.Items {
-			ponIf := dh.getPonIfFromFlow(flow)
+			intfID := dh.getIntfIDFromFlow(ctx, flow)
 
 			logger.Debugw(ctx, "removing-flow",
 				log.Fields{"device-id": device.Id,
-					"ponIf":        ponIf,
+					"intfId":       intfID,
 					"flowToRemove": flow})
 			if flow_utils.HasGroup(flow) {
 				err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupRemove)
 			} else {
-				err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, false, nil)
+				err = dh.flowMgr[intfID].RouteFlowToOnuChannel(ctx, flow, false, nil)
 			}
 			if err != nil {
 				errorsList = append(errorsList, err)
@@ -1747,20 +1750,20 @@
 		}
 
 		for _, flow := range flows.ToAdd.Items {
-			ponIf := dh.getPonIfFromFlow(flow)
+			intfID := dh.getIntfIDFromFlow(ctx, flow)
 			logger.Debugw(ctx, "adding-flow",
 				log.Fields{"device-id": device.Id,
-					"ponIf":     ponIf,
+					"ponIf":     intfID,
 					"flowToAdd": flow})
 			if flow_utils.HasGroup(flow) {
 				err = dh.RouteMcastFlowOrGroupMsgToChannel(ctx, flow, nil, McastFlowOrGroupAdd)
 			} else {
-				if dh.flowMgr == nil || dh.flowMgr[ponIf] == nil {
+				if dh.flowMgr == nil || dh.flowMgr[intfID] == nil {
 					// The flow manager module could be uninitialized if the flow arrives too soon before the device has reconciled fully
 					logger.Errorw(ctx, "flow-manager-uninitialized", log.Fields{"device-id": device.Id})
 					err = fmt.Errorf("flow-manager-uninitialized-%v", device.Id)
 				} else {
-					err = dh.flowMgr[ponIf].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
+					err = dh.flowMgr[intfID].RouteFlowToOnuChannel(ctx, flow, true, flowMetadata)
 				}
 			}
 			if err != nil {
@@ -2051,6 +2054,7 @@
 				logger.Debug(ctx, err)
 			}
 		}
+		_ = dh.resourceMgr[dh.totalPonPorts].DeleteAllFlowIDsForGemForIntf(ctx, dh.totalPonPorts)
 	}
 
 	/*Delete ONU map for the device*/
@@ -2612,9 +2616,9 @@
 	return resp, nil
 }
 
-func (dh *DeviceHandler) getPonIfFromFlow(flow *of.OfpFlowStats) uint32 {
-	// Default to PON0
-	var intfID uint32
+func (dh *DeviceHandler) getIntfIDFromFlow(ctx context.Context, flow *of.OfpFlowStats) uint32 {
+	// Default to NNI
+	var intfID = dh.totalPonPorts
 	inPort, outPort := getPorts(flow)
 	if inPort != InvalidPort && outPort != InvalidPort {
 		_, intfID, _, _ = plt.ExtractAccessFromFlow(inPort, outPort)
diff --git a/internal/pkg/core/device_handler_test.go b/internal/pkg/core/device_handler_test.go
index 867f821..ea40a5c 100644
--- a/internal/pkg/core/device_handler_test.go
+++ b/internal/pkg/core/device_handler_test.go
@@ -178,9 +178,9 @@
 	deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: oopRanges, Model: "openolt", DeviceId: dh.device.Id, PonPorts: NumPonPorts}
 	dh.deviceInfo = deviceInf
 	dh.device = device
-	dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts)
+	dh.resourceMgr = make([]*resourcemanager.OpenOltResourceMgr, deviceInf.PonPorts+1)
 	var i uint32
-	for i = 0; i < deviceInf.PonPorts; i++ {
+	for i = 0; i < deviceInf.PonPorts+1; i++ {
 		dh.resourceMgr[i] = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.device.Id, DeviceType: dh.device.Type, DevInfo: deviceInf,
 			KVStore: &db.Backend{
 				StoreType: "etcd",
@@ -225,8 +225,8 @@
 	defer cancel()
 	dh.groupMgr = NewGroupManager(ctx, dh, dh.resourceMgr[0])
 	dh.totalPonPorts = NumPonPorts
-	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts)
-	for i = 0; i < dh.totalPonPorts; i++ {
+	dh.flowMgr = make([]*OpenOltFlowMgr, dh.totalPonPorts+1)
+	for i = 0; i < dh.totalPonPorts+1; i++ {
 		dh.flowMgr[i] = &OpenOltFlowMgr{}
 		dh.flowMgr[i].deviceHandler = dh
 		dh.flowMgr[i].ponPortIdx = i
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 81f7fcb..41988ec 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -224,9 +224,12 @@
 	flowMgr.ponPortIdx = ponPortIdx
 	flowMgr.grpMgr = grpMgr
 	flowMgr.resourceMgr = rMgr
-	if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
-		logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
-		return nil
+	// dh.totalPonPorts is reserved for NNI trap flows. It doesn't need a tech profile
+	if ponPortIdx != dh.totalPonPorts {
+		if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
+			logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
+			return nil
+		}
 	}
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 
@@ -252,6 +255,10 @@
 }
 
 func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
+	// In case of nni trap flow
+	if deviceFlow.AccessIntfId == -1 {
+		return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.AccessIntfId), flowFromCore)
+	}
 	if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
 		// Flow is not replicated in this case, we need to register the flow for a single gem-port
 		return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
@@ -1048,14 +1055,20 @@
 			"gemport-id":  gemPortID,
 			"logicalflow": *logicalFlow})
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-already-exists",
 			log.Fields{
 				"device-id": f.deviceHandler.device.Id,
 				"intf-id":   intfID,
 				"onu-id":    onuID})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addSymmetricDataPathFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
+
 	classifierProto, err := makeOpenOltClassifierField(classifier)
 	if err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
@@ -1141,13 +1154,18 @@
 	classifier[UDPDst] = uint32(67)
 	classifier[PacketTagType] = SingleTag
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists--not-re-adding",
 			log.Fields{
 				"device-id": f.deviceHandler.device.Id,
 				"intf-id":   intfID,
 				"onu-id":    onuID})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addDHCPTrapFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	logger.Debugw(ctx, "creating-ul-dhcp-flow",
@@ -1234,9 +1252,14 @@
 	action[TrapToHost] = true
 	classifier[PacketTagType] = SingleTag
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addUpstreamTrapFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	logger.Debugw(ctx, "creating-upstream-trap-flow",
@@ -1320,13 +1343,18 @@
 	uplinkClassifier[VlanPcp] = classifier[VlanPcp]
 	// Fill action
 	uplinkAction[TrapToHost] = true
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{
 			"device-id": f.deviceHandler.device.Id,
 			"onu-id":    onuID,
 			"intf-id":   intfID,
 			"ethType":   ethType})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addEthTypeBasedFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 	//Add Uplink EthType Flow
 	logger.Debugw(ctx, "creating-ul-ethType-flow",
@@ -1596,13 +1624,9 @@
 			"device-id": f.deviceHandler.device.Id,
 			"intf-id":   intfID})
 
-	// Case of trap-on-nni flow when deviceFlow.AccessIntfId is invalid (-1)
-	if deviceFlow.AccessIntfId != -1 {
-		// No need to register the flow if it is a trap on nni flow.
-		if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
-			logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
-			return err
-		}
+	if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
+		logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
+		return err
 	}
 	return nil
 }
@@ -1663,9 +1687,15 @@
 	if err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
 	}
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), flow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists--not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addLLDPFlow--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+		return err
 	}
 
 	classifierProto, err := makeOpenOltClassifierField(classifierInfo)
@@ -2380,12 +2410,17 @@
 	delete(classifierInfo, EthType)
 
 	onuID := NoneOnuID
-	uniID := NoneUniID
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), flow.Id)
+	if present {
 		logger.Infow(ctx, "multicast-flow-exists-not-re-adding", log.Fields{"classifier-info": classifierInfo})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-handleFlowWithGroup--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+		return err
 	}
+
 	classifierProto, err := makeOpenOltClassifierField(classifierInfo)
 	if err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err)
@@ -2573,9 +2608,14 @@
 			err)
 	}
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Info(ctx, "flow-exists-not-re-adding")
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addTrapFlowOnNNI--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	logger.Debugw(ctx, "creating-trap-of-nni-flow",
@@ -2669,9 +2709,15 @@
 			"action":     action},
 			err)
 	}
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Info(ctx, "igmp-flow-exists-not-re-adding")
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addIgmpTrapFlowOnNNI--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	classifierProto, err := makeOpenOltClassifierField(classifier)
diff --git a/internal/pkg/core/openolt_flowmgr_test.go b/internal/pkg/core/openolt_flowmgr_test.go
index a880297..218ecf5 100644
--- a/internal/pkg/core/openolt_flowmgr_test.go
+++ b/internal/pkg/core/openolt_flowmgr_test.go
@@ -48,7 +48,7 @@
 	// onuGemInfoMap := make([]rsrcMgr.onuGemInfoMap, NumPonPorts)
 	var i uint32
 
-	for i = 0; i < NumPonPorts; i++ {
+	for i = 0; i < NumPonPorts+1; i++ {
 		packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
 		packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: i, OnuID: i + 1, LogicalPort: i + 1, VlanID: uint16(i), Priority: uint8(i)}] = i + 1
 
@@ -1401,6 +1401,7 @@
 
 	type args struct {
 		ctx          context.Context
+		intfID       int32
 		flow         *ofp.OfpFlowStats
 		addFlow      bool
 		flowMetadata *ofp.FlowMetadata
@@ -1415,6 +1416,7 @@
 			name: "RouteFlowToOnuChannel-0",
 			args: args{
 				ctx:          ctx,
+				intfID:       NumPonPorts,
 				flow:         flow0,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1425,6 +1427,7 @@
 			name: "RouteFlowToOnuChannel-1",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow1,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1435,6 +1438,7 @@
 			name: "RouteFlowToOnuChannel-2",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow2,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1445,6 +1449,7 @@
 			name: "RouteFlowToOnuChannel-3",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow3,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1455,6 +1460,7 @@
 			name: "RouteFlowToOnuChannel-4",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow4,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1465,6 +1471,7 @@
 			name: "RouteFlowToOnuChannel-5",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow1,
 				addFlow:      false,
 				flowMetadata: &flowMetadata1,
@@ -1475,6 +1482,7 @@
 			name: "RouteFlowToOnuChannel-6",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow1,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1485,6 +1493,7 @@
 			name: "RouteFlowToOnuChannel-7",
 			args: args{
 				ctx:          ctx,
+				intfID:       15,
 				flow:         flow5,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1495,6 +1504,7 @@
 			name: "RouteFlowToOnuChannel-8",
 			args: args{
 				ctx:          ctx,
+				intfID:       15,
 				flow:         flow6,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1505,6 +1515,7 @@
 			name: "RouteFlowToOnuChannel-9",
 			args: args{
 				ctx:          ctx,
+				intfID:       0,
 				flow:         flow7,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1515,6 +1526,7 @@
 			name: "RouteFlowToOnuChannel-10",
 			args: args{
 				ctx:          ctx,
+				intfID:       15,
 				flow:         flow8,
 				addFlow:      true,
 				flowMetadata: &flowMetadata1,
@@ -1525,6 +1537,7 @@
 			name: "RouteFlowToOnuChannel-11", // Test Remove trap-from-nni LLDP flow
 			args: args{
 				ctx:          ctx,
+				intfID:       NumPonPorts,
 				flow:         flow0,
 				addFlow:      false,
 				flowMetadata: &flowMetadata1,
@@ -1543,7 +1556,7 @@
 		time.Sleep(5 * time.Millisecond)
 		t.Run(tt.name, func(t *testing.T) {
 			defer wg.Done()
-			tt.returnedErr = flowMgr[0].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
+			tt.returnedErr = flowMgr[tt.args.intfID].RouteFlowToOnuChannel(tt.args.ctx, tt.args.flow, tt.args.addFlow, tt.args.flowMetadata)
 			if (tt.wantErr == false && tt.returnedErr != nil) || (tt.wantErr == true && tt.returnedErr == nil) {
 				t.Errorf("OpenOltFlowMgr.RouteFlowToOnuChannel() error = %v, wantErr %v", tt.returnedErr, tt.wantErr)
 			}
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index cfc755e..301e2e9 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -82,6 +82,9 @@
 	//OnuGemInfoPath is path on the kvstore to store onugem info map
 	//format: onu_gem_info/<intfid>/<onu_id>
 	OnuGemInfoPath = OnuGemInfoPathPathPrefix + "/{%d}"
+
+	// NNI uint32 version of -1 which represents the NNI port
+	NNI = 4294967295
 )
 
 // FlowInfo holds the flow information
@@ -133,9 +136,6 @@
 	PonRsrMgr *ponrmgr.PONResourceManager
 
 	// Local maps used for write-through-cache - start
-	flowIDsForOnu     map[string][]uint64
-	flowIDsForOnuLock sync.RWMutex
-
 	allocIDsForOnu     map[string][]uint32
 	allocIDsForOnuLock sync.RWMutex
 
@@ -253,7 +253,6 @@
 
 //InitLocalCache initializes local maps used for write-through-cache
 func (rsrcMgr *OpenOltResourceMgr) InitLocalCache() {
-	rsrcMgr.flowIDsForOnu = make(map[string][]uint64)
 	rsrcMgr.allocIDsForOnu = make(map[string][]uint32)
 	rsrcMgr.gemPortIDsForOnu = make(map[string][]uint32)
 	rsrcMgr.techProfileIDsForOnu = make(map[string][]uint32)
@@ -386,42 +385,6 @@
 	return 0, fmt.Errorf("no-onu-id-allocated")
 }
 
-// GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
-// Note: For flows which trap from the NNI and not really associated with any particular
-// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (rsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PonIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
-
-	subs := fmt.Sprintf("%d,%d,%d", PonIntfID, onuID, uniID)
-	path := fmt.Sprintf(FlowIDPath, subs)
-
-	// fetch from cache
-	rsrcMgr.flowIDsForOnuLock.RLock()
-	flowIDsForOnu, ok := rsrcMgr.flowIDsForOnu[path]
-	rsrcMgr.flowIDsForOnuLock.RUnlock()
-
-	if ok {
-		return flowIDsForOnu, nil
-	}
-
-	var data []uint64
-	value, err := rsrcMgr.KVStore.Get(ctx, path)
-	if err == nil {
-		if value != nil {
-			Val, _ := toByte(value.Value)
-			if err = json.Unmarshal(Val, &data); err != nil {
-				logger.Error(ctx, "Failed to unmarshal")
-				return nil, err
-			}
-		}
-	}
-	// update cache
-	rsrcMgr.flowIDsForOnuLock.Lock()
-	rsrcMgr.flowIDsForOnu[path] = data
-	rsrcMgr.flowIDsForOnuLock.Unlock()
-
-	return data, nil
-}
-
 // UpdateAllocIdsForOnu updates alloc ids in kv store for a given pon interface id, onu id and uni id
 func (rsrcMgr *OpenOltResourceMgr) UpdateAllocIdsForOnu(ctx context.Context, ponPort uint32, onuID uint32, uniID uint32, allocIDs []uint32) error {
 
@@ -639,23 +602,44 @@
 
 // IsFlowOnKvStore checks if the given flowID is present on the kv store
 // Returns true if the flowID is found, otherwise it returns false
-func (rsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, intfID uint32, onuID int32, uniID int32,
-	flowID uint64) bool {
+func (rsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, intfID uint32, onuID int32, flowID uint64) (bool, error) {
+	var anyError error
 
-	FlowIDs, err := rsrcMgr.GetCurrentFlowIDsForOnu(ctx, intfID, onuID, uniID)
-	if err != nil {
-		// error logged in the called function
-		return false
-	}
-	if FlowIDs != nil {
-		logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": intfID, "onuID": onuID, "uniID": uniID})
-		for _, id := range FlowIDs {
+	// In case of nni trap flow
+	if onuID == -1 {
+		nniTrapflowIDs, err := rsrcMgr.GetFlowIDsForGem(ctx, NNI, NNI)
+		if err != nil {
+			logger.Warnw(ctx, "failed-to-get-nni-trap-flowIDs", log.Fields{"err": err})
+			return false, err
+		}
+		for _, id := range nniTrapflowIDs {
 			if flowID == id {
-				return true
+				return true, nil
 			}
 		}
 	}
-	return false
+
+	path := fmt.Sprintf(OnuGemInfoPath, intfID, onuID)
+	rsrcMgr.onuGemInfoLock.RLock()
+	val, ok := rsrcMgr.onuGemInfo[path]
+	rsrcMgr.onuGemInfoLock.RUnlock()
+
+	if ok {
+		for _, gem := range val.GemPorts {
+			flowIDs, err := rsrcMgr.GetFlowIDsForGem(ctx, intfID, gem)
+			if err != nil {
+				anyError = err
+				logger.Warnw(ctx, "failed-to-get-flowIDs-for-gem", log.Fields{"err": err, "onuID": onuID, "gem": gem})
+			} else {
+				for _, id := range flowIDs {
+					if flowID == id {
+						return true, nil
+					}
+				}
+			}
+		}
+	}
+	return false, anyError
 }
 
 // GetTechProfileIDForOnu fetches Tech-Profile-ID from the KV-Store for the given onu based on the path
@@ -1228,7 +1212,7 @@
 	if err != nil {
 		logger.Errorw(ctx, "Failed to get from kv store", log.Fields{"path": path})
 		return nil, err
-	} else if value == nil {
+	} else if value == nil || value.Value == nil {
 		logger.Debug(ctx, "no flow-ids found", log.Fields{"path": path})
 		return nil, nil
 	}
@@ -1326,6 +1310,9 @@
 //DeleteAllFlowIDsForGemForIntf deletes all the flow ids associated for all the gems on the given pon interface
 func (rsrcMgr *OpenOltResourceMgr) DeleteAllFlowIDsForGemForIntf(ctx context.Context, intfID uint32) error {
 
+	if intfID == rsrcMgr.DevInfo.PonPorts {
+		intfID = NNI
+	}
 	path := fmt.Sprintf(FlowIDsForGemPathPrefix, intfID)
 
 	logger.Debugw(ctx, "delete-flow-ids-for-gem-for-pon-intf", log.Fields{"intfID": intfID})
@@ -1541,19 +1528,6 @@
 	return onuGemInfoLst
 }
 
-// toByte converts an interface value to a []byte.  The interface should either be of
-// a string type or []byte.  Otherwise, an error is returned.
-func toByte(value interface{}) ([]byte, error) {
-	switch t := value.(type) {
-	case []byte:
-		return value.([]byte), nil
-	case string:
-		return []byte(value.(string)), nil
-	default:
-		return nil, fmt.Errorf("unexpected-type-%T", t)
-	}
-}
-
 func appendUnique64bit(slice []uint64, item uint64) []uint64 {
 	for _, sliceElement := range slice {
 		if sliceElement == item {
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index 85a5811..e3fa49c 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -110,6 +110,7 @@
 	ranges["gemport_id_shared"] = uint32(0)
 	ranges["flow_id_shared"] = uint32(0)
 	resMgr.NumOfPonPorts = 16
+	resMgr.DevInfo = &openolt.DeviceInfo{PonPorts: 16}
 	resMgr.PonRsrMgr.DeviceID = "onu-1"
 	resMgr.PonRsrMgr.IntfIDs = []uint32{0, 1, 2, 3, 4, 5, 6, 7, 8, 9, 10, 11, 12, 13, 14, 15}
 	resMgr.PonRsrMgr.KVStore = &db.Backend{
@@ -411,37 +412,6 @@
 	}
 }
 
-func TestOpenOltResourceMgr_GetCurrentFlowIDsForOnu(t *testing.T) {
-
-	type args struct {
-		PONIntfID uint32
-		ONUID     int32
-		UNIID     int32
-	}
-	tests := []struct {
-		name   string
-		fields *fields
-		args   args
-		want   []uint64
-	}{
-		{"GetCurrentFlowIDsForOnu-1", getResMgr(), args{1, 2, 2}, []uint64{1, 2}},
-	}
-	for _, tt := range tests {
-		t.Run(tt.name, func(t *testing.T) {
-			RsrcMgr := testResMgrObject(tt.fields)
-			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
-			defer cancel()
-			got, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, tt.args.PONIntfID, tt.args.ONUID, tt.args.UNIID)
-			if err != nil {
-				t.Errorf("GetCurrentFlowIDsForOnu() returned error")
-			}
-			if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
-				t.Errorf("GetCurrentFlowIDsForOnu() = %v, want %v", got, tt.want)
-			}
-		})
-	}
-}
-
 func TestOpenOltResourceMgr_DeleteAllFlowIDsForGemForIntf(t *testing.T) {
 
 	type args struct {