VOL-4029 Fetch ONU stats on demand

Change-Id: I7248ab65f320858e9bdf9bdf6a60a27a734b8640
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index d998dbb..493c83a 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -2581,3 +2581,40 @@
 	}
 	return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
 }
+
+func (dh *DeviceHandler) getOnuPonCounters(ctx context.Context, onuPonInfo *extension.GetOnuCountersRequest) *extension.SingleGetValueResponse {
+
+	singleValResp := extension.SingleGetValueResponse{
+		Response: &extension.GetValueResponse{
+			Response: &extension.GetValueResponse_OnuPonCounters{
+				OnuPonCounters: &extension.GetOnuCountersResponse{},
+			},
+		},
+	}
+
+	errResp := func(status extension.GetValueResponse_Status,
+		reason extension.GetValueResponse_ErrorReason) *extension.SingleGetValueResponse {
+		return &extension.SingleGetValueResponse{
+			Response: &extension.GetValueResponse{
+				Status:    status,
+				ErrReason: reason,
+			},
+		}
+	}
+	intfID := onuPonInfo.IntfId
+	onuID := onuPonInfo.OnuId
+	onuKey := dh.formOnuKey(intfID, onuID)
+
+	if _, ok := dh.onus.Load(onuKey); !ok {
+		logger.Errorw(ctx, "get-onui-pon-counters-request-invalid-request-received", log.Fields{"intfID": intfID, "onuID": onuID})
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INVALID_DEVICE)
+	}
+	logger.Debugw(ctx, "get-onui-pon-counters-request-received", log.Fields{"intfID": intfID, "onuID": onuID})
+	cmnni := dh.portStats.collectOnDemandOnuStats(ctx, intfID, onuID)
+	if cmnni == nil {
+		return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_INTERNAL_ERROR)
+	}
+	dh.portStats.updateGetOnuPonCountersResponse(ctx, &singleValResp, cmnni)
+	return &singleValResp
+
+}
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index 34e681e..51ce314 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -431,6 +431,8 @@
 		switch reqType := request.GetRequest().GetRequest().(type) {
 		case *extension.GetValueRequest_OltPortInfo:
 			return handler.getOltPortCounters(ctx, reqType.OltPortInfo), nil
+		case *extension.GetValueRequest_OnuPonInfo:
+			return handler.getOnuPonCounters(ctx, reqType.OnuPonInfo), nil
 		default:
 			return errResp(extension.GetValueResponse_ERROR, extension.GetValueResponse_UNSUPPORTED), nil
 		}
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 6fc3e53..42c60d7 100755
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -88,6 +88,8 @@
 	LcdgErrors = "LcdgErrors"
 	//RdiErrors constant
 	RdiErrors = "RdiErrors"
+	//Timestamp constant
+	Timestamp = "Timestamp"
 )
 
 var mutex = &sync.Mutex{}
@@ -492,6 +494,7 @@
 	onuStatsVal[BerReported] = float32(onuStats.BerReported)
 	onuStatsVal[LcdgErrors] = float32(onuStats.LcdgErrors)
 	onuStatsVal[RdiErrors] = float32(onuStats.RdiErrors)
+	onuStatsVal[Timestamp] = float32(onuStats.Timestamp)
 	return onuStatsVal
 }
 
@@ -506,6 +509,21 @@
 	}
 }
 
+// collectOnDemandOnuStats will collect the onui-pon metrics
+func (StatMgr *OpenOltStatisticsMgr) collectOnDemandOnuStats(ctx context.Context, intfID uint32, onuID uint32) map[string]float32 {
+	onu := &openolt.Onu{IntfId: intfID, OnuId: onuID}
+	var stats *openolt.OnuStatistics
+	var err error
+	logger.Debugw(ctx, "pulling-onu-stats-on-demand", log.Fields{"IntfID": intfID, "OnuID": onuID})
+	if stats, err = StatMgr.Device.Client.GetOnuStatistics(context.Background(), onu); err == nil {
+		statValue := StatMgr.convertONUStats(stats)
+		return statValue
+
+	}
+	logger.Errorw(ctx, "error-while-getting-onu-stats-for-onu", log.Fields{"IntfID": intfID, "OnuID": onuID, "err": err})
+	return nil
+}
+
 // collectOnuAndGemStats will collect both onu and gem metrics
 func (StatMgr *OpenOltStatisticsMgr) collectOnuAndGemStats(ctx context.Context, onuGemInfo []rsrcMgr.OnuGemInfo) {
 	if !StatMgr.Device.openOLT.enableONUStats && !StatMgr.Device.openOLT.enableGemStats {
@@ -778,3 +796,89 @@
 	}
 
 }
+
+func (StatMgr *OpenOltStatisticsMgr) updateGetOnuPonCountersResponse(ctx context.Context, singleValResp *extension.SingleGetValueResponse, stats map[string]float32) {
+
+	metrics := singleValResp.GetResponse().GetOnuPonCounters()
+	metrics.IsIntfId = &extension.GetOnuCountersResponse_IntfId{
+		IntfId: uint32(stats[IntfID]),
+	}
+	metrics.IsOnuId = &extension.GetOnuCountersResponse_OnuId{
+		OnuId: uint32(stats[OnuID]),
+	}
+	metrics.IsPositiveDrift = &extension.GetOnuCountersResponse_PositiveDrift{
+		PositiveDrift: uint64(stats[PositiveDrift]),
+	}
+	metrics.IsNegativeDrift = &extension.GetOnuCountersResponse_NegativeDrift{
+		NegativeDrift: uint64(stats[NegativeDrift]),
+	}
+	metrics.IsDelimiterMissDetection = &extension.GetOnuCountersResponse_DelimiterMissDetection{
+		DelimiterMissDetection: uint64(stats[DelimiterMissDetection]),
+	}
+	metrics.IsBipErrors = &extension.GetOnuCountersResponse_BipErrors{
+		BipErrors: uint64(stats[BipErrors]),
+	}
+	metrics.IsBipUnits = &extension.GetOnuCountersResponse_BipUnits{
+		BipUnits: uint64(stats[BipUnits]),
+	}
+	metrics.IsFecCorrectedSymbols = &extension.GetOnuCountersResponse_FecCorrectedSymbols{
+		FecCorrectedSymbols: uint64(stats[FecCorrectedSymbols]),
+	}
+	metrics.IsFecCodewordsCorrected = &extension.GetOnuCountersResponse_FecCodewordsCorrected{
+		FecCodewordsCorrected: uint64(stats[FecCodewordsCorrected]),
+	}
+	metrics.IsFecCodewordsUncorrectable = &extension.GetOnuCountersResponse_FecCodewordsUncorrectable{
+		FecCodewordsUncorrectable: uint64(stats[fecCodewordsUncorrectable]),
+	}
+	metrics.IsFecCodewords = &extension.GetOnuCountersResponse_FecCodewords{
+		FecCodewords: uint64(stats[FecCodewords]),
+	}
+	metrics.IsFecCorrectedUnits = &extension.GetOnuCountersResponse_FecCorrectedUnits{
+		FecCorrectedUnits: uint64(stats[FecCorrectedUnits]),
+	}
+	metrics.IsXgemKeyErrors = &extension.GetOnuCountersResponse_XgemKeyErrors{
+		XgemKeyErrors: uint64(stats[XGEMKeyErrors]),
+	}
+	metrics.IsXgemLoss = &extension.GetOnuCountersResponse_XgemLoss{
+		XgemLoss: uint64(stats[XGEMLoss]),
+	}
+	metrics.IsRxPloamsError = &extension.GetOnuCountersResponse_RxPloamsError{
+		RxPloamsError: uint64(stats[RxPloamsError]),
+	}
+	metrics.IsRxPloamsNonIdle = &extension.GetOnuCountersResponse_RxPloamsNonIdle{
+		RxPloamsNonIdle: uint64(stats[RxPloamsNonIdle]),
+	}
+	metrics.IsRxOmci = &extension.GetOnuCountersResponse_RxOmci{
+		RxOmci: uint64(stats[RxOmci]),
+	}
+	metrics.IsRxOmciPacketsCrcError = &extension.GetOnuCountersResponse_RxOmciPacketsCrcError{
+		RxOmciPacketsCrcError: uint64(stats[RxOmciPacketsCrcError]),
+	}
+	metrics.IsRxBytes = &extension.GetOnuCountersResponse_RxBytes{
+		RxBytes: uint64(stats[RxBytes]),
+	}
+	metrics.IsRxPackets = &extension.GetOnuCountersResponse_RxPackets{
+		RxPackets: uint64(stats[RxPackets]),
+	}
+	metrics.IsTxBytes = &extension.GetOnuCountersResponse_TxBytes{
+		TxBytes: uint64(stats[TxBytes]),
+	}
+	metrics.IsTxPackets = &extension.GetOnuCountersResponse_TxPackets{
+		TxPackets: uint64(stats[TxPackets]),
+	}
+	metrics.IsBerReported = &extension.GetOnuCountersResponse_BerReported{
+		BerReported: uint64(stats[BerReported]),
+	}
+	metrics.IsLcdgErrors = &extension.GetOnuCountersResponse_LcdgErrors{
+		LcdgErrors: uint64(stats[LcdgErrors]),
+	}
+	metrics.IsRdiErrors = &extension.GetOnuCountersResponse_RdiErrors{
+		RdiErrors: uint64(stats[RdiErrors]),
+	}
+	metrics.IsTimestamp = &extension.GetOnuCountersResponse_Timestamp{
+		Timestamp: uint32(stats[Timestamp]),
+	}
+
+	singleValResp.Response.Status = extension.GetValueResponse_OK
+	logger.Debugw(ctx, "updateGetOnuPonCountersResponse", log.Fields{"resp": singleValResp})
+}