[VOL-1034] This commit consists of:
1) Implement PM collections from the ONU
2) Update the Registration method to include for the adapter type
and its supported device types.
Change-Id: Id984468546328b6ebf2ca47578675c69b2b66f01
diff --git a/adapters/ponsim_onu/ponsim_onu.py b/adapters/ponsim_onu/ponsim_onu.py
index dfac1d3..e15d0a9 100644
--- a/adapters/ponsim_onu/ponsim_onu.py
+++ b/adapters/ponsim_onu/ponsim_onu.py
@@ -18,21 +18,31 @@
Represents an ONU device
"""
+from uuid import uuid4
+
+import arrow
import structlog
-from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue
+from google.protobuf.json_format import MessageToDict
+from google.protobuf.message import Message
+from simplejson import dumps
+from twisted.internet.defer import DeferredQueue, inlineCallbacks, \
+ returnValue, Deferred
+from twisted.internet.task import LoopingCall
from adapters.common.utils.asleep import asleep
from adapters.iadapter import OnuAdapter
+from adapters.kafka.kafka_proxy import get_kafka_proxy
from adapters.protos import third_party
from adapters.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
from adapters.protos.core_adapter_pb2 import PortCapability, \
InterAdapterMessageType, InterAdapterResponseBody
-from adapters.protos.device_pb2 import Port
+from adapters.protos.device_pb2 import Port, PmConfig, PmConfigs
+from adapters.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
from adapters.protos.logical_device_pb2 import LogicalPort
from adapters.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD
from adapters.protos.openflow_13_pb2 import ofp_port
-from adapters.protos.ponsim_pb2 import FlowTable
+from adapters.protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
_ = third_party
log = structlog.get_logger()
@@ -42,9 +52,97 @@
return tuple(int(d, 16) for d in mac.split(':'))
+class AdapterPmMetrics:
+ def __init__(self, device):
+ self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+ 'tx_256_511_pkts', 'tx_512_1023_pkts',
+ 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+ 'rx_64_pkts', 'rx_65_127_pkts',
+ 'rx_128_255_pkts', 'rx_256_511_pkts',
+ 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+ 'rx_1519_9k_pkts'}
+ self.device = device
+ self.id = device.id
+ self.name = 'ponsim_onu'
+ self.default_freq = 150
+ self.grouped = False
+ self.freq_override = False
+ self.pm_metrics = None
+ self.pon_metrics_config = dict()
+ self.uni_metrics_config = dict()
+ self.lc = None
+ for m in self.pm_names:
+ self.pon_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+ self.uni_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+
+ def update(self, pm_config):
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
+ for m in pm_config.metrics:
+ self.pon_metrics_config[m.name].enabled = m.enabled
+ self.uni_metrics_config[m.name].enabled = m.enabled
+
+ def make_proto(self):
+ pm_config = PmConfigs(
+ id=self.id,
+ default_freq=self.default_freq,
+ grouped=False,
+ freq_override=False)
+ for m in sorted(self.pon_metrics_config):
+ pm = self.pon_metrics_config[m] # Either will do they're the same
+ pm_config.metrics.extend([PmConfig(name=pm.name,
+ type=pm.type,
+ enabled=pm.enabled)])
+ return pm_config
+
+ def extract_metrics(self, stats):
+ rtrn_port_metrics = dict()
+ rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+ rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
+ return rtrn_port_metrics
+
+ def extract_pon_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "pon":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def extract_uni_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "uni":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def start_collector(self, callback):
+ log.info("starting-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+ self.lc = LoopingCall(callback, self.device.id, prefix)
+ self.lc.start(interval=self.default_freq / 10)
+
+ def stop_collector(self):
+ log.info("stopping-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ self.lc.stop()
+
+
class PonSimOnuAdapter(OnuAdapter):
def __init__(self, core_proxy, adapter_proxy, config):
- # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number as specified by standard
+ # DeviceType of ONU should be same as VENDOR ID of ONU Serial Number
+ # as specified by standard
# requires for identifying correct adapter or ranged ONU
super(PonSimOnuAdapter, self).__init__(core_proxy=core_proxy,
adapter_proxy=adapter_proxy,
@@ -68,14 +166,27 @@
self.device_parent_id = None
self.log = structlog.get_logger(device_id=device_id)
self.incoming_messages = DeferredQueue()
+ self.inter_adapter_message_deferred_map = {}
self.proxy_address = None
# reference of uni_port is required when re-enabling the device if
# it was disabled previously
self.uni_port = None
self.pon_port = None
+ def _to_string(self, unicode_str):
+ if unicode_str is not None:
+ if type(unicode_str) == unicode:
+ return unicode_str.encode('ascii', 'ignore')
+ else:
+ return unicode_str
+ else:
+ return ""
+
def receive_message(self, msg):
- self.incoming_messages.put(msg)
+ trns_id = self._to_string(msg.header.id)
+ if trns_id in self.inter_adapter_message_deferred_map:
+ self.inter_adapter_message_deferred_map[trns_id].callback(msg)
+ # self.incoming_messages.put(msg)
@inlineCallbacks
def activate(self, device):
@@ -90,6 +201,12 @@
device.model = 'n/a'
yield self.core_proxy.device_update(device)
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.core_proxy.device_pm_config_update(pm_config, init=True)
+
# register physical ports
self.uni_port = Port(
port_no=2,
@@ -118,10 +235,15 @@
connect_status=ConnectStatus.REACHABLE,
oper_status=OperStatus.ACTIVE)
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
# TODO: Return only port specific info
def get_ofp_port_info(self, device, port_no):
- # Since the adapter created the device port then it has the reference of the port to
- # return the capability. TODO: Do a lookup on the UNI port number and return the
+ # Since the adapter created the device port then it has the reference
+ # of the port to
+ # return the capability. TODO: Do a lookup on the UNI port number
+ # and return the
# appropriate attributes
self.log.info('get_ofp_port_info', port_no=port_no, device_id=device.id)
cap = OFPPF_1GB_FD | OFPPF_FIBER
@@ -159,18 +281,22 @@
@inlineCallbacks
def update_flow_table(self, flows):
+ trnsId = None
try:
self.log.info('update_flow_table', flows=flows)
# we need to proxy through the OLT to get to the ONU
- # reset response queue
- while self.incoming_messages.pending:
- yield self.incoming_messages.get()
-
fb = FlowTable(
port=self.proxy_address.channel_id,
flows=flows
)
+
+ # Create a deferred to wait for the result as well as a transid
+ wait_for_result = Deferred()
+ trnsId = uuid4().hex
+ self.inter_adapter_message_deferred_map[
+ self._to_string(trnsId)] = wait_for_result
+
# Sends the request via proxy and wait for an ACK
yield self.adapter_proxy.send_inter_adapter_message(
msg=fb,
@@ -178,21 +304,25 @@
from_adapter=self.adapter.name,
to_adapter=self.proxy_address.device_type,
to_device_id=self.device_id,
- proxy_device_id=self.proxy_address.device_id
+ proxy_device_id=self.proxy_address.device_id,
+ message_id=trnsId
)
# Wait for the full response from the proxied adapter
- res = yield self.incoming_messages.get()
- self.log.info('response-received', result=res)
+ res = yield wait_for_result
+ if res.header.type == InterAdapterMessageType.FLOW_RESPONSE:
+ body = InterAdapterResponseBody()
+ res.body.Unpack(body)
+ self.log.info('response-received', result=body.status)
except Exception as e:
self.log.exception("update-flow-error", e=e)
+ finally:
+ if trnsId in self.inter_adapter_message_deferred_map:
+ del self.inter_adapter_message_deferred_map[trnsId]
def process_inter_adapter_message(self, msg):
+ # We expect only responses on the ONU side
self.log.info('process-inter-adapter-message', msg=msg)
- if msg.header.type == InterAdapterMessageType.FLOW_RESPONSE:
- body = InterAdapterResponseBody()
- msg.body.Unpack(body)
- self.log.info('received-response', status=body.success)
- self.receive_message(msg)
+ self.receive_message(msg)
def remove_from_flow_table(self, flows):
self.log.debug('remove-from-flow-table', flows=flows)
@@ -243,6 +373,8 @@
oper_status=OperStatus.UNKNOWN,
connect_status=ConnectStatus.UNREACHABLE)
+ self.stop_kpi_collection()
+
# TODO:
# 1) Remove all flows from the device
# 2) Remove the device from ponsim
@@ -280,6 +412,8 @@
oper_status=OperStatus.ACTIVE,
connect_status=ConnectStatus.REACHABLE)
+ self.start_kpi_collection(self.device_id)
+
self.log.info('re-enabled', device_id=self.device_id)
except Exception, e:
self.log.exception('error-reenabling', e=e)
@@ -292,3 +426,83 @@
# 2) Remove the device from ponsim
self.log.info('deleted', device_id=self.device_id)
+
+ def start_kpi_collection(self, device_id):
+ kafka_cluster_proxy = get_kafka_proxy()
+
+ @inlineCallbacks
+ def _collect(device_id, prefix):
+ try:
+ self.log.debug("pm-collection-interval")
+ # Proxy a message to ponsim_olt. The OLT will then query the ONU
+ # for statistics. The reply will
+ # arrive proxied back to us in self.receive_message().
+ msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
+
+ # Create a deferred to wait for the result as well as a transid
+ wait_for_result = Deferred()
+ trnsId = uuid4().hex
+ self.inter_adapter_message_deferred_map[
+ self._to_string(trnsId)] = wait_for_result
+
+ # Sends the request via proxy and wait for an ACK
+ yield self.adapter_proxy.send_inter_adapter_message(
+ msg=msg,
+ type=InterAdapterMessageType.METRICS_REQUEST,
+ from_adapter=self.adapter.name,
+ to_adapter=self.proxy_address.device_type,
+ to_device_id=self.device_id,
+ proxy_device_id=self.proxy_address.device_id,
+ message_id=trnsId
+ )
+ # Wait for the full response from the proxied adapter
+ res = yield wait_for_result
+ # Remove the transaction from the transaction map
+ del self.inter_adapter_message_deferred_map[self._to_string(trnsId)]
+
+ # Message is a reply to an ONU statistics request. Push it out to
+ # Kafka via adapter.submit_kpis().
+ if res.header.type == InterAdapterMessageType.METRICS_RESPONSE:
+ msg = InterAdapterResponseBody()
+ res.body.Unpack(msg)
+ self.log.debug('metrics-response-received', result=msg.status)
+ if self.pm_metrics:
+ self.log.debug('Handling incoming ONU metrics')
+ response = PonSimMetrics()
+ msg.body.Unpack(response)
+ port_metrics = self.pm_metrics.extract_metrics(response)
+ try:
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT NNI port
+ prefix + '.uni': MetricValuePairs(
+ metrics=port_metrics['uni']),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(
+ metrics=port_metrics['pon'])
+ }
+ )
+
+ self.log.debug(
+ 'Submitting KPI for incoming ONU mnetrics')
+
+ # Step 3: submit directly to the kafka bus
+ if kafka_cluster_proxy:
+ if isinstance(kpi_event, Message):
+ kpi_event = dumps(
+ MessageToDict(kpi_event, True, True))
+ kafka_cluster_proxy.send_message("voltha.kpis",
+ kpi_event)
+
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+ except Exception as e:
+ log.exception('failed-to-collect-metrics', e=e)
+
+ self.pm_metrics.start_collector(_collect)
+
+ def stop_kpi_collection(self):
+ self.pm_metrics.stop_collector()