VOL-1904 openolt adapter reconcile. along with the local caches data is stored in kv store as well.
child device cache is not removed as it is used to get child devices,
it will anyway be rebuilt even after adapter restart.
One more cache (flowsUsedByGemPort) got reintroduced recently which will be fixed later.
Updated test files as well.

Change-Id: I43378ff682f29477b22d61a76b0a721e83422853
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 6bee5a2..9eddc44 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -148,28 +148,11 @@
 	AllocID = "allocId"
 )
 
-type onuInfo struct {
-	intfID       uint32
-	onuID        uint32
-	serialNumber string
-}
-
-type onuIDKey struct {
-	intfID uint32
-	onuID  uint32
-}
-
 type gemPortKey struct {
 	intfID  uint32
 	gemPort uint32
 }
 
-type packetInInfoKey struct {
-	intfID      uint32
-	onuID       uint32
-	logicalPort uint32
-}
-
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -187,32 +170,39 @@
 	techprofile        []tp.TechProfileIf
 	deviceHandler      *DeviceHandler
 	resourceMgr        *rsrcMgr.OpenOltResourceMgr
-	onuIds             map[onuIDKey]onuInfo       //OnuId -> OnuInfo
-	onuSerialNumbers   map[string]onuInfo         //onu serial_number (string) -> OnuInfo
-	onuGemPortIds      map[gemPortKey]onuInfo     //GemPortId -> OnuInfo
-	packetInGemPort    map[packetInInfoKey]uint32 //packet in gem port
-	storedDeviceFlows  []ofp.OfpFlowStats         /* Required during deletion to obtain device flows from logical flows */
 	onuIdsLock         sync.RWMutex
-	flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
+	flowsUsedByGemPort map[gemPortKey][]uint32            //gem port id to flow ids
+	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
+	onuGemInfo         map[uint32][]rsrcMgr.OnuGemInfo    //onu, gem and uni info local cache
+	lockCache          sync.RWMutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(dh *DeviceHandler, rsrcMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
 	log.Info("Initializing flow manager")
 	var flowMgr OpenOltFlowMgr
+	var err error
+	var idx uint32
+
 	flowMgr.deviceHandler = dh
-	flowMgr.resourceMgr = rsrcMgr
+	flowMgr.resourceMgr = rMgr
 	flowMgr.techprofile = make([]tp.TechProfileIf, MaxPonPorts)
-	if err := flowMgr.populateTechProfilePerPonPort(); err != nil {
+	if err = flowMgr.populateTechProfilePerPonPort(); err != nil {
 		log.Error("Error while populating tech profile mgr\n")
 		return nil
 	}
-	flowMgr.onuIds = make(map[onuIDKey]onuInfo)
-	flowMgr.onuSerialNumbers = make(map[string]onuInfo)
-	flowMgr.onuGemPortIds = make(map[gemPortKey]onuInfo)
-	flowMgr.packetInGemPort = make(map[packetInInfoKey]uint32)
-	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
 	flowMgr.onuIdsLock = sync.RWMutex{}
+	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
+	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
+	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo)
+	ponPorts := rMgr.DevInfo.GetPonPorts()
+	//Load the onugem info cache from kv store on flowmanager start
+	for idx = 0; idx < ponPorts; idx++ {
+		if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(idx); err != nil {
+			log.Error("Failed to load onu gem info cache")
+		}
+	}
+	flowMgr.lockCache = sync.RWMutex{}
 	log.Info("Initialization of  flow manager success!!")
 	return &flowMgr
 }
@@ -233,13 +223,6 @@
 func (f *OpenOltFlowMgr) registerFlow(flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
 	log.Debug("Registering Flow for Device ", log.Fields{"flow": flowFromCore},
 		log.Fields{"device": f.deviceHandler.deviceID})
-
-	var storedFlow ofp.OfpFlowStats
-	storedFlow.Id, _ = f.generateStoredFlowID(deviceFlow.FlowId, deviceFlow.FlowType)
-	log.Debug(fmt.Sprintf("Generated stored device flow. id = %d, flowId = %d, direction = %s", storedFlow.Id,
-		deviceFlow.FlowId, deviceFlow.FlowType))
-	storedFlow.Cookie = flowFromCore.Id
-	f.storedDeviceFlows = append(f.storedDeviceFlows, storedFlow)
 	gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
 	flowIDList, ok := f.flowsUsedByGemPort[gemPK]
 	if !ok {
@@ -247,7 +230,6 @@
 	}
 	flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
 	f.flowsUsedByGemPort[gemPK] = flowIDList
-	log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
 }
 
 func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -267,7 +249,7 @@
 		return
 	}
 
-	uni := getUniPortPath(intfID, onuID, uniID)
+	uni := getUniPortPath(intfID, int32(onuID), int32(uniID))
 	log.Debugw("Uni port name", log.Fields{"uni": uni})
 	allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
 	if allocID == 0 || gemPorts == nil || TpInst == nil {
@@ -604,6 +586,7 @@
 func (f *OpenOltFlowMgr) addHSIAFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
 	action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
 	allocID uint32, gemPortID uint32) {
+	var networkIntfID uint32
 	/* One of the OLT platform (Broadcom BAL) requires that symmetric
 	   flows require the same flow_id to be used across UL and DL.
 	   Since HSIA flow is the only symmetric flow currently, we need to
@@ -620,7 +603,7 @@
 		log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
 	}
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
-	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
+	flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
 		return
@@ -637,7 +620,11 @@
 		return
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
-	networkIntfID := f.deviceHandler.nniIntfID
+	networkIntfID, err = getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
 	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
@@ -653,7 +640,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
 		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
-		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
 			flow.OnuId,
 			flow.UniId,
@@ -668,6 +655,12 @@
 	var dhcpFlow openoltpb2.Flow
 	var actionProto *openoltpb2.Action
 	var classifierProto *openoltpb2.Classifier
+	var flowID uint32
+	networkIntfID, err := getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
 
 	// Clear the action map
 	for k := range action {
@@ -682,7 +675,7 @@
 
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
 
-	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
+	flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
 		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -700,7 +693,6 @@
 		log.Error("Error in making action protobuf for ul flow")
 		return
 	}
-	networkIntfID := f.deviceHandler.nniIntfID
 
 	dhcpFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
@@ -718,7 +710,7 @@
 
 	if ok := f.addFlowToDevice(logicalFlow, &dhcpFlow); ok {
 		log.Debug("DHCP UL flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(dhcpFlow.AccessIntfId,
 			dhcpFlow.OnuId,
 			dhcpFlow.UniId,
@@ -732,7 +724,7 @@
 }
 
 // Add EAPOL flow to  device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
 	log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
 
 	uplinkClassifier := make(map[string]interface{})
@@ -741,6 +733,7 @@
 	downlinkAction := make(map[string]interface{})
 	var upstreamFlow openoltpb2.Flow
 	var downstreamFlow openoltpb2.Flow
+	var networkIntfID uint32
 
 	// Fill Classfier
 	uplinkClassifier[EthType] = uint32(EapEthType)
@@ -750,7 +743,7 @@
 	uplinkAction[TrapToHost] = true
 	flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
 	//Add Uplink EAPOL Flow
-	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
+	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 	if err != nil {
 		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 		return
@@ -769,7 +762,12 @@
 		return
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
-	networkIntfID := f.deviceHandler.nniIntfID
+	networkIntfID, err = getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
+
 	upstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
@@ -786,7 +784,7 @@
 	if ok := f.addFlowToDevice(logicalFlow, &upstreamFlow); ok {
 		log.Debug("EAPOL UL flow added to device successfully")
 		flowCategory := "EAPOL"
-		flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(upstreamFlow.AccessIntfId,
 			upstreamFlow.OnuId,
 			upstreamFlow.UniId,
@@ -824,7 +822,7 @@
 		downlinkAction[PushVlan] = true
 		downlinkAction[VlanVid] = vlanID
 		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
-		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
+		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 		if err != nil {
 			log.Errorw("flowId unavailable for DL EAPOL",
 				log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -857,7 +855,7 @@
 		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
 			log.Debug("EAPOL DL flow added to device successfully")
 			flowCategory := ""
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID)
+			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID, logicalFlow.Id)
 			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
 				downstreamFlow.OnuId,
 				downstreamFlow.UniId,
@@ -987,8 +985,8 @@
 	return hash.Uint64()
 }
 
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32) *[]rsrcMgr.FlowInfo {
-	var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
+func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
+	var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie, LogicalFlowID: logicalFlowID}}
 	var intfID uint32
 	/* For flows which trap out of the NNI, the AccessIntfId is invalid
 	   (set to -1). In such cases, we need to refer to the NetworkIntfId .
@@ -999,7 +997,7 @@
 		intfID = uint32(flow.NetworkIntfId)
 	}
 	// Get existing flows matching flowid for given subscriber from KV store
-	existingFlows := f.resourceMgr.GetFlowIDInfo(intfID, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
+	existingFlows := f.resourceMgr.GetFlowIDInfo(intfID, flow.OnuId, flow.UniId, flow.FlowId)
 	if existingFlows != nil {
 		log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
 		//for _, f := range *existingFlows {
@@ -1071,8 +1069,8 @@
 		f.resourceMgr.FreeFlowID(intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
 		return false
 	}
-	log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
 	f.registerFlow(logicalFlow, deviceFlow)
+	log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
 	return true
 }
 
@@ -1140,11 +1138,11 @@
 
 	var networkInterfaceID = IntfIDFromNniPortNum(portNo)
 	var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
-	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), flowStoreCookie); present {
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
 		log.Debug("Flow-exists--not-re-adding")
 		return
 	}
-	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 
 	if err != nil {
 		log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
@@ -1177,7 +1175,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(flow, &downstreamflow); ok {
 		log.Debug("LLDP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, flow.Id)
 		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
 			int32(onuID),
 			int32(uniID),
@@ -1188,7 +1186,7 @@
 	return
 }
 
-func getUniPortPath(intfID uint32, onuID uint32, uniID uint32) string {
+func getUniPortPath(intfID uint32, onuID int32, uniID int32) string {
 	return fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
 }
 
@@ -1221,48 +1219,81 @@
 	return id, Downstream
 }
 
-func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowID uint32, flowDirection string) {
-	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowID": flowID, "flowDirection": flowDirection, "flow": *flow})
-	portNum, ponIntf, onuID, uniID, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
+// FindAndRemoveFlow finds the flow from kv store and makes a call to remove the flow from device
+// returns the flows and gemport the flow is associated with.
+func (f *OpenOltFlowMgr) FindAndRemoveFlow(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32, flowDirection string) ([]rsrcMgr.FlowInfo, int32, uint32) {
+
+	var updatedFlows []rsrcMgr.FlowInfo
+	var gemPortID int32
+	var flowID uint32
+
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(Intf, onuID, uniID)
+FlowFound:
+	for _, flowID = range flowIds {
+		flowInfo := f.resourceMgr.GetFlowIDInfo(Intf, onuID, uniID, flowID)
+		if flowInfo == nil {
+			log.Debugw("No FlowInfo found found in KV store",
+				log.Fields{"Intf": Intf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
+			return nil, 0, flowID
+		}
+		updatedFlows = nil
+		for _, flow := range *flowInfo {
+			updatedFlows = append(updatedFlows, flow)
+		}
+
+		for i, storedFlow := range updatedFlows {
+			if flowDirection == storedFlow.Flow.FlowType && flow.Id == storedFlow.LogicalFlowID {
+				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: flowDirection}
+				if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
+					log.Debug("Flow removed from device successfully")
+					//Remove the Flow from FlowInfo
+					log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
+					updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+					gemPortID = storedFlow.Flow.GemportId
+					break FlowFound
+				} else {
+					log.Error("Failed to remove flow from device")
+					return nil, 0, flowID
+				}
+			}
+		}
+	}
+	return updatedFlows, gemPortID, flowID
+}
+
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
+
+	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
+	var updatedFlows []rsrcMgr.FlowInfo
+	var flowID uint32
+	var onuID, uniID int32
+	classifierInfo := make(map[string]interface{})
+
+	portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
 	if err != nil {
 		log.Error(err)
 		return
 	}
-	log.Debugw("Extracted access info from flow to be deleted",
-		log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
-
-	if ethType == LldpEthType {
-		var networkInterfaceID uint32
-		var onuID = -1
-		var uniID = -1
-
-		networkInterfaceID = IntfIDFromNniPortNum(inPort)
-		f.resourceMgr.FreeFlowID(networkInterfaceID, int32(onuID), int32(uniID), flowID)
-		return
-	}
-
-	flowsInfo := f.resourceMgr.GetFlowIDInfo(ponIntf, onuID, uniID, flowID)
-	if flowsInfo == nil {
-		log.Debugw("No FlowInfo found in KV store",
-			log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
-		return
-	}
-	var updatedFlows []rsrcMgr.FlowInfo
+	onuID = int32(onu)
+	uniID = int32(uni)
 	var gemPortID int32
 
-	for _, flow := range *flowsInfo {
-		updatedFlows = append(updatedFlows, flow)
-	}
-
-	for i, storedFlow := range updatedFlows {
-		if flowDirection == storedFlow.Flow.FlowType {
-			//Remove the Flow from FlowInfo
-			log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
-			gemPortID = storedFlow.Flow.GemportId
-			updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
-			break
+	for _, field := range flows.GetOfbFields(flow) {
+		if field.Type == flows.IP_PROTO {
+			classifierInfo[IPProto] = field.GetIpProto()
+			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
 		}
 	}
+	log.Debugw("Extracted access info from flow to be deleted",
+		log.Fields{"ponIntf": Intf, "onuID": onuID, "uniID": uniID})
+
+	if ethType == LldpEthType || ((classifierInfo[IPProto] == IPProtoDhcp) && (flowDirection == "downstream")) {
+		onuID = -1
+		uniID = -1
+		log.Debug("Trap on nni flow set oni, uni to -1")
+		Intf = IntfIDFromNniPortNum(inPort)
+	}
+	updatedFlows, gemPortID, flowID = f.FindAndRemoveFlow(flow, Intf, onuID, uniID, flowDirection)
 
 	tpID := getTpIDFromFlow(flow)
 
@@ -1271,15 +1302,15 @@
 		// So the flow should not be freed yet.
 		// For ex: Case of HSIA where same flow is shared
 		// between DS and US.
-		f.updateFlowInfoToKVStore(int32(ponIntf), int32(onuID), int32(uniID), flowID, &updatedFlows)
+		f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
 		if len(updatedFlows) == 0 {
-			log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
-			f.resourceMgr.FreeFlowID(ponIntf, int32(onuID), int32(uniID), flowID)
+			log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
+			f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
 
-			uni := getUniPortPath(ponIntf, onuID, uniID)
-			tpPath := f.getTPpath(ponIntf, uni, tpID)
+			uni := getUniPortPath(Intf, onuID, uniID)
+			tpPath := f.getTPpath(Intf, uni, tpID)
 			log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
-			techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(tpID, tpPath)
+			techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(tpID, tpPath)
 			if err != nil { // This should not happen, something wrong in KV backend transaction
 				log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
 				return
@@ -1289,7 +1320,7 @@
 				return
 			}
 
-			gemPK := gemPortKey{ponIntf, uint32(gemPortID)}
+			gemPK := gemPortKey{Intf, uint32(gemPortID)}
 			if f.isGemPortUsedByAnotherFlow(gemPK) {
 				flowIDs := f.flowsUsedByGemPort[gemPK]
 				for i, flowIDinMap := range flowIDs {
@@ -1304,23 +1335,23 @@
 			}
 
 			log.Debugf("Gem port id %d is not used by another flow - releasing the gem port", gemPortID)
-			f.resourceMgr.RemoveGemPortIDForOnu(ponIntf, onuID, uniID, uint32(gemPortID))
+			f.resourceMgr.RemoveGemPortIDForOnu(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
 			// But it is anyway eventually  removed later when the TechProfile is freed, so not a big issue for now.
-			f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), ponIntf)
+			f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), Intf)
 			f.onuIdsLock.Lock()
 			delete(f.flowsUsedByGemPort, gemPK)
-			delete(f.onuGemPortIds, gemPK)
-			f.resourceMgr.FreeGemPortID(ponIntf, onuID, uniID, uint32(gemPortID))
+			//delete(f.onuGemPortIds, gemPK)
+			f.resourceMgr.FreeGemPortID(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			f.onuIdsLock.Unlock()
 
-			ok, _ := f.isTechProfileUsedByAnotherGem(ponIntf, onuID, uniID, techprofileInst, uint32(gemPortID))
+			ok, _ := f.isTechProfileUsedByAnotherGem(Intf, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
 			if !ok {
-				f.resourceMgr.RemoveTechProfileIDForOnu(ponIntf, onuID, uniID, tpID)
-				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: ponIntf, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
-				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: ponIntf, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
-				f.DeleteTechProfileInstance(ponIntf, onuID, uniID, "", tpID)
-				f.resourceMgr.FreeAllocID(ponIntf, onuID, uniID, techprofileInst.UsScheduler.AllocID)
+				f.resourceMgr.RemoveTechProfileIDForOnu(Intf, uint32(onuID), uint32(uniID), tpID)
+				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+				f.DeleteTechProfileInstance(Intf, uint32(onuID), uint32(uniID), "", tpID)
+				f.resourceMgr.FreeAllocID(Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
 				// TODO: Send a "Delete TechProfile" message to ONU to do its own clean up on ONU OMCI stack
 			}
 		}
@@ -1330,37 +1361,28 @@
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
 	log.Debugw("Removing Flow", log.Fields{"flow": flow})
-	var deviceFlowsToRemove []ofp.OfpFlowStats
-	var deletedFlowsIdx []int
-	for _, curFlow := range f.storedDeviceFlows {
-		if curFlow.Cookie == flow.Id {
-			log.Debugw("Found found matching flow-cookie", log.Fields{"curFlow": curFlow})
-			deviceFlowsToRemove = append(deviceFlowsToRemove, curFlow)
-		}
-	}
-	log.Debugw("Flows to be deleted", log.Fields{"deviceFlowsToRemove": deviceFlowsToRemove})
-	for index, curFlow := range deviceFlowsToRemove {
-		id, direction := f.decodeStoredID(curFlow.GetId())
-		removeFlowMessage := openoltpb2.Flow{FlowId: uint32(id), FlowType: direction}
-		if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
-			log.Debug("Flow removed from device successfully")
-			deletedFlowsIdx = append(deletedFlowsIdx, index)
-			f.clearFlowFromResourceManager(flow, uint32(id), direction) //TODO: Take care of the limitations
-		}
+	var direction string
+	actionInfo := make(map[string]interface{})
 
-	}
-	// Can be done in separate go routine as it takes time ?
-	for _, flowToRemove := range deletedFlowsIdx {
-		for index, storedFlow := range f.storedDeviceFlows {
-			if deviceFlowsToRemove[flowToRemove].Cookie == storedFlow.Cookie {
-				log.Debugw("Removing flow from local Store", log.Fields{"flow": storedFlow})
-				f.storedDeviceFlows = append(f.storedDeviceFlows[:index], f.storedDeviceFlows[index+1:]...)
-				break
+	for _, action := range flows.GetActions(flow) {
+		if action.Type == flows.OUTPUT {
+			if out := action.GetOutput(); out != nil {
+				actionInfo[Output] = out.GetPort()
+				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
+			} else {
+				log.Error("Invalid output port in action")
+				return
 			}
 		}
 	}
-	log.Debugw("Flows removed from the data store",
-		log.Fields{"number_of_flows_removed": len(deviceFlowsToRemove), "updated_stored_flows": f.storedDeviceFlows})
+	if IsUpstream(actionInfo[Output].(uint32)) {
+		direction = Upstream
+	} else {
+		direction = Downstream
+	}
+
+	f.clearFlowFromResourceManager(flow, direction) //TODO: Take care of the limitations
+
 	return
 }
 
@@ -1411,6 +1433,7 @@
 	}
 
 	f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
+	f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNo)
 
 	TpID := getTpIDFromFlow(flow)
 
@@ -1456,45 +1479,63 @@
 	return nil
 }
 
-//UpdateOnuInfo function adds onu info to cache
+//UpdateOnuInfo function adds onu info to cache and kvstore
 func (f *OpenOltFlowMgr) UpdateOnuInfo(intfID uint32, onuID uint32, serialNum string) {
-	onu := onuInfo{intfID: intfID, onuID: onuID, serialNumber: serialNum}
-	onuIDkey := onuIDKey{intfID: intfID, onuID: onuID}
-	f.onuIdsLock.Lock()
-	defer f.onuIdsLock.Unlock()
-	f.onuIds[onuIDkey] = onu
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+	f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
+	if err := f.resourceMgr.AddOnuInfo(intfID, onu); err != nil {
+		log.Errorw("failed to add onu info", log.Fields{"onu": onu})
+		return
+	}
 	log.Debugw("Updated onuinfo", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum})
 }
 
-//addGemPortToOnuInfoMap function stores adds GEMport to ONU map
+//addGemPortToOnuInfoMap function adds GEMport to ONU map
 func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(intfID uint32, onuID uint32, gemPort uint32) {
-	onuIDkey := onuIDKey{intfID: intfID, onuID: onuID}
-	f.onuIdsLock.RLock()
-	defer f.onuIdsLock.RUnlock()
-	if val, ok := f.onuIds[onuIDkey]; ok {
-		onuInf := val
-		gemportKey := gemPortKey{intfID: intfID, gemPort: gemPort}
-		f.onuGemPortIds[gemportKey] = onuInf
-		log.Debugw("Cached Gemport to Onuinfo map", log.Fields{"GemPort": gemPort, "intfId": onuInf.intfID, "onuId": onuInf.onuID})
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	onugem := f.onuGemInfo[intfID]
+	// update the gem to the local cache as well as to kv strore
+	for idx, onu := range onugem {
+		if onu.OnuID == onuID {
+			// check if gem already exists , else update the cache and kvstore
+			for _, gem := range onu.GemPorts {
+				if gem == gemPort {
+					log.Debugw("Gem already in cache, no need to update cache and kv store",
+						log.Fields{"gem": gemPort})
+					return
+				}
+			}
+			onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
+			f.onuGemInfo[intfID] = onugem
+		}
+	}
+	err := f.resourceMgr.AddGemToOnuGemInfo(intfID, onuID, gemPort)
+	if err != nil {
+		log.Errorw("Failed to add gem to onu", log.Fields{"intfId": intfID, "onuId": onuID, "gemPort": gemPort})
 		return
 	}
-	log.Errorw("OnuInfo not found", log.Fields{"intfId": intfID, "onuId": onuID, "gemPort": gemPort})
 }
 
 // This function Lookup maps  by serialNumber or (intfId, gemPort)
 
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
 func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(serialNumber string, intfID uint32, gemPortID uint32) (uint32, error) {
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+
 	log.Debugw("Getting ONU ID from GEM port and PON port", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPortId": gemPortID})
-	if serialNumber != "" {
-		if onuInf, ok := f.onuSerialNumbers[serialNumber]; ok {
-			return onuInf.onuID, nil
-		}
-	} else {
-		gemportKey := gemPortKey{intfID: intfID, gemPort: gemPortID}
-		if onuInf, ok := f.onuGemPortIds[gemportKey]; ok {
-			log.Debugw("Retrieved onu info from access", log.Fields{"intfId": intfID, "gemPortId": gemPortID, "onuId": onuInf.onuID})
-			return onuInf.onuID, nil
+	// get onuid from the onugem info cache
+	onugem := f.onuGemInfo[intfID]
+	for _, onu := range onugem {
+		for _, gem := range onu.GemPorts {
+			if gem == gemPortID {
+				return onu.OnuID, nil
+			}
 		}
 	}
 	log.Errorw("onuid is not found", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPort": gemPortID})
@@ -1520,8 +1561,7 @@
 			logicalPortNum = MkUniPortNum(packetIn.IntfId, onuID, uniID)
 		}
 		// Store the gem port through which the packet_in came. Use the same gem port for packet_out
-		pktInkey := packetInInfoKey{intfID: packetIn.IntfId, onuID: onuID, logicalPort: logicalPortNum}
-		f.packetInGemPort[pktInkey] = packetIn.GemportId
+		f.UpdateGemPortForPktIn(packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
 	} else if packetIn.IntfType == "nni" {
 		logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
 	}
@@ -1533,14 +1573,28 @@
 func (f *OpenOltFlowMgr) GetPacketOutGemPortID(intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
 	var gemPortID uint32
 	var err error
-	key := packetInInfoKey{intfID: intfID, onuID: onuID, logicalPort: portNum}
-	if val, ok := f.packetInGemPort[key]; ok {
-		gemPortID = val
-	} else {
-		log.Errorw("Key-Error while fetching packet-out GEM port", log.Fields{"key": key})
-		err = errors.New("key-error while fetching packet-out GEM port")
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
+
+	gemPortID, ok := f.packetInGemPort[pktInkey]
+	if ok {
+		log.Debugw("Found gemport for pktin key", log.Fields{"pktinkey": pktInkey, "gem": gemPortID})
+		return gemPortID, err
 	}
-	return gemPortID, err
+	//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
+	gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(intfID, onuID, portNum)
+	if err == nil {
+		if gemPortID != 0 {
+			f.packetInGemPort[pktInkey] = gemPortID
+			log.Debugw("Found gem port from kv store and updating cache with gemport",
+				log.Fields{"pktinkey": pktInkey, "gem": gemPortID})
+			return gemPortID, nil
+		}
+	}
+	log.Errorw("Failed to get gemport", log.Fields{"pktinkey": pktInkey, "gem": gemPortID})
+	return uint32(0), err
 }
 
 func installFlowOnAllGemports(
@@ -1548,7 +1602,8 @@
 		portNo uint32, classifier map[string]interface{}, action map[string]interface{},
 		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
 	f2 func(intfId uint32, onuId uint32, uniId uint32, portNo uint32,
-		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32),
+		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
+		classifier map[string]interface{}, action map[string]interface{}),
 	args map[string]uint32,
 	classifier map[string]interface{}, action map[string]interface{},
 	logicalFlow *ofp.OfpFlowStats,
@@ -1560,7 +1615,7 @@
 		if FlowType == HsiaFlow || FlowType == DhcpFlow {
 			f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
 		} else if FlowType == EapolFlow {
-			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0])
+			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0], classifier, action)
 		} else {
 			log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
 			return
@@ -1573,6 +1628,8 @@
 	action := make(map[string]interface{})
 	classifier[PacketTagType] = DoubleTag
 	action[TrapToHost] = true
+	var err error
+	var networkInterfaceID uint32
 	/* We manage flowId resource pool on per PON port basis.
 	   Since this situation is tricky, as a hack, we pass the NNI port
 	   index (network_intf_id) as PON port Index for the flowId resource
@@ -1588,13 +1645,18 @@
 	uniID := -1
 	gemPortID := -1
 	allocID := -1
-	networkInterfaceID := f.deviceHandler.nniIntfID
+	networkInterfaceID, err = getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
+
 	flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
-	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), flowStoreCookie); present {
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
 		log.Debug("Flow-exists--not-re-adding")
 		return
 	}
-	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 	if err != nil {
 		log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
 		return
@@ -1626,7 +1688,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
 		log.Debug("DHCP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
 			int32(onuID),
 			int32(uniID),
@@ -1694,7 +1756,7 @@
 					tp_pb.Direction_UPSTREAM,
 					pcp.(uint32))
 
-				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID)
+				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID, classifierInfo, actionInfo)
 			} else {
 				installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
 			}
@@ -1913,3 +1975,60 @@
 	}
 	return append(slice, item)
 }
+
+// getNniIntfID gets nni intf id from the flow classifier/action
+func getNniIntfID(classifier map[string]interface{}, action map[string]interface{}) (uint32, error) {
+
+	portType := IntfIDToPortTypeName(classifier[InPort].(uint32))
+	if portType == voltha.Port_PON_OLT {
+		intfID := IntfIDFromNniPortNum(action[Output].(uint32))
+		log.Debugw("output Nni IntfID is", log.Fields{"intfid": intfID})
+		return intfID, nil
+	} else if portType == voltha.Port_ETHERNET_NNI {
+		intfID := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+		log.Debugw("input Nni IntfID is", log.Fields{"intfid": intfID})
+		return intfID, nil
+	}
+	return uint32(0), nil
+}
+
+// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
+func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	_, ok := f.packetInGemPort[pktInkey]
+	if ok {
+		log.Debugw("pktin key found in cache , no need to update kv as we are assuming both will be in sync",
+			log.Fields{"pktinkey": pktInkey, "gem": gemPort})
+	} else {
+		f.packetInGemPort[pktInkey] = gemPort
+
+		f.resourceMgr.UpdateGemPortForPktIn(pktInkey, gemPort)
+		log.Debugw("pktin key not found in local cache updating cache and kv store", log.Fields{"pktinkey": pktInkey, "gem": gemPort})
+	}
+	return
+}
+
+// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
+func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(intfID uint32, onuID uint32, portNum uint32) {
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	onugem := f.onuGemInfo[intfID]
+	for idx, onu := range onugem {
+		if onu.OnuID == onuID {
+			for _, uni := range onu.UniPorts {
+				if uni == portNum {
+					log.Debugw("uni already in cache, no need to update cache and kv store",
+						log.Fields{"uni": portNum})
+					return
+				}
+			}
+			onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
+			f.onuGemInfo[intfID] = onugem
+		}
+	}
+	f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNum)
+}