[VOL-4531] - isFlowOnKVStore refactor
removed unused map (FlowIDsForOnu)
store nni trap flowIds into KV store with path intf=-1 and gem=-1
Change-Id: I14568f3558957329ca3b0bad8330d6fe22bbecfc

Change-Id: Iece645414a494b248bb463e2837427007a3ee67f
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 81f7fcb..41988ec 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -224,9 +224,12 @@
 	flowMgr.ponPortIdx = ponPortIdx
 	flowMgr.grpMgr = grpMgr
 	flowMgr.resourceMgr = rMgr
-	if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
-		logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
-		return nil
+	// dh.totalPonPorts is reserved for NNI trap flows. It doesn't need a tech profile
+	if ponPortIdx != dh.totalPonPorts {
+		if err = flowMgr.populateTechProfileForCurrentPonPort(ctx); err != nil {
+			logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
+			return nil
+		}
 	}
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
 
@@ -252,6 +255,10 @@
 }
 
 func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
+	// In case of nni trap flow
+	if deviceFlow.AccessIntfId == -1 {
+		return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.AccessIntfId), flowFromCore)
+	}
 	if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
 		// Flow is not replicated in this case, we need to register the flow for a single gem-port
 		return f.resourceMgr.RegisterFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
@@ -1048,14 +1055,20 @@
 			"gemport-id":  gemPortID,
 			"logicalflow": *logicalFlow})
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-already-exists",
 			log.Fields{
 				"device-id": f.deviceHandler.device.Id,
 				"intf-id":   intfID,
 				"onu-id":    onuID})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addSymmetricDataPathFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
+
 	classifierProto, err := makeOpenOltClassifierField(classifier)
 	if err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifier, "device-id": f.deviceHandler.device.Id}, err).Log()
@@ -1141,13 +1154,18 @@
 	classifier[UDPDst] = uint32(67)
 	classifier[PacketTagType] = SingleTag
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists--not-re-adding",
 			log.Fields{
 				"device-id": f.deviceHandler.device.Id,
 				"intf-id":   intfID,
 				"onu-id":    onuID})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addDHCPTrapFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	logger.Debugw(ctx, "creating-ul-dhcp-flow",
@@ -1234,9 +1252,14 @@
 	action[TrapToHost] = true
 	classifier[PacketTagType] = SingleTag
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkIntfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addUpstreamTrapFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	logger.Debugw(ctx, "creating-upstream-trap-flow",
@@ -1320,13 +1343,18 @@
 	uplinkClassifier[VlanPcp] = classifier[VlanPcp]
 	// Fill action
 	uplinkAction[TrapToHost] = true
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, intfID, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists-not-re-adding", log.Fields{
 			"device-id": f.deviceHandler.device.Id,
 			"onu-id":    onuID,
 			"intf-id":   intfID,
 			"ethType":   ethType})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addEthTypeBasedFlow--flow-may-already-exist",
+			log.Fields{"intf-id": intfID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 	//Add Uplink EthType Flow
 	logger.Debugw(ctx, "creating-ul-ethType-flow",
@@ -1596,13 +1624,9 @@
 			"device-id": f.deviceHandler.device.Id,
 			"intf-id":   intfID})
 
-	// Case of trap-on-nni flow when deviceFlow.AccessIntfId is invalid (-1)
-	if deviceFlow.AccessIntfId != -1 {
-		// No need to register the flow if it is a trap on nni flow.
-		if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
-			logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
-			return err
-		}
+	if err := f.registerFlow(ctx, logicalFlow, deviceFlow); err != nil {
+		logger.Errorw(ctx, "failed-to-register-flow", log.Fields{"err": err})
+		return err
 	}
 	return nil
 }
@@ -1663,9 +1687,15 @@
 	if err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{"nni-port-number": portNo}, err).Log()
 	}
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), flow.Id)
+	if present {
 		logger.Infow(ctx, "flow-exists--not-re-adding", log.Fields{"device-id": f.deviceHandler.device.Id})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addLLDPFlow--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+		return err
 	}
 
 	classifierProto, err := makeOpenOltClassifierField(classifierInfo)
@@ -2380,12 +2410,17 @@
 	delete(classifierInfo, EthType)
 
 	onuID := NoneOnuID
-	uniID := NoneUniID
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), flow.Id)
+	if present {
 		logger.Infow(ctx, "multicast-flow-exists-not-re-adding", log.Fields{"classifier-info": classifierInfo})
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-handleFlowWithGroup--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": flow.Id})
+		return err
 	}
+
 	classifierProto, err := makeOpenOltClassifierField(classifierInfo)
 	if err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{"classifier": classifierInfo}, err)
@@ -2573,9 +2608,14 @@
 			err)
 	}
 
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Info(ctx, "flow-exists-not-re-adding")
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addTrapFlowOnNNI--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	logger.Debugw(ctx, "creating-trap-of-nni-flow",
@@ -2669,9 +2709,15 @@
 			"action":     action},
 			err)
 	}
-	if present := f.resourceMgr.IsFlowOnKvStore(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id); present {
+
+	present, err := f.resourceMgr.IsFlowOnKvStore(ctx, f.ponPortIdx, int32(onuID), logicalFlow.Id)
+	if present {
 		logger.Info(ctx, "igmp-flow-exists-not-re-adding")
 		return nil
+	} else if err != nil {
+		logger.Errorw(ctx, "aborting-addIgmpTrapFlowOnNNI--flow-may-already-exist",
+			log.Fields{"intf-id": networkInterfaceID, "onu-id": onuID, "flow-id": logicalFlow.Id})
+		return err
 	}
 
 	classifierProto, err := makeOpenOltClassifierField(classifier)