VOL-804 basic performance metrics
Collect port and flow stats and update the object in Voltha and the OpenFlow equivalent
Change-Id: Ib7c724a5f845ab4fad0147e8b189790ff111ff3d
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 4244063..4c5c986 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -29,7 +29,10 @@
from voltha.protos.logical_device_pb2 import LogicalDevice
from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPS_LINK_DOWN, OFPPF_1GB_FD, OFPC_GROUP_STATS, OFPC_PORT_STATS, \
- OFPC_TABLE_STATS, OFPC_FLOW_STATS, ofp_switch_features, ofp_port
+ OFPC_TABLE_STATS, OFPC_FLOW_STATS, ofp_switch_features, ofp_port, \
+ ofp_port_stats
+from voltha.protos.events_pb2 import KpiEvent, MetricValuePairs
+from voltha.protos.events_pb2 import KpiEventType
from voltha.protos.logical_device_pb2 import LogicalPort
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.registry import registry
@@ -664,14 +667,102 @@
self.log.debug('stopping-heartbeat-thread', device_id=self.device_id)
def port_statistics_indication(self, port_stats):
- # TODO: send to kafka
- # TODO : update ONOS counters
self.log.info('port-stats-collected', stats=port_stats)
+ self.ports_statistics_kpis(port_stats)
+ #FIXME : only the first uplink is a logical port
+ if port_stats.intf_id == 128:
+ # ONOS update
+ self.update_logical_port_stats(port_stats)
+ # FIXME: Discard other uplinks, they do not exist as an object
+ if port_stats.intf_id in [129, 130, 131]:
+ self.log.debug('those uplinks are not created')
+ return
+ # update port object stats
+ port = self.adapter_agent.get_port(self.device_id,
+ port_no=port_stats.intf_id)
+
+ port.rx_packets = port_stats.rx_packets
+ port.rx_bytes = port_stats.rx_bytes
+ port.rx_errors = port_stats.rx_error_packets
+ port.tx_packets = port_stats.tx_packets
+ port.tx_bytes = port_stats.tx_bytes
+ port.tx_errors = port_stats.tx_error_packets
+
+ # Add port does an update if port exists
+ self.adapter_agent.add_port(self.device_id, port)
def flow_statistics_indication(self, flow_stats):
- # TODO: send to kafka
- # TODO : update ONOS counters
self.log.info('flow-stats-collected', stats=flow_stats)
+ # TODO: send to kafka ?
+ # UNTESTED : the openolt driver does not yet provide flow stats
+ self.adapter_agent.update_flow_stats(self.logical_device_id,
+ flow_id=flow_stats.flow_id, packet_count=flow_stats.tx_packets,
+ byte_count=flow_stats.tx_bytes)
+
+ def ports_statistics_kpis(self, port_stats):
+ pm_data = {}
+ pm_data["rx_bytes"] = port_stats.rx_bytes
+ pm_data["rx_packets"] = port_stats.rx_packets
+ pm_data["rx_ucast_packets"] = port_stats.rx_ucast_packets
+ pm_data["rx_mcast_packets"] = port_stats.rx_mcast_packets
+ pm_data["rx_bcast_packets"] = port_stats.rx_bcast_packets
+ pm_data["rx_error_packets"] = port_stats.rx_error_packets
+ pm_data["tx_bytes"] = port_stats.tx_bytes
+ pm_data["tx_packets"] = port_stats.tx_packets
+ pm_data["tx_ucast_packets"] = port_stats.tx_ucast_packets
+ pm_data["tx_mcast_packets"] = port_stats.tx_mcast_packets
+ pm_data["tx_bcast_packets"] = port_stats.tx_bcast_packets
+ pm_data["tx_error_packets"] = port_stats.tx_error_packets
+ pm_data["rx_crc_errors"] = port_stats.rx_crc_errors
+ pm_data["bip_errors"] = port_stats.bip_errors
+
+
+ prefix = 'voltha.openolt.{}'.format(self.device_id)
+ # FIXME
+ if port_stats.intf_id < 132:
+ prefixes = {
+ prefix + '{}.nni'.format(port_stats.intf_id): MetricValuePairs(
+ metrics=pm_data)
+ }
+ else:
+ prefixes = {
+ prefix + '.pon.{}'.format(platform.intf_id_from_pon_port_no(
+ port_stats.intf_id)): MetricValuePairs(
+ metrics=pm_data)
+ }
+
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=port_stats.timestamp,
+ prefixes=prefixes)
+ self.adapter_agent.submit_kpis(kpi_event)
+
+ def update_logical_port_stats(self, port_stats):
+ # FIXME
+ label = 'nni-{}'.format(port_stats.intf_id)
+ logical_port = self.adapter_agent.get_logical_port(
+ self.logical_device_id, label)
+
+ if logical_port is None:
+ self.log.error('logical-port-is-None',
+ logical_device_id=self.logical_device_id, label=label,
+ port_stats=port_stats)
+ return
+
+ self.log.debug('before', port=logical_port)
+
+ logical_port.ofp_port_stats.rx_packets = port_stats.rx_packets
+ logical_port.ofp_port_stats.rx_bytes = port_stats.rx_bytes
+ logical_port.ofp_port_stats.tx_packets = port_stats.tx_packets
+ logical_port.ofp_port_stats.tx_bytes = port_stats.tx_bytes
+ logical_port.ofp_port_stats.rx_errors = port_stats.rx_error_packets
+ logical_port.ofp_port_stats.tx_errors = port_stats.tx_error_packets
+ logical_port.ofp_port_stats.rx_crc_err = port_stats.rx_crc_errors
+
+ self.log.debug('after', port=logical_port)
+
+ self.adapter_agent.update_logical_port(self.logical_device_id,
+ logical_port)
def packet_out(self, egress_port, msg):
pkt = Ether(msg)
@@ -791,9 +882,12 @@
advertised=cap, peer=cap, curr_speed=curr_speed,
max_speed=max_speed)
+ ofp_stats = ofp_port_stats(port_no=port_no)
+
logical_port = LogicalPort(
id=label, ofp_port=ofp, device_id=self.device_id,
- device_port_no=port_no, root_port=True)
+ device_port_no=port_no, root_port=True,
+ ofp_port_stats=ofp_stats)
self.adapter_agent.add_logical_port(self.logical_device_id,
logical_port)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 037fadb..e1dd1ee 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -434,6 +434,13 @@
else:
return [p for p in ports if p.type == port_type]
+ def get_port(self, device_id, port_no=None, label=None):
+ ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
+ for p in ports:
+ if p.label == label or p.port_no == port_no:
+ return p
+ return None
+
def delete_port(self, device_id, port):
assert isinstance(port, Port)
# for referential integrity, add/augment references
@@ -901,6 +908,28 @@
self.log.exception('failed-kpi-submission',
type=type(kpi_event_msg))
+
+ # # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Handle flow stats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ def update_flow_stats(self, logical_device_id, flow_id, packet_count=0,
+ byte_count=0):
+ flows = self.root_proxy.get(
+ 'logical_devices/{}/flows'.format(logical_device_id))
+ flow_to_update = None
+ for flow in flows:
+ if flow.id == flow_id:
+ flow_to_update = flow
+ flow_to_update.packet_count = packet_count
+ flow_to_update.byte_count = byte_count
+ break
+ if flow_to_update is not None:
+ self._make_up_to_date(
+ 'logical_devices/{}/flows'.format(logical_device_id),
+ flow_to_update.id, flow_to_update)
+ else:
+ self.log.warn('flow-to-update-not-found', logical_device_id=
+ logical_device_id, flow_id=flow_id)
+
# ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
def create_alarm(self, id=None, resource_id=None, description=None,
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 5ed9115..885a139 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -466,13 +466,15 @@
self.log.debug('flow-table-updated',
logical_device_id=self.last_data.id, flows=flows)
- # if device accepts non-bulk flow update, lets just call that first
- if self.device_type.accepts_add_remove_flow_updates:
- if (len(self.flow_changes.to_remove.items) == 0) and (len(
- self.flow_changes.to_add.items) == 0):
- self.log.debug('no-flow-update-required',
- logical_device_id=self.last_data.id)
- else:
+ if (len(self.flow_changes.to_remove.items) == 0) and (len(
+ self.flow_changes.to_add.items) == 0):
+ self.log.debug('no-flow-update-required',
+ logical_device_id=self.last_data.id)
+ else:
+
+ # if device accepts non-bulk flow update, lets just call that first
+ if self.device_type.accepts_add_remove_flow_updates:
+
try:
yield self.adapter_agent.update_flows_incrementally(
device=self.last_data,
@@ -482,18 +484,18 @@
except Exception as e:
self.log.exception("Failure-updating-flows", e=e)
- # if device accepts bulk flow update, lets just call that
- elif self.device_type.accepts_bulk_flow_update:
- self.log.debug('invoking bulk')
- groups = self.groups_proxy.get('/') # gather flow groups
- yield self.adapter_agent.update_flows_bulk(
- device=self.last_data,
- flows=flows,
- groups=groups)
- # add ability to notify called when an flow update completes
- # see https://jira.opencord.org/browse/CORD-839
- else:
- raise NotImplementedError()
+ # if device accepts bulk flow update, lets just call that
+ elif self.device_type.accepts_bulk_flow_update:
+ self.log.debug('invoking bulk')
+ groups = self.groups_proxy.get('/') # gather flow groups
+ yield self.adapter_agent.update_flows_bulk(
+ device=self.last_data,
+ flows=flows,
+ groups=groups)
+ # add ability to notify called when an flow update completes
+ # see https://jira.opencord.org/browse/CORD-839
+ else:
+ raise NotImplementedError()
## <======================= GROUP TABLE UPDATE HANDLING ===================
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index c321da4..f18abc0 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -58,6 +58,8 @@
'/logical_devices/{}'.format(logical_device.id))
self.flows_proxy.register_callback(
+ CallbackType.PRE_UPDATE, self._pre_process_flows)
+ self.flows_proxy.register_callback(
CallbackType.POST_UPDATE, self._flow_table_updated)
self.groups_proxy.register_callback(
CallbackType.POST_UPDATE, self._group_table_updated)
@@ -76,6 +78,8 @@
self.log = structlog.get_logger(logical_device_id=logical_device.id)
self._routes = None
+ self._no_flow_changes_required = False
+
except Exception, e:
self.log.exception('init-error', e=e)
@@ -523,22 +527,55 @@
# ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
+ def _pre_process_flows(self, flows):
+ """
+ This method is invoked before a device flow table data model is
+ updated. The resulting data is stored locally and the flow table is
+ updated during the post-processing phase, i.e. via the POST_UPDATE
+ callback
+ :param flows: Desired flows
+ :return: None
+ """
+ current_flows = self.flows_proxy.get('/')
+ self.log.debug('pre-processing-flows',
+ logical_device_id=self.logical_device_id,
+ desired_flows=flows,
+ existing_flows=current_flows)
+
+ current_flow_ids = set(f.id for f in current_flows.items)
+ desired_flow_ids = set(f.id for f in flows.items)
+
+ ids_to_add = desired_flow_ids.difference(current_flow_ids)
+ ids_to_del = current_flow_ids.difference(desired_flow_ids)
+
+ if len(ids_to_add) + len(ids_to_del) == 0:
+ # No changes of flows, just stats are changing
+ self._no_flow_changes_required = True
+ else:
+ self._no_flow_changes_required = False
+
+
def _flow_table_updated(self, flows):
self.log.debug('flow-table-updated',
logical_device_id=self.logical_device_id, flows=flows)
- # TODO we have to evolve this into a policy-based, event based pattern
- # This is a raw implementation of the specific use-case with certain
- # built-in assumptions, and not yet device vendor specific. The policy-
- # based refinement will be introduced that later.
+ if self._no_flow_changes_required:
+ # Stats changes, no need to process further
+ self.log.debug('flow-stats-update')
+ else:
- groups = self.groups_proxy.get('/').items
- device_rules_map = self.decompose_rules(flows.items, groups)
- for device_id, (flows, groups) in device_rules_map.iteritems():
- self.root_proxy.update('/devices/{}/flows'.format(device_id),
- Flows(items=flows.values()))
- self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
- FlowGroups(items=groups.values()))
+ # TODO we have to evolve this into a policy-based, event based pattern
+ # This is a raw implementation of the specific use-case with certain
+ # built-in assumptions, and not yet device vendor specific. The policy-
+ # based refinement will be introduced that later.
+
+ groups = self.groups_proxy.get('/').items
+ device_rules_map = self.decompose_rules(flows.items, groups)
+ for device_id, (flows, groups) in device_rules_map.iteritems():
+ self.root_proxy.update('/devices/{}/flows'.format(device_id),
+ Flows(items=flows.values()))
+ self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+ FlowGroups(items=groups.values()))
# ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 0eb32f1..e0f306f 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -200,6 +200,13 @@
}
repeated PeerPort peers = 8;
+ fixed64 rx_packets = 9;
+ fixed64 rx_bytes = 10;
+ fixed64 rx_errors = 11;
+ fixed64 tx_packets = 12;
+ fixed64 tx_bytes = 13;
+ fixed64 tx_errors = 14;
+
}
message Ports {
diff --git a/voltha/protos/logical_device.proto b/voltha/protos/logical_device.proto
index 92b2b4f..0b081d2 100644
--- a/voltha/protos/logical_device.proto
+++ b/voltha/protos/logical_device.proto
@@ -22,6 +22,7 @@
string device_id = 3;
uint32 device_port_no = 4;
bool root_port = 5;
+ openflow_13.ofp_port_stats ofp_port_stats = 6;
}
message LogicalPorts {