[VOL-3957]: Maintain meter reference counter and free up the meter from
KV store as soon as last flow referencing it is removed.

Change-Id: I1b32690e1d65a35e0d03a65aa7b2e38a38997521
diff --git a/VERSION b/VERSION
index 944880f..8ee49b2 100644
--- a/VERSION
+++ b/VERSION
@@ -1 +1 @@
-3.2.0
+3.2.1-dev
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index 8b3947e..53b33f5 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -1677,11 +1677,11 @@
 		logger.Debugw(ctx, "deleted-tech-profile-instance-for-onu", log.Fields{"onu-id": onu.OnuID})
 		tpIDList := dh.resourceMgr.GetTechProfileIDForOnu(ctx, onu.IntfID, onu.OnuID, uniID)
 		for _, tpID := range tpIDList {
-			if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+			if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "upstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
 				logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
 			}
 			logger.Debugw(ctx, "removed-meter-id-for-onu-upstream", log.Fields{"onu-id": onu.OnuID})
-			if err = dh.resourceMgr.RemoveMeterIDForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
+			if err = dh.resourceMgr.RemoveMeterInfoForOnu(ctx, "downstream", onu.IntfID, onu.OnuID, uniID, tpID); err != nil {
 				logger.Debugw(ctx, "failed-to-remove-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
 			}
 			logger.Debugw(ctx, "removed-meter-id-for-onu-downstream", log.Fields{"onu-id": onu.OnuID})
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 31ca07d..4e10522 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -440,7 +440,7 @@
 	 */
 
 	var SchedCfg *tp_pb.SchedulerConfig
-	KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+	meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
 	if err != nil {
 		return olterrors.NewErrNotFound("meter",
 			log.Fields{"intf-id": sq.intfID,
@@ -449,14 +449,17 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	if KvStoreMeter != nil {
-		if KvStoreMeter.MeterId == sq.meterID {
-			logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id})
+	if meterInfo != nil {
+		logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
+		if meterInfo.MeterConfig.MeterId == sq.meterID {
+			if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
+				return err
+			}
 			return nil
 		}
 		return olterrors.NewErrInvalidValue(log.Fields{
 			"unsupported":       "meter-id",
-			"kv-store-meter-id": KvStoreMeter.MeterId,
+			"kv-store-meter-id": meterInfo.MeterConfig.MeterId,
 			"meter-id-in-flow":  sq.meterID,
 			"device-id":         f.deviceHandler.device.Id}, nil)
 	}
@@ -482,42 +485,48 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	var meterConfig *ofp.OfpMeterConfig
+	found := false
+	meterInfo = &rsrcMgr.MeterInfo{}
 	if sq.flowMetadata != nil {
 		for _, meter := range sq.flowMetadata.Meters {
 			if sq.meterID == meter.MeterId {
-				meterConfig = meter
+				meterInfo.MeterConfig = ofp.OfpMeterConfig{}
+				meterInfo.MeterConfig.MeterId = meter.MeterId
+				meterInfo.MeterConfig.Flags = meter.Flags
+				meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
+				meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
 				logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
-					log.Fields{"meterConfig": meterConfig,
+					log.Fields{"meterConfig": meterInfo.MeterConfig,
 						"device-id": f.deviceHandler.device.Id})
+				found = true
 				break
 			}
 		}
 	} else {
 		logger.Errorw(ctx, "flow-metadata-not-present-in-flow", log.Fields{"device-id": f.deviceHandler.device.Id})
 	}
-	if meterConfig == nil {
+	if !found {
 		return olterrors.NewErrNotFound("meterbands", log.Fields{
 			"reason":        "Could-not-get-meterbands-from-flowMetadata",
 			"flow-metadata": sq.flowMetadata,
 			"meter-id":      sq.meterID,
 			"device-id":     f.deviceHandler.device.Id}, nil)
-	} else if len(meterConfig.Bands) < MaxMeterBand {
+	} else if len(meterInfo.MeterConfig.Bands) < MaxMeterBand {
 		logger.Errorw(ctx, "invalid-number-of-bands-in-meter",
-			log.Fields{"Bands": meterConfig.Bands,
+			log.Fields{"Bands": meterInfo.MeterConfig.Bands,
 				"meter-id":  sq.meterID,
 				"device-id": f.deviceHandler.device.Id})
 		return olterrors.NewErrInvalidValue(log.Fields{
 			"reason":          "Invalid-number-of-bands-in-meter",
-			"meterband-count": len(meterConfig.Bands),
-			"metabands":       meterConfig.Bands,
+			"meterband-count": len(meterInfo.MeterConfig.Bands),
+			"metabands":       meterInfo.MeterConfig.Bands,
 			"meter-id":        sq.meterID,
 			"device-id":       f.deviceHandler.device.Id}, nil)
 	}
-	cir := meterConfig.Bands[0].Rate
-	cbs := meterConfig.Bands[0].BurstSize
-	eir := meterConfig.Bands[1].Rate
-	ebs := meterConfig.Bands[1].BurstSize
+	cir := meterInfo.MeterConfig.Bands[0].Rate
+	cbs := meterInfo.MeterConfig.Bands[0].BurstSize
+	eir := meterInfo.MeterConfig.Bands[1].Rate
+	ebs := meterInfo.MeterConfig.Bands[1].BurstSize
 	pir := cir + eir
 	pbs := cbs + ebs
 	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
@@ -535,7 +544,7 @@
 	/* After we successfully applied the scheduler configuration on the OLT device,
 	 * store the meter id on the KV store, for further reference.
 	 */
-	if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+	if err := f.resourceMgr.StoreMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterInfo); err != nil {
 		return olterrors.NewErrAdapter("failed-updating-meter-id",
 			log.Fields{"onu-id": sq.onuID,
 				"meter-id":  sq.meterID,
@@ -543,8 +552,8 @@
 	}
 	logger.Infow(ctx, "updated-meter-info-into-kv-store-successfully",
 		log.Fields{"direction": Direction,
-			"Meter":     meterConfig,
-			"device-id": f.deviceHandler.device.Id})
+			"meter-info": meterInfo,
+			"device-id":  f.deviceHandler.device.Id})
 	return nil
 }
 
@@ -649,31 +658,7 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
-	if err != nil {
-		return olterrors.NewErrNotFound("meter",
-			log.Fields{
-				"onu-id":    sq.onuID,
-				"device-id": f.deviceHandler.device.Id}, err)
-	}
-	if KVStoreMeter == nil {
-		logger.Warnw(ctx, "no-meter-installed-yet",
-			log.Fields{
-				"direction": Direction,
-				"intf-id":   sq.intfID,
-				"onu-id":    sq.onuID,
-				"uni-id":    sq.uniID,
-				"device-id": f.deviceHandler.device.Id})
-		return nil
-	}
-	cir := KVStoreMeter.Bands[0].Rate
-	cbs := KVStoreMeter.Bands[0].BurstSize
-	eir := KVStoreMeter.Bands[1].Rate
-	ebs := KVStoreMeter.Bands[1].BurstSize
-	pir := cir + eir
-	pbs := cbs + ebs
-
-	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+	TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
 
 	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
 	TrafficSched[0].TechProfileId = sq.tpID
@@ -722,12 +707,11 @@
 	/* After we successfully remove the scheduler configuration on the OLT device,
 	 * delete the meter id on the KV store.
 	 */
-	err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+	err = f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
 	if err != nil {
 		return olterrors.NewErrAdapter("unable-to-remove-meter",
 			log.Fields{
 				"onu":       sq.onuID,
-				"meter":     KVStoreMeter.MeterId,
 				"device-id": f.deviceHandler.device.Id,
 				"intf-id":   sq.intfID,
 				"onu-id":    sq.onuID,
@@ -736,7 +720,6 @@
 	}
 	logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
 		log.Fields{
-			"meter-id":  KVStoreMeter.MeterId,
 			"dir":       Direction,
 			"device-id": f.deviceHandler.device.Id,
 			"intf-id":   sq.intfID,
@@ -1892,27 +1875,27 @@
 
 //clearResources clears pon resources in kv store and the device
 // nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
-	gemPortID int32, flowID uint64, portNum uint32) error {
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+	gemPortID int32, flowID uint64, portNum uint32, direction string) error {
 
 	tpID, err := getTpIDFromFlow(ctx, flow)
 	if err != nil {
 		return olterrors.NewErrNotFound("tp-id",
 			log.Fields{
 				"flow":      flow,
-				"intf":      Intf,
+				"intf-id":   intfID,
 				"onu-id":    onuID,
 				"uni-id":    uniID,
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
-	tpPath := f.getTPpath(ctx, Intf, uni, tpID)
+	uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
+	tpPath := f.getTPpath(ctx, intfID, uni, tpID)
 	logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
 		log.Fields{
 			"tpPath":    tpPath,
 			"device-id": f.deviceHandler.device.Id})
-	techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
+	techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
 	if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
 		return olterrors.NewErrNotFound("tech-profile-in-kv-store",
 			log.Fields{
@@ -1933,7 +1916,7 @@
 				// everytime flowsUsedByGemPort cache is updated the same should be updated
 				// in kv store by calling UpdateFlowIDsForGem
 				f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
-				if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+				if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
 					return err
 				}
 				break
@@ -1944,19 +1927,23 @@
 				"gemport-id":  gemPortID,
 				"usedByFlows": flowIDs,
 				"device-id":   f.deviceHandler.device.Id})
+
+		if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, intfID, uint32(onuID), uint32(uniID), tpID, false); err != nil {
+			return err
+		}
 		return nil
 	}
 	logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
-	f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+	f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
 	// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
 	// But it is anyway eventually  removed later when the TechProfile is freed, so not a big issue for now.
-	f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
+	f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
 	// also clear gem to uni cache
 	f.removeFromGemToUniMap(gemPortKey{
-		intfID:  Intf,
+		intfID:  intfID,
 		gemPort: uint32(gemPortID),
 	})
-	f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
+	f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
 
 	f.onuIdsLock.Lock() // TODO: What is this lock?
 
@@ -1965,17 +1952,17 @@
 	f.flowsUsedByGemPortKey.Lock()
 	delete(f.flowsUsedByGemPort, uint32(gemPortID))
 	f.flowsUsedByGemPortKey.Unlock()
-	f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
-	f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+	f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+	f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
 
 	f.onuIdsLock.Unlock()
 
 	// Delete the gem port on the ONU.
-	if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
+	if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
 		logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
 			log.Fields{
 				"err":        err,
-				"intf":       Intf,
+				"intfID":     intfID,
 				"onu-id":     onuID,
 				"uni-id":     uniID,
 				"device-id":  f.deviceHandler.device.Id,
@@ -1983,26 +1970,26 @@
 	}
 	switch techprofileInst := techprofileInst.(type) {
 	case *tp.TechProfile:
-		ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+		ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
 		if !ok {
-			if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+			if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
 				logger.Warn(ctx, err)
 			}
-			if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+			if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
 				logger.Warn(ctx, err)
 			}
-			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
 				logger.Warn(ctx, err)
 			}
-			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
 				logger.Warn(ctx, err)
 			}
-			f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+			f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
 			// Delete the TCONT on the ONU.
-			if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
+			if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
 				logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
 					log.Fields{
-						"intf":      Intf,
+						"intf":      intfID,
 						"onu-id":    onuID,
 						"uni-id":    uniID,
 						"device-id": f.deviceHandler.device.Id,
@@ -2010,18 +1997,18 @@
 			}
 		}
 	case *tp.EponProfile:
-		if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+		if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
 			logger.Warn(ctx, err)
 		}
-		if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+		if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
 			logger.Warn(ctx, err)
 		}
-		f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+		f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
 		// Delete the TCONT on the ONU.
-		if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+		if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
 			logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
 				log.Fields{
-					"intf":      Intf,
+					"intf":      intfID,
 					"onu-id":    onuID,
 					"uni-id":    uniID,
 					"device-id": f.deviceHandler.device.Id,
@@ -2100,7 +2087,7 @@
 		return err
 	}
 	if !flowInfo.Flow.ReplicateFlow {
-		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum); err != nil {
+		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
 			logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 				"flow-id":        flow.Id,
 				"stored-flow":    flowInfo.Flow,
@@ -2118,7 +2105,7 @@
 		}
 		logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
 		for _, gem := range gems {
-			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum); err != nil {
+			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
 				logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 					"flow-id":        flow.Id,
 					"stored-flow":    flowInfo.Flow,
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 7a3f8ec..4e7aa79 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -109,6 +109,12 @@
 	OutPorts []uint32
 }
 
+// MeterInfo store meter information at path <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
+type MeterInfo struct {
+	RefCnt      uint8 // number of flow references for this meter. When RefCnt is 0, the MeterInfo should be deleted.
+	MeterConfig ofp.OfpMeterConfig
+}
+
 // OpenOltResourceMgr holds resource related information as provided below for each field
 type OpenOltResourceMgr struct {
 	DeviceID   string      // OLT device id
@@ -1046,15 +1052,14 @@
 	return err
 }
 
-// UpdateMeterIDForOnu updates the meter id in the KV-Store for the given onu based on the path
+// StoreMeterInfoForOnu updates the meter id in the KV-Store for the given onu based on the path
 // This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) UpdateMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
-	UniID uint32, TpID uint32, MeterConfig *ofp.OfpMeterConfig) error {
+func (RsrcMgr *OpenOltResourceMgr) StoreMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
+	UniID uint32, TpID uint32, meterInfo *MeterInfo) error {
 	var Value []byte
 	var err error
-
 	IntfOnuUniID := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
-	Value, err = json.Marshal(*MeterConfig)
+	Value, err = json.Marshal(*meterInfo)
 	if err != nil {
 		logger.Error(ctx, "failed to Marshal meter config")
 		return err
@@ -1063,26 +1068,27 @@
 		logger.Errorf(ctx, "Failed to store meter into KV store %s", IntfOnuUniID)
 		return err
 	}
+	logger.Debugw(ctx, "meter info updated successfully", log.Fields{"path": IntfOnuUniID, "meter-info": meterInfo})
 	return err
 }
 
-// GetMeterIDForOnu fetches the meter id from the kv store for the given onu based on the path
+// GetMeterInfoForOnu fetches the meter id from the kv store for the given onu based on the path
 // This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) GetMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
-	UniID uint32, TpID uint32) (*ofp.OfpMeterConfig, error) {
+func (RsrcMgr *OpenOltResourceMgr) GetMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
+	UniID uint32, TpID uint32) (*MeterInfo, error) {
 	Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
-	var meterConfig ofp.OfpMeterConfig
+	var meterInfo MeterInfo
 	Value, err := RsrcMgr.KVStore.Get(ctx, Path)
 	if err == nil {
 		if Value != nil {
-			logger.Debug(ctx, "Found meter in KV store", log.Fields{"Direction": Direction})
+			logger.Debug(ctx, "Found meter info in KV store", log.Fields{"Direction": Direction})
 			Val, er := kvstore.ToByte(Value.Value)
 			if er != nil {
 				logger.Errorw(ctx, "Failed to convert into byte array", log.Fields{"error": er})
 				return nil, er
 			}
-			if er = json.Unmarshal(Val, &meterConfig); er != nil {
-				logger.Error(ctx, "Failed to unmarshal meterconfig", log.Fields{"error": er})
+			if er = json.Unmarshal(Val, &meterInfo); er != nil {
+				logger.Error(ctx, "Failed to unmarshal meter info", log.Fields{"error": er})
 				return nil, er
 			}
 		} else {
@@ -1093,12 +1099,37 @@
 		logger.Errorf(ctx, "Failed to get Meter config from kvstore for path %s", Path)
 
 	}
-	return &meterConfig, err
+	return &meterInfo, err
 }
 
-// RemoveMeterIDForOnu deletes the meter id from the kV-Store for the given onu based on the path
+// HandleMeterInfoRefCntUpdate increments or decrements the reference counter for a given meter.
+// When reference count becomes 0, it clears the meter information from the kv store
+func (RsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
+	IntfID uint32, OnuID uint32, UniID uint32, TpID uint32, increment bool) error {
+	meterInfo, err := RsrcMgr.GetMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID)
+	if meterInfo == nil || err != nil {
+		return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+	}
+	if increment {
+		meterInfo.RefCnt++
+	} else {
+		meterInfo.RefCnt--
+		if meterInfo.RefCnt == 0 {
+			if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID); err != nil {
+				return err
+			}
+			return nil
+		}
+	}
+	if err := RsrcMgr.StoreMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID, meterInfo); err != nil {
+		return err
+	}
+	return nil
+}
+
+// RemoveMeterInfoForOnu deletes the meter id from the kV-Store for the given onu based on the path
 // This path is formed as the following: <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
-func (RsrcMgr *OpenOltResourceMgr) RemoveMeterIDForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
+func (RsrcMgr *OpenOltResourceMgr) RemoveMeterInfoForOnu(ctx context.Context, Direction string, IntfID uint32, OnuID uint32,
 	UniID uint32, TpID uint32) error {
 	Path := fmt.Sprintf(MeterIDPathSuffix, IntfID, OnuID, UniID, TpID, Direction)
 	if err := RsrcMgr.KVStore.Delete(ctx, Path); err != nil {
diff --git a/internal/pkg/resourcemanager/resourcemanager_test.go b/internal/pkg/resourcemanager/resourcemanager_test.go
index e71d07e..53f8898 100644
--- a/internal/pkg/resourcemanager/resourcemanager_test.go
+++ b/internal/pkg/resourcemanager/resourcemanager_test.go
@@ -518,7 +518,7 @@
 	}
 }
 
-func TestOpenOltResourceMgr_GetMeterIDForOnu(t *testing.T) {
+func TestOpenOltResourceMgr_GetMeterInfoForOnu(t *testing.T) {
 	type args struct {
 		Direction string
 		IntfID    uint32
@@ -530,22 +530,22 @@
 		name    string
 		fields  *fields
 		args    args
-		want    *ofp.OfpMeterConfig
+		want    *MeterInfo
 		wantErr error
 	}{
-		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
-			&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
-		{"GetMeterIDOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
-			&ofp.OfpMeterConfig{}, errors.New("failed to get Meter config from kvstore for path")},
+		{"GetMeterInfoForOnu", getResMgr(), args{"DOWNSTREAM", 0, 1, 1, 64},
+			&MeterInfo{}, errors.New("failed to get Meter config from kvstore for path")},
+		{"GetMeterInfoForOnu", getResMgr(), args{"DOWNSTREAM", 1, 2, 2, 65},
+			&MeterInfo{}, errors.New("failed to get Meter config from kvstore for path")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			RsrcMgr := testResMgrObject(tt.fields)
 			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 			defer cancel()
-			got, err := RsrcMgr.GetMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.tpID)
+			got, err := RsrcMgr.GetMeterInfoForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID, tt.args.tpID)
 			if reflect.TypeOf(got) != reflect.TypeOf(tt.want) && err != nil {
-				t.Errorf("GetMeterIDForOnu() got = %v, want %v", got, tt.want)
+				t.Errorf("GetMeterInfoForOnu() got = %v, want %v", got, tt.want)
 			}
 		})
 	}
@@ -628,7 +628,7 @@
 			RsrcMgr := testResMgrObject(tt.fields)
 			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 			defer cancel()
-			if err := RsrcMgr.RemoveMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+			if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
 				tt.args.tpID); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
 				t.Errorf("RemoveMeterIDForOnu() error = %v, wantErr %v", err, tt.wantErr)
 			}
@@ -781,12 +781,12 @@
 
 func TestOpenOltResourceMgr_UpdateMeterIDForOnu(t *testing.T) {
 	type args struct {
-		Direction   string
-		IntfID      uint32
-		OnuID       uint32
-		UniID       uint32
-		tpID        uint32
-		MeterConfig *ofp.OfpMeterConfig
+		Direction string
+		IntfID    uint32
+		OnuID     uint32
+		UniID     uint32
+		tpID      uint32
+		MeterInfo *MeterInfo
 	}
 	tests := []struct {
 		name    string
@@ -795,15 +795,15 @@
 		wantErr error
 	}{
 		{"UpdateMeterIDForOnu-1", getResMgr(), args{"DOWNSTREAM", 1, 2,
-			2, 64, &ofp.OfpMeterConfig{}}, errors.New("failed to get Meter config from kvstore for path")},
+			2, 64, &MeterInfo{}}, errors.New("failed to get Meter config from kvstore for path")},
 	}
 	for _, tt := range tests {
 		t.Run(tt.name, func(t *testing.T) {
 			RsrcMgr := testResMgrObject(tt.fields)
 			ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
 			defer cancel()
-			if err := RsrcMgr.UpdateMeterIDForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
-				tt.args.tpID, tt.args.MeterConfig); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
+			if err := RsrcMgr.StoreMeterInfoForOnu(ctx, tt.args.Direction, tt.args.IntfID, tt.args.OnuID, tt.args.UniID,
+				tt.args.tpID, tt.args.MeterInfo); reflect.TypeOf(err) != reflect.TypeOf(tt.wantErr) && err != nil {
 				t.Errorf("UpdateMeterIDForOnu() got = %v, want %v", err, tt.wantErr)
 			}
 		})