VOL-804 basic performance metrics

Collect port and flow stats and update the object in Voltha and the OpenFlow equivalent

Change-Id: Ib7c724a5f845ab4fad0147e8b189790ff111ff3d
diff --git a/voltha/adapters/openolt/openolt_device.py b/voltha/adapters/openolt/openolt_device.py
index 4244063..4c5c986 100644
--- a/voltha/adapters/openolt/openolt_device.py
+++ b/voltha/adapters/openolt/openolt_device.py
@@ -29,7 +29,10 @@
 from voltha.protos.logical_device_pb2 import LogicalDevice
 from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
     OFPPS_LINK_DOWN, OFPPF_1GB_FD, OFPC_GROUP_STATS, OFPC_PORT_STATS, \
-    OFPC_TABLE_STATS, OFPC_FLOW_STATS, ofp_switch_features, ofp_port
+    OFPC_TABLE_STATS, OFPC_FLOW_STATS, ofp_switch_features, ofp_port, \
+    ofp_port_stats
+from voltha.protos.events_pb2 import KpiEvent, MetricValuePairs
+from voltha.protos.events_pb2 import KpiEventType
 from voltha.protos.logical_device_pb2 import LogicalPort
 from voltha.core.logical_device_agent import mac_str_to_tuple
 from voltha.registry import registry
@@ -664,14 +667,102 @@
         self.log.debug('stopping-heartbeat-thread', device_id=self.device_id)
 
     def port_statistics_indication(self, port_stats):
-        # TODO: send to kafka
-        # TODO : update ONOS counters
         self.log.info('port-stats-collected', stats=port_stats)
+        self.ports_statistics_kpis(port_stats)
+        #FIXME : only the first uplink is a logical port
+        if port_stats.intf_id == 128:
+            # ONOS update
+            self.update_logical_port_stats(port_stats)
+        # FIXME: Discard other uplinks, they do not exist as an object
+        if port_stats.intf_id in [129, 130, 131]:
+            self.log.debug('those uplinks are not created')
+            return
+        # update port object stats
+        port = self.adapter_agent.get_port(self.device_id,
+            port_no=port_stats.intf_id)
+
+        port.rx_packets = port_stats.rx_packets
+        port.rx_bytes = port_stats.rx_bytes
+        port.rx_errors = port_stats.rx_error_packets
+        port.tx_packets = port_stats.tx_packets
+        port.tx_bytes = port_stats.tx_bytes
+        port.tx_errors = port_stats.tx_error_packets
+
+        # Add port does an update if port exists
+        self.adapter_agent.add_port(self.device_id, port)
 
     def flow_statistics_indication(self, flow_stats):
-        # TODO: send to kafka
-        # TODO : update ONOS counters
         self.log.info('flow-stats-collected', stats=flow_stats)
+        # TODO: send to kafka ?
+        # UNTESTED : the openolt driver does not yet provide flow stats
+        self.adapter_agent.update_flow_stats(self.logical_device_id,
+            flow_id=flow_stats.flow_id, packet_count=flow_stats.tx_packets,
+            byte_count=flow_stats.tx_bytes)
+
+    def ports_statistics_kpis(self, port_stats):
+        pm_data = {}
+        pm_data["rx_bytes"] = port_stats.rx_bytes
+        pm_data["rx_packets"] = port_stats.rx_packets
+        pm_data["rx_ucast_packets"] = port_stats.rx_ucast_packets
+        pm_data["rx_mcast_packets"] = port_stats.rx_mcast_packets
+        pm_data["rx_bcast_packets"] = port_stats.rx_bcast_packets
+        pm_data["rx_error_packets"] = port_stats.rx_error_packets
+        pm_data["tx_bytes"] = port_stats.tx_bytes
+        pm_data["tx_packets"] = port_stats.tx_packets
+        pm_data["tx_ucast_packets"] = port_stats.tx_ucast_packets
+        pm_data["tx_mcast_packets"] = port_stats.tx_mcast_packets
+        pm_data["tx_bcast_packets"] = port_stats.tx_bcast_packets
+        pm_data["tx_error_packets"] = port_stats.tx_error_packets
+        pm_data["rx_crc_errors"] = port_stats.rx_crc_errors
+        pm_data["bip_errors"] = port_stats.bip_errors
+
+
+        prefix = 'voltha.openolt.{}'.format(self.device_id)
+        # FIXME
+        if port_stats.intf_id < 132:
+            prefixes = {
+                prefix + '{}.nni'.format(port_stats.intf_id): MetricValuePairs(
+                    metrics=pm_data)
+            }
+        else:
+            prefixes = {
+                prefix + '.pon.{}'.format(platform.intf_id_from_pon_port_no(
+                    port_stats.intf_id)): MetricValuePairs(
+                    metrics=pm_data)
+            }
+
+        kpi_event = KpiEvent(
+            type=KpiEventType.slice,
+            ts=port_stats.timestamp,
+            prefixes=prefixes)
+        self.adapter_agent.submit_kpis(kpi_event)
+
+    def update_logical_port_stats(self, port_stats):
+        # FIXME
+        label = 'nni-{}'.format(port_stats.intf_id)
+        logical_port = self.adapter_agent.get_logical_port(
+            self.logical_device_id, label)
+
+        if logical_port is None:
+            self.log.error('logical-port-is-None',
+                logical_device_id=self.logical_device_id, label=label,
+                port_stats=port_stats)
+            return
+
+        self.log.debug('before', port=logical_port)
+
+        logical_port.ofp_port_stats.rx_packets = port_stats.rx_packets
+        logical_port.ofp_port_stats.rx_bytes = port_stats.rx_bytes
+        logical_port.ofp_port_stats.tx_packets = port_stats.tx_packets
+        logical_port.ofp_port_stats.tx_bytes = port_stats.tx_bytes
+        logical_port.ofp_port_stats.rx_errors = port_stats.rx_error_packets
+        logical_port.ofp_port_stats.tx_errors = port_stats.tx_error_packets
+        logical_port.ofp_port_stats.rx_crc_err = port_stats.rx_crc_errors
+
+        self.log.debug('after', port=logical_port)
+
+        self.adapter_agent.update_logical_port(self.logical_device_id,
+                                               logical_port)
 
     def packet_out(self, egress_port, msg):
         pkt = Ether(msg)
@@ -791,9 +882,12 @@
             advertised=cap, peer=cap, curr_speed=curr_speed,
             max_speed=max_speed)
 
+        ofp_stats = ofp_port_stats(port_no=port_no)
+
         logical_port = LogicalPort(
             id=label, ofp_port=ofp, device_id=self.device_id,
-            device_port_no=port_no, root_port=True)
+            device_port_no=port_no, root_port=True,
+            ofp_port_stats=ofp_stats)
 
         self.adapter_agent.add_logical_port(self.logical_device_id,
                                             logical_port)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 037fadb..e1dd1ee 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -434,6 +434,13 @@
         else:
             return [p for p in ports if p.type == port_type]
 
+    def get_port(self, device_id, port_no=None, label=None):
+        ports = self.root_proxy.get('/devices/{}/ports'.format(device_id))
+        for p in ports:
+            if p.label == label or p.port_no == port_no:
+                return p
+        return None
+
     def delete_port(self, device_id, port):
         assert isinstance(port, Port)
         # for referential integrity, add/augment references
@@ -901,6 +908,28 @@
             self.log.exception('failed-kpi-submission',
                                type=type(kpi_event_msg))
 
+
+    # # ~~~~~~~~~~~~~~~~~~~~~~~~~~ Handle flow stats ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    def update_flow_stats(self, logical_device_id, flow_id, packet_count=0,
+                          byte_count=0):
+        flows = self.root_proxy.get(
+            'logical_devices/{}/flows'.format(logical_device_id))
+        flow_to_update = None
+        for flow in flows:
+            if flow.id == flow_id:
+                flow_to_update = flow
+                flow_to_update.packet_count = packet_count
+                flow_to_update.byte_count = byte_count
+                break
+        if flow_to_update is not None:
+            self._make_up_to_date(
+                'logical_devices/{}/flows'.format(logical_device_id),
+                flow_to_update.id, flow_to_update)
+        else:
+            self.log.warn('flow-to-update-not-found', logical_device_id=
+                logical_device_id, flow_id=flow_id)
+
     # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
 
     def create_alarm(self, id=None, resource_id=None, description=None,
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index 5ed9115..885a139 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -466,13 +466,15 @@
         self.log.debug('flow-table-updated',
                        logical_device_id=self.last_data.id, flows=flows)
 
-        # if device accepts non-bulk flow update, lets just call that first
-        if self.device_type.accepts_add_remove_flow_updates:
-            if (len(self.flow_changes.to_remove.items) == 0) and (len(
-                    self.flow_changes.to_add.items) == 0):
-                self.log.debug('no-flow-update-required',
-                               logical_device_id=self.last_data.id)
-            else:
+        if (len(self.flow_changes.to_remove.items) == 0) and (len(
+                self.flow_changes.to_add.items) == 0):
+            self.log.debug('no-flow-update-required',
+                           logical_device_id=self.last_data.id)
+        else:
+
+            # if device accepts non-bulk flow update, lets just call that first
+            if self.device_type.accepts_add_remove_flow_updates:
+
                 try:
                     yield self.adapter_agent.update_flows_incrementally(
                         device=self.last_data,
@@ -482,18 +484,18 @@
                 except Exception as e:
                     self.log.exception("Failure-updating-flows", e=e)
 
-        # if device accepts bulk flow update, lets just call that
-        elif self.device_type.accepts_bulk_flow_update:
-            self.log.debug('invoking bulk')
-            groups = self.groups_proxy.get('/')  # gather flow groups
-            yield self.adapter_agent.update_flows_bulk(
-                device=self.last_data,
-                flows=flows,
-                groups=groups)
-            # add ability to notify called when an flow update completes
-            # see https://jira.opencord.org/browse/CORD-839
-        else:
-            raise NotImplementedError()
+            # if device accepts bulk flow update, lets just call that
+            elif self.device_type.accepts_bulk_flow_update:
+                self.log.debug('invoking bulk')
+                groups = self.groups_proxy.get('/')  # gather flow groups
+                yield self.adapter_agent.update_flows_bulk(
+                    device=self.last_data,
+                    flows=flows,
+                    groups=groups)
+                # add ability to notify called when an flow update completes
+                # see https://jira.opencord.org/browse/CORD-839
+            else:
+                raise NotImplementedError()
 
     ## <======================= GROUP TABLE UPDATE HANDLING ===================
 
diff --git a/voltha/core/logical_device_agent.py b/voltha/core/logical_device_agent.py
index c321da4..f18abc0 100644
--- a/voltha/core/logical_device_agent.py
+++ b/voltha/core/logical_device_agent.py
@@ -58,6 +58,8 @@
                 '/logical_devices/{}'.format(logical_device.id))
 
             self.flows_proxy.register_callback(
+                CallbackType.PRE_UPDATE, self._pre_process_flows)
+            self.flows_proxy.register_callback(
                 CallbackType.POST_UPDATE, self._flow_table_updated)
             self.groups_proxy.register_callback(
                 CallbackType.POST_UPDATE, self._group_table_updated)
@@ -76,6 +78,8 @@
             self.log = structlog.get_logger(logical_device_id=logical_device.id)
 
             self._routes = None
+            self._no_flow_changes_required = False
+
         except Exception, e:
             self.log.exception('init-error', e=e)
 
@@ -523,22 +527,55 @@
 
     # ~~~~~~~~~~~~~~~~~~~~~ FLOW TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
 
+    def _pre_process_flows(self, flows):
+        """
+        This method is invoked before a device flow table data model is
+        updated. The resulting data is stored locally and the flow table is
+        updated during the post-processing phase, i.e. via the POST_UPDATE
+        callback
+        :param flows: Desired flows
+        :return: None
+        """
+        current_flows = self.flows_proxy.get('/')
+        self.log.debug('pre-processing-flows',
+                       logical_device_id=self.logical_device_id,
+                       desired_flows=flows,
+                       existing_flows=current_flows)
+
+        current_flow_ids = set(f.id for f in current_flows.items)
+        desired_flow_ids = set(f.id for f in flows.items)
+
+        ids_to_add = desired_flow_ids.difference(current_flow_ids)
+        ids_to_del = current_flow_ids.difference(desired_flow_ids)
+
+        if len(ids_to_add) + len(ids_to_del) == 0:
+            # No changes of flows, just stats are changing
+            self._no_flow_changes_required = True
+        else:
+            self._no_flow_changes_required = False
+
+
     def _flow_table_updated(self, flows):
         self.log.debug('flow-table-updated',
                   logical_device_id=self.logical_device_id, flows=flows)
 
-        # TODO we have to evolve this into a policy-based, event based pattern
-        # This is a raw implementation of the specific use-case with certain
-        # built-in assumptions, and not yet device vendor specific. The policy-
-        # based refinement will be introduced that later.
+        if self._no_flow_changes_required:
+            # Stats changes, no need to process further
+            self.log.debug('flow-stats-update')
+        else:
 
-        groups = self.groups_proxy.get('/').items
-        device_rules_map = self.decompose_rules(flows.items, groups)
-        for device_id, (flows, groups) in device_rules_map.iteritems():
-            self.root_proxy.update('/devices/{}/flows'.format(device_id),
-                                   Flows(items=flows.values()))
-            self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
-                                   FlowGroups(items=groups.values()))
+            # TODO we have to evolve this into a policy-based, event based pattern
+            # This is a raw implementation of the specific use-case with certain
+            # built-in assumptions, and not yet device vendor specific. The policy-
+            # based refinement will be introduced that later.
+
+            groups = self.groups_proxy.get('/').items
+            device_rules_map = self.decompose_rules(flows.items, groups)
+            for device_id, (flows, groups) in device_rules_map.iteritems():
+                self.root_proxy.update('/devices/{}/flows'.format(device_id),
+                                       Flows(items=flows.values()))
+                self.root_proxy.update('/devices/{}/flow_groups'.format(device_id),
+                                       FlowGroups(items=groups.values()))
 
     # ~~~~~~~~~~~~~~~~~~~~ GROUP TABLE UPDATE HANDLING ~~~~~~~~~~~~~~~~~~~~~~~~
 
diff --git a/voltha/protos/device.proto b/voltha/protos/device.proto
index 0eb32f1..e0f306f 100644
--- a/voltha/protos/device.proto
+++ b/voltha/protos/device.proto
@@ -200,6 +200,13 @@
     }
     repeated PeerPort peers = 8;
 
+    fixed64 rx_packets = 9;
+    fixed64 rx_bytes = 10;
+    fixed64 rx_errors = 11;
+    fixed64 tx_packets = 12;
+    fixed64 tx_bytes = 13;
+    fixed64 tx_errors = 14;
+
 }
 
 message Ports {
diff --git a/voltha/protos/logical_device.proto b/voltha/protos/logical_device.proto
index 92b2b4f..0b081d2 100644
--- a/voltha/protos/logical_device.proto
+++ b/voltha/protos/logical_device.proto
@@ -22,6 +22,7 @@
     string device_id = 3;
     uint32 device_port_no = 4;
     bool root_port = 5;
+    openflow_13.ofp_port_stats ofp_port_stats = 6;
 }
 
 message LogicalPorts {