VOL-1904 openolt adapter reconcile. along with the local caches data is stored in kv store as well.
child device cache is not removed as it is used to get child devices,
it will anyway be rebuilt even after adapter restart.
One more cache (flowsUsedByGemPort) got reintroduced recently which will be fixed later.
Updated test files as well.

Change-Id: I43378ff682f29477b22d61a76b0a721e83422853
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index a97a37d..26b62b3 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -76,7 +76,6 @@
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 	discOnus      map[string]bool
 	onus          map[string]*OnuDevice
-	nniIntfID     int
 	portStats     *OpenOltStatisticsMgr
 	metrics       *pmmetrics.PmMetrics
 	stopCollector chan bool
@@ -133,10 +132,6 @@
 	dh.discOnus = make(map[string]bool)
 	dh.lockDevice = sync.RWMutex{}
 	dh.onus = make(map[string]*OnuDevice)
-	// The nniIntfID is initialized to -1 (invalid) and set to right value
-	// when the first IntfOperInd with status as "up" is received for
-	// any one of the available NNI port on the OLT device.
-	dh.nniIntfID = -1
 	dh.stopCollector = make(chan bool, 2)
 	dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
 	//TODO initialize the support classes.
@@ -255,12 +250,6 @@
 	if err := dh.coreProxy.PortCreated(context.TODO(), dh.device.Id, port); err != nil {
 		log.Errorw("error-creating-nni-port", log.Fields{"deviceID": dh.device.Id, "portType": portType, "error": err})
 	}
-
-	// Once we have successfully added the NNI port to the core, if the
-	// locally cached nniIntfID is set to invalid (-1), set it to the right value.
-	if portType == voltha.Port_ETHERNET_NNI && dh.nniIntfID == -1 {
-		dh.nniIntfID = int(intfID)
-	}
 }
 
 // readIndications to read the indications from the OLT device
@@ -293,7 +282,13 @@
 	for {
 		indication, err := indications.Recv()
 		if err == io.EOF {
-			break
+			log.Infow("EOF for  indications", log.Fields{"err": err})
+			indications, err = dh.Client.EnableIndication(context.Background(), new(oop.Empty))
+			if err != nil {
+				log.Errorw("Failed to read indications", log.Fields{"err": err})
+				return
+			}
+			continue
 		}
 		if err != nil {
 			log.Infow("Failed to read from indications", log.Fields{"err": err})
@@ -343,6 +338,7 @@
 		intfOperInd := indication.GetIntfOperInd()
 		if intfOperInd.GetType() == "nni" {
 			go dh.addPort(intfOperInd.GetIntfId(), voltha.Port_ETHERNET_NNI, intfOperInd.GetOperState())
+			dh.resourceMgr.AddNNIToKVStore(intfOperInd.GetIntfId())
 		} else if intfOperInd.GetType() == "pon" {
 			// TODO: Check what needs to be handled here for When PON PORT down, ONU will be down
 			// Handle pon port update
@@ -1165,37 +1161,39 @@
 	return nil
 }
 
-func (dh *DeviceHandler) clearUNIData(onu *OnuDevice) error {
+func (dh *DeviceHandler) clearUNIData(onu *rsrcMgr.OnuGemInfo) error {
 	var uniID uint32
 	var err error
-	for port := range onu.uniPorts {
-		delete(onu.uniPorts, port)
-		uniID = UniIDFromPortNum(port)
+	for _, port := range onu.UniPorts {
+		uniID = UniIDFromPortNum(uint32(port))
 		log.Debugw("clearing-resource-data-for-uni-port", log.Fields{"port": port, "uniID": uniID})
 		/* Delete tech-profile instance from the KV store */
-		if err = dh.flowMgr.DeleteTechProfileInstances(onu.intfID, onu.onuID, uniID, onu.serialNumber); err != nil {
-			log.Debugw("Failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.onuID})
+		if err = dh.flowMgr.DeleteTechProfileInstances(onu.IntfID, onu.OnuID, uniID, onu.SerialNumber); err != nil {
+			log.Debugw("Failed-to-remove-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
 		}
-		log.Debugw("Deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.onuID})
-		flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(onu.intfID, onu.onuID, uniID)
+		log.Debugw("Deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
+		flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(onu.IntfID, int32(onu.OnuID), int32(uniID))
 		for _, flowID := range flowIDs {
-			dh.resourceMgr.FreeFlowID(onu.intfID, int32(onu.onuID), int32(uniID), flowID)
+			dh.resourceMgr.FreeFlowID(onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
 		}
-		dh.resourceMgr.FreePONResourcesForONU(onu.intfID, onu.onuID, uniID)
-		if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(onu.intfID, onu.onuID, uniID); err != nil {
-			log.Debugw("Failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.onuID})
-		}
-		log.Debugw("Removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.onuID})
-		tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(onu.intfID, onu.onuID, uniID)
+		tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(onu.IntfID, onu.OnuID, uniID)
 		for _, tpID := range tpIDList {
-			if err = dh.resourceMgr.RemoveMeterIDForOnu("upstream", onu.intfID, onu.onuID, uniID, tpID); err != nil {
-				log.Debugw("Failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.onuID})
+			if err = dh.resourceMgr.RemoveMeterIDForOnu("upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+				log.Debugw("Failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
 			}
-			log.Debugw("Removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.onuID})
-			if err = dh.resourceMgr.RemoveMeterIDForOnu("downstream", onu.intfID, onu.onuID, uniID, tpID); err != nil {
-				log.Debugw("Failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.onuID})
+			log.Debugw("Removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
+			if err = dh.resourceMgr.RemoveMeterIDForOnu("downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+				log.Debugw("Failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
 			}
-			log.Debugw("Removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.onuID})
+			log.Debugw("Removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
+		}
+		dh.resourceMgr.FreePONResourcesForONU(onu.IntfID, onu.OnuID, uniID)
+		if err = dh.resourceMgr.RemoveTechProfileIDsForOnu(onu.IntfID, onu.OnuID, uniID); err != nil {
+			log.Debugw("Failed-to-remove-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
+		}
+		log.Debugw("Removed-tech-profile-id-for-onu", log.Fields{"onu-id": onu.OnuID})
+		if err = dh.resourceMgr.DelGemPortPktIn(onu.IntfID, onu.OnuID, uint32(port)); err != nil {
+			log.Debugw("Failed-to-remove-gemport-pkt-in", log.Fields{"intfid": onu.IntfID, "onuid": onu.OnuID, "uniId": uniID})
 		}
 	}
 	return nil
@@ -1204,26 +1202,29 @@
 func (dh *DeviceHandler) clearNNIData() error {
 	nniUniID := -1
 	nniOnuID := -1
+
 	if dh.resourceMgr == nil {
 		return fmt.Errorf("no resource manager for deviceID %s", dh.deviceID)
 	}
-	flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(uint32(dh.nniIntfID), uint32(nniOnuID), uint32(nniUniID))
-	log.Debugw("Current flow ids for nni", log.Fields{"flow-ids": flowIDs})
-	for _, flowID := range flowIDs {
-		dh.resourceMgr.FreeFlowID(uint32(dh.nniIntfID), -1, -1, uint32(flowID))
-	}
 	//Free the flow-ids for the NNI port
-	dh.resourceMgr.FreePONResourcesForONU(uint32(dh.nniIntfID), uint32(nniOnuID), uint32(nniUniID))
-	/* Free ONU IDs for each pon port
-	   intfIDToONUIds is a map of intf-id: [onu-ids]*/
-	intfIDToONUIds := make(map[uint32][]uint32)
-	for _, onu := range dh.onus {
-		intfIDToONUIds[onu.intfID] = append(intfIDToONUIds[onu.intfID], onu.onuID)
+	nni, err := dh.resourceMgr.GetNNIFromKVStore()
+	if err != nil {
+		log.Error("Failed to fetch nni from kv store")
+		return err
 	}
-	for intfID, onuIds := range intfIDToONUIds {
-		dh.resourceMgr.FreeonuID(intfID, onuIds)
+	log.Debugw("NNI are ", log.Fields{"nni": nni})
+	for _, nniIntfID := range nni {
+		flowIDs := dh.resourceMgr.GetCurrentFlowIDsForOnu(uint32(nniIntfID), int32(nniOnuID), int32(nniUniID))
+		log.Debugw("Current flow ids for nni", log.Fields{"flow-ids": flowIDs})
+		for _, flowID := range flowIDs {
+			dh.resourceMgr.FreeFlowID(uint32(nniIntfID), -1, -1, uint32(flowID))
+		}
 	}
-	return nil
+	if err = dh.resourceMgr.DelNNiFromKVStore(); err != nil {
+		log.Error("Failed to clear nni from kv store")
+		return err
+	}
+	return err
 }
 
 // DeleteDevice deletes the device instance from openolt handler array.  Also clears allocated resource manager resources.  Also reboots the OLT hardware!
@@ -1241,16 +1242,32 @@
 	   other pon resources like alloc_id and gemport_id
 	*/
 	if dh.resourceMgr != nil {
-		for _, onu := range dh.onus {
-			if err := dh.clearUNIData(onu); err != nil {
-				log.Debugw("Failed to clear data for onu", log.Fields{"onu-device": onu})
+		noOfPonPorts := dh.resourceMgr.DevInfo.GetPonPorts()
+		var ponPort uint32
+		for ponPort = 0; ponPort < noOfPonPorts; ponPort++ {
+			var onuGemData []rsrcMgr.OnuGemInfo
+			err := dh.resourceMgr.ResourceMgrs[ponPort].GetOnuGemInfo(ponPort, &onuGemData)
+			if err != nil {
+				log.Errorw("Failed to get onu info for port ", log.Fields{"ponport": ponPort})
+				return err
+			}
+			for _, onu := range onuGemData {
+				log.Debugw("onu data ", log.Fields{"onu": onu})
+				if err = dh.clearUNIData(&onu); err != nil {
+					log.Errorw("Failed to clear data for onu", log.Fields{"onu-device": onu})
+				}
+			}
+			onuGemData = nil
+			err = dh.resourceMgr.DelOnuGemInfoForIntf(ponPort)
+			if err != nil {
+				log.Errorw("Failed to update onugem info", log.Fields{"intfid": ponPort, "onugeminfo": onuGemData})
 			}
 		}
 		/* Clear the flows from KV store associated with NNI port.
 		   There are mostly trap rules from NNI port (like LLDP)
 		*/
 		if err := dh.clearNNIData(); err != nil {
-			log.Debugw("Failed to clear data for NNI port", log.Fields{"deviceID": dh.deviceID})
+			log.Errorw("Failed to clear data for NNI port", log.Fields{"device-id": dh.deviceID})
 		}
 
 		/* Clear the resource pool for each PON port in the background */
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 6f52609..0e26d29 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -28,8 +28,10 @@
 
 	"github.com/golang/protobuf/ptypes"
 	"github.com/golang/protobuf/ptypes/any"
+	"github.com/opencord/voltha-lib-go/v2/pkg/db"
 	fu "github.com/opencord/voltha-lib-go/v2/pkg/flows"
 	"github.com/opencord/voltha-lib-go/v2/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v2/pkg/ponresourcemanager"
 	"github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
 	"github.com/opencord/voltha-openolt-adapter/mocks"
 	ic "github.com/opencord/voltha-protos/v2/go/inter_container"
@@ -149,9 +151,38 @@
 	ep := &mocks.MockEventProxy{}
 	openOLT := &OpenOLT{coreProxy: cp, adapterProxy: ap, eventProxy: ep}
 	dh := NewDeviceHandler(cp, ap, ep, device, openOLT)
-	dh.nniIntfID = 1
 	deviceInf := &oop.DeviceInfo{Vendor: "openolt", Ranges: nil, Model: "openolt", DeviceId: dh.deviceID}
-	dh.resourceMgr = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.deviceID, DeviceType: dh.deviceType, DevInfo: deviceInf}
+	dh.resourceMgr = &resourcemanager.OpenOltResourceMgr{DeviceID: dh.deviceID, DeviceType: dh.deviceType, DevInfo: deviceInf,
+		KVStore: &db.Backend{
+			Client: &mocks.MockKVClient{},
+		}}
+	dh.resourceMgr.ResourceMgrs = make(map[uint32]*ponrmgr.PONResourceManager)
+	ranges := make(map[string]interface{})
+	sharedIdxByType := make(map[string]string)
+	sharedIdxByType["ALLOC_ID"] = "ALLOC_ID"
+	sharedIdxByType["ONU_ID"] = "ONU_ID"
+	sharedIdxByType["GEMPORT_ID"] = "GEMPORT_ID"
+	sharedIdxByType["FLOW_ID"] = "FLOW_ID"
+	ranges["ONU_ID"] = uint32(0)
+	ranges["GEMPORT_ID"] = uint32(0)
+	ranges["ALLOC_ID"] = uint32(0)
+	ranges["FLOW_ID"] = uint32(0)
+	ranges["onu_id_shared"] = uint32(0)
+	ranges["alloc_id_shared"] = uint32(0)
+	ranges["gemport_id_shared"] = uint32(0)
+	ranges["flow_id_shared"] = uint32(0)
+
+	ponmgr := &ponrmgr.PONResourceManager{
+		DeviceID: "onu-1",
+		IntfIDs:  []uint32{1, 2},
+		KVStore: &db.Backend{
+			Client: &mocks.MockKVClient{},
+		},
+		PonResourceRanges: ranges,
+		SharedIdxByType:   sharedIdxByType,
+	}
+	dh.resourceMgr.ResourceMgrs[1] = ponmgr
+	dh.resourceMgr.ResourceMgrs[2] = ponmgr
 	dh.flowMgr = NewFlowManager(dh, dh.resourceMgr)
 	dh.Client = &mocks.MockOpenoltClient{}
 	dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}}
diff --git a/adaptercore/openolt.go b/adaptercore/openolt.go
index 603f75e..9b5b5ac 100644
--- a/adaptercore/openolt.go
+++ b/adaptercore/openolt.go
@@ -197,7 +197,19 @@
 
 //Reconcile_device unimplemented
 func (oo *OpenOLT) Reconcile_device(device *voltha.Device) error {
-	return errors.New("unImplemented")
+	if device == nil {
+		log.Warn("device-is-nil")
+		return errors.New("nil-device")
+	}
+	log.Infow("reconcile-device", log.Fields{"deviceId": device.Id})
+	var handler *DeviceHandler
+	if handler = oo.getDeviceHandler(device.Id); handler == nil {
+		handler := NewDeviceHandler(oo.coreProxy, oo.adapterProxy, oo.eventProxy, device, oo)
+		oo.addDeviceHandlerToMap(handler)
+		handler.transitionMap = NewTransitionMap(handler)
+		handler.transitionMap.Handle(DeviceInit)
+	}
+	return nil
 }
 
 //Abandon_device unimplemented
diff --git a/adaptercore/openolt_flowmgr.go b/adaptercore/openolt_flowmgr.go
index 6bee5a2..9eddc44 100644
--- a/adaptercore/openolt_flowmgr.go
+++ b/adaptercore/openolt_flowmgr.go
@@ -148,28 +148,11 @@
 	AllocID = "allocId"
 )
 
-type onuInfo struct {
-	intfID       uint32
-	onuID        uint32
-	serialNumber string
-}
-
-type onuIDKey struct {
-	intfID uint32
-	onuID  uint32
-}
-
 type gemPortKey struct {
 	intfID  uint32
 	gemPort uint32
 }
 
-type packetInInfoKey struct {
-	intfID      uint32
-	onuID       uint32
-	logicalPort uint32
-}
-
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -187,32 +170,39 @@
 	techprofile        []tp.TechProfileIf
 	deviceHandler      *DeviceHandler
 	resourceMgr        *rsrcMgr.OpenOltResourceMgr
-	onuIds             map[onuIDKey]onuInfo       //OnuId -> OnuInfo
-	onuSerialNumbers   map[string]onuInfo         //onu serial_number (string) -> OnuInfo
-	onuGemPortIds      map[gemPortKey]onuInfo     //GemPortId -> OnuInfo
-	packetInGemPort    map[packetInInfoKey]uint32 //packet in gem port
-	storedDeviceFlows  []ofp.OfpFlowStats         /* Required during deletion to obtain device flows from logical flows */
 	onuIdsLock         sync.RWMutex
-	flowsUsedByGemPort map[gemPortKey][]uint32 //gem port id to flow ids
+	flowsUsedByGemPort map[gemPortKey][]uint32            //gem port id to flow ids
+	packetInGemPort    map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
+	onuGemInfo         map[uint32][]rsrcMgr.OnuGemInfo    //onu, gem and uni info local cache
+	lockCache          sync.RWMutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
-func NewFlowManager(dh *DeviceHandler, rsrcMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
+func NewFlowManager(dh *DeviceHandler, rMgr *rsrcMgr.OpenOltResourceMgr) *OpenOltFlowMgr {
 	log.Info("Initializing flow manager")
 	var flowMgr OpenOltFlowMgr
+	var err error
+	var idx uint32
+
 	flowMgr.deviceHandler = dh
-	flowMgr.resourceMgr = rsrcMgr
+	flowMgr.resourceMgr = rMgr
 	flowMgr.techprofile = make([]tp.TechProfileIf, MaxPonPorts)
-	if err := flowMgr.populateTechProfilePerPonPort(); err != nil {
+	if err = flowMgr.populateTechProfilePerPonPort(); err != nil {
 		log.Error("Error while populating tech profile mgr\n")
 		return nil
 	}
-	flowMgr.onuIds = make(map[onuIDKey]onuInfo)
-	flowMgr.onuSerialNumbers = make(map[string]onuInfo)
-	flowMgr.onuGemPortIds = make(map[gemPortKey]onuInfo)
-	flowMgr.packetInGemPort = make(map[packetInInfoKey]uint32)
-	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
 	flowMgr.onuIdsLock = sync.RWMutex{}
+	flowMgr.flowsUsedByGemPort = make(map[gemPortKey][]uint32)
+	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
+	flowMgr.onuGemInfo = make(map[uint32][]rsrcMgr.OnuGemInfo)
+	ponPorts := rMgr.DevInfo.GetPonPorts()
+	//Load the onugem info cache from kv store on flowmanager start
+	for idx = 0; idx < ponPorts; idx++ {
+		if flowMgr.onuGemInfo[idx], err = rMgr.GetOnuGemInfo(idx); err != nil {
+			log.Error("Failed to load onu gem info cache")
+		}
+	}
+	flowMgr.lockCache = sync.RWMutex{}
 	log.Info("Initialization of  flow manager success!!")
 	return &flowMgr
 }
@@ -233,13 +223,6 @@
 func (f *OpenOltFlowMgr) registerFlow(flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) {
 	log.Debug("Registering Flow for Device ", log.Fields{"flow": flowFromCore},
 		log.Fields{"device": f.deviceHandler.deviceID})
-
-	var storedFlow ofp.OfpFlowStats
-	storedFlow.Id, _ = f.generateStoredFlowID(deviceFlow.FlowId, deviceFlow.FlowType)
-	log.Debug(fmt.Sprintf("Generated stored device flow. id = %d, flowId = %d, direction = %s", storedFlow.Id,
-		deviceFlow.FlowId, deviceFlow.FlowType))
-	storedFlow.Cookie = flowFromCore.Id
-	f.storedDeviceFlows = append(f.storedDeviceFlows, storedFlow)
 	gemPK := gemPortKey{uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId)}
 	flowIDList, ok := f.flowsUsedByGemPort[gemPK]
 	if !ok {
@@ -247,7 +230,6 @@
 	}
 	flowIDList = appendUnique(flowIDList, deviceFlow.FlowId)
 	f.flowsUsedByGemPort[gemPK] = flowIDList
-	log.Debugw("updated Stored flow info", log.Fields{"storedDeviceFlows": f.storedDeviceFlows})
 }
 
 func (f *OpenOltFlowMgr) divideAndAddFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32,
@@ -267,7 +249,7 @@
 		return
 	}
 
-	uni := getUniPortPath(intfID, onuID, uniID)
+	uni := getUniPortPath(intfID, int32(onuID), int32(uniID))
 	log.Debugw("Uni port name", log.Fields{"uni": uni})
 	allocID, gemPorts, TpInst = f.createTcontGemports(intfID, onuID, uniID, uni, portNo, TpID, UsMeterID, DsMeterID, flowMetadata)
 	if allocID == 0 || gemPorts == nil || TpInst == nil {
@@ -604,6 +586,7 @@
 func (f *OpenOltFlowMgr) addHSIAFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, classifier map[string]interface{},
 	action map[string]interface{}, direction string, logicalFlow *ofp.OfpFlowStats,
 	allocID uint32, gemPortID uint32) {
+	var networkIntfID uint32
 	/* One of the OLT platform (Broadcom BAL) requires that symmetric
 	   flows require the same flow_id to be used across UL and DL.
 	   Since HSIA flow is the only symmetric flow currently, we need to
@@ -620,7 +603,7 @@
 		log.Debugw("Found pbit in the flow", log.Fields{"VlanPbit": vlanPbit})
 	}
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
-	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
+	flowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, HsiaFlow, vlanPbit)
 	if err != nil {
 		log.Errorw("Flow id unavailable for HSIA flow", log.Fields{"direction": direction})
 		return
@@ -637,7 +620,11 @@
 		return
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
-	networkIntfID := f.deviceHandler.nniIntfID
+	networkIntfID, err = getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
 	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
@@ -653,7 +640,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(logicalFlow, &flow); ok {
 		log.Debug("HSIA flow added to device successfully", log.Fields{"direction": direction})
-		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&flow, flowStoreCookie, HsiaFlow, flowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(flow.AccessIntfId,
 			flow.OnuId,
 			flow.UniId,
@@ -668,6 +655,12 @@
 	var dhcpFlow openoltpb2.Flow
 	var actionProto *openoltpb2.Action
 	var classifierProto *openoltpb2.Classifier
+	var flowID uint32
+	networkIntfID, err := getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
 
 	// Clear the action map
 	for k := range action {
@@ -682,7 +675,7 @@
 
 	flowStoreCookie := getFlowStoreCookie(classifier, gemPortID)
 
-	flowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
+	flowID, err = f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0 /*classifier[VLAN_PCP].(uint32)*/)
 
 	if err != nil {
 		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -700,7 +693,6 @@
 		log.Error("Error in making action protobuf for ul flow")
 		return
 	}
-	networkIntfID := f.deviceHandler.nniIntfID
 
 	dhcpFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
@@ -718,7 +710,7 @@
 
 	if ok := f.addFlowToDevice(logicalFlow, &dhcpFlow); ok {
 		log.Debug("DHCP UL flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&dhcpFlow, flowStoreCookie, "DHCP", flowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(dhcpFlow.AccessIntfId,
 			dhcpFlow.OnuId,
 			dhcpFlow.UniId,
@@ -732,7 +724,7 @@
 }
 
 // Add EAPOL flow to  device with mac, vlanId as classifier for upstream and downstream
-func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32) {
+func (f *OpenOltFlowMgr) addEAPOLFlow(intfID uint32, onuID uint32, uniID uint32, portNo uint32, logicalFlow *ofp.OfpFlowStats, allocID uint32, gemPortID uint32, vlanID uint32, classifier map[string]interface{}, action map[string]interface{}) {
 	log.Debugw("Adding EAPOL to device", log.Fields{"intfId": intfID, "onuId": onuID, "portNo": portNo, "allocId": allocID, "gemPortId": gemPortID, "vlanId": vlanID, "flow": logicalFlow})
 
 	uplinkClassifier := make(map[string]interface{})
@@ -741,6 +733,7 @@
 	downlinkAction := make(map[string]interface{})
 	var upstreamFlow openoltpb2.Flow
 	var downstreamFlow openoltpb2.Flow
+	var networkIntfID uint32
 
 	// Fill Classfier
 	uplinkClassifier[EthType] = uint32(EapEthType)
@@ -750,7 +743,7 @@
 	uplinkAction[TrapToHost] = true
 	flowStoreCookie := getFlowStoreCookie(uplinkClassifier, gemPortID)
 	//Add Uplink EAPOL Flow
-	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
+	uplinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 	if err != nil {
 		log.Errorw("flowId unavailable for UL EAPOL", log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
 		return
@@ -769,7 +762,12 @@
 		return
 	}
 	log.Debugw("Created action proto", log.Fields{"action": *actionProto})
-	networkIntfID := f.deviceHandler.nniIntfID
+	networkIntfID, err = getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
+
 	upstreamFlow = openoltpb2.Flow{AccessIntfId: int32(intfID),
 		OnuId:         int32(onuID),
 		UniId:         int32(uniID),
@@ -786,7 +784,7 @@
 	if ok := f.addFlowToDevice(logicalFlow, &upstreamFlow); ok {
 		log.Debug("EAPOL UL flow added to device successfully")
 		flowCategory := "EAPOL"
-		flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&upstreamFlow, flowStoreCookie, flowCategory, uplinkFlowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(upstreamFlow.AccessIntfId,
 			upstreamFlow.OnuId,
 			upstreamFlow.UniId,
@@ -824,7 +822,7 @@
 		downlinkAction[PushVlan] = true
 		downlinkAction[VlanVid] = vlanID
 		flowStoreCookie := getFlowStoreCookie(downlinkClassifier, gemPortID)
-		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, onuID, uniID, gemPortID, flowStoreCookie, "", 0)
+		downlinkFlowID, err := f.resourceMgr.GetFlowID(intfID, int32(onuID), int32(uniID), gemPortID, flowStoreCookie, "", 0)
 		if err != nil {
 			log.Errorw("flowId unavailable for DL EAPOL",
 				log.Fields{"intfId": intfID, "onuId": onuID, "flowStoreCookie": flowStoreCookie})
@@ -857,7 +855,7 @@
 		if ok := f.addFlowToDevice(logicalFlow, &downstreamFlow); ok {
 			log.Debug("EAPOL DL flow added to device successfully")
 			flowCategory := ""
-			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID)
+			flowsToKVStore := f.getUpdatedFlowInfo(&downstreamFlow, flowStoreCookie, flowCategory, downlinkFlowID, logicalFlow.Id)
 			if err := f.updateFlowInfoToKVStore(downstreamFlow.AccessIntfId,
 				downstreamFlow.OnuId,
 				downstreamFlow.UniId,
@@ -987,8 +985,8 @@
 	return hash.Uint64()
 }
 
-func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32) *[]rsrcMgr.FlowInfo {
-	var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie}}
+func (f *OpenOltFlowMgr) getUpdatedFlowInfo(flow *openoltpb2.Flow, flowStoreCookie uint64, flowCategory string, deviceFlowID uint32, logicalFlowID uint64) *[]rsrcMgr.FlowInfo {
+	var flows = []rsrcMgr.FlowInfo{{Flow: flow, FlowCategory: flowCategory, FlowStoreCookie: flowStoreCookie, LogicalFlowID: logicalFlowID}}
 	var intfID uint32
 	/* For flows which trap out of the NNI, the AccessIntfId is invalid
 	   (set to -1). In such cases, we need to refer to the NetworkIntfId .
@@ -999,7 +997,7 @@
 		intfID = uint32(flow.NetworkIntfId)
 	}
 	// Get existing flows matching flowid for given subscriber from KV store
-	existingFlows := f.resourceMgr.GetFlowIDInfo(intfID, uint32(flow.OnuId), uint32(flow.UniId), flow.FlowId)
+	existingFlows := f.resourceMgr.GetFlowIDInfo(intfID, flow.OnuId, flow.UniId, flow.FlowId)
 	if existingFlows != nil {
 		log.Debugw("Flow exists for given flowID, appending it to current flow", log.Fields{"flowID": flow.FlowId})
 		//for _, f := range *existingFlows {
@@ -1071,8 +1069,8 @@
 		f.resourceMgr.FreeFlowID(intfID, deviceFlow.OnuId, deviceFlow.UniId, deviceFlow.FlowId)
 		return false
 	}
-	log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
 	f.registerFlow(logicalFlow, deviceFlow)
+	log.Debugw("Flow added to device successfully ", log.Fields{"flow": *deviceFlow})
 	return true
 }
 
@@ -1140,11 +1138,11 @@
 
 	var networkInterfaceID = IntfIDFromNniPortNum(portNo)
 	var flowStoreCookie = getFlowStoreCookie(classifierInfo, uint32(0))
-	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), flowStoreCookie); present {
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
 		log.Debug("Flow-exists--not-re-adding")
 		return
 	}
-	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 
 	if err != nil {
 		log.Errorw("Flow id unavailable for LLDP traponNNI flow", log.Fields{"error": err})
@@ -1177,7 +1175,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(flow, &downstreamflow); ok {
 		log.Debug("LLDP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, flow.Id)
 		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
 			int32(onuID),
 			int32(uniID),
@@ -1188,7 +1186,7 @@
 	return
 }
 
-func getUniPortPath(intfID uint32, onuID uint32, uniID uint32) string {
+func getUniPortPath(intfID uint32, onuID int32, uniID int32) string {
 	return fmt.Sprintf("pon-{%d}/onu-{%d}/uni-{%d}", intfID, onuID, uniID)
 }
 
@@ -1221,48 +1219,81 @@
 	return id, Downstream
 }
 
-func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowID uint32, flowDirection string) {
-	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowID": flowID, "flowDirection": flowDirection, "flow": *flow})
-	portNum, ponIntf, onuID, uniID, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
+// FindAndRemoveFlow finds the flow from kv store and makes a call to remove the flow from device
+// returns the flows and gemport the flow is associated with.
+func (f *OpenOltFlowMgr) FindAndRemoveFlow(flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32, flowDirection string) ([]rsrcMgr.FlowInfo, int32, uint32) {
+
+	var updatedFlows []rsrcMgr.FlowInfo
+	var gemPortID int32
+	var flowID uint32
+
+	flowIds := f.resourceMgr.GetCurrentFlowIDsForOnu(Intf, onuID, uniID)
+FlowFound:
+	for _, flowID = range flowIds {
+		flowInfo := f.resourceMgr.GetFlowIDInfo(Intf, onuID, uniID, flowID)
+		if flowInfo == nil {
+			log.Debugw("No FlowInfo found found in KV store",
+				log.Fields{"Intf": Intf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
+			return nil, 0, flowID
+		}
+		updatedFlows = nil
+		for _, flow := range *flowInfo {
+			updatedFlows = append(updatedFlows, flow)
+		}
+
+		for i, storedFlow := range updatedFlows {
+			if flowDirection == storedFlow.Flow.FlowType && flow.Id == storedFlow.LogicalFlowID {
+				removeFlowMessage := openoltpb2.Flow{FlowId: storedFlow.Flow.FlowId, FlowType: flowDirection}
+				if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
+					log.Debug("Flow removed from device successfully")
+					//Remove the Flow from FlowInfo
+					log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
+					updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
+					gemPortID = storedFlow.Flow.GemportId
+					break FlowFound
+				} else {
+					log.Error("Failed to remove flow from device")
+					return nil, 0, flowID
+				}
+			}
+		}
+	}
+	return updatedFlows, gemPortID, flowID
+}
+
+func (f *OpenOltFlowMgr) clearFlowFromResourceManager(flow *ofp.OfpFlowStats, flowDirection string) {
+
+	log.Debugw("clearFlowFromResourceManager", log.Fields{"flowDirection": flowDirection, "flow": *flow})
+	var updatedFlows []rsrcMgr.FlowInfo
+	var flowID uint32
+	var onuID, uniID int32
+	classifierInfo := make(map[string]interface{})
+
+	portNum, Intf, onu, uni, inPort, ethType, err := FlowExtractInfo(flow, flowDirection)
 	if err != nil {
 		log.Error(err)
 		return
 	}
-	log.Debugw("Extracted access info from flow to be deleted",
-		log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
-
-	if ethType == LldpEthType {
-		var networkInterfaceID uint32
-		var onuID = -1
-		var uniID = -1
-
-		networkInterfaceID = IntfIDFromNniPortNum(inPort)
-		f.resourceMgr.FreeFlowID(networkInterfaceID, int32(onuID), int32(uniID), flowID)
-		return
-	}
-
-	flowsInfo := f.resourceMgr.GetFlowIDInfo(ponIntf, onuID, uniID, flowID)
-	if flowsInfo == nil {
-		log.Debugw("No FlowInfo found in KV store",
-			log.Fields{"ponIntf": ponIntf, "onuID": onuID, "uniID": uniID, "flowID": flowID})
-		return
-	}
-	var updatedFlows []rsrcMgr.FlowInfo
+	onuID = int32(onu)
+	uniID = int32(uni)
 	var gemPortID int32
 
-	for _, flow := range *flowsInfo {
-		updatedFlows = append(updatedFlows, flow)
-	}
-
-	for i, storedFlow := range updatedFlows {
-		if flowDirection == storedFlow.Flow.FlowType {
-			//Remove the Flow from FlowInfo
-			log.Debugw("Removing flow to be deleted", log.Fields{"flow": storedFlow})
-			gemPortID = storedFlow.Flow.GemportId
-			updatedFlows = append(updatedFlows[:i], updatedFlows[i+1:]...)
-			break
+	for _, field := range flows.GetOfbFields(flow) {
+		if field.Type == flows.IP_PROTO {
+			classifierInfo[IPProto] = field.GetIpProto()
+			log.Debug("field-type-ip-proto", log.Fields{"classifierInfo[IP_PROTO]": classifierInfo[IPProto].(uint32)})
 		}
 	}
+	log.Debugw("Extracted access info from flow to be deleted",
+		log.Fields{"ponIntf": Intf, "onuID": onuID, "uniID": uniID})
+
+	if ethType == LldpEthType || ((classifierInfo[IPProto] == IPProtoDhcp) && (flowDirection == "downstream")) {
+		onuID = -1
+		uniID = -1
+		log.Debug("Trap on nni flow set oni, uni to -1")
+		Intf = IntfIDFromNniPortNum(inPort)
+	}
+	updatedFlows, gemPortID, flowID = f.FindAndRemoveFlow(flow, Intf, onuID, uniID, flowDirection)
 
 	tpID := getTpIDFromFlow(flow)
 
@@ -1271,15 +1302,15 @@
 		// So the flow should not be freed yet.
 		// For ex: Case of HSIA where same flow is shared
 		// between DS and US.
-		f.updateFlowInfoToKVStore(int32(ponIntf), int32(onuID), int32(uniID), flowID, &updatedFlows)
+		f.updateFlowInfoToKVStore(int32(Intf), int32(onuID), int32(uniID), flowID, &updatedFlows)
 		if len(updatedFlows) == 0 {
-			log.Debugw("Releasing flow Id to resource manager", log.Fields{"ponIntf": ponIntf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
-			f.resourceMgr.FreeFlowID(ponIntf, int32(onuID), int32(uniID), flowID)
+			log.Debugw("Releasing flow Id to resource manager", log.Fields{"Intf": Intf, "onuId": onuID, "uniId": uniID, "flowId": flowID})
+			f.resourceMgr.FreeFlowID(Intf, int32(onuID), int32(uniID), flowID)
 
-			uni := getUniPortPath(ponIntf, onuID, uniID)
-			tpPath := f.getTPpath(ponIntf, uni, tpID)
+			uni := getUniPortPath(Intf, onuID, uniID)
+			tpPath := f.getTPpath(Intf, uni, tpID)
 			log.Debugw("Getting-techprofile-instance-for-subscriber", log.Fields{"TP-PATH": tpPath})
-			techprofileInst, err := f.techprofile[ponIntf].GetTPInstanceFromKVStore(tpID, tpPath)
+			techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(tpID, tpPath)
 			if err != nil { // This should not happen, something wrong in KV backend transaction
 				log.Errorw("Error in fetching tech profile instance from KV store", log.Fields{"tpID": 20, "path": tpPath})
 				return
@@ -1289,7 +1320,7 @@
 				return
 			}
 
-			gemPK := gemPortKey{ponIntf, uint32(gemPortID)}
+			gemPK := gemPortKey{Intf, uint32(gemPortID)}
 			if f.isGemPortUsedByAnotherFlow(gemPK) {
 				flowIDs := f.flowsUsedByGemPort[gemPK]
 				for i, flowIDinMap := range flowIDs {
@@ -1304,23 +1335,23 @@
 			}
 
 			log.Debugf("Gem port id %d is not used by another flow - releasing the gem port", gemPortID)
-			f.resourceMgr.RemoveGemPortIDForOnu(ponIntf, onuID, uniID, uint32(gemPortID))
+			f.resourceMgr.RemoveGemPortIDForOnu(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
 			// But it is anyway eventually  removed later when the TechProfile is freed, so not a big issue for now.
-			f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), ponIntf)
+			f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(uint32(gemPortID), Intf)
 			f.onuIdsLock.Lock()
 			delete(f.flowsUsedByGemPort, gemPK)
-			delete(f.onuGemPortIds, gemPK)
-			f.resourceMgr.FreeGemPortID(ponIntf, onuID, uniID, uint32(gemPortID))
+			//delete(f.onuGemPortIds, gemPK)
+			f.resourceMgr.FreeGemPortID(Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
 			f.onuIdsLock.Unlock()
 
-			ok, _ := f.isTechProfileUsedByAnotherGem(ponIntf, onuID, uniID, techprofileInst, uint32(gemPortID))
+			ok, _ := f.isTechProfileUsedByAnotherGem(Intf, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
 			if !ok {
-				f.resourceMgr.RemoveTechProfileIDForOnu(ponIntf, onuID, uniID, tpID)
-				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: ponIntf, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
-				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: ponIntf, onuID: onuID, uniID: uniID, tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
-				f.DeleteTechProfileInstance(ponIntf, onuID, uniID, "", tpID)
-				f.resourceMgr.FreeAllocID(ponIntf, onuID, uniID, techprofileInst.UsScheduler.AllocID)
+				f.resourceMgr.RemoveTechProfileIDForOnu(Intf, uint32(onuID), uint32(uniID), tpID)
+				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+				f.RemoveSchedulerQueues(schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst})
+				f.DeleteTechProfileInstance(Intf, uint32(onuID), uint32(uniID), "", tpID)
+				f.resourceMgr.FreeAllocID(Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
 				// TODO: Send a "Delete TechProfile" message to ONU to do its own clean up on ONU OMCI stack
 			}
 		}
@@ -1330,37 +1361,28 @@
 //RemoveFlow removes the flow from the device
 func (f *OpenOltFlowMgr) RemoveFlow(flow *ofp.OfpFlowStats) {
 	log.Debugw("Removing Flow", log.Fields{"flow": flow})
-	var deviceFlowsToRemove []ofp.OfpFlowStats
-	var deletedFlowsIdx []int
-	for _, curFlow := range f.storedDeviceFlows {
-		if curFlow.Cookie == flow.Id {
-			log.Debugw("Found found matching flow-cookie", log.Fields{"curFlow": curFlow})
-			deviceFlowsToRemove = append(deviceFlowsToRemove, curFlow)
-		}
-	}
-	log.Debugw("Flows to be deleted", log.Fields{"deviceFlowsToRemove": deviceFlowsToRemove})
-	for index, curFlow := range deviceFlowsToRemove {
-		id, direction := f.decodeStoredID(curFlow.GetId())
-		removeFlowMessage := openoltpb2.Flow{FlowId: uint32(id), FlowType: direction}
-		if ok := f.removeFlowFromDevice(&removeFlowMessage); ok {
-			log.Debug("Flow removed from device successfully")
-			deletedFlowsIdx = append(deletedFlowsIdx, index)
-			f.clearFlowFromResourceManager(flow, uint32(id), direction) //TODO: Take care of the limitations
-		}
+	var direction string
+	actionInfo := make(map[string]interface{})
 
-	}
-	// Can be done in separate go routine as it takes time ?
-	for _, flowToRemove := range deletedFlowsIdx {
-		for index, storedFlow := range f.storedDeviceFlows {
-			if deviceFlowsToRemove[flowToRemove].Cookie == storedFlow.Cookie {
-				log.Debugw("Removing flow from local Store", log.Fields{"flow": storedFlow})
-				f.storedDeviceFlows = append(f.storedDeviceFlows[:index], f.storedDeviceFlows[index+1:]...)
-				break
+	for _, action := range flows.GetActions(flow) {
+		if action.Type == flows.OUTPUT {
+			if out := action.GetOutput(); out != nil {
+				actionInfo[Output] = out.GetPort()
+				log.Debugw("action-type-output", log.Fields{"out_port": actionInfo[Output].(uint32)})
+			} else {
+				log.Error("Invalid output port in action")
+				return
 			}
 		}
 	}
-	log.Debugw("Flows removed from the data store",
-		log.Fields{"number_of_flows_removed": len(deviceFlowsToRemove), "updated_stored_flows": f.storedDeviceFlows})
+	if IsUpstream(actionInfo[Output].(uint32)) {
+		direction = Upstream
+	} else {
+		direction = Downstream
+	}
+
+	f.clearFlowFromResourceManager(flow, direction) //TODO: Take care of the limitations
+
 	return
 }
 
@@ -1411,6 +1433,7 @@
 	}
 
 	f.deviceHandler.AddUniPortToOnu(intfID, onuID, portNo)
+	f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNo)
 
 	TpID := getTpIDFromFlow(flow)
 
@@ -1456,45 +1479,63 @@
 	return nil
 }
 
-//UpdateOnuInfo function adds onu info to cache
+//UpdateOnuInfo function adds onu info to cache and kvstore
 func (f *OpenOltFlowMgr) UpdateOnuInfo(intfID uint32, onuID uint32, serialNum string) {
-	onu := onuInfo{intfID: intfID, onuID: onuID, serialNumber: serialNum}
-	onuIDkey := onuIDKey{intfID: intfID, onuID: onuID}
-	f.onuIdsLock.Lock()
-	defer f.onuIdsLock.Unlock()
-	f.onuIds[onuIDkey] = onu
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+	f.onuGemInfo[intfID] = append(f.onuGemInfo[intfID], onu)
+	if err := f.resourceMgr.AddOnuInfo(intfID, onu); err != nil {
+		log.Errorw("failed to add onu info", log.Fields{"onu": onu})
+		return
+	}
 	log.Debugw("Updated onuinfo", log.Fields{"intfID": intfID, "onuID": onuID, "serialNum": serialNum})
 }
 
-//addGemPortToOnuInfoMap function stores adds GEMport to ONU map
+//addGemPortToOnuInfoMap function adds GEMport to ONU map
 func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(intfID uint32, onuID uint32, gemPort uint32) {
-	onuIDkey := onuIDKey{intfID: intfID, onuID: onuID}
-	f.onuIdsLock.RLock()
-	defer f.onuIdsLock.RUnlock()
-	if val, ok := f.onuIds[onuIDkey]; ok {
-		onuInf := val
-		gemportKey := gemPortKey{intfID: intfID, gemPort: gemPort}
-		f.onuGemPortIds[gemportKey] = onuInf
-		log.Debugw("Cached Gemport to Onuinfo map", log.Fields{"GemPort": gemPort, "intfId": onuInf.intfID, "onuId": onuInf.onuID})
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	onugem := f.onuGemInfo[intfID]
+	// update the gem to the local cache as well as to kv strore
+	for idx, onu := range onugem {
+		if onu.OnuID == onuID {
+			// check if gem already exists , else update the cache and kvstore
+			for _, gem := range onu.GemPorts {
+				if gem == gemPort {
+					log.Debugw("Gem already in cache, no need to update cache and kv store",
+						log.Fields{"gem": gemPort})
+					return
+				}
+			}
+			onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
+			f.onuGemInfo[intfID] = onugem
+		}
+	}
+	err := f.resourceMgr.AddGemToOnuGemInfo(intfID, onuID, gemPort)
+	if err != nil {
+		log.Errorw("Failed to add gem to onu", log.Fields{"intfId": intfID, "onuId": onuID, "gemPort": gemPort})
 		return
 	}
-	log.Errorw("OnuInfo not found", log.Fields{"intfId": intfID, "onuId": onuID, "gemPort": gemPort})
 }
 
 // This function Lookup maps  by serialNumber or (intfId, gemPort)
 
 //getOnuIDfromGemPortMap Returns OnuID,nil if found or set 0,error if no onuId is found for serialNumber or (intfId, gemPort)
 func (f *OpenOltFlowMgr) getOnuIDfromGemPortMap(serialNumber string, intfID uint32, gemPortID uint32) (uint32, error) {
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+
 	log.Debugw("Getting ONU ID from GEM port and PON port", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPortId": gemPortID})
-	if serialNumber != "" {
-		if onuInf, ok := f.onuSerialNumbers[serialNumber]; ok {
-			return onuInf.onuID, nil
-		}
-	} else {
-		gemportKey := gemPortKey{intfID: intfID, gemPort: gemPortID}
-		if onuInf, ok := f.onuGemPortIds[gemportKey]; ok {
-			log.Debugw("Retrieved onu info from access", log.Fields{"intfId": intfID, "gemPortId": gemPortID, "onuId": onuInf.onuID})
-			return onuInf.onuID, nil
+	// get onuid from the onugem info cache
+	onugem := f.onuGemInfo[intfID]
+	for _, onu := range onugem {
+		for _, gem := range onu.GemPorts {
+			if gem == gemPortID {
+				return onu.OnuID, nil
+			}
 		}
 	}
 	log.Errorw("onuid is not found", log.Fields{"serialNumber": serialNumber, "intfId": intfID, "gemPort": gemPortID})
@@ -1520,8 +1561,7 @@
 			logicalPortNum = MkUniPortNum(packetIn.IntfId, onuID, uniID)
 		}
 		// Store the gem port through which the packet_in came. Use the same gem port for packet_out
-		pktInkey := packetInInfoKey{intfID: packetIn.IntfId, onuID: onuID, logicalPort: logicalPortNum}
-		f.packetInGemPort[pktInkey] = packetIn.GemportId
+		f.UpdateGemPortForPktIn(packetIn.IntfId, onuID, logicalPortNum, packetIn.GemportId)
 	} else if packetIn.IntfType == "nni" {
 		logicalPortNum = IntfIDToPortNo(packetIn.IntfId, voltha.Port_ETHERNET_NNI)
 	}
@@ -1533,14 +1573,28 @@
 func (f *OpenOltFlowMgr) GetPacketOutGemPortID(intfID uint32, onuID uint32, portNum uint32) (uint32, error) {
 	var gemPortID uint32
 	var err error
-	key := packetInInfoKey{intfID: intfID, onuID: onuID, logicalPort: portNum}
-	if val, ok := f.packetInGemPort[key]; ok {
-		gemPortID = val
-	} else {
-		log.Errorw("Key-Error while fetching packet-out GEM port", log.Fields{"key": key})
-		err = errors.New("key-error while fetching packet-out GEM port")
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: portNum}
+
+	gemPortID, ok := f.packetInGemPort[pktInkey]
+	if ok {
+		log.Debugw("Found gemport for pktin key", log.Fields{"pktinkey": pktInkey, "gem": gemPortID})
+		return gemPortID, err
 	}
-	return gemPortID, err
+	//If gem is not found in cache try to get it from kv store, if found in kv store, update the cache and return.
+	gemPortID, err = f.resourceMgr.GetGemPortFromOnuPktIn(intfID, onuID, portNum)
+	if err == nil {
+		if gemPortID != 0 {
+			f.packetInGemPort[pktInkey] = gemPortID
+			log.Debugw("Found gem port from kv store and updating cache with gemport",
+				log.Fields{"pktinkey": pktInkey, "gem": gemPortID})
+			return gemPortID, nil
+		}
+	}
+	log.Errorw("Failed to get gemport", log.Fields{"pktinkey": pktInkey, "gem": gemPortID})
+	return uint32(0), err
 }
 
 func installFlowOnAllGemports(
@@ -1548,7 +1602,8 @@
 		portNo uint32, classifier map[string]interface{}, action map[string]interface{},
 		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32),
 	f2 func(intfId uint32, onuId uint32, uniId uint32, portNo uint32,
-		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32),
+		logicalFlow *ofp.OfpFlowStats, allocId uint32, gemPortId uint32, vlanId uint32,
+		classifier map[string]interface{}, action map[string]interface{}),
 	args map[string]uint32,
 	classifier map[string]interface{}, action map[string]interface{},
 	logicalFlow *ofp.OfpFlowStats,
@@ -1560,7 +1615,7 @@
 		if FlowType == HsiaFlow || FlowType == DhcpFlow {
 			f1(args["intfId"], args["onuId"], args["uniId"], args["portNo"], classifier, action, logicalFlow, args["allocId"], gemPortID)
 		} else if FlowType == EapolFlow {
-			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0])
+			f2(args["intfId"], args["onuId"], args["uniId"], args["portNo"], logicalFlow, args["allocId"], gemPortID, vlanID[0], classifier, action)
 		} else {
 			log.Errorw("Unrecognized Flow Type", log.Fields{"FlowType": FlowType})
 			return
@@ -1573,6 +1628,8 @@
 	action := make(map[string]interface{})
 	classifier[PacketTagType] = DoubleTag
 	action[TrapToHost] = true
+	var err error
+	var networkInterfaceID uint32
 	/* We manage flowId resource pool on per PON port basis.
 	   Since this situation is tricky, as a hack, we pass the NNI port
 	   index (network_intf_id) as PON port Index for the flowId resource
@@ -1588,13 +1645,18 @@
 	uniID := -1
 	gemPortID := -1
 	allocID := -1
-	networkInterfaceID := f.deviceHandler.nniIntfID
+	networkInterfaceID, err = getNniIntfID(classifier, action)
+	if err != nil {
+		log.Error("Failed to get nniIntf ID")
+		return
+	}
+
 	flowStoreCookie := getFlowStoreCookie(classifier, uint32(0))
-	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), flowStoreCookie); present {
+	if present := f.resourceMgr.IsFlowCookieOnKVStore(uint32(networkInterfaceID), int32(onuID), int32(uniID), flowStoreCookie); present {
 		log.Debug("Flow-exists--not-re-adding")
 		return
 	}
-	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), uint32(onuID), uint32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
+	flowID, err := f.resourceMgr.GetFlowID(uint32(networkInterfaceID), int32(onuID), int32(uniID), uint32(gemPortID), flowStoreCookie, "", 0)
 	if err != nil {
 		log.Errorw("Flow id unavailable for DHCP traponNNI flow", log.Fields{"error": err})
 		return
@@ -1626,7 +1688,7 @@
 		PortNo:        portNo}
 	if ok := f.addFlowToDevice(logicalFlow, &downstreamflow); ok {
 		log.Debug("DHCP trap on NNI flow added to device successfully")
-		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID)
+		flowsToKVStore := f.getUpdatedFlowInfo(&downstreamflow, flowStoreCookie, "", flowID, logicalFlow.Id)
 		if err := f.updateFlowInfoToKVStore(int32(networkInterfaceID),
 			int32(onuID),
 			int32(uniID),
@@ -1694,7 +1756,7 @@
 					tp_pb.Direction_UPSTREAM,
 					pcp.(uint32))
 
-				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID)
+				f.addEAPOLFlow(intfID, onuID, uniID, portNo, flow, allocID, gemPort, vlanID, classifierInfo, actionInfo)
 			} else {
 				installFlowOnAllGemports(nil, f.addEAPOLFlow, args, classifierInfo, actionInfo, flow, gemPorts, EapolFlow, vlanID)
 			}
@@ -1913,3 +1975,60 @@
 	}
 	return append(slice, item)
 }
+
+// getNniIntfID gets nni intf id from the flow classifier/action
+func getNniIntfID(classifier map[string]interface{}, action map[string]interface{}) (uint32, error) {
+
+	portType := IntfIDToPortTypeName(classifier[InPort].(uint32))
+	if portType == voltha.Port_PON_OLT {
+		intfID := IntfIDFromNniPortNum(action[Output].(uint32))
+		log.Debugw("output Nni IntfID is", log.Fields{"intfid": intfID})
+		return intfID, nil
+	} else if portType == voltha.Port_ETHERNET_NNI {
+		intfID := IntfIDFromNniPortNum(classifier[InPort].(uint32))
+		log.Debugw("input Nni IntfID is", log.Fields{"intfid": intfID})
+		return intfID, nil
+	}
+	return uint32(0), nil
+}
+
+// UpdateGemPortForPktIn updates gemport for packet-in in to the cache and to the kv store as well.
+func (f *OpenOltFlowMgr) UpdateGemPortForPktIn(intfID uint32, onuID uint32, logicalPort uint32, gemPort uint32) {
+	pktInkey := rsrcMgr.PacketInInfoKey{IntfID: intfID, OnuID: onuID, LogicalPort: logicalPort}
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	_, ok := f.packetInGemPort[pktInkey]
+	if ok {
+		log.Debugw("pktin key found in cache , no need to update kv as we are assuming both will be in sync",
+			log.Fields{"pktinkey": pktInkey, "gem": gemPort})
+	} else {
+		f.packetInGemPort[pktInkey] = gemPort
+
+		f.resourceMgr.UpdateGemPortForPktIn(pktInkey, gemPort)
+		log.Debugw("pktin key not found in local cache updating cache and kv store", log.Fields{"pktinkey": pktInkey, "gem": gemPort})
+	}
+	return
+}
+
+// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
+func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(intfID uint32, onuID uint32, portNum uint32) {
+
+	f.lockCache.Lock()
+	defer f.lockCache.Unlock()
+	onugem := f.onuGemInfo[intfID]
+	for idx, onu := range onugem {
+		if onu.OnuID == onuID {
+			for _, uni := range onu.UniPorts {
+				if uni == portNum {
+					log.Debugw("uni already in cache, no need to update cache and kv store",
+						log.Fields{"uni": portNum})
+					return
+				}
+			}
+			onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
+			f.onuGemInfo[intfID] = onugem
+		}
+	}
+	f.resourceMgr.AddUniPortToOnuInfo(intfID, onuID, portNum)
+}
diff --git a/adaptercore/openolt_flowmgr_test.go b/adaptercore/openolt_flowmgr_test.go
index 6dfb6f3..57d922b 100644
--- a/adaptercore/openolt_flowmgr_test.go
+++ b/adaptercore/openolt_flowmgr_test.go
@@ -62,36 +62,29 @@
 }
 
 func newMockFlowmgr() *OpenOltFlowMgr {
-	rsrMgr := newMockResourceMgr()
+	rMgr := newMockResourceMgr()
 	dh := newMockDeviceHandler()
 
-	rsrMgr.KVStore = &db.Backend{}
-	rsrMgr.KVStore.Client = &mocks.MockKVClient{}
+	rMgr.KVStore = &db.Backend{}
+	rMgr.KVStore.Client = &mocks.MockKVClient{}
 
-	dh.resourceMgr = rsrMgr
-	flwMgr := NewFlowManager(dh, rsrMgr)
-	onuIds := make(map[onuIDKey]onuInfo)
-	onuIds[onuIDKey{intfID: 1, onuID: 1}] = onuInfo{intfID: 1, onuID: 1, serialNumber: "1"}
-	onuIds[onuIDKey{intfID: 2, onuID: 2}] = onuInfo{intfID: 2, onuID: 2, serialNumber: "2"}
-	flwMgr.onuIds = onuIds
+	dh.resourceMgr = rMgr
+	flwMgr := NewFlowManager(dh, rMgr)
 
-	onuSerialNumbers := make(map[string]onuInfo)
-	onuSerialNumbers["1"] = onuInfo{intfID: 1, onuID: 1, serialNumber: "1"}
-	onuSerialNumbers["2"] = onuInfo{intfID: 2, onuID: 1, serialNumber: "2"}
-	flwMgr.onuSerialNumbers = onuSerialNumbers
+	onuGemInfo1 := make([]rsrcMgr.OnuGemInfo, 2)
+	onuGemInfo2 := make([]rsrcMgr.OnuGemInfo, 2)
+	onuGemInfo1[0] = rsrcMgr.OnuGemInfo{OnuID: 1, SerialNumber: "1", IntfID: 1, GemPorts: []uint32{1}}
+	onuGemInfo2[1] = rsrcMgr.OnuGemInfo{OnuID: 2, SerialNumber: "2", IntfID: 2, GemPorts: []uint32{2}}
+	flwMgr.onuGemInfo[1] = onuGemInfo1
+	flwMgr.onuGemInfo[2] = onuGemInfo2
 
-	onuGemPortIds := make(map[gemPortKey]onuInfo)
-	onuGemPortIds[gemPortKey{intfID: 1, gemPort: 1}] = onuInfo{intfID: 1, onuID: 1, serialNumber: "1"}
-	onuGemPortIds[gemPortKey{intfID: 2, gemPort: 2}] = onuInfo{intfID: 2, onuID: 2, serialNumber: "2"}
-	flwMgr.onuGemPortIds = onuGemPortIds
-
-	packetInGemPort := make(map[packetInInfoKey]uint32)
-	packetInGemPort[packetInInfoKey{intfID: 1, onuID: 1, logicalPort: 1}] = 1
-	packetInGemPort[packetInInfoKey{intfID: 2, onuID: 2, logicalPort: 2}] = 2
+	packetInGemPort := make(map[rsrcMgr.PacketInInfoKey]uint32)
+	packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 1, OnuID: 1, LogicalPort: 1}] = 1
+	packetInGemPort[rsrcMgr.PacketInInfoKey{IntfID: 2, OnuID: 2, LogicalPort: 2}] = 2
 
 	flwMgr.packetInGemPort = packetInGemPort
-	tps := make([]tp.TechProfileIf, len(rsrMgr.ResourceMgrs))
-	for key := range rsrMgr.ResourceMgrs {
+	tps := make([]tp.TechProfileIf, len(rMgr.ResourceMgrs))
+	for key := range rMgr.ResourceMgrs {
 		tps[key] = mocks.MockTechProfile{TpID: key}
 	}
 	flwMgr.techprofile = tps
@@ -223,7 +216,6 @@
 	}
 	ofpstats := fu.MkFlowStat(fa)
 	ofpstats.Cookie = ofpstats.Id
-	flowMgr.storedDeviceFlows = append(flowMgr.storedDeviceFlows, *ofpstats)
 	lldpFa := &fu.FlowArgs{
 		KV: fu.OfpFlowModArgs{"priority": 1000, "cookie": 48132224281636694},
 		MatchFields: []*ofp.OfpOxmOfbField{
@@ -755,14 +747,9 @@
 	}
 
 	type fields struct {
-		techprofile       []tp.TechProfileIf
-		deviceHandler     *DeviceHandler
-		resourceMgr       *rsrcMgr.OpenOltResourceMgr
-		onuIds            map[onuIDKey]onuInfo
-		onuSerialNumbers  map[string]onuInfo
-		onuGemPortIds     map[gemPortKey]onuInfo
-		packetInGemPort   map[packetInInfoKey]uint32
-		storedDeviceFlows []ofp.OfpFlowStats
+		techprofile   []tp.TechProfileIf
+		deviceHandler *DeviceHandler
+		resourceMgr   *rsrcMgr.OpenOltResourceMgr
 	}
 	type args struct {
 		args           map[string]uint32
diff --git a/adaptercore/resourcemanager/resourcemanager.go b/adaptercore/resourcemanager/resourcemanager.go
index 3dbbec8..3c01117 100755
--- a/adaptercore/resourcemanager/resourcemanager.go
+++ b/adaptercore/resourcemanager/resourcemanager.go
@@ -41,6 +41,11 @@
 	TpIDPathSuffix = "{%d,%d,%d}/tp_id"
 	//MeterIDPathSuffix - <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
 	MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
+	//NnniIntfID - nniintfids
+	NnniIntfID = "nniintfids"
+	// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, pcketout
+	//format: onu_packetin/<intfid>,<onuid>,<logicalport>
+	OnuPacketINPath = "onu_packetin/{%d,%d,%d}"
 )
 
 // FlowInfo holds the flow information
@@ -48,6 +53,23 @@
 	Flow            *openolt.Flow
 	FlowStoreCookie uint64
 	FlowCategory    string
+	LogicalFlowID   uint64
+}
+
+// OnuGemInfo holds onu information along with gem port list and uni port list
+type OnuGemInfo struct {
+	OnuID        uint32
+	SerialNumber string
+	IntfID       uint32
+	GemPorts     []uint32
+	UniPorts     []uint32
+}
+
+// PacketInInfoKey is the key for packet in gemport
+type PacketInInfoKey struct {
+	IntfID      uint32
+	OnuID       uint32
+	LogicalPort uint32
 }
 
 // OpenOltResourceMgr holds resource related information as provided below for each field
@@ -398,7 +420,7 @@
 // GetFlowIDInfo returns the slice of flow info of the given pon-port
 // Note: For flows which trap from the NNI and not really associated with any particular
 // ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ponIntfID uint32, onuID uint32, uniID uint32, flowID uint32) *[]FlowInfo {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ponIntfID uint32, onuID int32, uniID int32, flowID uint32) *[]FlowInfo {
 	var flows []FlowInfo
 
 	FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
@@ -416,7 +438,7 @@
 // GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
 // Note: For flows which trap from the NNI and not really associated with any particular
 // ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(PONIntfID uint32, ONUID uint32, UNIID uint32) []uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(PONIntfID uint32, ONUID int32, UNIID int32) []uint32 {
 
 	FlowPath := fmt.Sprintf("%d,%d,%d", PONIntfID, ONUID, UNIID)
 	if mgrs, exist := RsrcMgr.ResourceMgrs[PONIntfID]; exist {
@@ -435,7 +457,7 @@
 }
 
 // GetFlowID return flow ID for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ponIntfID uint32, ONUID uint32, uniID uint32,
+func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ponIntfID uint32, ONUID int32, uniID int32,
 	gemportID uint32,
 	flowStoreCookie uint64,
 	flowCategory string, vlanPcp ...uint32) (uint32, error) {
@@ -446,7 +468,7 @@
 	if FlowIDs != nil {
 		log.Debugw("Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
 		for _, flowID := range FlowIDs {
-			FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, ONUID, uniID, uint32(flowID))
+			FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, int32(ONUID), int32(uniID), uint32(flowID))
 			er := getFlowIDFromFlowInfo(FlowInfo, flowID, gemportID, flowStoreCookie, flowCategory, vlanPcp...)
 			if er == nil {
 				return flowID, er
@@ -667,7 +689,7 @@
 	IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
 	err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfONUID, FlowID, false)
 	if err != nil {
-		log.Error("Failed to Update flow id infor for %s", IntfONUID)
+		log.Errorw("Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
 	}
 	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfONUID, FlowID)
 	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(IntfID, ponrmgr.FLOW_ID, FlowIds)
@@ -685,7 +707,7 @@
 		IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
 		err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(IntfOnuIDUniID, flow, false)
 		if err != nil {
-			log.Error("Failed to Update flow id infor for %s", IntfOnuIDUniID)
+			log.Errorw("Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
 		}
 		RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(IntfOnuIDUniID, flow)
 	}
@@ -750,7 +772,7 @@
 
 // IsFlowCookieOnKVStore checks if the given flow cookie is present on the kv store
 // Returns true if the flow cookie is found, otherwise it returns false
-func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ponIntfID uint32, onuID uint32, uniID uint32,
+func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ponIntfID uint32, onuID int32, uniID int32,
 	flowStoreCookie uint64) bool {
 
 	FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
@@ -758,7 +780,7 @@
 	if FlowIDs != nil {
 		log.Debugw("Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID, "KVpath": FlowPath})
 		for _, flowID := range FlowIDs {
-			FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, onuID, uniID, uint32(flowID))
+			FlowInfo := RsrcMgr.GetFlowIDInfo(ponIntfID, int32(onuID), int32(uniID), uint32(flowID))
 			if FlowInfo != nil {
 				log.Debugw("Found flows", log.Fields{"flows": *FlowInfo, "flowId": flowID})
 				for _, Info := range *FlowInfo {
@@ -804,7 +826,7 @@
 func (RsrcMgr *OpenOltResourceMgr) RemoveTechProfileIDsForOnu(IntfID uint32, OnuID uint32, UniID uint32) error {
 	IntfOnuUniID := fmt.Sprintf(TpIDPathSuffix, IntfID, OnuID, UniID)
 	if err := RsrcMgr.KVStore.Delete(IntfOnuUniID); err != nil {
-		log.Error("Failed to delete techprofile id resource %s in KV store", IntfOnuUniID)
+		log.Errorw("Failed to delete techprofile id resource in KV store", log.Fields{"path": IntfOnuUniID})
 		return err
 	}
 	return nil
@@ -945,3 +967,243 @@
 	log.Debugw("the flow can be related to a different service", log.Fields{"flow_info": FlowInfo})
 	return errors.New("invalid flow-info")
 }
+
+//AddGemToOnuGemInfo adds gemport to onugem info kvstore
+func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(intfID uint32, onuID uint32, gemPort uint32) error {
+	var onuGemData []OnuGemInfo
+	var err error
+
+	if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(intfID, &onuGemData); err != nil {
+		log.Errorf("failed to get onuifo for intfid %d", intfID)
+		return err
+	}
+	if len(onuGemData) == 0 {
+		log.Errorw("failed to ger Onuid info ", log.Fields{"intfid": intfID, "onuid": onuID})
+		return err
+	}
+
+	for idx, onugem := range onuGemData {
+		if onugem.OnuID == onuID {
+			for _, gem := range onuGemData[idx].GemPorts {
+				if gem == gemPort {
+					log.Debugw("Gem already present in onugem info, skpping addition", log.Fields{"gem": gem})
+					return nil
+				}
+			}
+			log.Debugw("Added gem to onugem info", log.Fields{"gem": gemPort})
+			onuGemData[idx].GemPorts = append(onuGemData[idx].GemPorts, gemPort)
+			break
+		}
+	}
+	err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(intfID, onuGemData)
+	if err != nil {
+		log.Error("Failed to add onugem to kv store")
+		return err
+	}
+	return err
+}
+
+//GetOnuGemInfo gets onu gem info from the kvstore per interface
+func (RsrcMgr *OpenOltResourceMgr) GetOnuGemInfo(IntfID uint32) ([]OnuGemInfo, error) {
+	var onuGemData []OnuGemInfo
+
+	if err := RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(IntfID, &onuGemData); err != nil {
+		log.Errorf("failed to get onuifo for intfid %d", IntfID)
+		return nil, err
+	}
+
+	return onuGemData, nil
+}
+
+// AddOnuInfo adds onu info on to the kvstore per interface
+func (RsrcMgr *OpenOltResourceMgr) AddOnuInfo(IntfID uint32, onuGem OnuGemInfo) error {
+	var onuGemData []OnuGemInfo
+	var err error
+
+	if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(IntfID, &onuGemData); err != nil {
+		log.Errorf("failed to get onuifo for intfid %d", IntfID)
+		return err
+	}
+	onuGemData = append(onuGemData, onuGem)
+	err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(IntfID, onuGemData)
+	if err != nil {
+		log.Error("Failed to add onugem to kv store")
+		return err
+	}
+
+	log.Debugw("added onu to onugeminfo", log.Fields{"intf": IntfID, "onugem": onuGem})
+	return err
+}
+
+// UpdateOnuInfo updates Onuinfo on the kvstore per interface
+func (RsrcMgr *OpenOltResourceMgr) UpdateOnuInfo(IntfID uint32, onuGem []OnuGemInfo) error {
+	var onuGemData []OnuGemInfo
+	var err error
+
+	err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(IntfID, onuGemData)
+	if err != nil {
+		log.Error("Failed to add onugem to kv store")
+		return err
+	}
+
+	log.Debugw("updated onugeminfo", log.Fields{"intf": IntfID, "onugem": onuGem})
+	return err
+}
+
+// AddUniPortToOnuInfo adds uni port to the onuinfo kvstore. check if the uni is already present if not update the kv store.
+func (RsrcMgr *OpenOltResourceMgr) AddUniPortToOnuInfo(intfID uint32, onuID uint32, portNo uint32) {
+	var onuGemData []OnuGemInfo
+	var err error
+
+	if err = RsrcMgr.ResourceMgrs[intfID].GetOnuGemInfo(intfID, &onuGemData); err != nil {
+		log.Errorf("failed to get onuifo for intfid %d", intfID)
+		return
+	}
+	for idx, onu := range onuGemData {
+		if onu.OnuID == onuID {
+			for _, uni := range onu.UniPorts {
+				if uni == portNo {
+					log.Debugw("uni already present in onugem info", log.Fields{"uni": portNo})
+					return
+				}
+			}
+			onuGemData[idx].UniPorts = append(onuGemData[idx].UniPorts, portNo)
+			break
+		}
+	}
+	err = RsrcMgr.ResourceMgrs[intfID].AddOnuGemInfo(intfID, onuGemData)
+	if err != nil {
+		log.Errorw("Failed to add uin port in onugem to kv store", log.Fields{"uni": portNo})
+		return
+	}
+	return
+}
+
+//UpdateGemPortForPktIn updates gemport for pkt in path to kvstore, path being intfid, onuid, portno
+func (RsrcMgr *OpenOltResourceMgr) UpdateGemPortForPktIn(pktIn PacketInInfoKey, gemPort uint32) {
+
+	path := fmt.Sprintf(OnuPacketINPath, pktIn.IntfID, pktIn.OnuID, pktIn.LogicalPort)
+	Value, err := json.Marshal(gemPort)
+	if err != nil {
+		log.Error("Failed to marshal data")
+		return
+	}
+	if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+		log.Errorw("Failed to put to kvstore", log.Fields{"path": path, "value": gemPort})
+		return
+	}
+	log.Debugw("added gem packet in successfully", log.Fields{"path": path, "gem": gemPort})
+
+	return
+}
+
+// GetGemPortFromOnuPktIn gets the gem port from onu pkt in path, path being intfid, onuid, portno
+func (RsrcMgr *OpenOltResourceMgr) GetGemPortFromOnuPktIn(intfID uint32, onuID uint32, logicalPort uint32) (uint32, error) {
+
+	var Val []byte
+	var gemPort uint32
+
+	path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
+
+	value, err := RsrcMgr.KVStore.Get(path)
+	if err != nil {
+		log.Errorw("Failed to get from kv store", log.Fields{"path": path})
+		return uint32(0), err
+	} else if value == nil {
+		log.Debugw("No pkt in gem found", log.Fields{"path": path})
+		return uint32(0), nil
+	}
+
+	if Val, err = kvstore.ToByte(value.Value); err != nil {
+		log.Error("Failed to convert to byte array")
+		return uint32(0), err
+	}
+	if err = json.Unmarshal(Val, &gemPort); err != nil {
+		log.Error("Failed to unmarshall")
+		return uint32(0), err
+	}
+	log.Debugw("found packein gemport from path", log.Fields{"path": path, "gem": gemPort})
+
+	return gemPort, nil
+}
+
+// DelGemPortPktIn deletes the gemport from the pkt in path
+func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktIn(intfID uint32, onuID uint32, logicalPort uint32) error {
+
+	path := fmt.Sprintf(OnuPacketINPath, intfID, onuID, logicalPort)
+	if err := RsrcMgr.KVStore.Delete(path); err != nil {
+		log.Errorf("Falied to remove resource %s", path)
+		return err
+	}
+	return nil
+}
+
+// DelOnuGemInfoForIntf deletes the onugem info from kvstore per interface
+func (RsrcMgr *OpenOltResourceMgr) DelOnuGemInfoForIntf(intfID uint32) error {
+	if err := RsrcMgr.ResourceMgrs[intfID].DelOnuGemInfoForIntf(intfID); err != nil {
+		log.Errorw("failed to delete onu gem info for", log.Fields{"intfid": intfID})
+		return err
+	}
+	return nil
+}
+
+//GetNNIFromKVStore gets NNi intfids from kvstore. path being per device
+func (RsrcMgr *OpenOltResourceMgr) GetNNIFromKVStore() ([]uint32, error) {
+
+	var nni []uint32
+	var Val []byte
+
+	path := fmt.Sprintf(NnniIntfID)
+	value, err := RsrcMgr.KVStore.Get(path)
+	if err != nil {
+		log.Error("failed to get data from kv store")
+		return nil, err
+	}
+	if value != nil {
+		if Val, err = kvstore.ToByte(value.Value); err != nil {
+			log.Error("Failed to convert to byte array")
+			return nil, err
+		}
+		if err = json.Unmarshal(Val, &nni); err != nil {
+			log.Error("Failed to unmarshall")
+			return nil, err
+		}
+	}
+	return nni, err
+}
+
+// AddNNIToKVStore adds Nni interfaces to kvstore, path being per device.
+func (RsrcMgr *OpenOltResourceMgr) AddNNIToKVStore(nniIntf uint32) error {
+	var Value []byte
+
+	nni, err := RsrcMgr.GetNNIFromKVStore()
+	if err != nil {
+		log.Error("failed to fetch nni interfaces from kv store")
+		return err
+	}
+
+	path := fmt.Sprintf(NnniIntfID)
+	nni = append(nni, nniIntf)
+	Value, err = json.Marshal(nni)
+	if err != nil {
+		log.Error("Failed to marshal data")
+	}
+	if err = RsrcMgr.KVStore.Put(path, Value); err != nil {
+		log.Errorw("Failed to put to kvstore", log.Fields{"path": path, "value": Value})
+		return err
+	}
+	log.Debugw("added nni to kv successfully", log.Fields{"path": path, "nni": nniIntf})
+	return nil
+}
+
+// DelNNiFromKVStore deletes nni interface list from kv store.
+func (RsrcMgr *OpenOltResourceMgr) DelNNiFromKVStore() error {
+
+	path := fmt.Sprintf(NnniIntfID)
+
+	if err := RsrcMgr.KVStore.Delete(path); err != nil {
+		log.Errorw("Failed to delete nni interfaces from kv store", log.Fields{"path": path})
+		return err
+	}
+	return nil
+}
diff --git a/adaptercore/resourcemanager/resourcemanager_test.go b/adaptercore/resourcemanager/resourcemanager_test.go
index 3c45df5..1eac99f 100644
--- a/adaptercore/resourcemanager/resourcemanager_test.go
+++ b/adaptercore/resourcemanager/resourcemanager_test.go
@@ -428,8 +428,8 @@
 
 	type args struct {
 		PONIntfID uint32
-		ONUID     uint32
-		UNIID     uint32
+		ONUID     int32
+		UNIID     int32
 	}
 	tests := []struct {
 		name   string
@@ -477,8 +477,8 @@
 
 	type args struct {
 		ponIntfID       uint32
-		ONUID           uint32
-		uniID           uint32
+		ONUID           int32
+		uniID           int32
 		gemportID       uint32
 		flowStoreCookie uint64
 		flowCategory    string
@@ -625,8 +625,8 @@
 func TestOpenOltResourceMgr_IsFlowCookieOnKVStore(t *testing.T) {
 	type args struct {
 		ponIntfID       uint32
-		onuID           uint32
-		uniID           uint32
+		onuID           int32
+		uniID           int32
 		flowStoreCookie uint64
 	}
 	tests := []struct {
@@ -907,6 +907,7 @@
 				}},
 			1,
 			"HSIA_FLOW",
+			2000,
 		},
 		{
 			&openolt.Flow{
@@ -914,6 +915,7 @@
 			},
 			1,
 			"EAPOL",
+			3000,
 		},
 	}
 	tests := []struct {