VOL-3419: OpenOLT adapter at scale constantly takes more that 10 seconds to react to flows
- Pass information to agent to do the flow replication
- Consolidate various locks in the adapter and remove reduntant locks
- use voltha-proto version 4.0.2 and voltha-lib-go version 4.0.0
- Bump adapter version to 3.0.0

Change-Id: Ic053c54e5319bb1736ec74facfc79dd10058ecf5
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 3f642cf..abfb73d 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -23,17 +23,18 @@
 	"errors"
 	"fmt"
 	"strconv"
+	"strings"
 	"sync"
 	"time"
 
 	"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
 
-	"github.com/opencord/voltha-lib-go/v3/pkg/db"
-	"github.com/opencord/voltha-lib-go/v3/pkg/db/kvstore"
-	"github.com/opencord/voltha-lib-go/v3/pkg/log"
-	ponrmgr "github.com/opencord/voltha-lib-go/v3/pkg/ponresourcemanager"
-	ofp "github.com/opencord/voltha-protos/v3/go/openflow_13"
-	"github.com/opencord/voltha-protos/v3/go/openolt"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db"
+	"github.com/opencord/voltha-lib-go/v4/pkg/db/kvstore"
+	"github.com/opencord/voltha-lib-go/v4/pkg/log"
+	ponrmgr "github.com/opencord/voltha-lib-go/v4/pkg/ponresourcemanager"
+	ofp "github.com/opencord/voltha-protos/v4/go/openflow_13"
+	"github.com/opencord/voltha-protos/v4/go/openolt"
 )
 
 const (
@@ -47,11 +48,12 @@
 	MeterIDPathSuffix = "{%d,%d,%d}/{%d}/meter_id/{%s}"
 	//NnniIntfID - nniintfids
 	NnniIntfID = "nniintfids"
-	//OnuPacketINPathPrefix to be used as prefix for keys to be used to store packet-in gem ports
-	OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d"
-	// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, pcketout
-	//format: onu_packetin/<intfid>,<onuid>,<logicalport>,<vlanId>,<priority>
-	OnuPacketINPath = OnuPacketINPathPrefix + ",%d,%d}"
+	// OnuPacketINPathPrefix - path prefix where ONU packet-in vlanID/PCP is stored
+	//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}
+	OnuPacketINPathPrefix = "onu_packetin/{%d,%d,%d}"
+	// OnuPacketINPath path on the kvstore to store packetin gemport,which will be used for packetin, packetout
+	//format: onu_packetin/{<intfid>,<onuid>,<logicalport>}/{<vlanId>,<priority>}
+	OnuPacketINPath = OnuPacketINPathPrefix + "/{%d,%d}"
 	//FlowIDsForGem flowids_per_gem/<intfid>
 	FlowIDsForGem = "flowids_per_gem/{%d}"
 	//McastQueuesForIntf multicast queues for pon interfaces
@@ -68,14 +70,19 @@
 	// We preserve the groups under "FlowGroupsCached" directory in the KV store temporarily. Having set members,
 	// we remove the group from the cached group store.
 	FlowGroupCached = "flow_groups_cached/{%d}"
+
+	//FlowIDPath - Path on the KV store for storing list of Flow IDs for a given subscriber
+	//Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_ids
+	FlowIDPath = "{%s}/flow_ids"
+	//FlowIDInfoPath - Used to store more metadata associated with the flow_id
+	//Format: BasePathKvStore/<(pon_intf_id, onu_id, uni_id)>/flow_id_info/<flow_id>
+	FlowIDInfoPath = "{%s}/flow_id_info/{%d}"
 )
 
 // FlowInfo holds the flow information
 type FlowInfo struct {
-	Flow            *openolt.Flow
-	FlowStoreCookie uint64
-	FlowCategory    string
-	LogicalFlowID   uint64
+	Flow           *openolt.Flow
+	IsSymmtricFlow bool
 }
 
 // OnuGemInfo holds onu information along with gem port list and uni port list
@@ -119,12 +126,6 @@
 	AllocIDMgmtLock []sync.RWMutex
 	// This protects concurrent onu_id allocate/delete calls on a per PON port basis
 	OnuIDMgmtLock []sync.RWMutex
-	// This protects concurrent flow_id allocate/delete calls. We do not need this on a
-	// per PON port basis as flow IDs are unique across the OLT.
-	FlowIDMgmtLock sync.RWMutex
-
-	// This protects concurrent access to flowids_per_gem info stored on KV store
-	flowIDToGemInfoLock sync.RWMutex
 }
 
 func newKVClient(ctx context.Context, storeType string, address string, timeout time.Duration) (kvstore.Client, error) {
@@ -164,6 +165,7 @@
 func NewResourceMgr(ctx context.Context, deviceID string, KVStoreAddress string, kvStoreType string, deviceType string, devInfo *openolt.DeviceInfo) *OpenOltResourceMgr {
 	var ResourceMgr OpenOltResourceMgr
 	logger.Debugf(ctx, "Init new resource manager , address: %s, device-id: %s", KVStoreAddress, deviceID)
+	ResourceMgr.DeviceID = deviceID
 	ResourceMgr.Address = KVStoreAddress
 	ResourceMgr.DeviceType = deviceType
 	ResourceMgr.DevInfo = devInfo
@@ -249,7 +251,7 @@
 			GlobalPONRsrcMgr = RsrcMgrsByTech[technology]
 		}
 		for _, IntfID := range TechRange.IntfIds {
-			ResourceMgr.ResourceMgrs[uint32(IntfID)] = RsrcMgrsByTech[technology]
+			ResourceMgr.ResourceMgrs[IntfID] = RsrcMgrsByTech[technology]
 		}
 		// self.initialize_device_resource_range_and_pool(resource_mgr, global_resource_mgr, arange)
 		InitializeDeviceResourceRangeAndPool(ctx, RsrcMgrsByTech[technology], GlobalPONRsrcMgr,
@@ -446,16 +448,16 @@
 		return 0, err
 	}
 	// Get ONU id for a provided pon interface ID.
-	ONUID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
+	onuID, err := RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
 		ponrmgr.ONU_ID, 1)
 	if err != nil {
 		logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
 			ponIntfID, ponrmgr.ONU_ID)
 		return 0, err
 	}
-	if ONUID != nil {
-		RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, ONUID[0]))
-		return ONUID[0], err
+	if onuID != nil {
+		RsrcMgr.ResourceMgrs[ponIntfID].InitResourceMap(ctx, fmt.Sprintf("%d,%d", ponIntfID, onuID[0]))
+		return onuID[0], err
 	}
 
 	return 0, err // return OnuID 0 on error
@@ -464,81 +466,170 @@
 // GetFlowIDInfo returns the slice of flow info of the given pon-port
 // Note: For flows which trap from the NNI and not really associated with any particular
 // ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint32) *[]FlowInfo {
-	var flows []FlowInfo
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64) *FlowInfo {
+	var flowInfo FlowInfo
 
-	FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
-	if err := RsrcMgr.ResourceMgrs[ponIntfID].GetFlowIDInfo(ctx, FlowPath, flowID, &flows); err != nil {
-		logger.Errorw(ctx, "Error while getting flows from KV store", log.Fields{"flowId": flowID})
+	subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+	Path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
+	value, err := RsrcMgr.KVStore.Get(ctx, Path)
+	if err == nil {
+		if value != nil {
+			Val, err := toByte(value.Value)
+			if err != nil {
+				logger.Errorw(ctx, "Failed to convert flowinfo into byte array", log.Fields{"error": err, "subs": subs})
+				return nil
+			}
+			if err = json.Unmarshal(Val, &flowInfo); err != nil {
+				logger.Errorw(ctx, "Failed to unmarshal", log.Fields{"error": err, "subs": subs})
+				return nil
+			}
+		}
+	}
+	if flowInfo.Flow == nil {
+		logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"subs": subs})
 		return nil
 	}
-	if len(flows) == 0 {
-		logger.Debugw(ctx, "No flowInfo found in KV store", log.Fields{"flowPath": FlowPath})
-		return nil
-	}
-	return &flows
+	return &flowInfo
 }
 
 // GetCurrentFlowIDsForOnu fetches flow ID from the resource manager
 // Note: For flows which trap from the NNI and not really associated with any particular
 // ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, PONIntfID uint32, ONUID int32, UNIID int32) []uint32 {
+func (RsrcMgr *OpenOltResourceMgr) GetCurrentFlowIDsForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32) ([]uint64, error) {
 
-	FlowPath := fmt.Sprintf("%d,%d,%d", PONIntfID, ONUID, UNIID)
-	if mgrs, exist := RsrcMgr.ResourceMgrs[PONIntfID]; exist {
-		return mgrs.GetCurrentFlowIDsForOnu(ctx, FlowPath)
+	subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+	path := fmt.Sprintf(FlowIDPath, subs)
+
+	var data []uint64
+	value, err := RsrcMgr.KVStore.Get(ctx, path)
+	if err == nil {
+		if value != nil {
+			Val, _ := toByte(value.Value)
+			if err = json.Unmarshal(Val, &data); err != nil {
+				logger.Error(ctx, "Failed to unmarshal")
+				return nil, err
+			}
+		}
 	}
-	return nil
+	return data, nil
 }
 
 // UpdateFlowIDInfo updates flow info for the given pon interface, onu id, and uni id
 // Note: For flows which trap from the NNI and not really associated with any particular
 // ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID int32, onuID int32, uniID int32,
-	flowID uint32, flowData *[]FlowInfo) error {
-	FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
-	return RsrcMgr.ResourceMgrs[uint32(ponIntfID)].UpdateFlowIDInfoForOnu(ctx, FlowPath, flowID, *flowData)
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+	flowID uint64, flowData FlowInfo) error {
+
+	subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+	path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
+
+	var value []byte
+	var err error
+	value, err = json.Marshal(flowData)
+	if err != nil {
+		logger.Errorf(ctx, "failed to Marshal, resource path %s", path)
+		return err
+	}
+
+	if err = RsrcMgr.KVStore.Put(ctx, path, value); err != nil {
+		logger.Errorf(ctx, "Failed to update resource %s", path)
+	}
+
+	// Update the flowID list for the ONU
+	if err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, true); err != nil {
+		// If the operation fails, try to remove FlowInfo from the KV store
+		_ = RsrcMgr.KVStore.Delete(ctx, path)
+		return err
+	}
+	return err
 }
 
-// GetFlowID return flow ID for a given pon interface id, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) GetFlowID(ctx context.Context, ponIntfID uint32, ONUID int32, uniID int32,
-	gemportID uint32,
-	flowStoreCookie uint64,
-	flowCategory string, vlanVid uint32, vlanPcp ...uint32) (uint32, error) {
+// UpdateFlowIDForOnu updates the flow_id list of the ONU (add or remove flow_id from the list)
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDForOnu(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32, flowID uint64, add bool) error {
+	/*
+	   Update the flow_id list of the ONU (add or remove flow_id from the list)
+	   :param pon_intf_onu_id: reference of PON interface id and onu id
+	   :param flow_id: flow ID
+	   :param add: Boolean flag to indicate whether the flow_id should be
+	               added or removed from the list. Defaults to adding the flow.
+	*/
+	var Value []byte
+	var err error
+	var retVal bool
+	var idx uint64
+	subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+	path := fmt.Sprintf(FlowIDPath, subs)
+	flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
+	if err != nil {
+		// Error logged in the called function
+		return err
+	}
+
+	if add {
+		if retVal, _ = checkForFlowIDInList(flowIDs, flowID); retVal {
+			return nil
+		}
+		flowIDs = append(flowIDs, flowID)
+	} else {
+		if retVal, idx = checkForFlowIDInList(flowIDs, flowID); !retVal {
+			return nil
+		}
+		// delete the index and shift
+		flowIDs = append(flowIDs[:idx], flowIDs[idx+1:]...)
+	}
+	Value, err = json.Marshal(flowIDs)
+	if err != nil {
+		logger.Error(ctx, "Failed to Marshal")
+		return err
+	}
+
+	if err = RsrcMgr.KVStore.Put(ctx, path, Value); err != nil {
+		logger.Errorf(ctx, "Failed to update resource %s", path)
+		return err
+	}
+	return err
+}
+
+// RemoveFlowIDInfo remove flow info for the given pon interface, onu id, and uni id
+// Note: For flows which trap from the NNI and not really associated with any particular
+// ONU (like LLDP), the onu_id and uni_id is set as -1. The intf_id is the NNI intf_id.
+func (RsrcMgr *OpenOltResourceMgr) RemoveFlowIDInfo(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+	flowID uint64) error {
+
+	subs := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
+	path := fmt.Sprintf(FlowIDInfoPath, subs, flowID)
 
 	var err error
-	FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, ONUID, uniID)
+	if err = RsrcMgr.KVStore.Delete(ctx, path); err != nil {
+		logger.Errorf(ctx, "Failed to delete resource %s", path)
+		return err
+	}
 
-	RsrcMgr.FlowIDMgmtLock.Lock()
-	defer RsrcMgr.FlowIDMgmtLock.Unlock()
+	// Update the flowID list for the ONU
+	err = RsrcMgr.UpdateFlowIDForOnu(ctx, ponIntfID, onuID, uniID, flowID, false)
 
-	FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
-	if FlowIDs != nil {
-		logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "ONUID": ONUID, "uniID": uniID, "KVpath": FlowPath})
-		for _, flowID := range FlowIDs {
-			FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(ONUID), int32(uniID), uint32(flowID))
-			er := getFlowIDFromFlowInfo(ctx, FlowInfo, flowID, gemportID, flowStoreCookie, flowCategory, vlanVid, vlanPcp...)
-			if er == nil {
-				logger.Debugw(ctx, "Found flowid for the vlan, pcp, and gem",
-					log.Fields{"flowID": flowID, "vlanVid": vlanVid, "vlanPcp": vlanPcp, "gemPortID": gemportID})
-				return flowID, er
-			}
+	return err
+}
+
+// RemoveAllFlowsForIntfOnuUniKey removes flow info for the given interface, onu id, and uni id
+func (RsrcMgr *OpenOltResourceMgr) RemoveAllFlowsForIntfOnuUniKey(ctx context.Context, intf uint32, onuID int32, uniID int32) error {
+	flowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, intf, onuID, uniID)
+	if err != nil {
+		// error logged in the called function
+		return err
+	}
+	for _, flID := range flowIDs {
+		if err := RsrcMgr.RemoveFlowIDInfo(ctx, intf, onuID, uniID, flID); err != nil {
+			logger.Errorw(ctx, "failed-to-delete-flow-id-info", log.Fields{"intf": intf, "onuID": onuID, "uniID": uniID, "flowID": flID})
 		}
 	}
-	logger.Debug(ctx, "No matching flows with flow cookie or flow category, allocating new flowid")
-	FlowIDs, err = RsrcMgr.ResourceMgrs[ponIntfID].GetResourceID(ctx, ponIntfID,
-		ponrmgr.FLOW_ID, 1)
-	if err != nil {
-		logger.Errorf(ctx, "Failed to get resource for interface %d for type %s",
-			ponIntfID, ponrmgr.FLOW_ID)
-		return uint32(0), err
+	subs := fmt.Sprintf("%d,%d,%d", intf, onuID, uniID)
+	path := fmt.Sprintf(FlowIDPath, subs)
+	if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
+		logger.Errorf(ctx, "Failed to delete resource %s", path)
+		return err
 	}
-	if FlowIDs != nil {
-		_ = RsrcMgr.ResourceMgrs[ponIntfID].UpdateFlowIDForOnu(ctx, FlowPath, FlowIDs[0], true)
-		return FlowIDs[0], err
-	}
-
-	return 0, err
+	return nil
 }
 
 // GetAllocID return the first Alloc ID for a given pon interface id and onu id and then update the resource map on
@@ -738,47 +829,6 @@
 	}
 }
 
-// FreeFlowID returns the free flow id for a given interface, onu id and uni id
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowID(ctx context.Context, IntfID uint32, onuID int32,
-	uniID int32, FlowID uint32) {
-	var IntfONUID string
-	var err error
-
-	RsrcMgr.FlowIDMgmtLock.Lock()
-	defer RsrcMgr.FlowIDMgmtLock.Unlock()
-
-	FlowIds := make([]uint32, 0)
-	FlowIds = append(FlowIds, FlowID)
-	IntfONUID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
-	err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfONUID, FlowID, false)
-	if err != nil {
-		logger.Errorw(ctx, "Failed to Update flow id  for", log.Fields{"intf": IntfONUID})
-	}
-	RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfONUID, FlowID)
-
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowIds)
-}
-
-// FreeFlowIDs releases the flow Ids
-func (RsrcMgr *OpenOltResourceMgr) FreeFlowIDs(ctx context.Context, IntfID uint32, onuID uint32,
-	uniID uint32, FlowID []uint32) {
-	RsrcMgr.FlowIDMgmtLock.Lock()
-	defer RsrcMgr.FlowIDMgmtLock.Unlock()
-
-	RsrcMgr.ResourceMgrs[IntfID].FreeResourceID(ctx, IntfID, ponrmgr.FLOW_ID, FlowID)
-
-	var IntfOnuIDUniID string
-	var err error
-	for _, flow := range FlowID {
-		IntfOnuIDUniID = fmt.Sprintf("%d,%d,%d", IntfID, onuID, uniID)
-		err = RsrcMgr.ResourceMgrs[IntfID].UpdateFlowIDForOnu(ctx, IntfOnuIDUniID, flow, false)
-		if err != nil {
-			logger.Errorw(ctx, "Failed to Update flow id for", log.Fields{"intf": IntfOnuIDUniID})
-		}
-		RsrcMgr.ResourceMgrs[IntfID].RemoveFlowIDInfo(ctx, IntfOnuIDUniID, flow)
-	}
-}
-
 // FreeAllocID frees AllocID on the PON resource pool and also frees the allocID association
 // for the given OLT device.
 func (RsrcMgr *OpenOltResourceMgr) FreeAllocID(ctx context.Context, IntfID uint32, onuID uint32,
@@ -826,13 +876,6 @@
 		GEMPortIDs)
 	RsrcMgr.GemPortIDMgmtLock[intfID].Unlock()
 
-	RsrcMgr.FlowIDMgmtLock.Lock()
-	FlowIDs := RsrcMgr.ResourceMgrs[intfID].GetCurrentFlowIDsForOnu(ctx, IntfOnuIDUniID)
-	RsrcMgr.ResourceMgrs[intfID].FreeResourceID(ctx, intfID,
-		ponrmgr.FLOW_ID,
-		FlowIDs)
-	RsrcMgr.FlowIDMgmtLock.Unlock()
-
 	// Clear resource map associated with (pon_intf_id, gemport_id) tuple.
 	RsrcMgr.ResourceMgrs[intfID].RemoveResourceMap(ctx, IntfOnuIDUniID)
 	// Clear the ONU Id associated with the (pon_intf_id, gemport_id) tuple.
@@ -841,25 +884,21 @@
 	}
 }
 
-// IsFlowCookieOnKVStore checks if the given flow cookie is present on the kv store
-// Returns true if the flow cookie is found, otherwise it returns false
-func (RsrcMgr *OpenOltResourceMgr) IsFlowCookieOnKVStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
-	flowStoreCookie uint64) bool {
+// IsFlowOnKvStore checks if the given flowID is present on the kv store
+// Returns true if the flowID is found, otherwise it returns false
+func (RsrcMgr *OpenOltResourceMgr) IsFlowOnKvStore(ctx context.Context, ponIntfID uint32, onuID int32, uniID int32,
+	flowID uint64) bool {
 
-	FlowPath := fmt.Sprintf("%d,%d,%d", ponIntfID, onuID, uniID)
-	FlowIDs := RsrcMgr.ResourceMgrs[ponIntfID].GetCurrentFlowIDsForOnu(ctx, FlowPath)
+	FlowIDs, err := RsrcMgr.GetCurrentFlowIDsForOnu(ctx, ponIntfID, onuID, uniID)
+	if err != nil {
+		// error logged in the called function
+		return false
+	}
 	if FlowIDs != nil {
-		logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID, "KVpath": FlowPath})
-		for _, flowID := range FlowIDs {
-			FlowInfo := RsrcMgr.GetFlowIDInfo(ctx, ponIntfID, int32(onuID), int32(uniID), uint32(flowID))
-			if FlowInfo != nil {
-				logger.Debugw(ctx, "Found flows", log.Fields{"flows": *FlowInfo, "flowId": flowID})
-				for _, Info := range *FlowInfo {
-					if Info.FlowStoreCookie == flowStoreCookie {
-						logger.Debug(ctx, "Found flow matching with flowStore cookie", log.Fields{"flowId": flowID, "flowStoreCookie": flowStoreCookie})
-						return true
-					}
-				}
+		logger.Debugw(ctx, "Found flowId(s) for this ONU", log.Fields{"pon": ponIntfID, "onuID": onuID, "uniID": uniID})
+		for _, id := range FlowIDs {
+			if flowID == id {
+				return true
 			}
 		}
 	}
@@ -1017,57 +1056,6 @@
 	return nil
 }
 
-func getFlowIDFromFlowInfo(ctx context.Context, FlowInfo *[]FlowInfo, flowID, gemportID uint32, flowStoreCookie uint64, flowCategory string,
-	vlanVid uint32, vlanPcp ...uint32) error {
-	if FlowInfo != nil {
-		for _, Info := range *FlowInfo {
-			if int32(gemportID) == Info.Flow.GemportId && flowCategory != "" && Info.FlowCategory == flowCategory {
-				logger.Debug(ctx, "Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
-				if Info.FlowCategory == "HSIA_FLOW" {
-					if err := checkVlanAndPbitEqualityForFlows(vlanVid, Info, vlanPcp[0]); err == nil {
-						return nil
-					}
-				}
-			}
-			if int32(gemportID) == Info.Flow.GemportId && flowStoreCookie != 0 && Info.FlowStoreCookie == flowStoreCookie {
-				if flowCategory != "" && Info.FlowCategory == flowCategory {
-					logger.Debug(ctx, "Found flow matching with flow category", log.Fields{"flowId": flowID, "FlowCategory": flowCategory})
-					return nil
-				}
-			}
-		}
-	}
-	logger.Debugw(ctx, "the flow can be related to a different service", log.Fields{"flow_info": FlowInfo})
-	return errors.New("invalid flow-info")
-}
-
-func checkVlanAndPbitEqualityForFlows(vlanVid uint32, Info FlowInfo, vlanPcp uint32) error {
-	if err := checkVlanEqualityForFlows(vlanVid, Info); err != nil {
-		return err
-	}
-
-	//flow has remark action and pbits
-	if Info.Flow.Action.Cmd.RemarkInnerPbits || Info.Flow.Action.Cmd.RemarkOuterPbits {
-		if vlanPcp == Info.Flow.Action.OPbits || vlanPcp == Info.Flow.Action.IPbits {
-			return nil
-		}
-	} else if vlanPcp == Info.Flow.Classifier.OPbits {
-		//no remark action but flow has pbits
-		return nil
-	} else if vlanPcp == 0xff || Info.Flow.Classifier.OPbits == 0xff {
-		// no pbit found
-		return nil
-	}
-	return errors.New("not found in terms of pbit equality")
-}
-
-func checkVlanEqualityForFlows(vlanVid uint32, Info FlowInfo) error {
-	if vlanVid == Info.Flow.Action.OVid || vlanVid == Info.Flow.Classifier.IVid {
-		return nil
-	}
-	return errors.New("not found in terms of vlan_id equality")
-}
-
 //AddGemToOnuGemInfo adds gemport to onugem info kvstore
 func (RsrcMgr *OpenOltResourceMgr) AddGemToOnuGemInfo(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) error {
 	var onuGemData []OnuGemInfo
@@ -1122,14 +1110,14 @@
 
 	if err = RsrcMgr.ResourceMgrs[IntfID].GetOnuGemInfo(ctx, IntfID, &onuGemData); err != nil {
 		logger.Errorf(ctx, "failed to get onuifo for intfid %d", IntfID)
-		return olterrors.NewErrPersistence("get", "OnuGemInfo", IntfID,
+		return olterrors.NewErrPersistence("get", "OnuGemInfo", uint64(IntfID),
 			log.Fields{"onuGem": onuGem, "intfID": IntfID}, err)
 	}
 	onuGemData = append(onuGemData, onuGem)
 	err = RsrcMgr.ResourceMgrs[IntfID].AddOnuGemInfo(ctx, IntfID, onuGemData)
 	if err != nil {
 		logger.Error(ctx, "Failed to add onugem to kv store")
-		return olterrors.NewErrPersistence("set", "OnuGemInfo", IntfID,
+		return olterrors.NewErrPersistence("set", "OnuGemInfo", uint64(IntfID),
 			log.Fields{"onuGemData": onuGemData, "intfID": IntfID}, err)
 	}
 
@@ -1212,26 +1200,30 @@
 	return gemPort, nil
 }
 
-//DelGemPortPktInOfAllServices deletes the gemports from  pkt in path for all services
-func (RsrcMgr *OpenOltResourceMgr) DelGemPortPktInOfAllServices(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
+//DeletePacketInGemPortForOnu deletes the packet-in gemport for ONU
+func (RsrcMgr *OpenOltResourceMgr) DeletePacketInGemPortForOnu(ctx context.Context, intfID uint32, onuID uint32, logicalPort uint32) error {
 
-	//retrieve stored gem port from the store first.
-	Path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
-	logger.Debugf(ctx, "getting flows from the path:%s", Path)
-	Value, err := RsrcMgr.KVStore.List(ctx, Path)
+	path := fmt.Sprintf(OnuPacketINPathPrefix, intfID, onuID, logicalPort)
+	value, err := RsrcMgr.KVStore.List(ctx, path)
 	if err != nil {
-		logger.Errorf(ctx, "failed to get flows from kvstore for path %s", Path)
-		return errors.New("failed to get flows from kvstore for path " + Path)
+		logger.Errorf(ctx, "failed-to-read-value-from-path-%s", path)
+		return errors.New("failed-to-read-value-from-path-" + path)
 	}
-	logger.Debugf(ctx, "%d flows retrieved from the path:%s", len(Value), Path)
 
 	//remove them one by one
-	for key := range Value {
+	for key := range value {
+		// Formulate the right key path suffix ti be delete
+		stringToBeReplaced := fmt.Sprintf(BasePathKvStore, RsrcMgr.DeviceID) + "/"
+		replacedWith := ""
+		key = strings.Replace(key, stringToBeReplaced, replacedWith, 1)
+
+		logger.Debugf(ctx, "removing-key-%s", key)
 		if err := RsrcMgr.KVStore.Delete(ctx, key); err != nil {
-			logger.Errorf(ctx, "Falied to remove resource %s", key)
+			logger.Errorf(ctx, "failed-to-remove-resource-%s", key)
 			return err
 		}
 	}
+
 	return nil
 }
 
@@ -1306,7 +1298,7 @@
 }
 
 //UpdateFlowIDsForGem updates flow id per gemport
-func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint32) error {
+func (RsrcMgr *OpenOltResourceMgr) UpdateFlowIDsForGem(ctx context.Context, intf uint32, gem uint32, flowIDs []uint64) error {
 	var val []byte
 	path := fmt.Sprintf(FlowIDsForGem, intf)
 
@@ -1316,7 +1308,7 @@
 		return err
 	}
 	if flowsForGem == nil {
-		flowsForGem = make(map[uint32][]uint32)
+		flowsForGem = make(map[uint32][]uint64)
 	}
 	flowsForGem[gem] = flowIDs
 	val, err = json.Marshal(flowsForGem)
@@ -1325,8 +1317,6 @@
 		return err
 	}
 
-	RsrcMgr.flowIDToGemInfoLock.Lock()
-	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
 	if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
 		logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
 		return err
@@ -1358,21 +1348,17 @@
 		return
 	}
 
-	RsrcMgr.flowIDToGemInfoLock.Lock()
-	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
 	if err = RsrcMgr.KVStore.Put(ctx, path, val); err != nil {
 		logger.Errorw(ctx, "Failed to put to kvstore", log.Fields{"error": err, "path": path, "value": val})
 	}
 }
 
 //GetFlowIDsGemMapForInterface gets flowids per gemport and interface
-func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint32, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetFlowIDsGemMapForInterface(ctx context.Context, intf uint32) (map[uint32][]uint64, error) {
 	path := fmt.Sprintf(FlowIDsForGem, intf)
-	var flowsForGem map[uint32][]uint32
+	var flowsForGem map[uint32][]uint64
 	var val []byte
-	RsrcMgr.flowIDToGemInfoLock.RLock()
 	value, err := RsrcMgr.KVStore.Get(ctx, path)
-	RsrcMgr.flowIDToGemInfoLock.RUnlock()
 	if err != nil {
 		logger.Error(ctx, "failed to get data from kv store")
 		return nil, err
@@ -1393,8 +1379,7 @@
 //DeleteIntfIDGempMapPath deletes the intf id path used to store flow ids per gem to kvstore.
 func (RsrcMgr *OpenOltResourceMgr) DeleteIntfIDGempMapPath(ctx context.Context, intf uint32) {
 	path := fmt.Sprintf(FlowIDsForGem, intf)
-	RsrcMgr.flowIDToGemInfoLock.Lock()
-	defer RsrcMgr.flowIDToGemInfoLock.Unlock()
+
 	if err := RsrcMgr.KVStore.Delete(ctx, path); err != nil {
 		logger.Errorw(ctx, "Failed to delete nni interfaces from kv store", log.Fields{"path": path})
 	}
@@ -1538,3 +1523,32 @@
 	}
 	return false, groupInfo, nil
 }
+
+// toByte converts an interface value to a []byte.  The interface should either be of
+// a string type or []byte.  Otherwise, an error is returned.
+func toByte(value interface{}) ([]byte, error) {
+	switch t := value.(type) {
+	case []byte:
+		return value.([]byte), nil
+	case string:
+		return []byte(value.(string)), nil
+	default:
+		return nil, fmt.Errorf("unexpected-type-%T", t)
+	}
+}
+
+func checkForFlowIDInList(FlowIDList []uint64, FlowID uint64) (bool, uint64) {
+	/*
+	   Check for a flow id in a given list of flow IDs.
+	   :param FLowIDList: List of Flow IDs
+	   :param FlowID: Flowd to check in the list
+	   : return true and the index if present false otherwise.
+	*/
+
+	for idx := range FlowIDList {
+		if FlowID == FlowIDList[idx] {
+			return true, uint64(idx)
+		}
+	}
+	return false, 0
+}