Add KPI collection/submission loops to Tibit adapters
Change-Id: I5d09f59b826ac9687398688bb221f8ecdf5e1d9e
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index c328c13..3eedf9d 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -526,6 +526,7 @@
egress_port_no=egress_port_no, msg_len=len(msg))
def start_kpi_collection(self, device_id):
+
"""Simulate periodic KPI metric collection from the device"""
import random
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index eb7e415..42c082e 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -20,12 +20,14 @@
import json
from uuid import uuid4
+import arrow
import structlog
from scapy.fields import StrField
from scapy.layers.l2 import Ether, Dot1Q
from scapy.packet import Packet, bind_layers
from twisted.internet import reactor
from twisted.internet.defer import DeferredQueue, inlineCallbacks
+from twisted.internet.task import LoopingCall
from zope.interface import implementer
from common.frameio.frameio import BpfProgramFilter, hexify
@@ -49,6 +51,8 @@
from voltha.protos.common_pb2 import OperStatus, AdminState
from voltha.protos.device_pb2 import Device, Port
from voltha.protos.device_pb2 import DeviceType, DeviceTypes
+from voltha.protos.events_pb2 import KpiEvent, MetricValuePairs
+from voltha.protos.events_pb2 import KpiEventType
from voltha.protos.health_pb2 import HealthStatus
from voltha.protos.logical_device_pb2 import LogicalDevice, LogicalPort
from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_10GB_FD, \
@@ -266,6 +270,8 @@
# before checking for ONUs
reactor.callLater(0.1, self._detect_onus, device)
+ self.start_kpi_collection(device.id)
+
@inlineCallbacks
def _detect_onus(self, device):
# send out get 'links' to the OLT device
@@ -720,3 +726,57 @@
tmp.payload
self.io_port.send(str(frame))
+
+ def start_kpi_collection(self, device_id):
+
+ """TMP Simulate periodic KPI metric collection from the device"""
+ import random
+
+ @inlineCallbacks # pretend that we need to do async calls
+ def _collect(device_id, prefix):
+
+ try:
+ # Step 1: gather metrics from device (pretend it here) - examples
+ nni_port_metrics = yield dict(
+ tx_pkts=random.randint(0, 100),
+ rx_pkts=random.randint(0, 100),
+ tx_bytes=random.randint(0, 100000),
+ rx_bytes=random.randint(0, 100000),
+ )
+ pon_port_metrics = yield dict(
+ tx_pkts=nni_port_metrics['rx_pkts'],
+ rx_pkts=nni_port_metrics['tx_pkts'],
+ tx_bytes=nni_port_metrics['rx_bytes'],
+ rx_bytes=nni_port_metrics['tx_bytes'],
+ )
+ olt_metrics = yield dict(
+ cpu_util=20 + 5 * random.random(),
+ buffer_util=10 + 10 * random.random()
+ )
+
+ # Step 2: prepare the KpiEvent for submission
+ # we can time-stamp them here (or could use time derived from OLT
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT-level
+ prefix: MetricValuePairs(metrics=olt_metrics),
+ # OLT NNI port
+ prefix + '.nni': MetricValuePairs(metrics=nni_port_metrics),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(metrics=pon_port_metrics)
+ }
+ )
+
+ # Step 3: submit
+ self.adapter_agent.submit_kpis(kpi_event)
+
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+
+ prefix = 'voltha.{}.{}'.format(self.name, device_id)
+ lc = LoopingCall(_collect, device_id, prefix)
+ lc.start(interval=15) # TODO make this configurable
+
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index fe2b76b..5c11245 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -22,7 +22,9 @@
from uuid import uuid4
+import arrow
import structlog
+from twisted.internet.task import LoopingCall
from zope.interface import implementer
from scapy.layers.inet import ICMP, IP
@@ -37,6 +39,8 @@
from voltha.protos.adapter_pb2 import Adapter, AdapterConfig
from voltha.protos.device_pb2 import Port
from voltha.protos.device_pb2 import DeviceType, DeviceTypes
+from voltha.protos.events_pb2 import KpiEventType
+from voltha.protos.events_pb2 import MetricValuePairs, KpiEvent
from voltha.protos.health_pb2 import HealthStatus
from voltha.protos.common_pb2 import LogLevel, ConnectStatus
from voltha.protos.common_pb2 import OperStatus, AdminState
@@ -193,6 +197,8 @@
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ self.start_kpi_collection(device.id)
+
def abandon_device(self, device):
raise NotImplementedError(0
)
@@ -422,3 +428,57 @@
def receive_packet_out(self, logical_device_id, egress_port_no, msg):
log.info('packet-out', logical_device_id=logical_device_id,
egress_port_no=egress_port_no, msg_len=len(msg))
+
+ def start_kpi_collection(self, device_id):
+
+ """TMP Simulate periodic KPI metric collection from the device"""
+ import random
+
+ @inlineCallbacks # pretend that we need to do async calls
+ def _collect(device_id, prefix):
+
+ try:
+ # Step 1: gather metrics from device (pretend it here) - examples
+ uni_port_metrics = yield dict(
+ tx_pkts=random.randint(0, 100),
+ rx_pkts=random.randint(0, 100),
+ tx_bytes=random.randint(0, 100000),
+ rx_bytes=random.randint(0, 100000),
+ )
+ pon_port_metrics = yield dict(
+ tx_pkts=uni_port_metrics['rx_pkts'],
+ rx_pkts=uni_port_metrics['tx_pkts'],
+ tx_bytes=uni_port_metrics['rx_bytes'],
+ rx_bytes=uni_port_metrics['tx_bytes'],
+ )
+ onu_metrics = yield dict(
+ cpu_util=20 + 5 * random.random(),
+ buffer_util=10 + 10 * random.random()
+ )
+
+ # Step 2: prepare the KpiEvent for submission
+ # we can time-stamp them here (or could use time derived from OLT
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT-level
+ prefix: MetricValuePairs(metrics=onu_metrics),
+ # OLT NNI port
+ prefix + '.nni': MetricValuePairs(metrics=uni_port_metrics),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(metrics=pon_port_metrics)
+ }
+ )
+
+ # Step 3: submit
+ self.adapter_agent.submit_kpis(kpi_event)
+
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+
+ prefix = 'voltha.{}.{}'.format(self.name, device_id)
+ lc = LoopingCall(_collect, device_id, prefix)
+ lc.start(interval=15) # TODO make this configurable
+
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 748872b..5a0e1de 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -308,6 +308,7 @@
self.event_bus.publish(topic, (logical_port_no, packet))
# ~~~~~~~~~~~~~~~~~~~ Handling KPI metric submissions ~~~~~~~~~~~~~~~~~~~~~
+
def submit_kpis(self, kpi_event_msg):
try:
assert isinstance(kpi_event_msg, KpiEvent)