VOL-2223 DHCP fails when subscribers have user defined bandwidth profiles

The issue was only upstrem or downstream EAPOL flow was deleted.
As there were still flows remaining the default meter id was not getting cleared in the locally maintained kv.
once the hsia/dhcp flows gets pushed with user defined meter id a check is done to see whether that uni has a meter id in kv store,
since it was not cleared earlier there would be a mismatch in the meterids stored in kv and the one coming in the flow

Change-Id: I30d595c699bddc608e1453490fb9fb426f8f333a
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 9eddc44..4064cfe 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -1219,81 +1219,10 @@
 	return id, Downstream
 }
 
-// FindAndRemoveFlow finds the flow from kv store and makes a call to remove the flow from device
-// returns the flows and gemport the flow is associated with.
-func (f *OpenOltFlowMgr) FindAndRemoveFlow(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32, flowDirection string) ([]rsrcMgr.FlowInfo, int32, uint32) {
-
-	var updatedFlows []rsrcMgr.FlowInfo
-	var gemPortID int32
-	var flowID uint32
-
-	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(Intf, onuID, uniID)
-FlowFound:
-	for _, flowID = range flowIds {
-		flowInfo := f.resourceMgr.GetFlowIDInfo(Intf, onuID, uniID, flowID)
-		if flowInfo == nil {
-			log.Debugw("No FlowInfo found found in KV store",
-				log.Fields{"Intf": Intf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
-			return nil, 0, flowID
-		}
-		updatedFlows = nil
-		for _, flow := range *flowInfo {
-			updatedFlows = append(updatedFlows, flow)
-		}
-
-		for i, storedFlow := range updatedFlows {
-			if flowDirection == storedFlow.Flow.FlowType && flow.Id == storedFlow.LogicalFlowID {
-				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: flowDirection}
-				if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
-					log.Debug("Flow removed from device successfully")
-					//Remove the Flow from FlowInfo
-					log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
-					updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
-					gemPortID = storedFlow.Flow.GemportId
-					break FlowFound
-				} else {
-					log.Error("Failed to remove flow from device")
-					return nil, 0, flowID
-				}
-			}
-		}
-	}
-	return updatedFlows, gemPortID, flowID
-}
-
-func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
-
-	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
-	var updatedFlows []rsrcMgr.FlowInfo
-	var flowID uint32
-	var onuID, uniID int32
-	classifierInfo := make(map[string]interface{})
-
-	portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
-	if err != nil {
-		log.Error(err)
-		return
-	}
-	onuID = int32(onu)
-	uniID = int32(uni)
-	var gemPortID int32
-
-	for _, field := range flows.GetOfbFields(flow) {
-		if field.Type == flows.IP_PROTO {
-			classifierInfo[IPProto] = field.GetIpProto()
-			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
-		}
-	}
-	log.Debugw("Extracted access info from flow to be deleted",
-		log.Fields{"ponIntf": Intf, "onuID": onuID, "uniID": uniID})
-
-	if ethType == LldpEthType || ((classifierInfo[IPProto] == IPProtoDhcp) && (flowDirection == "downstream")) {
-		onuID = -1
-		uniID = -1
-		log.Debug("Trap on nni flow set oni, uni to -1")
-		Intf = IntfIDFromNniPortNum(inPort)
-	}
-	updatedFlows, gemPortID, flowID = f.FindAndRemoveFlow(flow, Intf, onuID, uniID, flowDirection)
+//clearResources clears pon resources in kv store and the device
+func (f *OpenOltFlowMgr) clearResources(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
+	gemPortID int32, flowID uint32, flowDirection string,
+	portNum uint32, updatedFlows []rsrcMgr.FlowInfo) error {
 
 	tpID := getTpIDFromFlow(flow)
 
@@ -1313,11 +1242,11 @@
 			techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(tpID, tpPath)
 			if err != nil { // This should not happen, something wrong in KV backend transaction
 				log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
-				return
+				return err
 			}
 			if techprofileInst == nil {
 				log.Errorw("Tech-profile-instance-does-not-exist-in-KV Store", log.Fields{"tpPath": tpPath})
-				return
+				return err
 			}
 
 			gemPK := gemPortKey{Intf, uint32(gemPortID)}
@@ -1331,9 +1260,8 @@
 					}
 				}
 				log.Debugw("Gem port id is still used by other flows", log.Fields{"gemPortID": gemPortID, "usedByFlows": flowIDs})
-				return
+				return nil
 			}
-
 			log.Debugf("Gem port id %d is not used by another flow - releasing the gem port", gemPortID)
 			f.resourceMgr.RemoveGemPortIDForOnu(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
@@ -1356,6 +1284,74 @@
 			}
 		}
 	}
+	return nil
+}
+
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
+
+	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
+	var updatedFlows []rsrcMgr.FlowInfo
+	var flowID uint32
+	var onuID, uniID int32
+	classifierInfo := make(map[string]interface{})
+
+	portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
+	if err != nil {
+		log.Error(err)
+		return
+	}
+	onuID = int32(onu)
+	uniID = int32(uni)
+
+	for _, field := range flows.GetOfbFields(flow) {
+		if field.Type == flows.IP_PROTO {
+			classifierInfo[IPProto] = field.GetIpProto()
+			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
+		}
+	}
+	log.Debugw("Extracted access info from flow to be deleted",
+		log.Fields{"ponIntf": Intf, "onuID": onuID, "uniID": uniID})
+
+	if ethType == LldpEthType || ((classifierInfo[IPProto] == IPProtoDhcp) && (flowDirection == "downstream")) {
+		onuID = -1
+		uniID = -1
+		log.Debug("Trap on nni flow set oni, uni to -1")
+		Intf = IntfIDFromNniPortNum(inPort)
+	}
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(Intf, onuID, uniID)
+	for _, flowID = range flowIds {
+		flowInfo := f.resourceMgr.GetFlowIDInfo(Intf, onuID, uniID, flowID)
+		if flowInfo == nil {
+			log.Debugw("No FlowInfo found found in KV store",
+				log.Fields{"Intf": Intf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
+			return
+		}
+		updatedFlows = nil
+		for _, flow := range *flowInfo {
+			updatedFlows = append(updatedFlows, flow)
+		}
+
+		for i, storedFlow := range updatedFlows {
+			if flow.Id == storedFlow.LogicalFlowID {
+				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: storedFlow.Flow.FlowType}
+				log.Debugw("Flow to be deleted", log.Fields{"flow": storedFlow})
+				if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
+					log.Debug("Flow removed from device successfully")
+					//Remove the Flow from FlowInfo
+					updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+					err = f.clearResources(flow, Intf, onuID, uniID, storedFlow.Flow.GemportId,
+						flowID, flowDirection, portNum, updatedFlows)
+					if err != nil {
+						log.Error("Failed to clear resources for flow", log.Fields{"flow": storedFlow})
+						return
+					}
+				} else {
+					log.Error("Failed to remove flow from device")
+					return
+				}
+			}
+		}
+	}
 }
 
 //RemoveFlow removes the flow from the device