VOL-3727 Support in OLT adapter to send periodic onu stats and gem stats
If enableOnuStats and enableGemStats flags are set, olt adapter collects these stats from agent and sends kpis to kafka. Default values are false.
Change-Id: Ifa96140518921d3d79118118a971d77c798dc2ab
diff --git a/internal/pkg/core/device_handler.go b/internal/pkg/core/device_handler.go
index e288838..fa8dd52 100644
--- a/internal/pkg/core/device_handler.go
+++ b/internal/pkg/core/device_handler.go
@@ -899,7 +899,7 @@
intfID := PortNoToIntfID(port.PortNo, voltha.Port_ETHERNET_NNI)
cmnni := dh.portStats.collectNNIMetrics(intfID)
logger.Debugw(ctx, "collect-nni-metrics", log.Fields{"metrics": cmnni})
- go dh.portStats.publishMetrics(ctx, cmnni, port, dh.device.Id, dh.device.Type)
+ go dh.portStats.publishMetrics(ctx, NNIStats, cmnni, port, dh.device.Id, dh.device.Type)
logger.Debugw(ctx, "publish-nni-metrics", log.Fields{"nni-port": port.Label})
}
// PON Stats
@@ -908,9 +908,15 @@
if val, ok := dh.activePorts.Load(intfID); ok && val == true {
cmpon := dh.portStats.collectPONMetrics(intfID)
logger.Debugw(ctx, "collect-pon-metrics", log.Fields{"metrics": cmpon})
- go dh.portStats.publishMetrics(ctx, cmpon, port, dh.device.Id, dh.device.Type)
+ go dh.portStats.publishMetrics(ctx, PONStats, cmpon, port, dh.device.Id, dh.device.Type)
}
logger.Debugw(ctx, "publish-pon-metrics", log.Fields{"pon-port": port.Label})
+
+ //ONU & Gem Stats
+ onuGemInfo := dh.flowMgr[intfID].onuGemInfo
+ if len(onuGemInfo) != 0 {
+ go dh.portStats.collectOnuAndGemStats(ctx, onuGemInfo)
+ }
}
}
}
diff --git a/internal/pkg/core/openolt.go b/internal/pkg/core/openolt.go
index a22aac4..49c5675 100644
--- a/internal/pkg/core/openolt.go
+++ b/internal/pkg/core/openolt.go
@@ -51,6 +51,8 @@
HeartbeatFailReportInterval time.Duration
GrpcTimeoutInterval time.Duration
lockDeviceHandlersMap sync.RWMutex
+ enableONUStats bool
+ enableGemStats bool
}
//NewOpenOLT returns a new instance of OpenOLT
@@ -73,6 +75,8 @@
openOLT.GrpcTimeoutInterval = cfg.GrpcTimeoutInterval
openOLT.lockDeviceHandlersMap = sync.RWMutex{}
openOLT.configManager = cm
+ openOLT.enableONUStats = cfg.EnableONUStats
+ openOLT.enableGemStats = cfg.EnableGEMStats
return &openOLT
}
diff --git a/internal/pkg/core/openolt_flowmgr.go b/internal/pkg/core/openolt_flowmgr.go
index abaa97b..618a3f4 100644
--- a/internal/pkg/core/openolt_flowmgr.go
+++ b/internal/pkg/core/openolt_flowmgr.go
@@ -135,6 +135,8 @@
PortNo = "portNo"
//AllocID constant
AllocID = "allocId"
+ //GemID constant
+ GemID = "gemId"
//NoneOnuID constant
NoneOnuID = -1
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index cf6af12..1b52e7c 100755
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -20,7 +20,7 @@
import (
"context"
"fmt"
- "strconv"
+ rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
"sync"
"time"
@@ -30,8 +30,69 @@
"github.com/opencord/voltha-protos/v4/go/voltha"
)
+const (
+ //NNIStats statType constant
+ NNIStats = "NNIStats"
+ //PONStats statType constant
+ PONStats = "PONStats"
+ //ONUStats statType constant
+ ONUStats = "ONUStats"
+ //GEMStats statType constant
+ GEMStats = "GEMStats"
+
+ //RxBytes constant
+ RxBytes = "RxBytes"
+ //RxPackets constant
+ RxPackets = "RxPackets"
+ //TxBytes constant
+ TxBytes = "TxBytes"
+ //TxPackets constant
+ TxPackets = "TxPackets"
+ //FecCodewords constant
+ FecCodewords = "FecCodewords"
+ //BipUnits constant
+ BipUnits = "BipUnits"
+ //BipErrors constant
+ BipErrors = "BipErrors"
+ //RxPloamsNonIdle constant
+ RxPloamsNonIdle = "RxPloamsNonIdle"
+ //RxPloamsError constant
+ RxPloamsError = "RxPloamsError"
+ //RxOmci constant
+ RxOmci = "RxOmci"
+ //RxOmciPacketsCrcError constant
+ RxOmciPacketsCrcError = "RxOmciPacketsCrcError"
+ //PositiveDrift constant
+ PositiveDrift = "PositiveDrift"
+ //NegativeDrift constant
+ NegativeDrift = "NegativeDrift"
+ //DelimiterMissDetection constant
+ DelimiterMissDetection = "DelimiterMissDetection"
+ //FecCorrectedSymbols constant
+ FecCorrectedSymbols = "FecCorrectedSymbols"
+ //FecCodewordsCorrected constant
+ FecCodewordsCorrected = "FecCodewordsCorrected"
+ //fecCodewordsUncorrectable constant
+ fecCodewordsUncorrectable = "fec_codewords_uncorrectable"
+ //FecCorrectedUnits constant
+ FecCorrectedUnits = "FecCorrectedUnits"
+ //XGEMKeyErrors constant
+ XGEMKeyErrors = "XGEMKeyErrors"
+ //XGEMLoss constant
+ XGEMLoss = "XGEMLOSS"
+ //BerReported constant
+ BerReported = "BerReported"
+ //LcdgErrors constant
+ LcdgErrors = "LcdgErrors"
+ //RdiErrors constant
+ RdiErrors = "RdiErrors"
+)
+
var mutex = &sync.Mutex{}
+var onuStats = make(chan *openolt.OnuStatistics, 100)
+var gemStats = make(chan *openolt.GemPortStatistics, 100)
+
// PonPort representation
type PonPort struct {
/*
@@ -204,6 +265,12 @@
NumPonPorts := Dev.resourceMgr.DevInfo.GetPonPorts()
Ports, _ = InitPorts(ctx, "pon", Dev.device.Id, NumPonPorts)
StatMgr.SouthBoundPort, _ = Ports.(map[uint32]*PonPort)
+ if StatMgr.Device.openOLT.enableONUStats {
+ go StatMgr.publishOnuStats()
+ }
+ if StatMgr.Device.openOLT.enableGemStats {
+ go StatMgr.publishGemStats()
+ }
return &StatMgr
}
@@ -361,8 +428,108 @@
return ponval
}
-// publishMatrics will publish the pon port metrics
-func (StatMgr OpenOltStatisticsMgr) publishMetrics(ctx context.Context, val map[string]float32,
+// converGemStats will convert gem stats response to kpi context
+func (StatMgr *OpenOltStatisticsMgr) convertGemStats(gemStats *openolt.GemPortStatistics) map[string]float32 {
+ gemStatsVal := make(map[string]float32)
+ gemStatsVal[IntfID] = float32(gemStats.IntfId)
+ gemStatsVal[GemID] = float32(gemStats.GemportId)
+ gemStatsVal[RxPackets] = float32(gemStats.RxPackets)
+ gemStatsVal[RxBytes] = float32(gemStats.RxBytes)
+ gemStatsVal[TxPackets] = float32(gemStats.TxPackets)
+ gemStatsVal[TxBytes] = float32(gemStats.TxBytes)
+ return gemStatsVal
+}
+
+// convertONUStats will convert onu stats response to kpi context
+func (StatMgr *OpenOltStatisticsMgr) convertONUStats(onuStats *openolt.OnuStatistics) map[string]float32 {
+ onuStatsVal := make(map[string]float32)
+ onuStatsVal[IntfID] = float32(onuStats.IntfId)
+ onuStatsVal[OnuID] = float32(onuStats.OnuId)
+ onuStatsVal[PositiveDrift] = float32(onuStats.PositiveDrift)
+ onuStatsVal[NegativeDrift] = float32(onuStats.NegativeDrift)
+ onuStatsVal[DelimiterMissDetection] = float32(onuStats.DelimiterMissDetection)
+ onuStatsVal[BipErrors] = float32(onuStats.BipErrors)
+ onuStatsVal[BipUnits] = float32(onuStats.BipUnits)
+ onuStatsVal[FecCorrectedSymbols] = float32(onuStats.FecCorrectedSymbols)
+ onuStatsVal[FecCodewordsCorrected] = float32(onuStats.FecCodewordsCorrected)
+ onuStatsVal[fecCodewordsUncorrectable] = float32(onuStats.FecCodewordsUncorrectable)
+ onuStatsVal[FecCodewords] = float32(onuStats.FecCodewords)
+ onuStatsVal[FecCorrectedUnits] = float32(onuStats.FecCorrectedUnits)
+ onuStatsVal[XGEMKeyErrors] = float32(onuStats.XgemKeyErrors)
+ onuStatsVal[XGEMLoss] = float32(onuStats.XgemLoss)
+ onuStatsVal[RxPloamsError] = float32(onuStats.RxPloamsError)
+ onuStatsVal[RxPloamsNonIdle] = float32(onuStats.RxPloamsNonIdle)
+ onuStatsVal[RxOmci] = float32(onuStats.RxOmci)
+ onuStatsVal[RxOmciPacketsCrcError] = float32(onuStats.RxOmciPacketsCrcError)
+ onuStatsVal[RxBytes] = float32(onuStats.RxBytes)
+ onuStatsVal[RxPackets] = float32(onuStats.RxPackets)
+ onuStatsVal[TxBytes] = float32(onuStats.TxBytes)
+ onuStatsVal[TxPackets] = float32(onuStats.TxPackets)
+ onuStatsVal[BerReported] = float32(onuStats.BerReported)
+ onuStatsVal[LcdgErrors] = float32(onuStats.LcdgErrors)
+ onuStatsVal[RdiErrors] = float32(onuStats.RdiErrors)
+ return onuStatsVal
+}
+
+// collectOnuStats will collect the onu metrics
+func (StatMgr *OpenOltStatisticsMgr) collectOnuStats(ctx context.Context, onuGemInfo rsrcMgr.OnuGemInfo) {
+ onu := &openolt.Onu{IntfId: onuGemInfo.IntfID, OnuId: onuGemInfo.OnuID}
+ logger.Debugw(ctx, "pulling-onu-stats", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID})
+ if stats, err := StatMgr.Device.Client.GetOnuStatistics(context.Background(), onu); err == nil {
+ onuStats <- stats
+ } else {
+ logger.Errorw(ctx, "error-while-getting-onu-stats-for-onu", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "err": err})
+ }
+}
+
+// collectOnuAndGemStats will collect both onu and gem metrics
+func (StatMgr *OpenOltStatisticsMgr) collectOnuAndGemStats(ctx context.Context, onuGemInfo []rsrcMgr.OnuGemInfo) {
+ if !StatMgr.Device.openOLT.enableONUStats && !StatMgr.Device.openOLT.enableGemStats {
+ return
+ }
+
+ for _, onuInfo := range onuGemInfo {
+ if StatMgr.Device.openOLT.enableONUStats {
+ go StatMgr.collectOnuStats(ctx, onuInfo)
+ }
+ if StatMgr.Device.openOLT.enableGemStats {
+ go StatMgr.collectGemStats(ctx, onuInfo)
+ }
+ }
+}
+
+// collectGemStats will collect gem metrics
+func (StatMgr *OpenOltStatisticsMgr) collectGemStats(ctx context.Context, onuGemInfo rsrcMgr.OnuGemInfo) {
+ for _, gem := range onuGemInfo.GemPorts {
+ logger.Debugw(ctx, "pulling-gem-stats", log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "GemID": gem})
+ onuPacket := &openolt.OnuPacket{IntfId: onuGemInfo.IntfID, OnuId: onuGemInfo.OnuID, GemportId: gem}
+ if stats, err := StatMgr.Device.Client.GetGemPortStatistics(context.Background(), onuPacket); err == nil {
+ gemStats <- stats
+ } else {
+ logger.Errorw(ctx, "error-while-getting-gem-stats-for-onu",
+ log.Fields{"IntfID": onuGemInfo.IntfID, "OnuID": onuGemInfo.OnuID, "GemID": gem, "err": err})
+ }
+ }
+}
+
+// publishGemStats will publish the gem metrics
+func (StatMgr *OpenOltStatisticsMgr) publishGemStats() {
+ for {
+ statValue := StatMgr.convertGemStats(<-gemStats)
+ StatMgr.publishMetrics(context.Background(), GEMStats, statValue, &voltha.Port{Label: "GEM"}, StatMgr.Device.device.Id, StatMgr.Device.device.Type)
+ }
+}
+
+// publishOnuStats will publish the onu metrics
+func (StatMgr *OpenOltStatisticsMgr) publishOnuStats() {
+ for {
+ statValue := StatMgr.convertONUStats(<-onuStats)
+ StatMgr.publishMetrics(context.Background(), ONUStats, statValue, &voltha.Port{Label: "ONU"}, StatMgr.Device.device.Id, StatMgr.Device.device.Type)
+ }
+}
+
+// publishMetrics will publish the pon port metrics
+func (StatMgr OpenOltStatisticsMgr) publishMetrics(ctx context.Context, statType string, val map[string]float32,
port *voltha.Port, devID string, devType string) {
logger.Debugw(ctx, "publish-metrics",
log.Fields{
@@ -376,17 +543,18 @@
metricsContext["oltid"] = devID
metricsContext["devicetype"] = devType
metricsContext["portlabel"] = port.Label
- metricsContext["portno"] = strconv.Itoa(int(port.PortNo))
- if port.Type == voltha.Port_ETHERNET_NNI {
+ if statType == NNIStats {
volthaEventSubCatgry = voltha.EventSubCategory_NNI
- } else {
+ } else if statType == PONStats {
volthaEventSubCatgry = voltha.EventSubCategory_PON
+ } else if statType == GEMStats || statType == ONUStats {
+ volthaEventSubCatgry = voltha.EventSubCategory_ONT
}
raisedTs := time.Now().UnixNano()
mmd := voltha.MetricMetaData{
- Title: port.Type.String(),
+ Title: statType,
Ts: float64(raisedTs),
Context: metricsContext,
DeviceId: devID,
@@ -400,7 +568,7 @@
ke.Ts = float64(time.Now().UnixNano())
if err := StatMgr.Device.EventProxy.SendKpiEvent(ctx, "STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, volthaEventSubCatgry, raisedTs); err != nil {
- logger.Errorw(ctx, "failed-to-send-pon-stats", log.Fields{"err": err})
+ logger.Errorw(ctx, "failed-to-send-stats", log.Fields{"err": err})
}
}
diff --git a/internal/pkg/core/statsmanager_test.go b/internal/pkg/core/statsmanager_test.go
index 438d117..1ea2ddd 100644
--- a/internal/pkg/core/statsmanager_test.go
+++ b/internal/pkg/core/statsmanager_test.go
@@ -69,8 +69,9 @@
SouthBoundPort map[uint32]*PonPort
}
type args struct {
- val map[string]float32
- port *voltha.Port
+ val map[string]float32
+ port *voltha.Port
+ statType string
}
ponmap := map[uint32]*PonPort{}
ponmap[0] = &PonPort{
@@ -126,6 +127,9 @@
dhandlerNNI.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: nil, NorthBoundPort: nnimap}
dhandlerPON := newMockDeviceHandler()
dhandlerPON.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: ponmap, NorthBoundPort: nil}
+ dhandlerONU := newMockDeviceHandler()
+ dhandlerGEM := newMockDeviceHandler()
+
tests := []struct {
name string
fields fields
@@ -139,8 +143,9 @@
SouthBoundPort: nil,
},
args: args{
- val: nval,
- port: &voltha.Port{PortNo: 0, Label: fmt.Sprintf("%s%d", "nni-", 0), Type: voltha.Port_ETHERNET_NNI},
+ val: nval,
+ port: &voltha.Port{PortNo: 0, Label: fmt.Sprintf("%s%d", "nni-", 0), Type: voltha.Port_ETHERNET_NNI},
+ statType: NNIStats,
},
},
{
@@ -151,8 +156,33 @@
SouthBoundPort: ponmap,
},
args: args{
- val: pval,
- port: &voltha.Port{PortNo: 1, Label: fmt.Sprintf("%s%d", "pon-", 1), Type: voltha.Port_PON_OLT},
+ val: pval,
+ port: &voltha.Port{PortNo: 1, Label: fmt.Sprintf("%s%d", "pon-", 1), Type: voltha.Port_PON_OLT},
+ statType: PONStats,
+ },
+ },
+ {
+ name: "PublishONUMetrics-1",
+ fields: fields{
+ Device: dhandlerONU,
+ NorthBoundPort: nil,
+ SouthBoundPort: nil,
+ },
+ args: args{
+ port: &voltha.Port{Label: "ONU"},
+ statType: ONUStats,
+ },
+ },
+ {
+ name: "PublishGEMMetrics-1",
+ fields: fields{
+ Device: dhandlerGEM,
+ NorthBoundPort: nil,
+ SouthBoundPort: nil,
+ },
+ args: args{
+ port: &voltha.Port{Label: "GEM"},
+ statType: GEMStats,
},
},
// TODO: Add test cases.
@@ -164,7 +194,12 @@
NorthBoundPort: tt.fields.NorthBoundPort,
SouthBoundPort: tt.fields.SouthBoundPort,
}
- StatMgr.publishMetrics(context.Background(), tt.args.val, tt.args.port, "onu1", "openolt")
+ if tt.args.statType == ONUStats {
+ tt.args.val = StatMgr.convertONUStats(&openolt.OnuStatistics{IntfId: 1, OnuId: 1, PositiveDrift: 123, BipErrors: 22})
+ } else if tt.args.statType == GEMStats {
+ tt.args.val = StatMgr.convertGemStats(&openolt.GemPortStatistics{IntfId: 1, GemportId: 1024, RxPackets: 12, TxBytes: 12})
+ }
+ StatMgr.publishMetrics(context.Background(), tt.args.statType, tt.args.val, tt.args.port, "onu1", "openolt")
})
}
}