VOL-3727 Support in OLT adapter to send periodic onu stats and gem stats

If enableOnuStats and enableGemStats flags are set, olt adapter collects these stats from agent and sends kpis to kafka. Default values are false.

Change-Id: Ifa96140518921d3d79118118a971d77c798dc2ab
diff --git a/internal/pkg/config/config.go b/internal/pkg/config/config.go
index e720890..4e6fd5f 100644
--- a/internal/pkg/config/config.go
+++ b/internal/pkg/config/config.go
@@ -55,6 +55,8 @@
 	defaultTraceAgentAddress     = "127.0.0.1:6831"
 	defaultLogCorrelationEnabled = true
 	defaultOmccEncryption        = false
+	defaultEnableONUStats        = false
+	defaultEnableGEMStats        = false
 )
 
 // AdapterFlags represents the set of configurations used by the read-write adaptercore service
@@ -86,6 +88,8 @@
 	TraceAgentAddress           string
 	LogCorrelationEnabled       bool
 	OmccEncryption              bool
+	EnableONUStats              bool
+	EnableGEMStats              bool
 }
 
 // NewAdapterFlags returns a new RWCore config
@@ -114,6 +118,8 @@
 		TraceAgentAddress:           defaultTraceAgentAddress,
 		LogCorrelationEnabled:       defaultLogCorrelationEnabled,
 		OmccEncryption:              defaultOmccEncryption,
+		EnableONUStats:              defaultEnableONUStats,
+		EnableGEMStats:              defaultEnableGEMStats,
 	}
 	return &adapterFlags
 }
@@ -193,6 +199,12 @@
 	help = fmt.Sprintf("OMCI Channel encryption status")
 	flag.BoolVar(&(so.OmccEncryption), "omcc_encryption", defaultOmccEncryption, help)
 
+	help = fmt.Sprintf("Enable ONU Statistics")
+	flag.BoolVar(&(so.EnableONUStats), "enable_onu_stats", defaultEnableONUStats, help)
+
+	help = fmt.Sprintf("Enable GEM Statistics")
+	flag.BoolVar(&(so.EnableGEMStats), "enable_gem_stats", defaultEnableGEMStats, help)
+
 	flag.Parse()
 	containerName := getContainerInfo()
 	if len(containerName) > 0 {
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index e288838..fa8dd52 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -899,7 +899,7 @@
 					intfID := PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)
 					cmnni := dh.portStats.collectNNIMetrics(intfID)
 					logger.Debugw(ctx, "collect-nni-metrics", log.Fields{"metrics": cmnni})
-					go dh.portStats.publishMetrics(ctx, cmnni, port, dh.device.Id, dh.device.Type)
+					go dh.portStats.publishMetrics(ctx, NNIStats, cmnni, port, dh.device.Id, dh.device.Type)
 					logger.Debugw(ctx, "publish-nni-metrics", log.Fields{"nni-port": port.Label})
 				}
 				// PON Stats
@@ -908,9 +908,15 @@
 					if val, ok := dh.activePorts.Load(intfID); ok && val == true {
 						cmpon := dh.portStats.collectPONMetrics(intfID)
 						logger.Debugw(ctx, "collect-pon-metrics", log.Fields{"metrics": cmpon})
-						go dh.portStats.publishMetrics(ctx, cmpon, port, dh.device.Id, dh.device.Type)
+						go dh.portStats.publishMetrics(ctx, PONStats, cmpon, port, dh.device.Id, dh.device.Type)
 					}
 					logger.Debugw(ctx, "publish-pon-metrics", log.Fields{"pon-port": port.Label})
+
+					//ONU & Gem Stats
+					onuGemInfo := dh.flowMgr[intfID].onuGemInfo
+					if len(onuGemInfo) != 0 {
+						go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfo)
+					}
 				}
 			}
 		}
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index a22aac4..49c5675 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -51,6 +51,8 @@
 	HeartbeatFailReportInterval time.Duration
 	GrpcTimeoutInterval         time.Duration
 	lockDeviceHandlersMap       sync.RWMutex
+	enableONUStats              bool
+	enableGemStats              bool
 }
 
 //NewOpenOLT returns a new instance of OpenOLT
@@ -73,6 +75,8 @@
 	openOLT.GrpcTimeoutInterval = cfg.GrpcTimeoutInterval
 	openOLT.lockDeviceHandlersMap = sync.RWMutex{}
 	openOLT.configManager = cm
+	openOLT.enableONUStats = cfg.EnableONUStats
+	openOLT.enableGemStats = cfg.EnableGEMStats
 	return &openOLT
 }
 
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index abaa97b..618a3f4 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -135,6 +135,8 @@
 	PortNo = "portNo"
 	//AllocID constant
 	AllocID = "allocId"
+	//GemID constant
+	GemID = "gemId"
 
 	//NoneOnuID constant
 	NoneOnuID = -1
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index cf6af12..1b52e7c 100755
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -20,7 +20,7 @@
 import (
 	"context"
 	"fmt"
-	"strconv"
+	rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
 	"sync"
 	"time"
 
@@ -30,8 +30,69 @@
 	"github.com/opencord/voltha-protos/v4/go/voltha"
 )
 
+const (
+	//NNIStats statType constant
+	NNIStats = "NNIStats"
+	//PONStats statType constant
+	PONStats = "PONStats"
+	//ONUStats statType constant
+	ONUStats = "ONUStats"
+	//GEMStats statType constant
+	GEMStats = "GEMStats"
+
+	//RxBytes constant
+	RxBytes = "RxBytes"
+	//RxPackets constant
+	RxPackets = "RxPackets"
+	//TxBytes constant
+	TxBytes = "TxBytes"
+	//TxPackets constant
+	TxPackets = "TxPackets"
+	//FecCodewords constant
+	FecCodewords = "FecCodewords"
+	//BipUnits constant
+	BipUnits = "BipUnits"
+	//BipErrors constant
+	BipErrors = "BipErrors"
+	//RxPloamsNonIdle constant
+	RxPloamsNonIdle = "RxPloamsNonIdle"
+	//RxPloamsError constant
+	RxPloamsError = "RxPloamsError"
+	//RxOmci constant
+	RxOmci = "RxOmci"
+	//RxOmciPacketsCrcError constant
+	RxOmciPacketsCrcError = "RxOmciPacketsCrcError"
+	//PositiveDrift constant
+	PositiveDrift = "PositiveDrift"
+	//NegativeDrift constant
+	NegativeDrift = "NegativeDrift"
+	//DelimiterMissDetection constant
+	DelimiterMissDetection = "DelimiterMissDetection"
+	//FecCorrectedSymbols constant
+	FecCorrectedSymbols = "FecCorrectedSymbols"
+	//FecCodewordsCorrected constant
+	FecCodewordsCorrected = "FecCodewordsCorrected"
+	//fecCodewordsUncorrectable constant
+	fecCodewordsUncorrectable = "fec_codewords_uncorrectable"
+	//FecCorrectedUnits constant
+	FecCorrectedUnits = "FecCorrectedUnits"
+	//XGEMKeyErrors constant
+	XGEMKeyErrors = "XGEMKeyErrors"
+	//XGEMLoss constant
+	XGEMLoss = "XGEMLOSS"
+	//BerReported constant
+	BerReported = "BerReported"
+	//LcdgErrors constant
+	LcdgErrors = "LcdgErrors"
+	//RdiErrors constant
+	RdiErrors = "RdiErrors"
+)
+
 var mutex = &sync.Mutex{}
 
+var onuStats = make(chan *openolt.OnuStatistics, 100)
+var gemStats = make(chan *openolt.GemPortStatistics, 100)
+
 // PonPort representation
 type PonPort struct {
 	/*
@@ -204,6 +265,12 @@
 	NumPonPorts := Dev.resourceMgr.DevInfo.GetPonPorts()
 	Ports, _ = InitPorts(ctx, "pon", Dev.device.Id, NumPonPorts)
 	StatMgr.SouthBoundPort, _ = Ports.(map[uint32]*PonPort)
+	if StatMgr.Device.openOLT.enableONUStats {
+		go StatMgr.publishOnuStats()
+	}
+	if StatMgr.Device.openOLT.enableGemStats {
+		go StatMgr.publishGemStats()
+	}
 	return &StatMgr
 }
 
@@ -361,8 +428,108 @@
 	return ponval
 }
 
-// publishMatrics will publish the pon port metrics
-func (StatMgr OpenOltStatisticsMgr) publishMetrics(ctx context.Context, val map[string]float32,
+// converGemStats will convert gem stats response to kpi context
+func (StatMgr *OpenOltStatisticsMgr) convertGemStats(gemStats *openolt.GemPortStatistics) map[string]float32 {
+	gemStatsVal := make(map[string]float32)
+	gemStatsVal[IntfID] = float32(gemStats.IntfId)
+	gemStatsVal[GemID] = float32(gemStats.GemportId)
+	gemStatsVal[RxPackets] = float32(gemStats.RxPackets)
+	gemStatsVal[RxBytes] = float32(gemStats.RxBytes)
+	gemStatsVal[TxPackets] = float32(gemStats.TxPackets)
+	gemStatsVal[TxBytes] = float32(gemStats.TxBytes)
+	return gemStatsVal
+}
+
+// convertONUStats will convert onu stats response to kpi context
+func (StatMgr *OpenOltStatisticsMgr) convertONUStats(onuStats *openolt.OnuStatistics) map[string]float32 {
+	onuStatsVal := make(map[string]float32)
+	onuStatsVal[IntfID] = float32(onuStats.IntfId)
+	onuStatsVal[OnuID] = float32(onuStats.OnuId)
+	onuStatsVal[PositiveDrift] = float32(onuStats.PositiveDrift)
+	onuStatsVal[NegativeDrift] = float32(onuStats.NegativeDrift)
+	onuStatsVal[DelimiterMissDetection] = float32(onuStats.DelimiterMissDetection)
+	onuStatsVal[BipErrors] = float32(onuStats.BipErrors)
+	onuStatsVal[BipUnits] = float32(onuStats.BipUnits)
+	onuStatsVal[FecCorrectedSymbols] = float32(onuStats.FecCorrectedSymbols)
+	onuStatsVal[FecCodewordsCorrected] = float32(onuStats.FecCodewordsCorrected)
+	onuStatsVal[fecCodewordsUncorrectable] = float32(onuStats.FecCodewordsUncorrectable)
+	onuStatsVal[FecCodewords] = float32(onuStats.FecCodewords)
+	onuStatsVal[FecCorrectedUnits] = float32(onuStats.FecCorrectedUnits)
+	onuStatsVal[XGEMKeyErrors] = float32(onuStats.XgemKeyErrors)
+	onuStatsVal[XGEMLoss] = float32(onuStats.XgemLoss)
+	onuStatsVal[RxPloamsError] = float32(onuStats.RxPloamsError)
+	onuStatsVal[RxPloamsNonIdle] = float32(onuStats.RxPloamsNonIdle)
+	onuStatsVal[RxOmci] = float32(onuStats.RxOmci)
+	onuStatsVal[RxOmciPacketsCrcError] = float32(onuStats.RxOmciPacketsCrcError)
+	onuStatsVal[RxBytes] = float32(onuStats.RxBytes)
+	onuStatsVal[RxPackets] = float32(onuStats.RxPackets)
+	onuStatsVal[TxBytes] = float32(onuStats.TxBytes)
+	onuStatsVal[TxPackets] = float32(onuStats.TxPackets)
+	onuStatsVal[BerReported] = float32(onuStats.BerReported)
+	onuStatsVal[LcdgErrors] = float32(onuStats.LcdgErrors)
+	onuStatsVal[RdiErrors] = float32(onuStats.RdiErrors)
+	return onuStatsVal
+}
+
+// collectOnuStats will collect the onu metrics
+func (StatMgr *OpenOltStatisticsMgr) collectOnuStats(ctx context.Context, onuGemInfo rsrcMgr.OnuGemInfo) {
+	onu := &openolt.Onu{IntfId: onuGemInfo.IntfID, OnuId: onuGemInfo.OnuID}
+	logger.Debugw(ctx, "pulling-onu-stats", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID})
+	if stats, err := StatMgr.Device.Client.GetOnuStatistics(context.Background(), onu); err == nil {
+		onuStats <- stats
+	} else {
+		logger.Errorw(ctx, "error-while-getting-onu-stats-for-onu", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "err": err})
+	}
+}
+
+// collectOnuAndGemStats will collect both onu and gem metrics
+func (StatMgr *OpenOltStatisticsMgr) collectOnuAndGemStats(ctx context.Context, onuGemInfo []rsrcMgr.OnuGemInfo) {
+	if !StatMgr.Device.openOLT.enableONUStats && !StatMgr.Device.openOLT.enableGemStats {
+		return
+	}
+
+	for _, onuInfo := range onuGemInfo {
+		if StatMgr.Device.openOLT.enableONUStats {
+			go StatMgr.collectOnuStats(ctx, onuInfo)
+		}
+		if StatMgr.Device.openOLT.enableGemStats {
+			go StatMgr.collectGemStats(ctx, onuInfo)
+		}
+	}
+}
+
+// collectGemStats will collect gem metrics
+func (StatMgr *OpenOltStatisticsMgr) collectGemStats(ctx context.Context, onuGemInfo rsrcMgr.OnuGemInfo) {
+	for _, gem := range onuGemInfo.GemPorts {
+		logger.Debugw(ctx, "pulling-gem-stats", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "GemID": gem})
+		onuPacket := &openolt.OnuPacket{IntfId: onuGemInfo.IntfID, OnuId: onuGemInfo.OnuID, GemportId: gem}
+		if stats, err := StatMgr.Device.Client.GetGemPortStatistics(context.Background(), onuPacket); err == nil {
+			gemStats <- stats
+		} else {
+			logger.Errorw(ctx, "error-while-getting-gem-stats-for-onu",
+				log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "GemID": gem, "err": err})
+		}
+	}
+}
+
+// publishGemStats will publish the gem metrics
+func (StatMgr *OpenOltStatisticsMgr) publishGemStats() {
+	for {
+		statValue := StatMgr.convertGemStats(<-gemStats)
+		StatMgr.publishMetrics(context.Background(), GEMStats, statValue, &voltha.Port{Label: "GEM"}, StatMgr.Device.device.Id, StatMgr.Device.device.Type)
+	}
+}
+
+// publishOnuStats will publish the onu metrics
+func (StatMgr *OpenOltStatisticsMgr) publishOnuStats() {
+	for {
+		statValue := StatMgr.convertONUStats(<-onuStats)
+		StatMgr.publishMetrics(context.Background(), ONUStats, statValue, &voltha.Port{Label: "ONU"}, StatMgr.Device.device.Id, StatMgr.Device.device.Type)
+	}
+}
+
+// publishMetrics will publish the pon port metrics
+func (StatMgr OpenOltStatisticsMgr) publishMetrics(ctx context.Context, statType string, val map[string]float32,
 	port *voltha.Port, devID string, devType string) {
 	logger.Debugw(ctx, "publish-metrics",
 		log.Fields{
@@ -376,17 +543,18 @@
 	metricsContext["oltid"] = devID
 	metricsContext["devicetype"] = devType
 	metricsContext["portlabel"] = port.Label
-	metricsContext["portno"] = strconv.Itoa(int(port.PortNo))
 
-	if port.Type == voltha.Port_ETHERNET_NNI {
+	if statType == NNIStats {
 		volthaEventSubCatgry = voltha.EventSubCategory_NNI
-	} else {
+	} else if statType == PONStats {
 		volthaEventSubCatgry = voltha.EventSubCategory_PON
+	} else if statType == GEMStats || statType == ONUStats {
+		volthaEventSubCatgry = voltha.EventSubCategory_ONT
 	}
 
 	raisedTs := time.Now().UnixNano()
 	mmd := voltha.MetricMetaData{
-		Title:    port.Type.String(),
+		Title:    statType,
 		Ts:       float64(raisedTs),
 		Context:  metricsContext,
 		DeviceId: devID,
@@ -400,7 +568,7 @@
 	ke.Ts = float64(time.Now().UnixNano())
 
 	if err := StatMgr.Device.EventProxy.SendKpiEvent(ctx, "STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, volthaEventSubCatgry, raisedTs); err != nil {
-		logger.Errorw(ctx, "failed-to-send-pon-stats", log.Fields{"err": err})
+		logger.Errorw(ctx, "failed-to-send-stats", log.Fields{"err": err})
 	}
 }
 
diff --git a/internal/pkg/core/statsmanager_test.go b/internal/pkg/core/statsmanager_test.go
index 438d117..1ea2ddd 100644
--- a/internal/pkg/core/statsmanager_test.go
+++ b/internal/pkg/core/statsmanager_test.go
@@ -69,8 +69,9 @@
 		SouthBoundPort map[uint32]*PonPort
 	}
 	type args struct {
-		val  map[string]float32
-		port *voltha.Port
+		val      map[string]float32
+		port     *voltha.Port
+		statType string
 	}
 	ponmap := map[uint32]*PonPort{}
 	ponmap[0] = &PonPort{
@@ -126,6 +127,9 @@
 	dhandlerNNI.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: nil, NorthBoundPort: nnimap}
 	dhandlerPON := newMockDeviceHandler()
 	dhandlerPON.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: ponmap, NorthBoundPort: nil}
+	dhandlerONU := newMockDeviceHandler()
+	dhandlerGEM := newMockDeviceHandler()
+
 	tests := []struct {
 		name   string
 		fields fields
@@ -139,8 +143,9 @@
 				SouthBoundPort: nil,
 			},
 			args: args{
-				val:  nval,
-				port: &voltha.Port{PortNo: 0, Label: fmt.Sprintf("%s%d", "nni-", 0), Type: voltha.Port_ETHERNET_NNI},
+				val:      nval,
+				port:     &voltha.Port{PortNo: 0, Label: fmt.Sprintf("%s%d", "nni-", 0), Type: voltha.Port_ETHERNET_NNI},
+				statType: NNIStats,
 			},
 		},
 		{
@@ -151,8 +156,33 @@
 				SouthBoundPort: ponmap,
 			},
 			args: args{
-				val:  pval,
-				port: &voltha.Port{PortNo: 1, Label: fmt.Sprintf("%s%d", "pon-", 1), Type: voltha.Port_PON_OLT},
+				val:      pval,
+				port:     &voltha.Port{PortNo: 1, Label: fmt.Sprintf("%s%d", "pon-", 1), Type: voltha.Port_PON_OLT},
+				statType: PONStats,
+			},
+		},
+		{
+			name: "PublishONUMetrics-1",
+			fields: fields{
+				Device:         dhandlerONU,
+				NorthBoundPort: nil,
+				SouthBoundPort: nil,
+			},
+			args: args{
+				port:     &voltha.Port{Label: "ONU"},
+				statType: ONUStats,
+			},
+		},
+		{
+			name: "PublishGEMMetrics-1",
+			fields: fields{
+				Device:         dhandlerGEM,
+				NorthBoundPort: nil,
+				SouthBoundPort: nil,
+			},
+			args: args{
+				port:     &voltha.Port{Label: "GEM"},
+				statType: GEMStats,
 			},
 		},
 		// TODO: Add test cases.
@@ -164,7 +194,12 @@
 				NorthBoundPort: tt.fields.NorthBoundPort,
 				SouthBoundPort: tt.fields.SouthBoundPort,
 			}
-			StatMgr.publishMetrics(context.Background(), tt.args.val, tt.args.port, "onu1", "openolt")
+			if tt.args.statType == ONUStats {
+				tt.args.val = StatMgr.convertONUStats(&openolt.OnuStatistics{IntfId: 1, OnuId: 1, PositiveDrift: 123, BipErrors: 22})
+			} else if tt.args.statType == GEMStats {
+				tt.args.val = StatMgr.convertGemStats(&openolt.GemPortStatistics{IntfId: 1, GemportId: 1024, RxPackets: 12, TxBytes: 12})
+			}
+			StatMgr.publishMetrics(context.Background(), tt.args.statType, tt.args.val, tt.args.port, "onu1", "openolt")
 		})
 	}
 }