VOL-1381 and VOL-1091 Add feature to collect PMmetrics of OLT, this will collect Tx and Rx and posts to kafka. This commit is also related to changes of commit 72b25abd4e804596413558ef10f098c2262dd65d voltha-go
Updated review comments
Updated with code optimization
Updated with dep files
Updated fix for sanity test
Updated with gomod changes
Change-Id: I899e855812a6eda812c64664a9154321cdb16876
diff --git a/adaptercore/device_handler.go b/adaptercore/device_handler.go
index 7f2b9cf..dbf301f 100644
--- a/adaptercore/device_handler.go
+++ b/adaptercore/device_handler.go
@@ -35,6 +35,7 @@
"github.com/golang/protobuf/ptypes"
"github.com/opencord/voltha-lib-go/v2/pkg/adapters/adapterif"
"github.com/opencord/voltha-lib-go/v2/pkg/log"
+ "github.com/opencord/voltha-lib-go/v2/pkg/pmmetrics"
rsrcMgr "github.com/opencord/voltha-openolt-adapter/adaptercore/resourcemanager"
"github.com/opencord/voltha-protos/go/common"
ic "github.com/opencord/voltha-protos/go/inter_container"
@@ -76,6 +77,9 @@
discOnus map[string]bool
onus map[string]*OnuDevice
nniIntfID int
+ portStats *OpenOltStatisticsMgr
+ metrics *pmmetrics.PmMetrics
+ stopCollector chan bool
}
//OnuDevice represents ONU related info
@@ -89,6 +93,17 @@
uniPorts map[uint32]struct{}
}
+var pmNames = []string{
+ "rx_bytes",
+ "rx_packets",
+ "rx_mcast_packets",
+ "rx_bcast_packets",
+ "tx_bytes",
+ "tx_packets",
+ "tx_mcast_packets",
+ "tx_bcast_packets",
+}
+
//NewOnuDevice creates a new Onu Device
func NewOnuDevice(devID, deviceTp, serialNum string, onuID, intfID uint32, proxyDevID string) *OnuDevice {
var device OnuDevice
@@ -122,7 +137,8 @@
// when the first IntfOperInd with status as "up" is received for
// any one of the available NNI port on the OLT device.
dh.nniIntfID = -1
-
+ dh.stopCollector = make(chan bool, 2)
+ dh.metrics = pmmetrics.NewPmMetrics(cloned.Id, pmmetrics.Frequency(150), pmmetrics.FrequencyOverride(false), pmmetrics.Grouped(false), pmmetrics.Metrics(pmNames))
//TODO initialize the support classes.
return &dh
}
@@ -352,15 +368,14 @@
go dh.handlePacketIndication(pktInd)
case *oop.Indication_PortStats:
portStats := indication.GetPortStats()
- log.Infow("Received port stats indication", log.Fields{"PortStats": portStats})
+ go dh.portStats.PortStatisticsIndication(portStats, dh.resourceMgr.DevInfo.GetPonPorts())
case *oop.Indication_FlowStats:
flowStats := indication.GetFlowStats()
log.Infow("Received flow stats", log.Fields{"FlowStats": flowStats})
case *oop.Indication_AlarmInd:
alarmInd := indication.GetAlarmInd()
log.Infow("Received alarm indication ", log.Fields{"AlarmInd": alarmInd})
- dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
-
+ go dh.eventMgr.ProcessEvents(alarmInd, dh.deviceID, raisedTs)
}
}
@@ -515,6 +530,8 @@
/* TODO: Instantiate Alarm , stats , BW managers */
/* Instantiating Event Manager to handle Alarms and KPIs */
dh.eventMgr = NewEventMgr(dh.EventProxy, dh)
+ // Stats config for new device
+ dh.portStats = NewOpenOltStatsMgr(dh)
// Start reading indications
go dh.readIndications()
@@ -566,11 +583,51 @@
return deviceInfo, nil
}
+func startCollector(dh *DeviceHandler) {
+ // Initial delay for OLT initialization
+ time.Sleep(1 * time.Minute)
+ log.Debugf("Starting-Collector")
+ context := make(map[string]string)
+ for {
+ select {
+ case <-dh.stopCollector:
+ log.Debugw("Stopping-Collector-for-OLT", log.Fields{"deviceID:": dh.deviceID})
+ return
+ default:
+ freq := dh.metrics.ToPmConfigs().DefaultFreq
+ time.Sleep(time.Duration(freq) * time.Second)
+ context["oltid"] = dh.deviceID
+ context["devicetype"] = dh.deviceType
+ // NNI Stats
+ cmnni := dh.portStats.collectNNIMetrics(uint32(0))
+ log.Debugf("Collect-NNI-Metrics %v", cmnni)
+ go dh.portStats.publishMetrics("NNIStats", cmnni, uint32(0), context, dh.deviceID)
+ log.Debugf("Publish-NNI-Metrics")
+ // PON Stats
+ NumPonPORTS := dh.resourceMgr.DevInfo.GetPonPorts()
+ for i := uint32(0); i < NumPonPORTS; i++ {
+ cmpon := dh.portStats.collectPONMetrics(i)
+ log.Debugf("Collect-PON-Metrics %v", cmpon)
+
+ go dh.portStats.publishMetrics("PONStats", cmpon, i, context, dh.deviceID)
+ log.Debugf("Publish-PON-Metrics")
+ }
+ }
+ }
+}
+
//AdoptDevice adopts the OLT device
func (dh *DeviceHandler) AdoptDevice(device *voltha.Device) {
dh.transitionMap = NewTransitionMap(dh)
log.Infow("Adopt_device", log.Fields{"deviceID": device.Id, "Address": device.GetHostAndPort()})
dh.transitionMap.Handle(DeviceInit)
+
+ // Now, set the initial PM configuration for that device
+ if err := dh.coreProxy.DevicePMConfigUpdate(nil, dh.metrics.ToPmConfigs()); err != nil {
+ log.Errorw("error-updating-PMs", log.Fields{"deviceId": device.Id, "error": err})
+ }
+
+ go startCollector(dh)
}
//GetOfpDeviceInfo Gets the Ofp information of the given device
@@ -893,7 +950,7 @@
"openolt", onuDevice.Type, onuDevice.Id, onuDevice.ProxyAddress.DeviceId, "")
if err != nil {
log.Errorw("Failed to send inter-adapter-message", log.Fields{"OnuInd": onuInd,
- "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id, "Error": err})
+ "From Adapter": "openolt", "DevieType": onuDevice.Type, "DeviceID": onuDevice.Id})
return
}
} else {
@@ -1199,6 +1256,8 @@
delete(dh.onus, onu)
}
log.Debug("Removed-device-from-Resource-manager-KV-store")
+ // Stop the Stats collector
+ dh.stopCollector <- true
//Reset the state
if _, err := dh.Client.Reboot(context.Background(), new(oop.Empty)); err != nil {
log.Errorw("Failed-to-reboot-olt ", log.Fields{"err": err})