VOL-1381 and VOL-1091 Add feature to collect PMmetrics of OLT, this will collect Tx and Rx and posts to kafka. This commit is also related to changes of commit 72b25abd4e804596413558ef10f098c2262dd65d voltha-go
Updated review comments
Updated with code optimization
Updated with dep files
Updated fix for sanity test
Updated with gomod changes
Change-Id: I899e855812a6eda812c64664a9154321cdb16876
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7f2b9cf..dbf301f 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -35,6 +35,7 @@
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
"github.com/opencord/voltha-protos/go/common"
ic "github.com/opencord/voltha-protos/go/inter_container"
@@ -76,6 +77,9 @@
discOnus map[string]bool
onus map[string]*OnuDevice
nniIntfID int
+ portStats *OpenOltStatisticsMgr
+ metrics *pmmetrics.PmMetrics
+ stopCollector chan bool
}
//OnuDevice represents ONU related info
@@ -89,6 +93,17 @@
uniPorts map[uint32]struct{}
}
+var pmNames = []string{
+ "rx_bytes",
+ "rx_packets",
+ "rx_mcast_packets",
+ "rx_bcast_packets",
+ "tx_bytes",
+ "tx_packets",
+ "tx_mcast_packets",
+ "tx_bcast_packets",
+}
+
//NewOnuDevice creates a new Onu Device
func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string) *OnuDevice {
var device OnuDevice
@@ -122,7 +137,8 @@
// when the first IntfOperInd with status as "up" is received for
// any one of the available NNI port on the OLT device.
dh.nniIntfID = -1
-
+ dh.stopCollector = make(chan bool, 2)
+ dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
//TODO initialize the support classes.
return &dh
}
@@ -352,15 +368,14 @@
go dh.handlePacketIndication(pktInd)
case *oop.Indication_PortStats:
portStats := indication.GetPortStats()
- log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+ go dh.portStats.PortStatisticsIndication(portStats, dh.resourceMgr.DevInfo.GetPonPorts())
case *oop.Indication_FlowStats:
flowStats := indication.GetFlowStats()
log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
case *oop.Indication_AlarmInd:
alarmInd := indication.GetAlarmInd()
log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
- dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
-
+ go dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
}
}
@@ -515,6 +530,8 @@
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
+ // Stats config for new device
+ dh.portStats = NewOpenOltStatsMgr(dh)
// Start reading indications
go dh.readIndications()
@@ -566,11 +583,51 @@
return deviceInfo, nil
}
+func startCollector(dh *DeviceHandler) {
+ // Initial delay for OLT initialization
+ time.Sleep(1 * time.Minute)
+ log.Debugf("Starting-Collector")
+ context := make(map[string]string)
+ for {
+ select {
+ case <-dh.stopCollector:
+ log.Debugw("Stopping-Collector-for-OLT", log.Fields{"deviceID:": dh.deviceID})
+ return
+ default:
+ freq := dh.metrics.ToPmConfigs().DefaultFreq
+ time.Sleep(time.Duration(freq) * time.Second)
+ context["oltid"] = dh.deviceID
+ context["devicetype"] = dh.deviceType
+ // NNI Stats
+ cmnni := dh.portStats.collectNNIMetrics(uint32(0))
+ log.Debugf("Collect-NNI-Metrics %v", cmnni)
+ go dh.portStats.publishMetrics("NNIStats", cmnni, uint32(0), context, dh.deviceID)
+ log.Debugf("Publish-NNI-Metrics")
+ // PON Stats
+ NumPonPORTS := dh.resourceMgr.DevInfo.GetPonPorts()
+ for i := uint32(0); i < NumPonPORTS; i++ {
+ cmpon := dh.portStats.collectPONMetrics(i)
+ log.Debugf("Collect-PON-Metrics %v", cmpon)
+
+ go dh.portStats.publishMetrics("PONStats", cmpon, i, context, dh.deviceID)
+ log.Debugf("Publish-PON-Metrics")
+ }
+ }
+ }
+}
+
//AdoptDevice adopts the OLT device
func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
dh.transitionMap = NewTransitionMap(dh)
log.Infow("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
dh.transitionMap.Handle(DeviceInit)
+
+ // Now, set the initial PM configuration for that device
+ if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+ log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ go startCollector(dh)
}
//GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -893,7 +950,7 @@
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if err != nil {
log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
- "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id, "Error": err})
+ "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
return
}
} else {
@@ -1199,6 +1256,8 @@
delete(dh.onus, onu)
}
log.Debug("Removed-device-from-Resource-manager-KV-store")
+ // Stop the Stats collector
+ dh.stopCollector <- true
//Reset the state
if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
log.Errorw("Failed-to-reboot-olt ", log.Fields{"err": err})
diff --git a/adaptercore/device_handler_test.go b/adaptercore/device_handler_test.go
index 6e94215..924430d 100644
--- a/adaptercore/device_handler_test.go
+++ b/adaptercore/device_handler_test.go
@@ -22,6 +22,9 @@
"net"
"reflect"
"testing"
+ "time"
+
+ "github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
"github.com/golang/protobuf/ptypes"
"github.com/golang/protobuf/ptypes/any"
@@ -43,6 +46,7 @@
func newMockCoreProxy() *mocks.MockCoreProxy {
mcp := mocks.MockCoreProxy{}
mcp.Devices = make(map[string]*voltha.Device)
+ var pm []*voltha.PmConfig
mcp.Devices["olt"] = &voltha.Device{
Id: "olt",
@@ -61,6 +65,13 @@
ChannelGroupId: 1,
},
ConnectStatus: 1,
+ PmConfigs: &voltha.PmConfigs{
+ DefaultFreq: 10,
+ Id: "olt",
+ FreqOverride: false,
+ Grouped: false,
+ Metrics: pm,
+ },
}
mcp.Devices["onu1"] = &voltha.Device{
@@ -80,6 +91,13 @@
ChannelGroupId: 1,
},
ConnectStatus: 1,
+ PmConfigs: &voltha.PmConfigs{
+ DefaultFreq: 10,
+ Id: "olt",
+ FreqOverride: false,
+ Grouped: false,
+ Metrics: pm,
+ },
}
mcp.Devices["onu2"] = &voltha.Device{
Id: "2",
@@ -99,6 +117,13 @@
ChannelGroupId: 1,
},
ConnectStatus: 1,
+ PmConfigs: &voltha.PmConfigs{
+ DefaultFreq: 10,
+ Id: "olt",
+ FreqOverride: false,
+ Grouped: false,
+ Metrics: pm,
+ },
}
return &mcp
}
@@ -131,6 +156,8 @@
dh.Client = &mocks.MockOpenoltClient{}
dh.eventMgr = &OpenOltEventMgr{eventProxy: &mocks.MockEventProxy{}}
dh.transitionMap = &TransitionMap{}
+ dh.portStats = &OpenOltStatisticsMgr{}
+ dh.metrics = &pmmetrics.PmMetrics{}
return dh
}
@@ -701,7 +728,7 @@
}{
// TODO: Add test cases.
{"AdoptDevice-1", dh1, args{device: dh1.device}},
- {"AdoptDevice-1", dh2, args{device: dh2.device}},
+ {"AdoptDevice-2", dh2, args{device: dh2.device}},
}
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
@@ -985,3 +1012,33 @@
})
}
}
+
+func Test_startCollector(t *testing.T) {
+ type args struct {
+ dh *DeviceHandler
+ }
+ dh := newMockDeviceHandler()
+ dh.portStats.NorthBoundPort = make(map[uint32]*NniPort)
+ dh.portStats.NorthBoundPort[0] = &NniPort{Name: "OLT-1"}
+ dh.portStats.SouthBoundPort = make(map[uint32]*PonPort)
+ dh.portStats.Device = dh
+ for i := 0; i < 16; i++ {
+ dh.portStats.SouthBoundPort[uint32(i)] = &PonPort{DeviceID: "OLT-1"}
+ }
+ tests := []struct {
+ name string
+ args args
+ }{
+ // TODO: Add test cases.
+ {"StartCollector-1", args{dh}},
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ go func() {
+ time.Sleep(2 * time.Minute)
+ tt.args.dh.stopCollector <- true
+ }()
+ startCollector(tt.args.dh)
+ })
+ }
+}
diff --git a/adaptercore/olt_platform.go b/adaptercore/olt_platform.go
index 5f48bc6..ed6d489 100644
--- a/adaptercore/olt_platform.go
+++ b/adaptercore/olt_platform.go
@@ -127,6 +127,17 @@
return 0
}
+//PortNoToIntfID returns portnumber derived from interfaceID
+func PortNoToIntfID(portno uint32, intfType voltha.Port_PortType) uint32 {
+ if (intfType) == voltha.Port_ETHERNET_NNI {
+ return (1 << 16) ^ portno
+ }
+ if (intfType) == voltha.Port_PON_OLT {
+ return (2 << 28) ^ portno
+ }
+ return 0
+}
+
//IntfIDFromNniPortNum returns Intf ID derived from portNum
func IntfIDFromNniPortNum(portNum uint32) uint32 {
return portNum & 0xFFFF
diff --git a/adaptercore/statsmanager.go b/adaptercore/statsmanager.go
index 778accc..6a67f15 100755
--- a/adaptercore/statsmanager.go
+++ b/adaptercore/statsmanager.go
@@ -20,12 +20,16 @@
import (
"errors"
"fmt"
+ "sync"
+ "time"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
"github.com/opencord/voltha-protos/go/openolt"
"github.com/opencord/voltha-protos/go/voltha"
)
+var mutex = &sync.Mutex{}
+
// PonPort representation
type PonPort struct {
/*
@@ -74,7 +78,7 @@
PON.IntfID = IntfID
PON.PortNum = PortNum
PON.PortID = 0
- PON.Label = fmt.Sprintf("%s,%d", "pon-", PONID)
+ PON.Label = fmt.Sprintf("%s%d", "pon-", PONID)
PON.ONUs = make(map[uint32]interface{})
PON.ONUsByID = make(map[uint32]interface{})
@@ -145,7 +149,7 @@
var NNI NniPort
NNI.PortNum = PortNum
- NNI.Name = fmt.Sprintf("%s,%d", "nni-", PortNum)
+ NNI.Name = fmt.Sprintf("%s%d", "nni-", PortNum)
NNI.IntfID = IntfID
NNI.RxBytes = 0
@@ -166,8 +170,8 @@
// OpenOltStatisticsMgr structure
type OpenOltStatisticsMgr struct {
Device *DeviceHandler
- NorthBoundPort map[uint32]NniPort
- SouthBoundPort map[uint32]PonPort
+ NorthBoundPort map[uint32]*NniPort
+ SouthBoundPort map[uint32]*PonPort
// TODO PMMetrics Metrics
}
@@ -181,16 +185,16 @@
// Northbound and Southbound ports
// added to initialize the pm_metrics
var Ports interface{}
- Ports, _ = InitPorts("nni", Dev.deviceID)
- StatMgr.NorthBoundPort, _ = Ports.(map[uint32]NniPort)
- Ports, _ = InitPorts("pon", Dev.deviceID)
- StatMgr.SouthBoundPort, _ = Ports.(map[uint32]PonPort)
-
+ Ports, _ = InitPorts("nni", Dev.deviceID, 1)
+ StatMgr.NorthBoundPort, _ = Ports.(map[uint32]*NniPort)
+ NumPonPorts := Dev.resourceMgr.DevInfo.GetPonPorts()
+ Ports, _ = InitPorts("pon", Dev.deviceID, NumPonPorts)
+ StatMgr.SouthBoundPort, _ = Ports.(map[uint32]*PonPort)
return &StatMgr
}
// InitPorts collects the port objects: nni and pon that are updated with the current data from the OLT
-func InitPorts(Intftype string, DeviceID string) (interface{}, error) {
+func InitPorts(Intftype string, DeviceID string, numOfPorts uint32) (interface{}, error) {
/*
This method collects the port objects: nni and pon that are updated with the
current data from the OLT
@@ -204,17 +208,17 @@
*/
var i uint32
if Intftype == "nni" {
- NniPorts := make(map[uint32]NniPort)
- for i = 0; i <= 1; i++ {
+ NniPorts := make(map[uint32]*NniPort)
+ for i = 0; i < numOfPorts; i++ {
Port := BuildPortObject(i, "nni", DeviceID).(*NniPort)
- NniPorts[Port.IntfID] = *Port
+ NniPorts[Port.IntfID] = Port
}
return NniPorts, nil
} else if Intftype == "pon" {
- PONPorts := make(map[uint32]PonPort)
- for i = 0; i <= 16; i++ {
+ PONPorts := make(map[uint32]*PonPort)
+ for i = 0; i < numOfPorts; i++ {
PONPort := BuildPortObject(i, "pon", DeviceID).(*PonPort)
- PONPorts[PONPort.IntfID] = *PONPort
+ PONPorts[PortNoToIntfID(PONPort.IntfID, voltha.Port_PON_OLT)] = PONPort
}
return PONPorts, nil
} else {
@@ -237,13 +241,16 @@
//This builds a port object which is added to the
//appropriate northbound or southbound values
if IntfType == "nni" {
- IntfID := IntfIDToPortNo(PortNum, voltha.Port_ETHERNET_UNI)
- return NewNniPort(PortNum, IntfID)
+ IntfID := IntfIDToPortNo(PortNum, voltha.Port_ETHERNET_NNI)
+ nniID := PortNoToIntfID(IntfID, voltha.Port_ETHERNET_NNI)
+ log.Debugf("NniID %v", nniID)
+ return NewNniPort(PortNum, nniID)
} else if IntfType == "pon" {
// PON ports require a different configuration
// intf_id and pon_id are currently equal.
- IntfID := IntfIDToPortNo(PortNum, voltha.Port_ETHERNET_NNI)
- PONID := IntfID
+ IntfID := IntfIDToPortNo(PortNum, voltha.Port_PON_OLT)
+ PONID := PortNoToIntfID(IntfID, voltha.Port_PON_OLT)
+ log.Debugf("PonID %v", PONID)
return NewPONPort(PONID, DeviceID, IntfID, PortNum)
} else {
log.Errorf("Invalid type of interface %s", IntfType)
@@ -251,10 +258,114 @@
}
}
+// collectNNIMetrics will collect the nni port metrics
+func (StatMgr *OpenOltStatisticsMgr) collectNNIMetrics(nniID uint32) map[string]float32 {
+
+ nnival := make(map[string]float32)
+ mutex.Lock()
+ cm := StatMgr.Device.portStats.NorthBoundPort[nniID]
+ mutex.Unlock()
+ metricName := StatMgr.Device.metrics.GetSubscriberMetrics()
+
+ if metricName != nil && len(metricName) > 0 {
+ for mName := range metricName {
+ switch mName {
+ case "rx_bytes":
+ nnival["RxBytes"] = float32(cm.RxBytes)
+ case "rx_packets":
+ nnival["RxPackets"] = float32(cm.RxPackets)
+ case "rx_mcast_packets":
+ nnival["RxMcastPackets"] = float32(cm.RxMcastPackets)
+ case "rx_bcast_packets":
+ nnival["RxBcastPackets"] = float32(cm.RxBcastPackets)
+ case "tx_bytes":
+ nnival["TxBytes"] = float32(cm.TxBytes)
+ case "tx_packets":
+ nnival["TxPackets"] = float32(cm.TxPackets)
+ case "tx_mcast_packets":
+ nnival["TxMcastPackets"] = float32(cm.TxMcastPackets)
+ case "tx_bcast_packets":
+ nnival["TxBcastPackets"] = float32(cm.TxBcastPackets)
+ }
+ }
+ }
+ return nnival
+}
+
+// collectPONMetrics will collect the pon port metrics
+func (StatMgr *OpenOltStatisticsMgr) collectPONMetrics(pID uint32) map[string]float32 {
+
+ ponval := make(map[string]float32)
+ mutex.Lock()
+ cm := StatMgr.Device.portStats.SouthBoundPort[pID]
+ mutex.Unlock()
+ metricName := StatMgr.Device.metrics.GetSubscriberMetrics()
+
+ if metricName != nil && len(metricName) > 0 {
+ for mName := range metricName {
+ switch mName {
+ case "rx_bytes":
+ ponval["RxBytes"] = float32(cm.RxBytes)
+ case "rx_packets":
+ ponval["RxPackets"] = float32(cm.RxPackets)
+ case "rx_mcast_packets":
+ ponval["RxMcastPackets"] = float32(cm.RxMcastPackets)
+ case "rx_bcast_packets":
+ ponval["RxBcastPackets"] = float32(cm.RxBcastPackets)
+ case "tx_bytes":
+ ponval["TxBytes"] = float32(cm.TxBytes)
+ case "tx_packets":
+ ponval["TxPackets"] = float32(cm.TxPackets)
+ case "tx_mcast_packets":
+ ponval["TxMcastPackets"] = float32(cm.TxMcastPackets)
+ case "tx_bcast_packets":
+ ponval["TxBcastPackets"] = float32(cm.TxBcastPackets)
+ }
+ }
+ }
+ return ponval
+}
+
+// publishMatrics will publish the pon port metrics
+func (StatMgr OpenOltStatisticsMgr) publishMetrics(portType string, val map[string]float32, portnum uint32, context map[string]string, devID string) {
+ log.Debugf("Post-%v %v", portType, val)
+
+ var metricInfo voltha.MetricInformation
+ var ke voltha.KpiEvent2
+ var volthaEventSubCatgry voltha.EventSubCategory_EventSubCategory
+
+ if portType == "NNIStats" {
+ volthaEventSubCatgry = voltha.EventSubCategory_NNI
+ } else {
+ volthaEventSubCatgry = voltha.EventSubCategory_PON
+ }
+
+ raisedTs := time.Now().UnixNano()
+ mmd := voltha.MetricMetaData{
+ Title: portType,
+ Ts: float64(raisedTs),
+ Context: context,
+ DeviceId: devID,
+ }
+
+ metricInfo.Metadata = &mmd
+ metricInfo.Metrics = val
+
+ ke.SliceData = []*voltha.MetricInformation{&metricInfo}
+ ke.Type = voltha.KpiEventType_slice
+ ke.Ts = float64(time.Now().UnixNano())
+
+ if err := StatMgr.Device.EventProxy.SendKpiEvent("STATS_EVENT", &ke, voltha.EventCategory_EQUIPMENT, volthaEventSubCatgry, raisedTs); err != nil {
+ log.Errorw("Failed to send Pon stats", log.Fields{"err": err})
+ }
+
+}
+
// PortStatisticsIndication handles the port statistics indication
-func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(PortStats *openolt.PortStatistics) {
+func (StatMgr *OpenOltStatisticsMgr) PortStatisticsIndication(PortStats *openolt.PortStatistics, NumPonPorts uint32) {
log.Debugf("port-stats-collected %v", PortStats)
- StatMgr.PortsStatisticsKpis(PortStats)
+ StatMgr.PortsStatisticsKpis(PortStats, NumPonPorts)
+ log.Infow("Received port stats indication", log.Fields{"PortStats": PortStats})
// TODO send stats to core topic to the voltha kafka or a different kafka ?
}
@@ -265,7 +376,7 @@
}
// PortsStatisticsKpis map the port stats values into a dictionary, creates the kpiEvent and then publish to Kafka
-func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(PortStats *openolt.PortStatistics) {
+func (StatMgr *OpenOltStatisticsMgr) PortsStatisticsKpis(PortStats *openolt.PortStatistics, NumPonPorts uint32) {
/*map the port stats values into a dictionary
Create a kpoEvent and publish to Kafka
@@ -276,7 +387,7 @@
//var err error
IntfID := PortStats.IntfId
- if (IntfIDToPortNo(0, voltha.Port_ETHERNET_NNI) < IntfID) &&
+ if (IntfIDToPortNo(1, voltha.Port_ETHERNET_NNI) < IntfID) &&
(IntfID < IntfIDToPortNo(4, voltha.Port_ETHERNET_NNI)) {
/*
for this release we are only interested in the first NNI for
@@ -284,24 +395,45 @@
we are not using the other 3
*/
return
- }
- PMData := make(map[string]uint64)
- PMData["rx_bytes"] = PortStats.RxBytes
- PMData["rx_packets"] = PortStats.RxPackets
- PMData["rx_ucast_packets"] = PortStats.RxUcastPackets
- PMData["rx_mcast_packets"] = PortStats.RxMcastPackets
- PMData["rx_bcast_packets"] = PortStats.RxBcastPackets
- PMData["rx_error_packets"] = PortStats.RxErrorPackets
- PMData["tx_bytes"] = PortStats.TxBytes
- PMData["tx_packets"] = PortStats.TxPackets
- PMData["tx_ucast_packets"] = PortStats.TxUcastPackets
- PMData["tx_mcast_packets"] = PortStats.TxMcastPackets
- PMData["tx_bcast_packets"] = PortStats.TxBcastPackets
- PMData["tx_error_packets"] = PortStats.TxErrorPackets
- PMData["rx_crc_errors"] = PortStats.RxCrcErrors
- PMData["bip_errors"] = PortStats.BipErrors
+ } else if IntfIDToPortNo(0, voltha.Port_ETHERNET_NNI) == IntfID {
- PMData["intf_id"] = uint64(PortStats.IntfId)
+ var portNNIStat NniPort
+ portNNIStat.IntfID = IntfID
+ portNNIStat.PortNum = uint32(0)
+ portNNIStat.RxBytes = PortStats.RxBytes
+ portNNIStat.RxPackets = PortStats.RxPackets
+ portNNIStat.RxMcastPackets = PortStats.RxMcastPackets
+ portNNIStat.RxBcastPackets = PortStats.RxBcastPackets
+ portNNIStat.TxBytes = PortStats.TxBytes
+ portNNIStat.TxPackets = PortStats.TxPackets
+ portNNIStat.TxMcastPackets = PortStats.TxMcastPackets
+ portNNIStat.TxBcastPackets = PortStats.TxBcastPackets
+ mutex.Lock()
+ StatMgr.NorthBoundPort[0] = &portNNIStat
+ mutex.Unlock()
+ log.Debugf("Received-NNI-Stats: %v", StatMgr.NorthBoundPort)
+ }
+ for i := uint32(0); i < NumPonPorts; i++ {
+
+ if IntfIDToPortNo(i, voltha.Port_PON_OLT) == IntfID {
+ var portPonStat PonPort
+ portPonStat.IntfID = IntfID
+ portPonStat.PortNum = i
+ portPonStat.PONID = i
+ portPonStat.RxBytes = PortStats.RxBytes
+ portPonStat.RxPackets = PortStats.RxPackets
+ portPonStat.RxMcastPackets = PortStats.RxMcastPackets
+ portPonStat.RxBcastPackets = PortStats.RxBcastPackets
+ portPonStat.TxBytes = PortStats.TxBytes
+ portPonStat.TxPackets = PortStats.TxPackets
+ portPonStat.TxMcastPackets = PortStats.TxMcastPackets
+ portPonStat.TxBcastPackets = PortStats.TxBcastPackets
+ mutex.Lock()
+ StatMgr.SouthBoundPort[i] = &portPonStat
+ mutex.Unlock()
+ log.Debugf("Received-PON-Stats-for-Port %v : %v", i, StatMgr.SouthBoundPort[i])
+ }
+ }
/*
Based upon the intf_id map to an nni port or a pon port
diff --git a/adaptercore/statsmanager_test.go b/adaptercore/statsmanager_test.go
index 88f3703..91f6c87 100644
--- a/adaptercore/statsmanager_test.go
+++ b/adaptercore/statsmanager_test.go
@@ -18,6 +18,7 @@
package adaptercore
import (
+ "reflect"
"testing"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
@@ -45,7 +46,7 @@
},
ConnectStatus: 1,
}
- dh := &DeviceHandler{}
+ dh := newMockDeviceHandler()
dh.device = device
StatMgr := NewOpenOltStatsMgr(dh)
@@ -62,7 +63,215 @@
for _, tt := range tests {
t.Run(tt.name, func(t *testing.T) {
- StatMgr.PortStatisticsIndication(tt.args.PortStats)
+ StatMgr.PortStatisticsIndication(tt.args.PortStats, 16)
+ })
+ }
+}
+
+func TestOpenOltStatisticsMgr_publishMetrics(t *testing.T) {
+ type fields struct {
+ Device *DeviceHandler
+ NorthBoundPort map[uint32]*NniPort
+ SouthBoundPort map[uint32]*PonPort
+ }
+ type args struct {
+ portType string
+ val map[string]float32
+ portnum uint32
+ context map[string]string
+ }
+ ctx := map[string]string{}
+ ctx["deviceID"] = "Test"
+ ponmap := map[uint32]*PonPort{}
+ ponmap[0] = &PonPort{
+ PONID: 0,
+ DeviceID: "onu1",
+ IntfID: 0,
+ PortNum: 0,
+ PortID: 0,
+ Label: "",
+ ONUs: nil,
+ ONUsByID: nil,
+ RxBytes: 0,
+ RxPackets: 0,
+ RxMcastPackets: 0,
+ RxBcastPackets: 0,
+ RxErrorPackets: 0,
+ TxBytes: 0,
+ TxPackets: 0,
+ TxUcastPackets: 0,
+ TxMcastPackets: 0,
+ TxBcastPackets: 0,
+ TxErrorPackets: 0,
+ }
+ nnimap := map[uint32]*NniPort{}
+ nnimap[0] = &NniPort{
+ PortNum: 0,
+ Name: "olt1",
+ LogicalPort: 0,
+ IntfID: 0,
+ RxBytes: 0,
+ RxPackets: 0,
+ RxMcastPackets: uint64(1111),
+ RxBcastPackets: 0,
+ RxErrorPackets: 0,
+ TxBytes: 0,
+ TxPackets: 0,
+ TxUcastPackets: 0,
+ TxMcastPackets: 0,
+ TxBcastPackets: 0,
+ TxErrorPackets: 0,
+ }
+ pval := make(map[string]float32)
+ pval["rx_bytes"] = float32(111)
+ nval := make(map[string]float32)
+ nval["rx_bytes"] = float32(111)
+ dhandlerNNI := newMockDeviceHandler()
+ dhandlerNNI.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: nil, NorthBoundPort: nnimap}
+ dhandlerPON := newMockDeviceHandler()
+ dhandlerPON.portStats = &OpenOltStatisticsMgr{Device: nil, SouthBoundPort: ponmap, NorthBoundPort: nil}
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ }{
+ {
+ name: "PublishNNIMetrics-1",
+ fields: fields{
+ Device: dhandlerNNI,
+ NorthBoundPort: nnimap,
+ SouthBoundPort: nil,
+ },
+ args: args{
+ portType: "NNIStats",
+ val: nval,
+ portnum: 0,
+ context: ctx,
+ },
+ },
+ {
+ name: "PublishPONMetrics-1",
+ fields: fields{
+ Device: dhandlerPON,
+ NorthBoundPort: nil,
+ SouthBoundPort: ponmap,
+ },
+ args: args{
+ portType: "PONStats",
+ val: pval,
+ portnum: 0,
+ context: ctx,
+ },
+ },
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ StatMgr := &OpenOltStatisticsMgr{
+ Device: tt.fields.Device,
+ NorthBoundPort: tt.fields.NorthBoundPort,
+ SouthBoundPort: tt.fields.SouthBoundPort,
+ }
+ StatMgr.publishMetrics(tt.args.portType, tt.args.val, tt.args.portnum, tt.args.context, "onu1")
+
+ })
+ }
+}
+
+func TestOpenOltStatisticsMgr_collectNNIMetrics(t *testing.T) {
+ type fields struct {
+ Device *DeviceHandler
+ NorthBoundPort map[uint32]*NniPort
+ SouthBoundPort map[uint32]*PonPort
+ }
+ type args struct {
+ nniID uint32
+ }
+ dhandler := newMockDeviceHandler()
+ pmconfig := make(map[string]*voltha.PmConfig)
+ pmconfig["rx_bytes"] = &voltha.PmConfig{Name: "olt"}
+
+ var res map[string]float32
+ nnimap := map[uint32]*NniPort{}
+ nnimap[0] = &NniPort{Name: "olt"}
+ nnimap[1] = &NniPort{Name: "olt"}
+ dh := &DeviceHandler{portStats: &OpenOltStatisticsMgr{Device: dhandler, SouthBoundPort: nil, NorthBoundPort: nnimap}}
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ want map[string]float32
+ }{
+ {"CollectNNIMetrics-1", fields{
+ Device: dh,
+ NorthBoundPort: nnimap,
+ SouthBoundPort: nil,
+ }, args{0}, res},
+ {"CollectNNIMetrics-2", fields{
+ Device: dh,
+ NorthBoundPort: nnimap,
+ SouthBoundPort: nil,
+ }, args{1}, res},
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ StatMgr := &OpenOltStatisticsMgr{
+ Device: tt.fields.Device,
+ NorthBoundPort: tt.fields.NorthBoundPort,
+ SouthBoundPort: tt.fields.SouthBoundPort,
+ }
+ got := StatMgr.collectNNIMetrics(tt.args.nniID)
+ if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ t.Errorf("collectNNIMetrics() = %v, want %v", got, tt.want)
+ }
+ })
+ }
+}
+
+func TestOpenOltStatisticsMgr_collectPONMetrics(t *testing.T) {
+ type fields struct {
+ Device *DeviceHandler
+ NorthBoundPort map[uint32]*NniPort
+ SouthBoundPort map[uint32]*PonPort
+ }
+ type args struct {
+ pID uint32
+ }
+ dhandler := newMockDeviceHandler()
+ pmconfig := make(map[string]*voltha.PmConfig)
+ pmconfig["rx_bytes"] = &voltha.PmConfig{Name: "olt"}
+
+ var res map[string]float32
+ ponmap := map[uint32]*PonPort{}
+ ponmap[0] = &PonPort{DeviceID: "olt"}
+ ponmap[1] = &PonPort{DeviceID: "olt"}
+ dh := &DeviceHandler{portStats: &OpenOltStatisticsMgr{Device: dhandler, SouthBoundPort: ponmap, NorthBoundPort: nil}}
+
+ tests := []struct {
+ name string
+ fields fields
+ args args
+ want map[string]float32
+ }{
+ {"CollectPONMetrics-1", fields{
+ Device: dh,
+ NorthBoundPort: nil,
+ SouthBoundPort: ponmap,
+ }, args{0}, res},
+ // TODO: Add test cases.
+ }
+ for _, tt := range tests {
+ t.Run(tt.name, func(t *testing.T) {
+ StatMgr := &OpenOltStatisticsMgr{
+ Device: tt.fields.Device,
+ NorthBoundPort: tt.fields.NorthBoundPort,
+ SouthBoundPort: tt.fields.SouthBoundPort,
+ }
+ got := StatMgr.collectPONMetrics(tt.args.pID)
+ if reflect.TypeOf(got) != reflect.TypeOf(tt.want) {
+ t.Errorf("collectPONMetrics() = %v, want %v", got, tt.want)
+ }
})
}
}