VOL-3957: Re-org the meter reference count update code to account
for scenarios involing flow replication across multiple p-bits.

Change-Id: I5f97fba1d75e2dc604007a151ed807a05c1165cd
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 4e10522..15c7e9c 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -1876,18 +1876,7 @@
 //clearResources clears pon resources in kv store and the device
 // nolint: gocyclo
 func (f *OpenOltFlowMgr) clearResources(ctx context.Context, flow *ofp.OfpFlowStats, intfID uint32, onuID int32, uniID int32,
-	gemPortID int32, flowID uint64, portNum uint32, direction string) error {
-
-	tpID, err := getTpIDFromFlow(ctx, flow)
-	if err != nil {
-		return olterrors.NewErrNotFound("tp-id",
-			log.Fields{
-				"flow":      flow,
-				"intf-id":   intfID,
-				"onu-id":    onuID,
-				"uni-id":    uniID,
-				"device-id": f.deviceHandler.device.Id}, err)
-	}
+	gemPortID int32, flowID uint64, portNum uint32, tpID uint32) error {
 
 	uni := getUniPortPath(f.deviceHandler.device.Id, intfID, onuID, uniID)
 	tpPath := f.getTPpath(ctx, intfID, uni, tpID)
@@ -1928,9 +1917,6 @@
 				"usedByFlows": flowIDs,
 				"device-id":   f.deviceHandler.device.Id})
 
-		if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, intfID, uint32(onuID), uint32(uniID), tpID, false); err != nil {
-			return err
-		}
 		return nil
 	}
 	logger.Debugf(ctx, "gem-port-id %d is-not-used-by-another-flow--releasing-the-gem-port", gemPortID)
@@ -2086,8 +2072,19 @@
 		logger.Errorw(ctx, "failed-to-remove-flow-on-kv-store", log.Fields{"error": err})
 		return err
 	}
+	tpID, err := getTpIDFromFlow(ctx, flow)
+	if err != nil {
+		return olterrors.NewErrNotFound("tp-id",
+			log.Fields{
+				"flow":      flow,
+				"intf-id":   Intf,
+				"onu-id":    onuID,
+				"uni-id":    uniID,
+				"device-id": f.deviceHandler.device.Id}, err)
+	}
+
 	if !flowInfo.Flow.ReplicateFlow {
-		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
+		if err = f.clearResources(ctx, flow, Intf, onuID, uniID, flowInfo.Flow.GemportId, flowInfo.Flow.FlowId, portNum, tpID); err != nil {
 			logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 				"flow-id":        flow.Id,
 				"stored-flow":    flowInfo.Flow,
@@ -2095,6 +2092,7 @@
 				"stored-flow-id": flowInfo.Flow.FlowId,
 				"onu-id":         onuID,
 				"intf":           Intf,
+				"err":            err,
 			})
 			return err
 		}
@@ -2105,7 +2103,7 @@
 		}
 		logger.Debugw(ctx, "gems-to-be-cleared", log.Fields{"gems": gems})
 		for _, gem := range gems {
-			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, flowDirection); err != nil {
+			if err = f.clearResources(ctx, flow, Intf, onuID, uniID, int32(gem), flowInfo.Flow.FlowId, portNum, tpID); err != nil {
 				logger.Errorw(ctx, "failed-to-clear-resources-for-flow", log.Fields{
 					"flow-id":        flow.Id,
 					"stored-flow":    flowInfo.Flow,
@@ -2114,6 +2112,7 @@
 					"onu-id":         onuID,
 					"intf":           Intf,
 					"gem":            gem,
+					"err":            err,
 				})
 				return err
 			}
@@ -2136,6 +2135,10 @@
 			f.subscriberDataPathFlowIDMapLock.Unlock()
 		}
 	}
+	// Decrement reference count for the meter associated with the given <(pon_id, onu_id, uni_id)>/<tp_id>/meter_id/<direction>
+	if err := f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, flowDirection, Intf, uint32(onuID), uint32(uniID), tpID, false); err != nil {
+		return err
+	}
 	return nil
 }
 
diff --git a/internal/pkg/resourcemanager/resourcemanager.go b/internal/pkg/resourcemanager/resourcemanager.go
index 4e7aa79..a501310 100755
--- a/internal/pkg/resourcemanager/resourcemanager.go
+++ b/internal/pkg/resourcemanager/resourcemanager.go
@@ -1107,13 +1107,25 @@
 func (RsrcMgr *OpenOltResourceMgr) HandleMeterInfoRefCntUpdate(ctx context.Context, Direction string,
 	IntfID uint32, OnuID uint32, UniID uint32, TpID uint32, increment bool) error {
 	meterInfo, err := RsrcMgr.GetMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID)
-	if meterInfo == nil || err != nil {
-		return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+	if err != nil {
+		return err
+	} else if meterInfo == nil {
+		// If we are increasing the reference count, we expect the meter information to be present on KV store.
+		// But if decrementing the reference count, the meter is possibly already cleared from KV store. Just log warn but do not return error.
+		if increment {
+			logger.Errorf(ctx, "error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+			return fmt.Errorf("error-fetching-meter-info-for-intf-%d-onu-%d-uni-%d-tp-id-%d-direction-%s", IntfID, OnuID, UniID, TpID, Direction)
+		}
+		logger.Warnw(ctx, "meter is already cleared",
+			log.Fields{"intfID": IntfID, "onuID": OnuID, "uniID": UniID, "direction": Direction, "increment": increment})
+		return nil
 	}
+
 	if increment {
 		meterInfo.RefCnt++
 	} else {
 		meterInfo.RefCnt--
+		// If RefCnt become 0 clear the meter information from the DB.
 		if meterInfo.RefCnt == 0 {
 			if err := RsrcMgr.RemoveMeterInfoForOnu(ctx, Direction, IntfID, OnuID, UniID, TpID); err != nil {
 				return err