[VOL-3957]: Maintain meter reference counter and free up the meter from
KV store as soon as last flow referencing it is removed.

Change-Id: I1b32690e1d65a35e0d03a65aa7b2e38a38997521
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 31ca07d..4e10522 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -440,7 +440,7 @@
 	 */
 
 	var SchedCfg *tp_pb.SchedulerConfig
-	KvStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+	meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
 	if err != nil {
 		return olterrors.NewErrNotFound("meter",
 			log.Fields{"intf-id": sq.intfID,
@@ -449,14 +449,17 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	if KvStoreMeter != nil {
-		if KvStoreMeter.MeterId == sq.meterID {
-			logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id})
+	if meterInfo != nil {
+		logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
+		if meterInfo.MeterConfig.MeterId == sq.meterID {
+			if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true); err != nil {
+				return err
+			}
 			return nil
 		}
 		return olterrors.NewErrInvalidValue(log.Fields{
 			"unsupported":       "meter-id",
-			"kv-store-meter-id": KvStoreMeter.MeterId,
+			"kv-store-meter-id": meterInfo.MeterConfig.MeterId,
 			"meter-id-in-flow":  sq.meterID,
 			"device-id":         f.deviceHandler.device.Id}, nil)
 	}
@@ -482,42 +485,48 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	var meterConfig *ofp.OfpMeterConfig
+	found := false
+	meterInfo = &rsrcMgr.MeterInfo{}
 	if sq.flowMetadata != nil {
 		for _, meter := range sq.flowMetadata.Meters {
 			if sq.meterID == meter.MeterId {
-				meterConfig = meter
+				meterInfo.MeterConfig = ofp.OfpMeterConfig{}
+				meterInfo.MeterConfig.MeterId = meter.MeterId
+				meterInfo.MeterConfig.Flags = meter.Flags
+				meterInfo.RefCnt = 1 // initialize it to 1, since this is the first flow that referenced the meter id.
+				meterInfo.MeterConfig.Bands = append(meterInfo.MeterConfig.Bands, meter.Bands...)
 				logger.Debugw(ctx, "found-meter-config-from-flowmetadata",
-					log.Fields{"meterConfig": meterConfig,
+					log.Fields{"meterConfig": meterInfo.MeterConfig,
 						"device-id": f.deviceHandler.device.Id})
+				found = true
 				break
 			}
 		}
 	} else {
 		logger.Errorw(ctx, "flow-metadata-not-present-in-flow", log.Fields{"device-id": f.deviceHandler.device.Id})
 	}
-	if meterConfig == nil {
+	if !found {
 		return olterrors.NewErrNotFound("meterbands", log.Fields{
 			"reason":        "Could-not-get-meterbands-from-flowMetadata",
 			"flow-metadata": sq.flowMetadata,
 			"meter-id":      sq.meterID,
 			"device-id":     f.deviceHandler.device.Id}, nil)
-	} else if len(meterConfig.Bands) < MaxMeterBand {
+	} else if len(meterInfo.MeterConfig.Bands) < MaxMeterBand {
 		logger.Errorw(ctx, "invalid-number-of-bands-in-meter",
-			log.Fields{"Bands": meterConfig.Bands,
+			log.Fields{"Bands": meterInfo.MeterConfig.Bands,
 				"meter-id":  sq.meterID,
 				"device-id": f.deviceHandler.device.Id})
 		return olterrors.NewErrInvalidValue(log.Fields{
 			"reason":          "Invalid-number-of-bands-in-meter",
-			"meterband-count": len(meterConfig.Bands),
-			"metabands":       meterConfig.Bands,
+			"meterband-count": len(meterInfo.MeterConfig.Bands),
+			"metabands":       meterInfo.MeterConfig.Bands,
 			"meter-id":        sq.meterID,
 			"device-id":       f.deviceHandler.device.Id}, nil)
 	}
-	cir := meterConfig.Bands[0].Rate
-	cbs := meterConfig.Bands[0].BurstSize
-	eir := meterConfig.Bands[1].Rate
-	ebs := meterConfig.Bands[1].BurstSize
+	cir := meterInfo.MeterConfig.Bands[0].Rate
+	cbs := meterInfo.MeterConfig.Bands[0].BurstSize
+	eir := meterInfo.MeterConfig.Bands[1].Rate
+	ebs := meterInfo.MeterConfig.Bands[1].BurstSize
 	pir := cir + eir
 	pbs := cbs + ebs
 	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
@@ -535,7 +544,7 @@
 	/* After we successfully applied the scheduler configuration on the OLT device,
 	 * store the meter id on the KV store, for further reference.
 	 */
-	if err := f.resourceMgr.UpdateMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterConfig); err != nil {
+	if err := f.resourceMgr.StoreMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterInfo); err != nil {
 		return olterrors.NewErrAdapter("failed-updating-meter-id",
 			log.Fields{"onu-id": sq.onuID,
 				"meter-id":  sq.meterID,
@@ -543,8 +552,8 @@
 	}
 	logger.Infow(ctx, "updated-meter-info-into-kv-store-successfully",
 		log.Fields{"direction": Direction,
-			"Meter":     meterConfig,
-			"device-id": f.deviceHandler.device.Id})
+			"meter-info": meterInfo,
+			"device-id":  f.deviceHandler.device.Id})
 	return nil
 }
 
@@ -649,31 +658,7 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	KVStoreMeter, err := f.resourceMgr.GetMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
-	if err != nil {
-		return olterrors.NewErrNotFound("meter",
-			log.Fields{
-				"onu-id":    sq.onuID,
-				"device-id": f.deviceHandler.device.Id}, err)
-	}
-	if KVStoreMeter == nil {
-		logger.Warnw(ctx, "no-meter-installed-yet",
-			log.Fields{
-				"direction": Direction,
-				"intf-id":   sq.intfID,
-				"onu-id":    sq.onuID,
-				"uni-id":    sq.uniID,
-				"device-id": f.deviceHandler.device.Id})
-		return nil
-	}
-	cir := KVStoreMeter.Bands[0].Rate
-	cbs := KVStoreMeter.Bands[0].BurstSize
-	eir := KVStoreMeter.Bands[1].Rate
-	ebs := KVStoreMeter.Bands[1].BurstSize
-	pir := cir + eir
-	pbs := cbs + ebs
-
-	TrafficShaping := &tp_pb.TrafficShapingInfo{Cir: cir, Cbs: cbs, Pir: pir, Pbs: pbs}
+	TrafficShaping := &tp_pb.TrafficShapingInfo{} // this info is not really useful for the agent during flow removal. Just use default values.
 
 	TrafficSched := []*tp_pb.TrafficScheduler{f.techprofile[sq.intfID].GetTrafficScheduler(sq.tpInst.(*tp.TechProfile), SchedCfg, TrafficShaping)}
 	TrafficSched[0].TechProfileId = sq.tpID
@@ -722,12 +707,11 @@
 	/* After we successfully remove the scheduler configuration on the OLT device,
 	 * delete the meter id on the KV store.
 	 */
-	err = f.resourceMgr.RemoveMeterIDForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+	err = f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
 	if err != nil {
 		return olterrors.NewErrAdapter("unable-to-remove-meter",
 			log.Fields{
 				"onu":       sq.onuID,
-				"meter":     KVStoreMeter.MeterId,
 				"device-id": f.deviceHandler.device.Id,
 				"intf-id":   sq.intfID,
 				"onu-id":    sq.onuID,
@@ -736,7 +720,6 @@
 	}
 	logger.Infow(ctx, "removed-meter-from-KV-store-successfully",
 		log.Fields{
-			"meter-id":  KVStoreMeter.MeterId,
 			"dir":       Direction,
 			"device-id": f.deviceHandler.device.Id,
 			"intf-id":   sq.intfID,
@@ -1892,27 +1875,27 @@
 
 //clearResources clears pon resources in kv store and the device
 // nolint: gocyclo
-func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, Intf uint32, onuID int32, uniID int32,
-	gemPortID int32, flowID uint64, portNum uint32) error {
+func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
+	gemPortID int32, flowID uint64, portNum uint32, direction string) error {
 
 	tpID, err := getTpIDFromFlow(ctx, flow)
 	if err != nil {
 		return olterrors.NewErrNotFound("tp-id",
 			log.Fields{
 				"flow":      flow,
-				"intf":      Intf,
+				"intf-id":   intfID,
 				"onu-id":    onuID,
 				"uni-id":    uniID,
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	uni := getUniPortPath(f.deviceHandler.device.Id, Intf, onuID, uniID)
-	tpPath := f.getTPpath(ctx, Intf, uni, tpID)
+	uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
+	tpPath := f.getTPpath(ctx, intfID, uni, tpID)
 	logger.Debugw(ctx, "getting-techprofile-instance-for-subscriber",
 		log.Fields{
 			"tpPath":    tpPath,
 			"device-id": f.deviceHandler.device.Id})
-	techprofileInst, err := f.techprofile[Intf].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
+	techprofileInst, err := f.techprofile[intfID].GetTPInstanceFromKVStore(ctx, tpID, tpPath)
 	if err != nil || techprofileInst == nil { // This should not happen, something wrong in KV backend transaction
 		return olterrors.NewErrNotFound("tech-profile-in-kv-store",
 			log.Fields{
@@ -1933,7 +1916,7 @@
 				// everytime flowsUsedByGemPort cache is updated the same should be updated
 				// in kv store by calling UpdateFlowIDsForGem
 				f.flowsUsedByGemPort[uint32(gemPortID)] = flowIDs
-				if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, Intf, uint32(gemPortID), flowIDs); err != nil {
+				if err := f.resourceMgr.UpdateFlowIDsForGem(ctx, intfID, uint32(gemPortID), flowIDs); err != nil {
 					return err
 				}
 				break
@@ -1944,19 +1927,23 @@
 				"gemport-id":  gemPortID,
 				"usedByFlows": flowIDs,
 				"device-id":   f.deviceHandler.device.Id})
+
+		if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, intfID, uint32(onuID), uint32(uniID), tpID, false); err != nil {
+			return err
+		}
 		return nil
 	}
 	logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
-	f.resourceMgr.RemoveGemPortIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+	f.resourceMgr.RemoveGemPortIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
 	// TODO: The TrafficQueue corresponding to this gem-port also should be removed immediately.
 	// But it is anyway eventually  removed later when the TechProfile is freed, so not a big issue for now.
-	f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), Intf)
+	f.resourceMgr.RemoveGEMportPonportToOnuMapOnKVStore(ctx, uint32(gemPortID), intfID)
 	// also clear gem to uni cache
 	f.removeFromGemToUniMap(gemPortKey{
-		intfID:  Intf,
+		intfID:  intfID,
 		gemPort: uint32(gemPortID),
 	})
-	f.deleteGemPortFromLocalCache(ctx, Intf, uint32(onuID), uint32(gemPortID))
+	f.deleteGemPortFromLocalCache(ctx, intfID, uint32(onuID), uint32(gemPortID))
 
 	f.onuIdsLock.Lock() // TODO: What is this lock?
 
@@ -1965,17 +1952,17 @@
 	f.flowsUsedByGemPortKey.Lock()
 	delete(f.flowsUsedByGemPort, uint32(gemPortID))
 	f.flowsUsedByGemPortKey.Unlock()
-	f.resourceMgr.DeleteFlowIDsForGem(ctx, Intf, uint32(gemPortID))
-	f.resourceMgr.FreeGemPortID(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID))
+	f.resourceMgr.DeleteFlowIDsForGem(ctx, intfID, uint32(gemPortID))
+	f.resourceMgr.FreeGemPortID(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID))
 
 	f.onuIdsLock.Unlock()
 
 	// Delete the gem port on the ONU.
-	if err := f.sendDeleteGemPortToChild(ctx, Intf, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
+	if err := f.sendDeleteGemPortToChild(ctx, intfID, uint32(onuID), uint32(uniID), uint32(gemPortID), tpPath); err != nil {
 		logger.Errorw(ctx, "error-processing-delete-gem-port-towards-onu",
 			log.Fields{
 				"err":        err,
-				"intf":       Intf,
+				"intfID":     intfID,
 				"onu-id":     onuID,
 				"uni-id":     uniID,
 				"device-id":  f.deviceHandler.device.Id,
@@ -1983,26 +1970,26 @@
 	}
 	switch techprofileInst := techprofileInst.(type) {
 	case *tp.TechProfile:
-		ok, _ := f.isTechProfileUsedByAnotherGem(ctx, Intf, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
+		ok, _ := f.isTechProfileUsedByAnotherGem(ctx, intfID, uint32(onuID), uint32(uniID), tpID, techprofileInst, uint32(gemPortID))
 		if !ok {
-			if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+			if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
 				logger.Warn(ctx, err)
 			}
-			if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+			if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
 				logger.Warn(ctx, err)
 			}
-			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_UPSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
 				logger.Warn(ctx, err)
 			}
-			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: Intf, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
+			if err := f.RemoveSchedulerQueues(ctx, schedQueue{direction: tp_pb.Direction_DOWNSTREAM, intfID: intfID, onuID: uint32(onuID), uniID: uint32(uniID), tpID: tpID, uniPort: portNum, tpInst: techprofileInst}); err != nil {
 				logger.Warn(ctx, err)
 			}
-			f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
+			f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID)
 			// Delete the TCONT on the ONU.
-			if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
+			if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.UsScheduler.AllocID, tpPath); err != nil {
 				logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
 					log.Fields{
-						"intf":      Intf,
+						"intf":      intfID,
 						"onu-id":    onuID,
 						"uni-id":    uniID,
 						"device-id": f.deviceHandler.device.Id,
@@ -2010,18 +1997,18 @@
 			}
 		}
 	case *tp.EponProfile:
-		if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, Intf, uint32(onuID), uint32(uniID), tpID); err != nil {
+		if err := f.resourceMgr.RemoveTechProfileIDForOnu(ctx, intfID, uint32(onuID), uint32(uniID), tpID); err != nil {
 			logger.Warn(ctx, err)
 		}
-		if err := f.DeleteTechProfileInstance(ctx, Intf, uint32(onuID), uint32(uniID), "", tpID); err != nil {
+		if err := f.DeleteTechProfileInstance(ctx, intfID, uint32(onuID), uint32(uniID), "", tpID); err != nil {
 			logger.Warn(ctx, err)
 		}
-		f.resourceMgr.FreeAllocID(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
+		f.resourceMgr.FreeAllocID(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID)
 		// Delete the TCONT on the ONU.
-		if err := f.sendDeleteTcontToChild(ctx, Intf, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
+		if err := f.sendDeleteTcontToChild(ctx, intfID, uint32(onuID), uint32(uniID), techprofileInst.AllocID, tpPath); err != nil {
 			logger.Errorw(ctx, "error-processing-delete-tcont-towards-onu",
 				log.Fields{
-					"intf":      Intf,
+					"intf":      intfID,
 					"onu-id":    onuID,
 					"uni-id":    uniID,
 					"device-id": f.deviceHandler.device.Id,
@@ -2100,7 +2087,7 @@
 		return err
 	}
 	if !flowInfo.Flow.ReplicateFlow {
-		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum); err != nil {
+		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
 			logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 				"flow-id":        flow.Id,
 				"stored-flow":    flowInfo.Flow,
@@ -2118,7 +2105,7 @@
 		}
 		logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
 		for _, gem := range gems {
-			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum); err != nil {
+			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
 				logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 					"flow-id":        flow.Id,
 					"stored-flow":    flowInfo.Flow,