VOL-3735 Retrieve port counters using the GetExtValue(SingleGetValue) rpc from the openolt adapter
Change-Id: I195f7c9fdc4b677c333ae9e4bb3547bb95312fdf
diff --git a/internal/pkg/core/statsmanager.go b/internal/pkg/core/statsmanager.go
index 1b52e7c..e13abed 100755
--- a/internal/pkg/core/statsmanager.go
+++ b/internal/pkg/core/statsmanager.go
@@ -18,6 +18,7 @@
package core
import (
+ "container/list"
"context"
"fmt"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/internal/pkg/resourcemanager"
@@ -26,6 +27,7 @@
"github.com/opencord/voltha-lib-go/v4/pkg/log"
"github.com/opencord/voltha-openolt-adapter/internal/pkg/olterrors"
+ "github.com/opencord/voltha-protos/v4/go/extension"
"github.com/opencord/voltha-protos/v4/go/openolt"
"github.com/opencord/voltha-protos/v4/go/voltha"
)
@@ -93,6 +95,14 @@
var onuStats = make(chan *openolt.OnuStatistics, 100)
var gemStats = make(chan *openolt.GemPortStatistics, 100)
+//statRegInfo is used to register for notifications
+//on receiving port stats and flow stats indication
+type statRegInfo struct {
+ chn chan bool
+ portNo uint32
+ portType extension.GetOltPortCounters_PortType
+}
+
// PonPort representation
type PonPort struct {
/*
@@ -242,12 +252,23 @@
return &NNI
}
+//StatType defines portStatsType and flowStatsType types
+type StatType int
+
+const (
+ portStatsType StatType = iota
+ flowStatsType
+)
+
// OpenOltStatisticsMgr structure
type OpenOltStatisticsMgr struct {
Device *DeviceHandler
NorthBoundPort map[uint32]*NniPort
SouthBoundPort map[uint32]*PonPort
// TODO PMMetrics Metrics
+ //statIndListners is the list of requests to be notified when port and flow stats indication is received
+ statIndListnerMu sync.Mutex
+ statIndListners map[StatType]*list.List
}
// NewOpenOltStatsMgr returns a new instance of the OpenOltStatisticsMgr
@@ -271,6 +292,9 @@
if StatMgr.Device.openOLT.enableGemStats {
go StatMgr.publishGemStats()
}
+ StatMgr.statIndListners = make(map[StatType]*list.List)
+ StatMgr.statIndListners[portStatsType] = list.New()
+ StatMgr.statIndListners[flowStatsType] = list.New()
return &StatMgr
}
@@ -529,7 +553,7 @@
}
// publishMetrics will publish the pon port metrics
-func (StatMgr OpenOltStatisticsMgr) publishMetrics(ctx context.Context, statType string, val map[string]float32,
+func (StatMgr *OpenOltStatisticsMgr) publishMetrics(ctx context.Context, statType string, val map[string]float32,
port *voltha.Port, devID string, devType string) {
logger.Debugw(ctx, "publish-metrics",
log.Fields{
@@ -576,6 +600,9 @@
func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(ctx context.Context, PortStats *openolt.PortStatistics, NumPonPorts uint32) {
StatMgr.PortsStatisticsKpis(ctx, PortStats, NumPonPorts)
logger.Debugw(ctx, "received-port-stats-indication", log.Fields{"port-stats": PortStats})
+ //Indicate that PortStatisticsIndication is handled
+ //PortStats.IntfId is actually the port number
+ StatMgr.processStatIndication(ctx, portStatsType, PortStats.IntfId)
// TODO send stats to core topic to the voltha kafka or a different kafka ?
}
@@ -673,3 +700,81 @@
*/
}
+
+func (StatMgr *OpenOltStatisticsMgr) updateGetOltPortCountersResponse(ctx context.Context, singleValResp *extension.SingleGetValueResponse, stats map[string]float32) {
+
+ metrics := singleValResp.GetResponse().GetPortCoutners()
+ metrics.TxBytes = uint64(stats["TxBytes"])
+ metrics.RxBytes = uint64(stats["RxBytes"])
+ metrics.TxPackets = uint64(stats["TxPackets"])
+ metrics.RxPackets = uint64(stats["RxPackets"])
+ metrics.TxErrorPackets = uint64(stats["TxErrorPackets"])
+ metrics.RxErrorPackets = uint64(stats["RxErrorPackets"])
+ metrics.TxBcastPackets = uint64(stats["TxBcastPackets"])
+ metrics.RxBcastPackets = uint64(stats["RxBcastPackets"])
+ metrics.TxUcastPackets = uint64(stats["TxUcastPackets"])
+ metrics.RxUcastPackets = uint64(stats["RxUcastPackets"])
+ metrics.TxMcastPackets = uint64(stats["TxMcastPackets"])
+ metrics.RxMcastPackets = uint64(stats["RxMcastPackets"])
+
+ singleValResp.Response.Status = extension.GetValueResponse_OK
+ logger.Debugw(ctx, "updateGetOltPortCountersResponse", log.Fields{"resp": singleValResp})
+}
+
+//RegisterForStatIndication registers ch as a channel on which indication is sent when statistics of type t is received
+func (StatMgr *OpenOltStatisticsMgr) RegisterForStatIndication(ctx context.Context, t StatType, ch chan bool, portNo uint32, portType extension.GetOltPortCounters_PortType) {
+ statInd := statRegInfo{
+ chn: ch,
+ portNo: portNo,
+ portType: portType,
+ }
+
+ logger.Debugf(ctx, "RegisterForStatIndication stat type %v portno %v porttype %v chan %v", t, portNo, portType, ch)
+ StatMgr.statIndListnerMu.Lock()
+ StatMgr.statIndListners[t].PushBack(statInd)
+ StatMgr.statIndListnerMu.Unlock()
+
+}
+
+//DeRegisterFromStatIndication removes the previously registered channel ch for type t of statistics
+func (StatMgr *OpenOltStatisticsMgr) DeRegisterFromStatIndication(ctx context.Context, t StatType, ch chan bool) {
+ StatMgr.statIndListnerMu.Lock()
+ defer StatMgr.statIndListnerMu.Unlock()
+
+ for e := StatMgr.statIndListners[t].Front(); e != nil; e = e.Next() {
+ statInd := e.Value.(statRegInfo)
+ if statInd.chn == ch {
+ StatMgr.statIndListners[t].Remove(e)
+ return
+ }
+ }
+}
+
+func (StatMgr *OpenOltStatisticsMgr) processStatIndication(ctx context.Context, t StatType, portNo uint32) {
+ var deRegList []*list.Element
+ var statInd statRegInfo
+
+ StatMgr.statIndListnerMu.Lock()
+ defer StatMgr.statIndListnerMu.Unlock()
+
+ if StatMgr.statIndListners[t] == nil || StatMgr.statIndListners[t].Len() == 0 {
+ logger.Debugf(ctx, "processStatIndication %v list is empty ", t)
+ return
+ }
+
+ for e := StatMgr.statIndListners[t].Front(); e != nil; e = e.Next() {
+ statInd = e.Value.(statRegInfo)
+ if statInd.portNo != portNo {
+ fmt.Printf("Skipping %v\n", e.Value)
+ continue
+ }
+ // message sent
+ statInd.chn <- true
+ deRegList = append(deRegList, e)
+
+ }
+ for _, e := range deRegList {
+ StatMgr.statIndListners[t].Remove(e)
+ }
+
+}