VOL-4077: Improve storage usage on etcd
- Do away with unnecessary data storage on etcd if it can be
  reconciled on adapter restart
- For data that needs storage, use lesser footprint if possible
- Use write-through-cache for all data stored on etcd via
  resource manager module
- Use ResourceManager module per interface to localize lock
  contention per PON port

Change-Id: I21d38216fab195d738a446b3f96a00251569e38b
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 4ae86ac..9c1cb09 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -22,14 +22,15 @@
 	"encoding/hex"
 	"errors"
 	"fmt"
-	"github.com/opencord/voltha-lib-go/v4/pkg/meters"
+	"github.com/opencord/voltha-lib-go/v5/pkg/meters"
 	"strconv"
 	"strings"
 	"sync"
+	"time"
 
-	"github.com/opencord/voltha-lib-go/v4/pkg/flows"
-	"github.com/opencord/voltha-lib-go/v4/pkg/log"
-	tp "github.com/opencord/voltha-lib-go/v4/pkg/techprofile"
+	"github.com/opencord/voltha-lib-go/v5/pkg/flows"
+	"github.com/opencord/voltha-lib-go/v5/pkg/log"
+	tp "github.com/opencord/voltha-lib-go/v5/pkg/techprofile"
 	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
 	"github.com/opencord/voltha-protos/v4/go/common"
 	ic "github.com/opencord/voltha-protos/v4/go/inter_container"
@@ -154,11 +155,6 @@
 	pbit1        = '1'
 )
 
-type gemPortKey struct {
-	intfID  uint32
-	gemPort uint32
-}
-
 type schedQueue struct {
 	direction    tp_pb.Direction
 	intfID       uint32
@@ -186,15 +182,6 @@
 	gemToAes    map[uint32]bool
 }
 
-// subscriberDataPathFlowIDKey is key to subscriberDataPathFlowIDMap map
-type subscriberDataPathFlowIDKey struct {
-	intfID    uint32
-	onuID     uint32
-	uniID     uint32
-	direction string
-	tpID      uint32
-}
-
 // This control block is created per flow add/remove and pushed on the incomingFlows channel slice
 // The flowControlBlock is then picked by the perOnuFlowHandlerRoutine for further processing.
 // There is on perOnuFlowHandlerRoutine routine per ONU that constantly monitors for any incoming
@@ -210,37 +197,28 @@
 //OpenOltFlowMgr creates the Structure of OpenOltFlowMgr obj
 type OpenOltFlowMgr struct {
 	ponPortIdx    uint32 // Pon Port this FlowManager is responsible for
-	techprofile   map[uint32]tp.TechProfileIf
+	techprofile   tp.TechProfileIf
 	deviceHandler *DeviceHandler
 	grpMgr        *OpenOltGroupMgr
 	resourceMgr   *rsrcMgr.OpenOltResourceMgr
 
-	onuIdsLock sync.RWMutex // TODO: Do we need this?
-
-	flowsUsedByGemPort    map[uint32][]uint64 // gem port id to flow ids
-	flowsUsedByGemPortKey sync.RWMutex        // lock to be used to access the flowsUsedByGemPort map
+	gemToFlowIDs    map[uint32][]uint64 // gem port id to flow ids
+	gemToFlowIDsKey sync.RWMutex        // lock to be used to access the gemToFlowIDs map
 
 	packetInGemPort     map[rsrcMgr.PacketInInfoKey]uint32 //packet in gem port local cache
 	packetInGemPortLock sync.RWMutex
 
 	// TODO create a type rsrcMgr.OnuGemInfos to be used instead of []rsrcMgr.OnuGemInfo
-	onuGemInfo []rsrcMgr.OnuGemInfo //onu, gem and uni info local cache
+	onuGemInfoMap map[uint32]*rsrcMgr.OnuGemInfo //onu, gem and uni info local cache -> map of onuID to OnuGemInfo
 	// We need to have a global lock on the onuGemInfo map
 	onuGemInfoLock sync.RWMutex
 
-	// Map of voltha flowID associated with subscriberDataPathFlowIDKey
-	// This information is not persisted on Kv store and hence should be reconciled on adapter restart
-	subscriberDataPathFlowIDMap     map[subscriberDataPathFlowIDKey]uint64
-	subscriberDataPathFlowIDMapLock sync.RWMutex
+	flowIDToGems     map[uint64][]uint32
+	flowIDToGemsLock sync.RWMutex
 
 	// Slice of channels. Each channel in slice, index by ONU ID, queues flows per ONU.
 	// A go routine per ONU, waits on the unique channel (indexed by ONU ID) for incoming flows (add/remove)
 	incomingFlows []chan flowControlBlock
-
-	//this map keeps uni port info by gem and pon port. This relation shall be used for packet-out operations
-	gemToUniMap map[gemPortKey][]uint32
-	//We need to have a global lock on the gemToUniLock map
-	gemToUniLock sync.RWMutex
 }
 
 //NewFlowManager creates OpenOltFlowMgr object and initializes the parameters
@@ -248,23 +226,18 @@
 	logger.Infow(ctx, "initializing-flow-manager", log.Fields{"device-id": dh.device.Id})
 	var flowMgr OpenOltFlowMgr
 	var err error
-	var idx uint32
 
 	flowMgr.deviceHandler = dh
+	flowMgr.ponPortIdx = ponPortIdx
 	flowMgr.grpMgr = grpMgr
 	flowMgr.resourceMgr = rMgr
-	flowMgr.techprofile = make(map[uint32]tp.TechProfileIf)
 	if err = flowMgr.populateTechProfilePerPonPort(ctx); err != nil {
-		logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"error": err})
+		logger.Errorw(ctx, "error-while-populating-tech-profile-mgr", log.Fields{"err": err})
 		return nil
 	}
-	flowMgr.onuIdsLock = sync.RWMutex{}
-	flowMgr.flowsUsedByGemPort = make(map[uint32][]uint64)
+	flowMgr.gemToFlowIDs = make(map[uint32][]uint64)
 	flowMgr.packetInGemPort = make(map[rsrcMgr.PacketInInfoKey]uint32)
-	flowMgr.packetInGemPortLock = sync.RWMutex{}
-	flowMgr.onuGemInfoLock = sync.RWMutex{}
-	flowMgr.subscriberDataPathFlowIDMap = make(map[subscriberDataPathFlowIDKey]uint64)
-	flowMgr.subscriberDataPathFlowIDMapLock = sync.RWMutex{}
+	flowMgr.flowIDToGems = make(map[uint64][]uint32)
 
 	// Create a slice of buffered channels for handling concurrent flows per ONU.
 	// The additional entry (+1) is to handle the NNI trap flows on a separate channel from individual ONUs channel
@@ -276,54 +249,35 @@
 		// This routine will be blocked on the flowMgr.incomingFlows[onu-id] channel for incoming flows.
 		go flowMgr.perOnuFlowHandlerRoutine(flowMgr.incomingFlows[i])
 	}
-
+	flowMgr.onuGemInfoMap = make(map[uint32]*rsrcMgr.OnuGemInfo)
 	//Load the onugem info cache from kv store on flowmanager start
-	if flowMgr.onuGemInfo, err = rMgr.GetOnuGemInfo(ctx, ponPortIdx); err != nil {
-		logger.Error(ctx, "failed-to-load-onu-gem-info-cache")
+	onuIDStart := flowMgr.deviceHandler.deviceInfo.OnuIdStart
+	onuIDEnd := flowMgr.deviceHandler.deviceInfo.OnuIdEnd
+	for onuID := onuIDStart; onuID <= onuIDEnd; onuID++ {
+		// check for a valid serial number in onuGem as GetOnuGemInfo can return nil error in case of nothing found in the path.
+		onugem, err := rMgr.GetOnuGemInfo(ctx, onuID, ponPortIdx)
+		if err == nil && onugem != nil && onugem.SerialNumber != "" {
+			flowMgr.onuGemInfoMap[onuID] = onugem
+		}
 	}
-	//Load flowID list per gem map per interface from the kvstore.
-	flowMgr.loadFlowIDlistForGem(ctx, idx)
+
+	//Load flowID list per gem map And gemIDs per flow per interface from the kvstore.
+	flowMgr.loadFlowIDsForGemAndGemIDsForFlow(ctx)
+
 	//load interface to multicast queue map from kv store
-
-	flowMgr.gemToUniMap = make(map[gemPortKey][]uint32)
-	flowMgr.gemToUniLock = sync.RWMutex{}
-
 	flowMgr.grpMgr.LoadInterfaceToMulticastQueueMap(ctx)
-	flowMgr.reconcileSubscriberDataPathFlowIDMap(ctx)
 	logger.Info(ctx, "initialization-of-flow-manager-success")
 	return &flowMgr
 }
 
-// toGemToUniMap adds uni info consisting of onu and uni ID to the map and associates it with a gem port
-func (f *OpenOltFlowMgr) toGemToUniMap(ctx context.Context, gemPK gemPortKey, onuID uint32, uniID uint32) {
-	f.gemToUniLock.Lock()
-	f.gemToUniMap[gemPK] = []uint32{onuID, uniID}
-	f.gemToUniLock.Unlock()
-}
-
-// fromGemToUniMap returns onu and uni ID associated with the given key
-func (f *OpenOltFlowMgr) fromGemToUniMap(key gemPortKey) ([]uint32, bool) {
-	f.gemToUniLock.RLock()
-	defer f.gemToUniLock.RUnlock()
-	val, ok := f.gemToUniMap[key]
-	return val, ok
-}
-
-// removeFromGemToUniMap removes an entry associated with the given key from gemToUniMap
-func (f *OpenOltFlowMgr) removeFromGemToUniMap(key gemPortKey) {
-	f.gemToUniLock.Lock()
-	defer f.gemToUniLock.Unlock()
-	delete(f.gemToUniMap, key)
-}
-
 func (f *OpenOltFlowMgr) registerFlow(ctx context.Context, flowFromCore *ofp.OfpFlowStats, deviceFlow *openoltpb2.Flow) error {
 	if !deviceFlow.ReplicateFlow && deviceFlow.GemportId > 0 {
 		// Flow is not replicated in this case, we need to register the flow for a single gem-port
-		return f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
+		return f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), uint32(deviceFlow.GemportId), flowFromCore)
 	} else if deviceFlow.ReplicateFlow && len(deviceFlow.PbitToGemport) > 0 {
 		// Flow is replicated in this case. We need to register the flow for all the gem-ports it is replicated to.
 		for _, gemPort := range deviceFlow.PbitToGemport {
-			if err := f.registerFlowIDForGem(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
+			if err := f.registerFlowIDForGemAndGemIDForFlow(ctx, uint32(deviceFlow.AccessIntfId), gemPort, flowFromCore); err != nil {
 				return err
 			}
 		}
@@ -331,15 +285,26 @@
 	return nil
 }
 
-func (f *OpenOltFlowMgr) registerFlowIDForGem(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
-	f.flowsUsedByGemPortKey.Lock()
-	flowIDList, ok := f.flowsUsedByGemPort[gemPortID]
+func (f *OpenOltFlowMgr) registerFlowIDForGemAndGemIDForFlow(ctx context.Context, accessIntfID uint32, gemPortID uint32, flowFromCore *ofp.OfpFlowStats) error {
+	// update gem->flows map
+	f.gemToFlowIDsKey.Lock()
+	flowIDList, ok := f.gemToFlowIDs[gemPortID]
 	if !ok {
 		flowIDList = []uint64{flowFromCore.Id}
+	} else {
+		flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
 	}
-	flowIDList = appendUnique64bit(flowIDList, flowFromCore.Id)
-	f.flowsUsedByGemPort[gemPortID] = flowIDList
-	f.flowsUsedByGemPortKey.Unlock()
+	f.gemToFlowIDs[gemPortID] = flowIDList
+	f.gemToFlowIDsKey.Unlock()
+
+	// update flow->gems map
+	f.flowIDToGemsLock.Lock()
+	if _, ok := f.flowIDToGems[flowFromCore.Id]; !ok {
+		f.flowIDToGems[flowFromCore.Id] = []uint32{gemPortID}
+	} else {
+		f.flowIDToGems[flowFromCore.Id] = appendUnique32bit(f.flowIDToGems[flowFromCore.Id], gemPortID)
+	}
+	f.flowIDToGemsLock.Unlock()
 
 	// update the flowids for a gem to the KVstore
 	return f.resourceMgr.UpdateFlowIDsForGem(ctx, accessIntfID, gemPortID, flowIDList)
@@ -452,7 +417,7 @@
 
 	if meterInfo != nil {
 		logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
-		if meterInfo.MeterConfig.MeterId == sq.meterID {
+		if meterInfo.MeterID == sq.meterID {
 			if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
 				return err
 			}
@@ -460,7 +425,7 @@
 		}
 		return olterrors.NewErrInvalidValue(log.Fields{
 			"unsupported":       "meter-id",
-			"kv-store-meter-id": meterInfo.MeterConfig.MeterId,
+			"kv-store-meter-id": meterInfo.MeterID,
 			"meter-id-in-flow":  sq.meterID,
 			"device-id":         f.deviceHandler.device.Id}, nil)
 	}
@@ -472,18 +437,9 @@
 			"device-id": f.deviceHandler.device.Id})
 
 	if sq.direction == tp_pb.Direction_UPSTREAM {
-		SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+		SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
 	} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
-		SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
-	}
-
-	if err != nil {
-		return olterrors.NewErrNotFound("scheduler-config",
-			log.Fields{
-				"intf-id":   sq.intfID,
-				"direction": sq.direction,
-				"tp-inst":   sq.tpInst,
-				"device-id": f.deviceHandler.device.Id}, err)
+		SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
 	}
 
 	found := false
@@ -491,13 +447,10 @@
 	if sq.flowMetadata != nil {
 		for _, meter := range sq.flowMetadata.Meters {
 			if sq.meterID == meter.MeterId {
-				meterInfo.MeterConfig = ofp.OfpMeterConfig{}
-				meterInfo.MeterConfig.MeterId = meter.MeterId
-				meterInfo.MeterConfig.Flags = meter.Flags
+				meterInfo.MeterID = meter.MeterId
 				meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
-				meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
 				logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
-					log.Fields{"meterConfig": meterInfo.MeterConfig,
+					log.Fields{"meter": meter,
 						"device-id": f.deviceHandler.device.Id})
 				found = true
 				break
@@ -515,14 +468,14 @@
 	}
 
 	var TrafficShaping *tp_pb.TrafficShapingInfo
-	if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, &meterInfo.MeterConfig); err != nil {
+	if TrafficShaping, err = meters.GetTrafficShapingInfo(ctx, sq.flowMetadata.Meters[0]); err != nil {
 		return olterrors.NewErrInvalidValue(log.Fields{
 			"reason":    "invalid-meter-config",
 			"meter-id":  sq.meterID,
 			"device-id": f.deviceHandler.device.Id}, nil)
 	}
 
-	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
 	TrafficSched[0].TechProfileId = sq.tpID
 
 	if err := f.pushSchedulerQueuesToDevice(ctx, sq, TrafficSched); err != nil {
@@ -549,7 +502,7 @@
 }
 
 func (f *OpenOltFlowMgr) pushSchedulerQueuesToDevice(ctx context.Context, sq schedQueue, TrafficSched []*tp_pb.TrafficScheduler) error {
-	trafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+	trafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
 
 	if err != nil {
 		return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
@@ -598,10 +551,9 @@
 		"device-id":      f.deviceHandler.device.Id})
 
 	if sq.direction == tp_pb.Direction_DOWNSTREAM {
-		multicastTrafficQueues := f.techprofile[sq.intfID].GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile))
+		multicastTrafficQueues := f.techprofile.GetMulticastTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance))
 		if len(multicastTrafficQueues) > 0 {
-			if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present {
-				//assumed that there is only one queue per PON for the multicast service
+			if _, present := f.grpMgr.GetInterfaceToMcastQueueMap(sq.intfID); !present { //assumed that there is only one queue per PON for the multicast service
 				//the default queue with multicastQueuePerPonPort.Priority per a pon interface is used for multicast service
 				//just put it in interfaceToMcastQueueMap to use for building group members
 				logger.Debugw(ctx, "multicast-traffic-queues", log.Fields{"device-id": f.deviceHandler.device.Id})
@@ -613,7 +565,7 @@
 				f.grpMgr.UpdateInterfaceToMcastQueueMap(sq.intfID, val)
 				//also store the queue info in kv store
 				if err := f.resourceMgr.AddMcastQueueForIntf(ctx, sq.intfID, multicastQueuePerPonPort.GemportId, multicastQueuePerPonPort.Priority); err != nil {
-					logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"error": err})
+					logger.Errorw(ctx, "failed-to-add-mcast-queue", log.Fields{"err": err})
 					return err
 				}
 
@@ -639,27 +591,19 @@
 			"uni-port":  sq.uniPort,
 			"device-id": f.deviceHandler.device.Id})
 	if sq.direction == tp_pb.Direction_UPSTREAM {
-		SchedCfg, err = f.techprofile[sq.intfID].GetUsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+		SchedCfg = f.techprofile.GetUsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
 		Direction = "upstream"
 	} else if sq.direction == tp_pb.Direction_DOWNSTREAM {
-		SchedCfg, err = f.techprofile[sq.intfID].GetDsScheduler(ctx, sq.tpInst.(*tp.TechProfile))
+		SchedCfg = f.techprofile.GetDsScheduler(sq.tpInst.(*tp_pb.TechProfileInstance))
 		Direction = "downstream"
 	}
 
-	if err != nil {
-		return olterrors.NewErrNotFound("scheduler-config",
-			log.Fields{
-				"int-id":    sq.intfID,
-				"direction": sq.direction,
-				"device-id": f.deviceHandler.device.Id}, err)
-	}
-
 	TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
 
-	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
+	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile.GetTrafficScheduler(sq.tpInst.(*tp_pb.TechProfileInstance), SchedCfg, TrafficShaping)}
 	TrafficSched[0].TechProfileId = sq.tpID
 
-	TrafficQueues, err := f.techprofile[sq.intfID].GetTrafficQueues(ctx, sq.tpInst.(*tp.TechProfile), sq.direction)
+	TrafficQueues, err := f.techprofile.GetTrafficQueues(ctx, sq.tpInst.(*tp_pb.TechProfileInstance), sq.direction)
 	if err != nil {
 		return olterrors.NewErrAdapter("unable-to-construct-traffic-queue-configuration",
 			log.Fields{
@@ -703,7 +647,7 @@
 				"uni-port": sq.uniPort})
 
 		if sq.direction == tp_pb.Direction_UPSTREAM {
-			allocID := sq.tpInst.(*tp.TechProfile).UsScheduler.AllocID
+			allocID := sq.tpInst.(*tp_pb.TechProfileInstance).UsScheduler.AllocId
 			f.resourceMgr.FreeAllocID(ctx, sq.intfID, sq.onuID, sq.uniID, allocID)
 			// Delete the TCONT on the ONU.
 			uni := getUniPortPath(f.deviceHandler.device.Id, sq.intfID, int32(sq.onuID), int32(sq.uniID))
@@ -752,7 +696,6 @@
 	var gemPortIDs []uint32
 	tpInstanceExists := false
 	var err error
-
 	allocIDs = f.resourceMgr.GetCurrentAllocIDsForOnu(ctx, intfID, onuID, uniID)
 	allgemPortIDs = f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, intfID, onuID, uniID)
 	tpPath := f.getTPpath(ctx, intfID, uni, TpID)
@@ -765,24 +708,24 @@
 		"tp-id":     TpID})
 
 	// Check tech profile instance already exists for derived port name
-	techProfileInstance, _ := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, TpID, tpPath)
+	techProfileInstance, _ := f.techprofile.GetTPInstance(ctx, tpPath)
 	if techProfileInstance == nil {
 		logger.Infow(ctx, "tp-instance-not-found--creating-new",
 			log.Fields{
 				"path":      tpPath,
 				"device-id": f.deviceHandler.device.Id})
-		techProfileInstance, err = f.techprofile[intfID].CreateTechProfInstance(ctx, TpID, uni, intfID)
+		techProfileInstance, err = f.techprofile.CreateTechProfileInstance(ctx, TpID, uni, intfID)
 		if err != nil {
 			// This should not happen, something wrong in KV backend transaction
 			logger.Errorw(ctx, "tp-instance-create-failed",
 				log.Fields{
-					"error":     err,
+					"err":       err,
 					"tp-id":     TpID,
 					"device-id": f.deviceHandler.device.Id})
 			return 0, nil, nil
 		}
 		if err := f.resourceMgr.UpdateTechProfileIDForOnu(ctx, intfID, onuID, uniID, TpID); err != nil {
-			logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"error": err})
+			logger.Warnw(ctx, "failed-to-update-tech-profile-id", log.Fields{"err": err})
 		}
 	} else {
 		logger.Debugw(ctx, "tech-profile-instance-already-exist-for-given port-name",
@@ -793,14 +736,14 @@
 	}
 
 	switch tpInst := techProfileInstance.(type) {
-	case *tp.TechProfile:
+	case *tp_pb.TechProfileInstance:
 		if UsMeterID != 0 {
 			sq := schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: onuID, uniID: uniID, tpID: TpID,
 				uniPort: uniPort, tpInst: techProfileInstance, meterID: UsMeterID, flowMetadata: flowMetadata}
 			if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
 				logger.Errorw(ctx, "CreateSchedulerQueues-failed-upstream",
 					log.Fields{
-						"error":     err,
+						"err":       err,
 						"onu-id":    onuID,
 						"uni-id":    uniID,
 						"intf-id":   intfID,
@@ -815,7 +758,7 @@
 			if err := f.CreateSchedulerQueues(ctx, sq); err != nil {
 				logger.Errorw(ctx, "CreateSchedulerQueues-failed-downstream",
 					log.Fields{
-						"error":     err,
+						"err":       err,
 						"onu-id":    onuID,
 						"uni-id":    uniID,
 						"intf-id":   intfID,
@@ -824,9 +767,9 @@
 				return 0, nil, nil
 			}
 		}
-		allocID := tpInst.UsScheduler.AllocID
+		allocID := tpInst.UsScheduler.AllocId
 		for _, gem := range tpInst.UpstreamGemPortAttributeList {
-			gemPortIDs = append(gemPortIDs, gem.GemportID)
+			gemPortIDs = append(gemPortIDs, gem.GemportId)
 		}
 		allocIDs = appendUnique32bit(allocIDs, allocID)
 
@@ -848,12 +791,12 @@
 		// Send Tconts and GEM ports to KV store
 		f.storeTcontsGEMPortsIntoKVStore(ctx, intfID, onuID, uniID, allocIDs, allgemPortIDs)
 		return allocID, gemPortIDs, techProfileInstance
-	case *tp.EponProfile:
+	case *openoltpb2.EponTechProfileInstance:
 		// CreateSchedulerQueues for EPON needs to be implemented here
 		// when voltha-protos for EPON is completed.
-		allocID := tpInst.AllocID
+		allocID := tpInst.AllocId
 		for _, gem := range tpInst.UpstreamQueueAttributeList {
-			gemPortIDs = append(gemPortIDs, gem.GemportID)
+			gemPortIDs = append(gemPortIDs, gem.GemportId)
 		}
 		allocIDs = appendUnique32bit(allocIDs, allocID)
 
@@ -897,36 +840,18 @@
 	if err := f.resourceMgr.UpdateGEMPortIDsForOnu(ctx, intfID, onuID, uniID, gemPortIDs); err != nil {
 		logger.Errorw(ctx, "error-while-uploading-gemports-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
 	}
-	if err := f.resourceMgr.UpdateGEMportsPonportToOnuMapOnKVStore(ctx, gemPortIDs, intfID, onuID, uniID); err != nil {
-		logger.Error(ctx, "error-while-uploading-gemtopon-map-to-kv-store", log.Fields{"device-id": f.deviceHandler.device.Id})
-	} else {
-		//add to gem to uni cache
-		f.addGemPortUniAssociationsToCache(ctx, intfID, onuID, uniID, gemPortIDs)
-	}
+
 	logger.Infow(ctx, "stored-tconts-and-gem-into-kv-store-successfully", log.Fields{"device-id": f.deviceHandler.device.Id})
 	for _, gemPort := range gemPortIDs {
 		f.addGemPortToOnuInfoMap(ctx, intfID, onuID, gemPort)
 	}
 }
 
-//addGemPortUniAssociationsToCache
-func (f *OpenOltFlowMgr) addGemPortUniAssociationsToCache(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, gemPortIDs []uint32) {
-	for _, gemPortID := range gemPortIDs {
-		key := gemPortKey{
-			intfID:  intfID,
-			gemPort: gemPortID,
-		}
-		f.toGemToUniMap(ctx, key, onuID, uniID)
-	}
-	logger.Debugw(ctx, "gem-to-uni-info-added-to-cache", log.Fields{"device-id": f.deviceHandler.device.Id, "intfID": intfID,
-		"gemPortIDs": gemPortIDs, "onuID": onuID, "uniID": uniID})
-}
-
 func (f *OpenOltFlowMgr) populateTechProfilePerPonPort(ctx context.Context) error {
 	var tpCount int
 	for _, techRange := range f.resourceMgr.DevInfo.Ranges {
 		for _, intfID := range techRange.IntfIds {
-			f.techprofile[intfID] = f.resourceMgr.ResourceMgrs[intfID].TechProfileMgr
+			f.techprofile = f.resourceMgr.PonRsrMgr.TechProfileMgr
 			tpCount++
 			logger.Debugw(ctx, "init-tech-profile-done",
 				log.Fields{
@@ -1004,13 +929,6 @@
 
 func (f *OpenOltFlowMgr) addSymmetricDataPathFlow(ctx context.Context, flowContext *flowContext, direction string) error {
 
-	var inverseDirection string
-	if direction == Upstream {
-		inverseDirection = Downstream
-	} else {
-		inverseDirection = Upstream
-	}
-
 	intfID := flowContext.intfID
 	onuID := flowContext.onuID
 	uniID := flowContext.uniID
@@ -1067,33 +985,23 @@
 			}, err).Log()
 	}
 
-	// Get symmetric flowID if it exists
-	// This symmetric flowID will be needed by agent software to use the same device flow-id that was used for the
-	// symmetric flow earlier
-	// symmetric flowID 0 is considered by agent as non-existent symmetric flow
-	keySymm := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: inverseDirection, tpID: tpID}
-	f.subscriberDataPathFlowIDMapLock.RLock()
-	symmFlowID := f.subscriberDataPathFlowIDMap[keySymm]
-	f.subscriberDataPathFlowIDMapLock.RUnlock()
-
 	flow := openoltpb2.Flow{AccessIntfId: int32(intfID),
-		OnuId:           int32(onuID),
-		UniId:           int32(uniID),
-		FlowId:          logicalFlow.Id,
-		FlowType:        direction,
-		AllocId:         int32(allocID),
-		NetworkIntfId:   int32(networkIntfID),
-		GemportId:       int32(gemPortID),
-		Classifier:      classifierProto,
-		Action:          actionProto,
-		Priority:        int32(logicalFlow.Priority),
-		Cookie:          logicalFlow.Cookie,
-		PortNo:          flowContext.portNo,
-		TechProfileId:   tpID,
-		ReplicateFlow:   len(flowContext.pbitToGem) > 0,
-		PbitToGemport:   flowContext.pbitToGem,
-		SymmetricFlowId: symmFlowID,
-		GemportToAes:    flowContext.gemToAes,
+		OnuId:         int32(onuID),
+		UniId:         int32(uniID),
+		FlowId:        logicalFlow.Id,
+		FlowType:      direction,
+		AllocId:       int32(allocID),
+		NetworkIntfId: int32(networkIntfID),
+		GemportId:     int32(gemPortID),
+		Classifier:    classifierProto,
+		Action:        actionProto,
+		Priority:      int32(logicalFlow.Priority),
+		Cookie:        logicalFlow.Cookie,
+		PortNo:        flowContext.portNo,
+		TechProfileId: tpID,
+		ReplicateFlow: len(flowContext.pbitToGem) > 0,
+		PbitToGemport: flowContext.pbitToGem,
+		GemportToAes:  flowContext.gemToAes,
 	}
 	if err := f.addFlowToDevice(ctx, logicalFlow, &flow); err != nil {
 		return olterrors.NewErrFlowOp("add", logicalFlow.Id, nil, err).Log()
@@ -1104,21 +1012,6 @@
 			"flow":      flow,
 			"intf-id":   intfID,
 			"onu-id":    onuID})
-	flowInfo := rsrcMgr.FlowInfo{Flow: &flow, IsSymmtricFlow: true}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id,
-			log.Fields{
-				"flow":      flow,
-				"device-id": f.deviceHandler.device.Id,
-				"intf-id":   intfID,
-				"onu-id":    onuID}, err).Log()
-	}
-
-	// Update the current flowID to the map
-	keyCurr := subscriberDataPathFlowIDKey{intfID: intfID, onuID: onuID, uniID: uniID, direction: direction, tpID: tpID}
-	f.subscriberDataPathFlowIDMapLock.Lock()
-	f.subscriberDataPathFlowIDMap[keyCurr] = logicalFlow.Id
-	f.subscriberDataPathFlowIDMapLock.Unlock()
 
 	return nil
 }
@@ -1206,13 +1099,6 @@
 			"flow-id":   logicalFlow.Id,
 			"intf-id":   intfID,
 			"onu-id":    onuID})
-	flowInfo := rsrcMgr.FlowInfo{Flow: &dhcpFlow}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(dhcpFlow.AccessIntfId), dhcpFlow.OnuId, dhcpFlow.UniId, dhcpFlow.FlowId, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", dhcpFlow.FlowId,
-			log.Fields{
-				"flow":      dhcpFlow,
-				"device-id": f.deviceHandler.device.Id}, err).Log()
-	}
 
 	return nil
 }
@@ -1301,11 +1187,6 @@
 		return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
 	}
 
-	flowInfo := rsrcMgr.FlowInfo{Flow: &flow}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(flow.AccessIntfId), flow.OnuId, flow.UniId, flow.FlowId, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", flow.FlowId, log.Fields{"flow": flow, "device-id": f.deviceHandler.device.Id}, err).Log()
-	}
-
 	return nil
 }
 
@@ -1336,7 +1217,7 @@
 	uplinkAction := make(map[string]interface{})
 
 	// Fill Classfier
-	uplinkClassifier[EthType] = uint32(ethType)
+	uplinkClassifier[EthType] = ethType
 	uplinkClassifier[PacketTagType] = SingleTag
 	uplinkClassifier[VlanVid] = vlanID
 	uplinkClassifier[VlanPcp] = classifier[VlanPcp]
@@ -1415,13 +1296,7 @@
 			"intf-id":   intfID,
 			"ethType":   ethType,
 		})
-	flowInfo := rsrcMgr.FlowInfo{Flow: &upstreamFlow}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, uint32(upstreamFlow.AccessIntfId), upstreamFlow.OnuId, upstreamFlow.UniId, upstreamFlow.FlowId, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", upstreamFlow.FlowId,
-			log.Fields{
-				"flow":      upstreamFlow,
-				"device-id": f.deviceHandler.device.Id}, err).Log()
-	}
+
 	return nil
 }
 
@@ -1502,7 +1377,7 @@
 
 // getTPpath return the ETCD path for a given UNI port
 func (f *OpenOltFlowMgr) getTPpath(ctx context.Context, intfID uint32, uniPath string, TpID uint32) string {
-	return f.techprofile[intfID].GetTechProfileInstanceKVPath(ctx, TpID, uniPath)
+	return f.techprofile.GetTechProfileInstanceKey(ctx, TpID, uniPath)
 }
 
 // DeleteTechProfileInstances removes the tech profile instances from persistent storage
@@ -1512,11 +1387,11 @@
 
 	for _, tpID := range tpIDList {
 		if err := f.DeleteTechProfileInstance(ctx, intfID, onuID, uniID, uniPortName, tpID); err != nil {
-			_ = olterrors.NewErrAdapter("delete-tech-profile-failed", log.Fields{"device-id": f.deviceHandler.device.Id}, err).Log()
+			logger.Errorw(ctx, "delete-tech-profile-failed", log.Fields{"err": err, "device-id": f.deviceHandler.device.Id})
 			// return err
 			// We should continue to delete tech-profile instances for other TP IDs
 		}
-		logger.Debugw(ctx, "tech-profile-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "tp-id": tpID})
+		logger.Debugw(ctx, "tech-profile-instance-deleted", log.Fields{"device-id": f.deviceHandler.device.Id, "uniPortName": uniPortName, "tp-id": tpID})
 	}
 	return nil
 }
@@ -1526,7 +1401,7 @@
 	if uniPortName == "" {
 		uniPortName = getUniPortPath(f.deviceHandler.device.Id, intfID, int32(onuID), int32(uniID))
 	}
-	if err := f.techprofile[intfID].DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
+	if err := f.techprofile.DeleteTechProfileInstance(ctx, tpID, uniPortName); err != nil {
 		return olterrors.NewErrAdapter("failed-to-delete-tp-instance-from-kv-store",
 			log.Fields{
 				"tp-id":         tpID,
@@ -1698,13 +1573,7 @@
 			"device-id": f.deviceHandler.device.Id,
 			"onu-id":    onuID,
 			"flow-id":   flow.Id})
-	flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", flow.Id,
-			log.Fields{
-				"flow":      downstreamflow,
-				"device-id": f.deviceHandler.device.Id}, err)
-	}
+
 	return nil
 }
 
@@ -1781,8 +1650,8 @@
 		return err
 	}
 
-	delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpPath: tpPath, GemPortId: gemPortID}
-	logger.Debugw(ctx, "sending-gem-port-delete-to-openonu-adapter",
+	delGemPortMsg := &ic.InterAdapterDeleteGemPortMessage{UniId: uniID, TpInstancePath: tpPath, GemPortId: gemPortID}
+	logger.Infow(ctx, "sending-gem-port-delete-to-openonu-adapter",
 		log.Fields{
 			"msg":       *delGemPortMsg,
 			"device-id": f.deviceHandler.device.Id})
@@ -1822,7 +1691,7 @@
 		return err
 	}
 
-	delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpPath: tpPath, AllocId: allocID}
+	delTcontMsg := &ic.InterAdapterDeleteTcontMessage{UniId: uniID, TpInstancePath: tpPath, AllocId: allocID}
 	logger.Debugw(ctx, "sending-tcont-delete-to-openonu-adapter",
 		log.Fields{
 			"msg":       *delTcontMsg,
@@ -1853,37 +1722,38 @@
 // Otherwise stale info continues to exist after gemport is freed and wrong logicalPortNo
 // is conveyed to ONOS during packet-in OF message.
 func (f *OpenOltFlowMgr) deleteGemPortFromLocalCache(ctx context.Context, intfID uint32, onuID uint32, gemPortID uint32) {
-
-	f.onuGemInfoLock.Lock()
-	defer f.onuGemInfoLock.Unlock()
-
 	logger.Infow(ctx, "deleting-gem-from-local-cache",
 		log.Fields{
 			"gem-port-id": gemPortID,
 			"intf-id":     intfID,
 			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id,
-			"onu-gem":     f.onuGemInfo})
-
-	onugem := f.onuGemInfo
+			"device-id":   f.deviceHandler.device.Id})
+	f.onuGemInfoLock.RLock()
+	onugem, ok := f.onuGemInfoMap[onuID]
+	f.onuGemInfoLock.RUnlock()
+	if !ok {
+		logger.Warnw(ctx, "onu gem info already cleared from cache", log.Fields{
+			"gem-port-id": gemPortID,
+			"intf-id":     intfID,
+			"onu-id":      onuID,
+			"device-id":   f.deviceHandler.device.Id})
+		return
+	}
 deleteLoop:
-	for i, onu := range onugem {
-		if onu.OnuID == onuID {
-			for j, gem := range onu.GemPorts {
-				// If the gemport is found, delete it from local cache.
-				if gem == gemPortID {
-					onu.GemPorts = append(onu.GemPorts[:j], onu.GemPorts[j+1:]...)
-					onugem[i] = onu
-					logger.Infow(ctx, "removed-gemport-from-local-cache",
-						log.Fields{
-							"intf-id":           intfID,
-							"onu-id":            onuID,
-							"deletedgemport-id": gemPortID,
-							"gemports":          onu.GemPorts,
-							"device-id":         f.deviceHandler.device.Id})
-					break deleteLoop
-				}
-			}
+	for j, gem := range onugem.GemPorts {
+		// If the gemport is found, delete it from local cache.
+		if gem == gemPortID {
+			onugem.GemPorts = append(onugem.GemPorts[:j], onugem.GemPorts[j+1:]...)
+			f.onuGemInfoLock.Lock()
+			f.onuGemInfoMap[onuID] = onugem
+			f.onuGemInfoLock.Unlock()
+			logger.Infow(ctx, "removed-gemport-from-local-cache",
+				log.Fields{
+					"intf-id":           intfID,
+					"onu-id":            onuID,
+					"deletedgemport-id": gemPortID,
+					"gemports":          onugem.GemPorts,
+					"device-id":         f.deviceHandler.device.Id})
 			break deleteLoop
 		}
 	}
@@ -1891,7 +1761,7 @@
 
 //clearResources clears pon resources in kv store and the device
 // nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, intfID uint32, onuID int32, uniID int32,
 	gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
 
 	uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
@@ -1900,27 +1770,22 @@
 		log.Fields{
 			"tpPath":    tpPath,
 			"device-id": f.deviceHandler.device.Id})
-	techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
-	if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
-		return olterrors.NewErrNotFound("tech-profile-in-kv-store",
-			log.Fields{
-				"tp-id": tpID,
-				"path":  tpPath}, err)
-	}
 
 	used := f.isGemPortUsedByAnotherFlow(uint32(gemPortID))
 
 	if used {
-		f.flowsUsedByGemPortKey.Lock()
-		defer f.flowsUsedByGemPortKey.Unlock()
+		f.gemToFlowIDsKey.RLock()
+		flowIDs := f.gemToFlowIDs[uint32(gemPortID)]
+		f.gemToFlowIDsKey.RUnlock()
 
-		flowIDs := f.flowsUsedByGemPort[uint32(gemPortID)]
 		for i, flowIDinMap := range flowIDs {
 			if flowIDinMap == flowID {
 				flowIDs = append(flowIDs[:i], flowIDs[i+1:]...)
-				// everytime flowsUsedByGemPort cache is updated the same should be updated
+				f.gemToFlowIDsKey.Lock()
+				f.gemToFlowIDs[uint32(gemPortID)] = flowIDs
+				f.gemToFlowIDsKey.Unlock()
+				// everytime gemToFlowIDs cache is updated the same should be updated
 				// in kv store by calling UpdateFlowIDsForGem
-				f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
 				if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
 					return err
 				}
@@ -1936,28 +1801,17 @@
 		return nil
 	}
 	logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
-	f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
-	// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
-	// But it is anyway eventually  removed later when the TechProfile is freed, so not a big issue for now.
-	f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
-	// also clear gem to uni cache
-	f.removeFromGemToUniMap(gemPortKey{
-		intfID:  intfID,
-		gemPort: uint32(gemPortID),
-	})
 	f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
-
-	f.onuIdsLock.Lock() // TODO: What is this lock?
-
-	//everytime an entry is deleted from flowsUsedByGemPort cache, the same should be updated in kv as well
+	_ = f.resourceMgr.RemoveGemFromOnuGemInfo(ctx, intfID, uint32(onuID), uint32(gemPortID)) // ignore error and proceed.
+	//everytime an entry is deleted from gemToFlowIDs cache, the same should be updated in kv as well
 	// by calling DeleteFlowIDsForGem
-	f.flowsUsedByGemPortKey.Lock()
-	delete(f.flowsUsedByGemPort, uint32(gemPortID))
-	f.flowsUsedByGemPortKey.Unlock()
-	f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
-	f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
+	f.gemToFlowIDsKey.Lock()
+	delete(f.gemToFlowIDs, uint32(gemPortID))
+	f.gemToFlowIDsKey.Unlock()
 
-	f.onuIdsLock.Unlock()
+	f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+
+	f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
 
 	// Delete the gem port on the ONU.
 	if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
@@ -1970,8 +1824,15 @@
 				"device-id":  f.deviceHandler.device.Id,
 				"gemport-id": gemPortID})
 	}
+	techprofileInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
+	if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
+		return olterrors.NewErrNotFound("tech-profile-in-kv-store",
+			log.Fields{
+				"tp-id": tpID,
+				"path":  tpPath}, err)
+	}
 	switch techprofileInst := techprofileInst.(type) {
-	case *tp.TechProfile:
+	case *tp_pb.TechProfileInstance:
 		ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst, uint32(gemPortID))
 		if !ok {
 			if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
@@ -1987,23 +1848,23 @@
 				logger.Warn(ctx, err)
 			}
 		}
-	case *tp.EponProfile:
+	case *tp_pb.EponTechProfileInstance:
 		if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
 			logger.Warn(ctx, err)
 		}
 		if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
 			logger.Warn(ctx, err)
 		}
-		f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+		f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId)
 		// Delete the TCONT on the ONU.
-		if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+		if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocId, tpPath); err != nil {
 			logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
 				log.Fields{
 					"intf":      intfID,
 					"onu-id":    onuID,
 					"uni-id":    uniID,
 					"device-id": f.deviceHandler.device.Id,
-					"alloc-id":  techprofileInst.AllocID})
+					"alloc-id":  techprofileInst.AllocId})
 		}
 	default:
 		logger.Errorw(ctx, "error-unknown-tech",
@@ -2016,7 +1877,6 @@
 
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) clearFlowFromDeviceAndResourceManager(ctx context.Context, flow *ofp.OfpFlowStats, flowDirection string) error {
-	var flowInfo *rsrcMgr.FlowInfo
 	logger.Infow(ctx, "clear-flow-from-resource-manager",
 		log.Fields{
 			"flowDirection": flowDirection,
@@ -2037,6 +1897,16 @@
 
 	onuID := int32(onu)
 	uniID := int32(uni)
+	tpID, err := getTpIDFromFlow(ctx, flow)
+	if err != nil {
+		return olterrors.NewErrNotFound("tp-id",
+			log.Fields{
+				"flow":      flow,
+				"intf-id":   Intf,
+				"onu-id":    onuID,
+				"uni-id":    uniID,
+				"device-id": f.deviceHandler.device.Id}, err)
+	}
 
 	for _, field := range flows.GetOfbFields(flow) {
 		if field.Type == flows.IP_PROTO {
@@ -2060,86 +1930,45 @@
 			logger.Errorw(ctx, "invalid-in-port-number",
 				log.Fields{
 					"port-number": inPort,
-					"error":       err})
+					"err":         err})
 			return err
 		}
 	}
-	if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); flowInfo == nil {
-		logger.Errorw(ctx, "flow-info-not-found-for-flow-to-be-removed", log.Fields{"flow-id": flow.Id, "intf-id": Intf, "onu-id": onuID, "uni-id": uniID})
-		return olterrors.NewErrPersistence("remove", "flow", flow.Id, log.Fields{"flow": flow}, err)
-	}
-	removeFlowMessage := openoltpb2.Flow{FlowId: flowInfo.Flow.FlowId, FlowType: flowInfo.Flow.FlowType}
-	logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flowInfo.Flow})
+
+	removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, AccessIntfId: int32(Intf), OnuId: onuID, UniId: uniID, TechProfileId: tpID, FlowType: flowDirection}
+	logger.Debugw(ctx, "flow-to-be-deleted", log.Fields{"flow": flow})
 	if err = f.removeFlowFromDevice(ctx, &removeFlowMessage, flow.Id); err != nil {
 		return err
 	}
-	if err = f.resourceMgr.RemoveFlowIDInfo(ctx, Intf, onuID, uniID, flow.Id); err != nil {
-		logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
-		return err
-	}
-	tpID, err := getTpIDFromFlow(ctx, flow)
-	if err != nil {
-		return olterrors.NewErrNotFound("tp-id",
-			log.Fields{
-				"flow":      flow,
-				"intf-id":   Intf,
-				"onu-id":    onuID,
-				"uni-id":    uniID,
-				"device-id": f.deviceHandler.device.Id}, err)
-	}
 
-	if !flowInfo.Flow.ReplicateFlow {
-		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, tpID); err != nil {
+	f.flowIDToGemsLock.Lock()
+	gems, ok := f.flowIDToGems[flow.Id]
+	if !ok {
+		logger.Errorw(ctx, "flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id})
+		f.flowIDToGemsLock.Unlock()
+		return olterrors.NewErrNotFound("flow-id-to-gem-map-not-found", log.Fields{"flowID": flow.Id}, nil)
+	}
+	copyOfGems := make([]uint32, len(gems))
+	_ = copy(copyOfGems, gems)
+	// Delete the flow-id to gemport list entry from the map now the flow is deleted.
+	delete(f.flowIDToGems, flow.Id)
+	f.flowIDToGemsLock.Unlock()
+
+	logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": copyOfGems})
+	for _, gem := range copyOfGems {
+		if err = f.clearResources(ctx, Intf, onuID, uniID, int32(gem), flow.Id, portNum, tpID); err != nil {
 			logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
-				"flow-id":        flow.Id,
-				"stored-flow":    flowInfo.Flow,
-				"device-id":      f.deviceHandler.device.Id,
-				"stored-flow-id": flowInfo.Flow.FlowId,
-				"onu-id":         onuID,
-				"intf":           Intf,
-				"err":            err,
+				"flow-id":   flow.Id,
+				"device-id": f.deviceHandler.device.Id,
+				"onu-id":    onuID,
+				"intf":      Intf,
+				"gem":       gem,
+				"err":       err,
 			})
 			return err
 		}
-	} else {
-		gems := make([]uint32, 0)
-		for _, gem := range flowInfo.Flow.PbitToGemport {
-			gems = appendUnique32bit(gems, gem)
-		}
-		logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
-		for _, gem := range gems {
-			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, tpID); err != nil {
-				logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
-					"flow-id":        flow.Id,
-					"stored-flow":    flowInfo.Flow,
-					"device-id":      f.deviceHandler.device.Id,
-					"stored-flow-id": flowInfo.Flow.FlowId,
-					"onu-id":         onuID,
-					"intf":           Intf,
-					"gem":            gem,
-					"err":            err,
-				})
-				return err
-			}
-		}
 	}
 
-	// If datapath flow, clear the symmetric flow data from the subscriberDataPathFlowIDMap map
-	if isDatapathFlow(flow) {
-		if tpID, err := getTpIDFromFlow(ctx, flow); err != nil {
-			var inverseDirection string
-			if flowDirection == Upstream {
-				inverseDirection = Downstream
-			} else {
-				inverseDirection = Upstream
-			}
-
-			keySymm := subscriberDataPathFlowIDKey{intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), direction: inverseDirection, tpID: tpID}
-			f.subscriberDataPathFlowIDMapLock.Lock()
-			delete(f.subscriberDataPathFlowIDMap, keySymm)
-			f.subscriberDataPathFlowIDMapLock.Unlock()
-		}
-	}
 	// Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
 	if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, flowDirection, Intf, uint32(onuID), uint32(uniID), tpID, false); err != nil {
 		return err
@@ -2203,7 +2032,8 @@
 	// Step2 : Push the flowControlBlock to ONU channel
 	// Step3 : Wait on response channel for response
 	// Step4 : Return error value
-	logger.Debugw(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
+	startTime := time.Now()
+	logger.Infow(ctx, "process-flow", log.Fields{"flow": flow, "addFlow": addFlow})
 	errChan := make(chan error)
 	flowCb := flowControlBlock{
 		ctx:          ctx,
@@ -2223,7 +2053,7 @@
 	f.incomingFlows[onuID] <- flowCb
 	// Wait on the channel for flow handlers return value
 	err := <-errChan
-	logger.Debugw(ctx, "process-flow--received-resp", log.Fields{"flow": flow, "addFlow": addFlow, "err": err})
+	logger.Infow(ctx, "process-flow--received-resp", log.Fields{"err": err, "totalTimeSeconds": time.Since(startTime).Seconds()})
 	return err
 }
 
@@ -2235,17 +2065,17 @@
 		// process the flow completely before proceeding to handle the next flow
 		flowCb := <-subscriberFlowChannel
 		if flowCb.addFlow {
-			logger.Debugw(flowCb.ctx, "adding-flow",
-				log.Fields{"device-id": f.deviceHandler.device.Id,
-					"flowToAdd": flowCb.flow})
+			logger.Info(flowCb.ctx, "adding-flow-start")
+			startTime := time.Now()
 			err := f.AddFlow(flowCb.ctx, flowCb.flow, flowCb.flowMetadata)
+			logger.Infow(flowCb.ctx, "adding-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
 			// Pass the return value over the return channel
 			*flowCb.errChan <- err
 		} else {
-			logger.Debugw(flowCb.ctx, "removing-flow",
-				log.Fields{"device-id": f.deviceHandler.device.Id,
-					"flowToRemove": flowCb.flow})
+			logger.Info(flowCb.ctx, "removing-flow-start")
+			startTime := time.Now()
 			err := f.RemoveFlow(flowCb.ctx, flowCb.flow)
+			logger.Infow(flowCb.ctx, "removing-flow-complete", log.Fields{"processTimeSecs": time.Since(startTime).Seconds()})
 			// Pass the return value over the return channel
 			*flowCb.errChan <- err
 		}
@@ -2393,14 +2223,10 @@
 		}
 		//cached group can be removed now
 		if err := f.resourceMgr.RemoveFlowGroupFromKVStore(ctx, groupID, true); err != nil {
-			logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "error": err})
+			logger.Warnw(ctx, "failed-to-remove-flow-group", log.Fields{"group-id": groupID, "err": err})
 		}
 	}
 
-	flowInfo := rsrcMgr.FlowInfo{Flow: &multicastFlow}
-	if err = f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), flow.Id, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", flow.Id, log.Fields{"flow": multicastFlow}, err)
-	}
 	return nil
 }
 
@@ -2413,16 +2239,13 @@
 		}
 		return nniInterfaceID, nil
 	}
-	// find the first NNI interface id of the device
-	nniPorts, e := f.resourceMgr.GetNNIFromKVStore(ctx)
-	if e == nil && len(nniPorts) > 0 {
-		return nniPorts[0], nil
-	}
-	return 0, olterrors.NewErrNotFound("nni-port", nil, e).Log()
+
+	// TODO: For now we support only one NNI port in VOLTHA. We shall use only the first NNI port, i.e., interface-id 0.
+	return 0, nil
 }
 
 //sendTPDownloadMsgToChild send payload
-func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32) error {
+func (f *OpenOltFlowMgr) sendTPDownloadMsgToChild(ctx context.Context, intfID uint32, onuID uint32, uniID uint32, uni string, TpID uint32, tpInst tp_pb.TechProfileInstance) error {
 
 	onuDev, err := f.getOnuDevice(ctx, intfID, onuID)
 	if err != nil {
@@ -2436,7 +2259,11 @@
 	logger.Debugw(ctx, "got-child-device-from-olt-device-handler", log.Fields{"onu-id": onuDev.deviceID})
 
 	tpPath := f.getTPpath(ctx, intfID, uni, TpID)
-	tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID, Path: tpPath}
+	tpDownloadMsg := &ic.InterAdapterTechProfileDownloadMessage{
+		UniId:          uniID,
+		TpInstancePath: tpPath,
+		TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: &tpInst},
+	}
 	logger.Debugw(ctx, "sending-load-tech-profile-request-to-brcm-onu-adapter", log.Fields{"tpDownloadMsg": *tpDownloadMsg})
 	sendErr := f.deviceHandler.AdapterProxy.SendInterAdapterMessage(log.WithSpanFromContext(context.Background(), ctx),
 		tpDownloadMsg,
@@ -2458,24 +2285,25 @@
 }
 
 //UpdateOnuInfo function adds onu info to cache and kvstore
+//UpdateOnuInfo function adds onu info to cache and kvstore
 func (f *OpenOltFlowMgr) UpdateOnuInfo(ctx context.Context, intfID uint32, onuID uint32, serialNum string) error {
 
-	f.onuGemInfoLock.Lock()
-	defer f.onuGemInfoLock.Unlock()
-	onugem := f.onuGemInfo
+	f.onuGemInfoLock.RLock()
+	_, ok := f.onuGemInfoMap[onuID]
+	f.onuGemInfoLock.RUnlock()
 	// If the ONU already exists in onuGemInfo list, nothing to do
-	for _, onu := range onugem {
-		if onu.OnuID == onuID && onu.SerialNumber == serialNum {
-			logger.Debugw(ctx, "onu-id-already-exists-in-cache",
-				log.Fields{"onuID": onuID,
-					"serialNum": serialNum})
-			return nil
-		}
+	if ok {
+		logger.Debugw(ctx, "onu-id-already-exists-in-cache",
+			log.Fields{"onuID": onuID,
+				"serialNum": serialNum})
+		return nil
 	}
 
-	onu := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
-	f.onuGemInfo = append(f.onuGemInfo, onu)
-	if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onu); err != nil {
+	onuGemInfo := rsrcMgr.OnuGemInfo{OnuID: onuID, SerialNumber: serialNum, IntfID: intfID}
+	f.onuGemInfoLock.Lock()
+	f.onuGemInfoMap[onuID] = &onuGemInfo
+	f.onuGemInfoLock.Unlock()
+	if err := f.resourceMgr.AddOnuGemInfo(ctx, intfID, onuID, onuGemInfo); err != nil {
 		return err
 	}
 	logger.Infow(ctx, "updated-onuinfo",
@@ -2483,7 +2311,7 @@
 			"intf-id":    intfID,
 			"onu-id":     onuID,
 			"serial-num": serialNum,
-			"onu":        onu,
+			"onu":        onuGemInfo,
 			"device-id":  f.deviceHandler.device.Id})
 	return nil
 }
@@ -2491,34 +2319,46 @@
 //addGemPortToOnuInfoMap function adds GEMport to ONU map
 func (f *OpenOltFlowMgr) addGemPortToOnuInfoMap(ctx context.Context, intfID uint32, onuID uint32, gemPort uint32) {
 
-	f.onuGemInfoLock.Lock()
-	defer f.onuGemInfoLock.Unlock()
-
 	logger.Infow(ctx, "adding-gem-to-onu-info-map",
 		log.Fields{
 			"gem-port-id": gemPort,
 			"intf-id":     intfID,
 			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id,
-			"onu-gem":     f.onuGemInfo})
-	onugem := f.onuGemInfo
-	// update the gem to the local cache as well as to kv strore
-	for idx, onu := range onugem {
-		if onu.OnuID == onuID {
-			// check if gem already exists , else update the cache and kvstore
-			for _, gem := range onu.GemPorts {
-				if gem == gemPort {
-					logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
-						log.Fields{
-							"gem":       gemPort,
-							"device-id": f.deviceHandler.device.Id})
-					return
-				}
+			"device-id":   f.deviceHandler.device.Id})
+	f.onuGemInfoLock.RLock()
+	onugem, ok := f.onuGemInfoMap[onuID]
+	f.onuGemInfoLock.RUnlock()
+	if !ok {
+		logger.Warnw(ctx, "onu gem info is missing", log.Fields{
+			"gem-port-id": gemPort,
+			"intf-id":     intfID,
+			"onu-id":      onuID,
+			"device-id":   f.deviceHandler.device.Id})
+		return
+	}
+
+	if onugem.OnuID == onuID {
+		// check if gem already exists , else update the cache and kvstore
+		for _, gem := range onugem.GemPorts {
+			if gem == gemPort {
+				logger.Debugw(ctx, "gem-already-in-cache-no-need-to-update-cache-and-kv-store",
+					log.Fields{
+						"gem":       gemPort,
+						"device-id": f.deviceHandler.device.Id})
+				return
 			}
-			onugem[idx].GemPorts = append(onugem[idx].GemPorts, gemPort)
-			f.onuGemInfo = onugem
-			break
 		}
+		onugem.GemPorts = append(onugem.GemPorts, gemPort)
+		f.onuGemInfoLock.Lock()
+		f.onuGemInfoMap[onuID] = onugem
+		f.onuGemInfoLock.Unlock()
+	} else {
+		logger.Warnw(ctx, "mismatched onu id", log.Fields{
+			"gem-port-id": gemPort,
+			"intf-id":     intfID,
+			"onu-id":      onuID,
+			"device-id":   f.deviceHandler.device.Id})
+		return
 	}
 	err := f.resourceMgr.AddGemToOnuGemInfo(ctx, intfID, onuID, gemPort)
 	if err != nil {
@@ -2535,24 +2375,18 @@
 			"gem-port-id": gemPort,
 			"intf-id":     intfID,
 			"onu-id":      onuID,
-			"device-id":   f.deviceHandler.device.Id,
-			"onu-gem":     f.onuGemInfo})
+			"device-id":   f.deviceHandler.device.Id})
 }
 
 //GetLogicalPortFromPacketIn function computes logical port UNI/NNI port from packet-in indication and returns the same
 func (f *OpenOltFlowMgr) GetLogicalPortFromPacketIn(ctx context.Context, packetIn *openoltpb2.PacketIndication) (uint32, error) {
 	var logicalPortNum uint32
-	var onuID, uniID uint32
-	var err error
 
 	if packetIn.IntfType == "pon" {
 		// packet indication does not have serial number , so sending as nil
 		// get onu and uni ids associated with the given pon and gem ports
-		if onuID, uniID, err = f.GetUniPortByPonPortGemPort(ctx, packetIn.IntfId, packetIn.GemportId); err != nil {
-			// Called method is returning error with all data populated; just return the same
-			return logicalPortNum, err
-		}
-		logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d")
+		onuID, uniID := packetIn.OnuId, packetIn.UniId
+		logger.Debugf(ctx, "retrieved ONU and UNI IDs [%d, %d] by interface:%d, gem:%d", packetIn.OnuId, packetIn.UniId, packetIn.GemportId)
 
 		if packetIn.PortNo != 0 {
 			logicalPortNum = packetIn.PortNo
@@ -2576,40 +2410,6 @@
 	return logicalPortNum, nil
 }
 
-//GetUniPortByPonPortGemPort return onu and uni IDs associated with given pon and gem ports
-func (f *OpenOltFlowMgr) GetUniPortByPonPortGemPort(ctx context.Context, intfID uint32, gemPortID uint32) (uint32, uint32, error) {
-	key := gemPortKey{
-		intfID:  intfID,
-		gemPort: gemPortID,
-	}
-	uniPortInfo, ok := f.fromGemToUniMap(key) //try to get from the cache first
-	if ok {
-		if len(uniPortInfo) > 1 {
-			//return onu ID and uni port from the cache
-			logger.Debugw(ctx, "found-uni-port-by-pon-and-gem-ports",
-				log.Fields{
-					"intfID":       intfID,
-					"gemPortID":    gemPortID,
-					"onuID, uniID": uniPortInfo})
-			return uniPortInfo[0], uniPortInfo[1], nil
-		}
-	}
-	//If uni port is not found in cache try to get it from kv store. if it is found in kv store, update the cache and return.
-	onuID, uniID, err := f.resourceMgr.GetUniPortByPonPortGemPortFromKVStore(ctx, intfID, gemPortID)
-	if err == nil {
-		f.toGemToUniMap(ctx, key, onuID, uniID)
-		logger.Infow(ctx, "found-uni-port-by-pon-and-gem-port-from-kv-store-and-updating-cache-with-uni-port",
-			log.Fields{
-				"gemPortKey": key,
-				"onuID":      onuID,
-				"uniID":      uniID})
-		return onuID, uniID, nil
-	}
-	return uint32(0), uint32(0), olterrors.NewErrNotFound("uni-id",
-		log.Fields{"interfaceID": intfID, "gemPortID": gemPortID},
-		errors.New("no uni port found"))
-}
-
 //GetPacketOutGemPortID returns gemPortId
 func (f *OpenOltFlowMgr) GetPacketOutGemPortID(ctx context.Context, intfID uint32, onuID uint32, portNum uint32, packet []byte) (uint32, error) {
 	var gemPortID uint32
@@ -2721,10 +2521,6 @@
 		return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
 	}
 	logger.Info(ctx, "trap-on-nni-flow-added–to-device-successfully")
-	flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
-	}
 	return nil
 }
 
@@ -2814,10 +2610,7 @@
 		return olterrors.NewErrFlowOp("add", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
 	}
 	logger.Info(ctx, "igmp-trap-on-nni-flow-added-to-device-successfully")
-	flowInfo := rsrcMgr.FlowInfo{Flow: &downstreamflow}
-	if err := f.resourceMgr.UpdateFlowIDInfo(ctx, networkInterfaceID, int32(onuID), int32(uniID), logicalFlow.Id, flowInfo); err != nil {
-		return olterrors.NewErrPersistence("update", "flow", logicalFlow.Id, log.Fields{"flow": downstreamflow}, err)
-	}
+
 	return nil
 }
 
@@ -2846,10 +2639,10 @@
 	pbitToGem := make(map[uint32]uint32)
 	gemToAes := make(map[uint32]bool)
 
-	var attributes []tp.IGemPortAttribute
+	var attributes []*tp_pb.GemPortAttributes
 	var direction = tp_pb.Direction_UPSTREAM
 	switch TpInst := TpInst.(type) {
-	case *tp.TechProfile:
+	case *tp_pb.TechProfileInstance:
 		if IsUpstream(actionInfo[Output].(uint32)) {
 			attributes = TpInst.UpstreamGemPortAttributeList
 		} else {
@@ -2881,9 +2674,9 @@
 			}
 		}
 	} else { // Extract the exact gemport which maps to the PCP classifier in the flow
-		if gem := f.techprofile[intfID].GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
-			gemPortID = gem.(tp.IGemPortAttribute).GemportID
-			gemToAes[gemPortID], _ = strconv.ParseBool(gem.(tp.IGemPortAttribute).AesEncryption)
+		if gem := f.techprofile.GetGemportForPbit(ctx, TpInst, direction, pcp.(uint32)); gem != nil {
+			gemPortID = gem.(*tp_pb.GemPortAttributes).GemportId
+			gemToAes[gemPortID], _ = strconv.ParseBool(gem.(*tp_pb.GemPortAttributes).AesEncryption)
 		}
 	}
 
@@ -2981,26 +2774,26 @@
 	}
 	// Send Techprofile download event to child device in go routine as it takes time
 	go func() {
-		if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID); err != nil {
+		if err := f.sendTPDownloadMsgToChild(ctx, intfID, onuID, uniID, uni, tpID, *(TpInst.(*tp_pb.TechProfileInstance))); err != nil {
 			logger.Warn(ctx, err)
 		}
 	}()
 }
 
 func (f *OpenOltFlowMgr) isGemPortUsedByAnotherFlow(gemPortID uint32) bool {
-	f.flowsUsedByGemPortKey.RLock()
-	flowIDList := f.flowsUsedByGemPort[gemPortID]
-	f.flowsUsedByGemPortKey.RUnlock()
+	f.gemToFlowIDsKey.RLock()
+	flowIDList := f.gemToFlowIDs[gemPortID]
+	f.gemToFlowIDsKey.RUnlock()
 	return len(flowIDList) > 1
 
 }
 
-func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp.TechProfile, gemPortID uint32) (bool, uint32) {
+func (f *OpenOltFlowMgr) isTechProfileUsedByAnotherGem(ctx context.Context, ponIntf uint32, onuID uint32, uniID uint32, tpInst *tp_pb.TechProfileInstance, gemPortID uint32) (bool, uint32) {
 	currentGemPorts := f.resourceMgr.GetCurrentGEMPortIDsForOnu(ctx, ponIntf, onuID, uniID)
 	tpGemPorts := tpInst.UpstreamGemPortAttributeList
 	for _, currentGemPort := range currentGemPorts {
 		for _, tpGemPort := range tpGemPorts {
-			if (currentGemPort == tpGemPort.GemportID) && (currentGemPort != gemPortID) {
+			if (currentGemPort == tpGemPort.GemportId) && (currentGemPort != gemPortID) {
 				return true, currentGemPort
 			}
 		}
@@ -3010,21 +2803,21 @@
 }
 
 func (f *OpenOltFlowMgr) isAllocUsedByAnotherUNI(ctx context.Context, sq schedQueue) bool {
-	tpInst := sq.tpInst.(*tp.TechProfile)
-	if tpInst.InstanceCtrl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
-		tpInstances := f.techprofile[sq.intfID].FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp.TechProfile)
+	tpInst := sq.tpInst.(*tp_pb.TechProfileInstance)
+	if tpInst.InstanceControl.Onu == "single-instance" && sq.direction == tp_pb.Direction_UPSTREAM {
+		tpInstances := f.techprofile.FindAllTpInstances(ctx, f.deviceHandler.device.Id, sq.tpID, sq.intfID, sq.onuID).([]tp_pb.TechProfileInstance)
 		logger.Debugw(ctx, "got-single-instance-tp-instances", log.Fields{"tp-instances": tpInstances})
 		for i := 0; i < len(tpInstances); i++ {
 			tpI := tpInstances[i]
 			if tpI.SubscriberIdentifier != tpInst.SubscriberIdentifier &&
-				tpI.UsScheduler.AllocID == tpInst.UsScheduler.AllocID {
+				tpI.UsScheduler.AllocId == tpInst.UsScheduler.AllocId {
 				logger.Debugw(ctx, "alloc-is-in-use",
 					log.Fields{
 						"device-id": f.deviceHandler.device.Id,
 						"intfID":    sq.intfID,
 						"onuID":     sq.onuID,
 						"uniID":     sq.uniID,
-						"allocID":   tpI.UsScheduler.AllocID,
+						"allocID":   tpI.UsScheduler.AllocId,
 					})
 				return true
 			}
@@ -3250,7 +3043,7 @@
 			logger.Debugw(ctx, "invalid-action-port-number",
 				log.Fields{
 					"port-number": action[Output].(uint32),
-					"error":       err})
+					"err":         err})
 			return uint32(0), err
 		}
 		logger.Infow(ctx, "output-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3261,7 +3054,7 @@
 			logger.Debugw(ctx, "invalid-classifier-port-number",
 				log.Fields{
 					"port-number": action[Output].(uint32),
-					"error":       err})
+					"err":         err})
 			return uint32(0), err
 		}
 		logger.Infow(ctx, "input-nni-intfId-is", log.Fields{"intf-id": intfID})
@@ -3331,69 +3124,39 @@
 	return 0, 0, nil
 }
 
-// AddUniPortToOnuInfo adds uni port to the onugem info both in cache and kvstore.
-func (f *OpenOltFlowMgr) AddUniPortToOnuInfo(ctx context.Context, intfID uint32, onuID uint32, portNum uint32) {
-
-	f.onuGemInfoLock.Lock()
-	defer f.onuGemInfoLock.Unlock()
-
-	onugem := f.onuGemInfo
-	for idx, onu := range onugem {
-		if onu.OnuID == onuID {
-			for _, uni := range onu.UniPorts {
-				if uni == portNum {
-					logger.Infow(ctx, "uni-already-in-cache--no-need-to-update-cache-and-kv-store", log.Fields{"uni": portNum})
-					return
+func (f *OpenOltFlowMgr) loadFlowIDsForGemAndGemIDsForFlow(ctx context.Context) {
+	logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - start")
+	f.onuGemInfoLock.RLock()
+	f.gemToFlowIDsKey.Lock()
+	f.flowIDToGemsLock.Lock()
+	for _, og := range f.onuGemInfoMap {
+		for _, gem := range og.GemPorts {
+			flowIDs, err := f.resourceMgr.GetFlowIDsForGem(ctx, f.ponPortIdx, gem)
+			if err != nil {
+				f.gemToFlowIDs[gem] = flowIDs
+				for _, flowID := range flowIDs {
+					if _, ok := f.flowIDToGems[flowID]; !ok {
+						f.flowIDToGems[flowID] = []uint32{gem}
+					} else {
+						f.flowIDToGems[flowID] = appendUnique32bit(f.flowIDToGems[flowID], gem)
+					}
 				}
 			}
-			onugem[idx].UniPorts = append(onugem[idx].UniPorts, portNum)
-			f.onuGemInfo = onugem
 		}
 	}
-	f.resourceMgr.AddUniPortToOnuInfo(ctx, intfID, onuID, portNum)
-
-}
-
-func (f *OpenOltFlowMgr) loadFlowIDlistForGem(ctx context.Context, intf uint32) {
-	flowIDsList, err := f.resourceMgr.GetFlowIDsGemMapForInterface(ctx, intf)
-	if err != nil {
-		logger.Error(ctx, "failed-to-get-flowid-list-per-gem", log.Fields{"intf": intf})
-		return
-	}
-	f.flowsUsedByGemPortKey.Lock()
-	for gem, FlowIDs := range flowIDsList {
-		f.flowsUsedByGemPort[gem] = FlowIDs
-	}
-	f.flowsUsedByGemPortKey.Unlock()
+	f.flowIDToGemsLock.Unlock()
+	f.gemToFlowIDsKey.Unlock()
+	f.onuGemInfoLock.RUnlock()
+	logger.Debug(ctx, "loadFlowIDsForGemAndGemIDsForFlow - end")
 }
 
 //clearMulticastFlowFromResourceManager  removes a multicast flow from the KV store and
 // clears resources reserved for this multicast flow
 func (f *OpenOltFlowMgr) clearMulticastFlowFromResourceManager(ctx context.Context, flow *ofp.OfpFlowStats) error {
-	classifierInfo := make(map[string]interface{})
-	var flowInfo *rsrcMgr.FlowInfo
-	formulateClassifierInfoFromFlow(ctx, classifierInfo, flow)
-	networkInterfaceID, err := f.getNNIInterfaceIDOfMulticastFlow(ctx, classifierInfo)
-
-	if err != nil {
-		logger.Warnw(ctx, "no-inport-found--cannot-release-resources-of-the-multicast-flow", log.Fields{"flowId:": flow.Id})
-		return err
-	}
-
-	var onuID = int32(NoneOnuID)
-	var uniID = int32(NoneUniID)
-	if flowInfo = f.resourceMgr.GetFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id); flowInfo == nil {
-		return olterrors.NewErrPersistence("remove", "flow", flow.Id,
-			log.Fields{
-				"flow":      flow,
-				"device-id": f.deviceHandler.device.Id,
-				"intf-id":   networkInterfaceID,
-				"onu-id":    onuID}, err).Log()
-	}
-	removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: flowInfo.Flow.FlowType}
+	removeFlowMessage := openoltpb2.Flow{FlowId: flow.Id, FlowType: Multicast}
 	logger.Debugw(ctx, "multicast-flow-to-be-deleted",
 		log.Fields{
-			"flow":      flowInfo.Flow,
+			"flow":      flow,
 			"flow-id":   flow.Id,
 			"device-id": f.deviceHandler.device.Id})
 	// Remove from device
@@ -3402,60 +3165,44 @@
 		logger.Errorw(ctx, "failed-to-remove-multicast-flow",
 			log.Fields{
 				"flow-id": flow.Id,
-				"error":   err})
+				"err":     err})
 		return err
 	}
-	// Remove flow from KV store
-	return f.resourceMgr.RemoveFlowIDInfo(ctx, networkInterfaceID, onuID, uniID, flow.Id)
+
+	return nil
 }
 
-// reconcileSubscriberDataPathFlowIDMap reconciles subscriberDataPathFlowIDMap from KV store
-func (f *OpenOltFlowMgr) reconcileSubscriberDataPathFlowIDMap(ctx context.Context) {
-	onuGemInfo, err := f.resourceMgr.GetOnuGemInfo(ctx, f.ponPortIdx)
+func (f *OpenOltFlowMgr) getTechProfileDownloadMessage(ctx context.Context, tpPath string, ponID uint32, onuID uint32, uniID uint32) *ic.InterAdapterTechProfileDownloadMessage {
+	tpInst, err := f.techprofile.GetTPInstance(ctx, tpPath)
 	if err != nil {
-		_ = olterrors.NewErrNotFound("onu", log.Fields{
-			"pon-port": f.ponPortIdx}, err).Log()
-		return
+		logger.Errorw(ctx, "error-fetching-tp-instance", log.Fields{"tpPath": tpPath})
+		return nil
 	}
 
-	f.subscriberDataPathFlowIDMapLock.Lock()
-	defer f.subscriberDataPathFlowIDMapLock.Unlock()
-
-	for _, onu := range onuGemInfo {
-		for _, uniID := range onu.UniPorts {
-			flowIDs, err := f.resourceMgr.GetCurrentFlowIDsForOnu(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID))
-			if err != nil {
-				logger.Fatalf(ctx, "failed-to-read-flow-ids-of-onu-during-reconciliation")
-			}
-			for _, flowID := range flowIDs {
-				flowInfo := f.resourceMgr.GetFlowIDInfo(ctx, onu.IntfID, int32(onu.OnuID), int32(uniID), flowID)
-				if flowInfo == nil {
-					// Error is already logged in the called function
-					continue
-				}
-				if flowInfo.Flow.Classifier.PktTagType == DoubleTag &&
-					flowInfo.Flow.FlowType == Downstream &&
-					flowInfo.Flow.Classifier.OVid > 0 &&
-					flowInfo.Flow.TechProfileId > 0 {
-					key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
-					if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
-						f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
-					}
-				} else if flowInfo.Flow.Classifier.PktTagType == SingleTag &&
-					flowInfo.Flow.FlowType == Upstream &&
-					flowInfo.Flow.Action.OVid > 0 &&
-					flowInfo.Flow.TechProfileId > 0 {
-					key := subscriberDataPathFlowIDKey{intfID: onu.IntfID, onuID: onu.OnuID, uniID: uniID, direction: flowInfo.Flow.FlowType, tpID: flowInfo.Flow.TechProfileId}
-					if _, ok := f.subscriberDataPathFlowIDMap[key]; !ok {
-						f.subscriberDataPathFlowIDMap[key] = flowInfo.Flow.FlowId
-					}
-				}
-			}
+	switch tpInst := tpInst.(type) {
+	case *tp_pb.TechProfileInstance:
+		logger.Debugw(ctx, "fetched-tp-instance-successfully--formulating-tp-download-msg", log.Fields{"tpPath": tpPath})
+		return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+			TpInstancePath: tpPath,
+			TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_TpInstance{TpInstance: tpInst},
 		}
+	case *openoltpb2.EponTechProfileInstance:
+		return &ic.InterAdapterTechProfileDownloadMessage{UniId: uniID,
+			TpInstancePath: tpPath,
+			TechTpInstance: &ic.InterAdapterTechProfileDownloadMessage_EponTpInstance{EponTpInstance: tpInst},
+		}
+	default:
+		logger.Errorw(ctx, "unknown-tech", log.Fields{"tpPath": tpPath})
 	}
+	return nil
 }
 
-// isDatapathFlow declares a flow as datapath flow if it is not a controller bound flow and the flow does not have group
-func isDatapathFlow(flow *ofp.OfpFlowStats) bool {
-	return !IsControllerBoundFlow(flows.GetOutPort(flow)) && !flows.HasGroup(flow)
+func (f *OpenOltFlowMgr) getOnuGemInfoList() []rsrcMgr.OnuGemInfo {
+	var onuGemInfoLst []rsrcMgr.OnuGemInfo
+	f.onuGemInfoLock.RLock()
+	defer f.onuGemInfoLock.RUnlock()
+	for _, v := range f.onuGemInfoMap {
+		onuGemInfoLst = append(onuGemInfoLst, *v)
+	}
+	return onuGemInfoLst
 }