VOL-4522: If meter mismatch occurs for any direction, the corresponding
direction's schedulers/queues are not installed again

Change-Id: I923dd5f1ea120f9a71cede005e5dec1b8541cc85
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index 53b25e9..50c1707 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -401,7 +401,7 @@
 			"flowmetadata": sq.flowMetadata,
 			"device-id":    f.deviceHandler.device.Id})
 
-	Direction, err := verifyMeterIDAndGetDirection(sq.meterID, sq.direction)
+	direction, err := verifyMeterIDAndGetDirection(sq.meterID, sq.direction)
 	if err != nil {
 		return err
 	}
@@ -412,7 +412,7 @@
 	 */
 
 	var SchedCfg *tp_pb.SchedulerConfig
-	meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
+	meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID)
 	if err != nil {
 		return olterrors.NewErrNotFound("meter",
 			log.Fields{"intf-id": sq.intfID,
@@ -421,29 +421,17 @@
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 
-	if meterInfo != nil {
-		// If RefCnt become 0 clear the meter information from the DB.
-		if meterInfo.MeterID != sq.meterID && meterInfo.RefCnt == 0 {
-			if err := f.resourceMgr.RemoveMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID); err != nil {
-				return err
-			}
-		} else {
-			logger.Debugw(ctx, "scheduler-already-created-for-upstream", log.Fields{"device-id": f.deviceHandler.device.Id, "meter-id": sq.meterID})
-			if meterInfo.MeterID == sq.meterID {
-				return f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true)
-			}
-			return olterrors.NewErrInvalidValue(log.Fields{
-				"unsupported":       "meter-id",
-				"kv-store-meter-id": meterInfo.MeterID,
-				"meter-id-in-flow":  sq.meterID,
-				"device-id":         f.deviceHandler.device.Id}, nil)
-		}
+	// update referernce count and return if the meter was already installed before
+	if meterInfo != nil && meterInfo.MeterID == sq.meterID {
+		logger.Debugw(ctx, "scheduler-already-created-for-direction",
+			log.Fields{"device-id": f.deviceHandler.device.Id, "direction": direction, "meter-id": sq.meterID})
+		return f.resourceMgr.HandleMeterInfoRefCntUpdate(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, true)
 	}
 
 	logger.Debugw(ctx, "meter-does-not-exist-creating-new",
 		log.Fields{
 			"meter-id":  sq.meterID,
-			"direction": Direction,
+			"direction": direction,
 			"device-id": f.deviceHandler.device.Id})
 
 	if sq.direction == tp_pb.Direction_UPSTREAM {
@@ -498,14 +486,14 @@
 	/* After we successfully applied the scheduler configuration on the OLT device,
 	 * store the meter id on the KV store, for further reference.
 	 */
-	if err := f.resourceMgr.StoreMeterInfoForOnu(ctx, Direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterInfo); err != nil {
+	if err := f.resourceMgr.StoreMeterInfoForOnu(ctx, direction, sq.intfID, sq.onuID, sq.uniID, sq.tpID, meterInfo); err != nil {
 		return olterrors.NewErrAdapter("failed-updating-meter-id",
 			log.Fields{"onu-id": sq.onuID,
 				"meter-id":  sq.meterID,
 				"device-id": f.deviceHandler.device.Id}, err)
 	}
 	logger.Infow(ctx, "updated-meter-info-into-kv-store-successfully",
-		log.Fields{"direction": Direction,
+		log.Fields{"direction": direction,
 			"meter-info": meterInfo,
 			"device-id":  f.deviceHandler.device.Id})
 	return nil
@@ -2380,7 +2368,7 @@
 	}
 	f.onuGemInfoLock.Unlock()
 
-	TpID, err := getTpIDFromFlow(ctx, flow)
+	tpID, err := getTpIDFromFlow(ctx, flow)
 	if err != nil {
 		return olterrors.NewErrNotFound("tpid-for-flow",
 			log.Fields{
@@ -2391,19 +2379,26 @@
 	}
 	logger.Debugw(ctx, "tpid-for-this-subcriber",
 		log.Fields{
-			"tp-id":   TpID,
+			"tp-id":   tpID,
 			"intf-id": intfID,
 			"onu-id":  onuID,
 			"uni-id":  uniID})
 	if plt.IsUpstream(actionInfo[Output].(uint32)) {
 		UsMeterID = flows.GetMeterIdFromFlow(flow)
 		logger.Debugw(ctx, "upstream-flow-meter-id", log.Fields{"us-meter-id": UsMeterID})
+		if err := f.validateMeter(ctx, Upstream, UsMeterID, intfID, onuID, uniID, tpID); err != nil {
+			logger.Errorw(ctx, "meter-validation-failed", log.Fields{"err": err})
+			return err
+		}
 	} else {
 		DsMeterID = flows.GetMeterIdFromFlow(flow)
 		logger.Debugw(ctx, "downstream-flow-meter-id", log.Fields{"ds-meter-id": DsMeterID})
-
+		if err := f.validateMeter(ctx, Downstream, DsMeterID, intfID, onuID, uniID, tpID); err != nil {
+			logger.Errorw(ctx, "meter-validation-failed", log.Fields{"err": err})
+			return err
+		}
 	}
-	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, TpID, UsMeterID, DsMeterID, flowMetadata)
+	return f.processAddFlow(ctx, intfID, onuID, uniID, portNo, classifierInfo, actionInfo, flow, tpID, UsMeterID, DsMeterID, flowMetadata)
 }
 
 // handleFlowWithGroup adds multicast flow to the device.
@@ -3518,3 +3513,36 @@
 		UniId: sq.uniID, PortNo: sq.uniPort,
 		TrafficScheds: TrafficSched})
 }
+
+// validateMeter validates if there is a meter mismatch for the given direction. It also clears the stale meter if the reference count is zero
+func (f *OpenOltFlowMgr) validateMeter(ctx context.Context, direction string, meterID uint32, intfID uint32, onuID uint32, uniID uint32, tpID uint32) error {
+	meterInfo, err := f.resourceMgr.GetMeterInfoForOnu(ctx, direction, intfID, onuID, uniID, tpID)
+	if err != nil {
+		return olterrors.NewErrNotFound("meter",
+			log.Fields{"intf-id": intfID,
+				"onu-id":    onuID,
+				"uni-id":    uniID,
+				"device-id": f.deviceHandler.device.Id}, err)
+	}
+
+	if meterInfo != nil {
+		// If RefCnt become 0 clear the meter information from the DB.
+		if meterInfo.MeterID != meterID && meterInfo.RefCnt == 0 {
+			if err := f.resourceMgr.RemoveMeterInfoForOnu(ctx, direction, intfID, onuID, uniID, tpID); err != nil {
+				return err
+			}
+		} else if meterInfo.MeterID != meterID {
+			logger.Errorw(ctx, "meter-mismatch-for-direction",
+				log.Fields{"direction": direction,
+					"kv-store-meter-id": meterInfo.MeterID,
+					"meter-id-in-flow":  meterID,
+					"device-id":         f.deviceHandler.device.Id})
+			return olterrors.NewErrInvalidValue(log.Fields{
+				"unsupported":       "meter-id",
+				"kv-store-meter-id": meterInfo.MeterID,
+				"meter-id-in-flow":  meterID,
+				"device-id":         f.deviceHandler.device.Id}, nil)
+		}
+	}
+	return nil
+}