Added support for pm configuration and reporting to ponsim.

Change-Id: I047627253f60beb624d38bde1e5da31adc42312e
diff --git a/ponsim/grpc_server.py b/ponsim/grpc_server.py
index 20b5dab..cea0cc4 100644
--- a/ponsim/grpc_server.py
+++ b/ponsim/grpc_server.py
@@ -53,6 +53,10 @@
             self.ponsim.onu_install_flows(request.port, request.flows)
         return Empty()
 
+    def GetStats(self, request, context):
+        return self.ponsim.get_stats()
+
+
 class GrpcServer(object):
 
     def __init__(self, port, ponsim):
diff --git a/ponsim/ponsim.py b/ponsim/ponsim.py
index f041171..b888ba2 100644
--- a/ponsim/ponsim.py
+++ b/ponsim/ponsim.py
@@ -26,7 +26,11 @@
 
 from common.frameio.frameio import hexify
 from voltha.protos import third_party
+from voltha.protos.ponsim_pb2 import PonSimMetrics, PonSimPortMetrics, \
+PonSimPacketCounter
+#from voltha.protos.ponsim_pb2 import
 from voltha.core.flow_decomposer import *
+from twisted.internet.task import LoopingCall
 _ = third_party
 
 
@@ -39,6 +43,94 @@
     )
 
 
+class FrameIOCounter(object):
+    class SingleFrameCounter(object):
+        def __init__(self,name,min,max):
+            # Currently there are 2 values, one for the PON interface (port 1)
+            # and one for the Network Interface (port 2). This can be extended if
+            # the virtual devices extend the number of ports. 
+            self.value = [0,0] #{PON,NI}
+            self.name = name
+            self.min =  min
+            self.max = max
+
+    def __init__(self, device):
+        self.device = device
+        self.tx_counters = dict (
+           tx_64=self.SingleFrameCounter("tx_64", 1, 64),
+           tx_65_127=self.SingleFrameCounter("tx_65_127", 65, 127),
+           tx_128_255=self.SingleFrameCounter("tx_128_255", 128, 255),
+           tx_256_511=self.SingleFrameCounter("tx_256_511", 256, 511),
+           tx_512_1023=self.SingleFrameCounter("tx_512_1023", 512, 1024),
+           tx_1024_1518=self.SingleFrameCounter("tx_1024_1518", 1024, 1518),
+           tx_1519_9k=self.SingleFrameCounter("tx_1519_9k", 1519, 9216),
+        )
+        self.rx_counters = dict(
+           rx_64=self.SingleFrameCounter("rx_64", 1, 64),
+           rx_65_127=self.SingleFrameCounter("rx_65_127", 65, 127),
+           rx_128_255=self.SingleFrameCounter("rx_128_255", 128, 255),
+           rx_256_511=self.SingleFrameCounter("rx_256_511", 256, 511),
+           rx_512_1023=self.SingleFrameCounter("rx_512_1023", 512, 1024),
+           rx_1024_1518=self.SingleFrameCounter("rx_1024_1518", 1024, 1518),
+           rx_1519_9k=self.SingleFrameCounter("rx_1519_9k", 1519, 9216)
+        )
+
+    def count_rx_frame(self, port, size):
+        log.info("counting-rx-frame", size=size, port=port)
+        for k,v in self.rx_counters.iteritems():
+            if size >= v.min and size <= v.max:
+                self.rx_counters[k].value[port-1] += 1
+                return
+        log.warn("unsupported-packet-size", size=size)
+
+    def count_tx_frame(self, port, size):
+        for k, v in self.tx_counters.iteritems():
+            if size >= v.min and size <= v.max:
+                self.tx_counters[k].value[port-1] += 1
+                return
+        log.warn("unsupported-packet-size", size=size)
+
+    def log_counts(self):
+        rx_ct_list = [(v.name, v.value[0], v.value[1]) for v in self.rx_counters.values()]
+        tx_ct_list = [(v.name, v.value[0], v.value[1]) for v in self.tx_counters.values()]
+        log.info("rx-counts",rx_ct_list=rx_ct_list)
+        log.info("tx-counts",tx_ct_list=tx_ct_list)
+
+    def make_proto(self):
+        sim_metrics = PonSimMetrics(
+            device = self.device
+        )
+        pon_port_metrics = PonSimPortMetrics (
+            port_name = "pon"
+        )
+        ni_port_metrics = PonSimPortMetrics (
+            port_name = "nni"
+        )
+        for c in sorted(self.rx_counters):
+            ctr = self.rx_counters[c]
+            pon_port_metrics.packets.extend([
+                PonSimPacketCounter(name=ctr.name,value=ctr.value[0])
+            ])
+            # Since they're identical keys, save some time and cheat
+            ni_port_metrics.packets.extend([
+                PonSimPacketCounter(name=ctr.name,value=ctr.value[1])
+            ])
+
+        for c in sorted(self.tx_counters):
+            ctr = self.tx_counters[c]
+            pon_port_metrics.packets.extend([
+                PonSimPacketCounter(name=ctr.name,value=ctr.value[0])
+            ])
+            # Since they're identical keys, save some time and cheat
+            ni_port_metrics.packets.extend([
+                PonSimPacketCounter(name=ctr.name,value=ctr.value[1])
+            ])
+        sim_metrics.metrics.extend([pon_port_metrics])
+        sim_metrics.metrics.extend([ni_port_metrics])
+
+        return sim_metrics
+
+
 class SimDevice(object):
 
     def __init__(self, name, logical_port_no):
@@ -48,18 +140,21 @@
         self.flows = list()
         self.log = structlog.get_logger(name=name,
                                         logical_port_no=logical_port_no)
+        self.counter = FrameIOCounter(name)
 
     def link(self, port, egress_fun):
         self.links.setdefault(port, []).append(egress_fun)
 
     def ingress(self, port, frame):
-        self.log.debug('ingress', ingress_port=port)
+        self.log.debug('ingress', ingress_port=port, name=self.name)
+        self.counter.count_rx_frame(port, len(frame["Ether"].payload))
         outcome = self.process_frame(port, frame)
         if outcome is not None:
             egress_port, egress_frame = outcome
             forwarded = 0
             links = self.links.get(egress_port)
             if links is not None:
+                self.counter.count_tx_frame(egress_port, len(egress_frame["Ether"].payload))
                 for fun in links:
                     forwarded += 1
                     self.log.debug('forwarding', egress_port=egress_port)
@@ -209,11 +304,15 @@
     def __init__(self, onus, egress_fun):
         self.egress_fun = egress_fun
 
+        self.log = structlog.get_logger()
         # Create OLT and hook NNI port up for egress
         self.olt = SimDevice('olt', 0)
         self.olt.link(2, lambda _, frame: self.egress_fun(0, frame))
         self.devices = dict()
         self.devices[0] = self.olt
+        # TODO: This can be removed, it's for debugging purposes
+        self.lc = LoopingCall(self.olt.counter.log_counts)
+        self.lc.start(90) # To correlate with Kafka 
 
         # Create ONUs of the requested number and hook them up with OLT
         # and with egress fun
@@ -226,14 +325,20 @@
         for i in range(onus):
             port_no = 128 + i
             onu = SimDevice('onu%d' % i, port_no)
-            onu.link(1, lambda _, frame: self.olt.ingress(1, frame))
-            onu.link(2, mk_egress_fun(port_no))
-            self.olt.link(1, mk_onu_ingress(onu))
+            onu.link(1, lambda _, frame: self.olt.ingress(1, frame)) # Send to the OLT
+            onu.link(2, mk_egress_fun(port_no)) # Send from the ONU to the world
+            self.olt.link(1, mk_onu_ingress(onu)) # Internal send to the ONU
             self.devices[port_no] = onu
+        for d in self.devices:
+            self.log.info("pon-sim-init", port=d, name=self.devices[d].name,
+                          links=self.devices[d].links)
 
     def get_ports(self):
         return sorted(self.devices.keys())
 
+    def get_stats(self):
+        return self.olt.counter.make_proto()
+
     def olt_install_flows(self, flows):
         self.olt.install_flows(flows)
 
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index aecac9f..dd347c0 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -19,6 +19,7 @@
 """
 from uuid import uuid4
 
+import arrow
 import grpc
 import structlog
 from scapy.layers.l2 import Ether, Dot1Q
@@ -28,6 +29,7 @@
 
 from common.frameio.frameio import BpfProgramFilter, hexify
 from common.utils.asleep import asleep
+from twisted.internet.task import LoopingCall
 from voltha.adapters.interface import IAdapterInterface
 from voltha.core.logical_device_agent import mac_str_to_tuple
 from voltha.protos import third_party
@@ -36,7 +38,9 @@
 from voltha.protos.adapter_pb2 import AdapterConfig
 from voltha.protos.common_pb2 import LogLevel, OperStatus, ConnectStatus, \
     AdminState
-from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port, Device
+from voltha.protos.device_pb2 import DeviceType, DeviceTypes, Port, Device, \
+PmConfig, PmConfigs
+from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
 from voltha.protos.health_pb2 import HealthStatus
 from google.protobuf.empty_pb2 import Empty
 
@@ -56,6 +60,87 @@
 is_inband_frame = BpfProgramFilter('(ether[14:2] & 0xfff) = 0x{:03x}'.format(
     PACKET_IN_VLAN))
 
+class AdapterPmMetrics:
+    def __init__(self,device):
+        self.pm_names = {'tx_64','tx_65_127', 'tx_128_255', 'tx_256_511',
+                        'tx_512_1023', 'tx_1024_1518', 'tx_1519_9k', 'rx_64',
+                        'rx_65_127', 'rx_128_255', 'rx_256_511', 'rx_512_1023',
+                        'rx_1024_1518', 'rx_1519_9k'}
+        self.device = device
+        self.id = device.id
+        self.name = 'ponsim_olt'
+        #self.id = "abc"
+        self.default_freq = 150
+        self.grouped = False
+        self.freq_override = False
+        self.pon_metrics_config = dict()
+        self.nni_metrics_config = dict()
+        self.lc = None
+        for m in self.pm_names:
+            self.pon_metrics_config[m] = PmConfig(name=m,
+                                                  type=PmConfig.COUNTER,
+                                                  enabled=True)
+            self.nni_metrics_config[m] = PmConfig(name=m,
+                                                  type=PmConfig.COUNTER,
+                                                  enabled=True)
+
+    def update(self, pm_config):
+        if self.default_freq != pm_config.default_freq:
+            # Update the callback to the new frequency.
+            self.default_freq = pm_config.default_freq
+            self.lc.stop()
+            self.lc.start(interval=self.default_freq/10)
+        for m in pm_config.metrics:
+            self.pon_metrics_config[m.name].enabled = m.enabled
+            self.nni_metrics_config[m.name].enabled = m.enabled
+
+    def make_proto(self):
+        pm_config = PmConfigs(
+            id=self.id,
+            default_freq=self.default_freq,
+            grouped = False,
+            freq_override = False)
+        for m in sorted(self.pon_metrics_config):
+            pm=self.pon_metrics_config[m] # Either will do they're the same
+            pm_config.metrics.extend([PmConfig(name=pm.name,
+                                               type=pm.type,
+                                               enabled=pm.enabled)])
+        return pm_config
+
+    def collect_port_metrics(self, channel):
+        rtrn_port_metrics = dict()
+        stub = ponsim_pb2.PonSimStub(channel)
+        stats = stub.GetStats(Empty())
+        rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+        rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
+        return rtrn_port_metrics
+
+
+    def extract_pon_metrics(self, stats):
+        rtrn_pon_metrics = dict()
+        for m in stats.metrics:
+            if m.port_name == "pon":
+                for p in m.packets:
+                    if self.pon_metrics_config[p.name].enabled:
+                        rtrn_pon_metrics[p.name] = p.value
+                return rtrn_pon_metrics
+
+    def extract_nni_metrics(self, stats):
+        rtrn_pon_metrics = dict()
+        for m in stats.metrics:
+            if m.port_name == "nni":
+                for p in m.packets:
+                    if self.pon_metrics_config[p.name].enabled:
+                        rtrn_pon_metrics[p.name] = p.value
+                return rtrn_pon_metrics
+
+    def start_collector(self, callback):
+        log.info("starting-pm-collection", device_name=self.name,
+                 device_id=self.device.id)
+        prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+        self.lc = LoopingCall(callback, self.device.id, prefix)
+        self.lc.start(interval=self.default_freq/10)
+
 
 @implementer(IAdapterInterface)
 class PonSimOltAdapter(object):
@@ -107,8 +192,10 @@
     def change_master_state(self, master):
         raise NotImplementedError()
 
-    def update_pm_config(self, device, pm_configs):
-        raise NotImplementedError()
+    def update_pm_config(self, device, pm_config):
+        log.info("adapter-update-pm-config", device=device, pm_config=pm_config)
+        handler = self.devices_handlers[device.id]
+        handler.update_pm_config(device, pm_config)
 
     def adopt_device(self, device):
         self.devices_handlers[device.id] = PonSimOltHandler(self, device.id)
@@ -187,6 +274,7 @@
         self.nni_port = None
         self.ofp_port_no = None
         self.interface = registry('main').get_args().interface
+        self.pm_metrics = None
 
     def __del__(self):
         if self.io_port is not None:
@@ -218,6 +306,12 @@
         device.connect_status = ConnectStatus.REACHABLE
         self.adapter_agent.update_device(device)
 
+        # Now set the initial PM configuration for this device
+        self.pm_metrics=AdapterPmMetrics(device)
+        pm_config = self.pm_metrics.make_proto()
+        log.info("initial-pm-config", pm_config=pm_config)
+        self.adapter_agent.update_device_pm_config(pm_config,init=True)
+
         nni_port = Port(
             port_no=2,
             label='NNI facing Ethernet port',
@@ -304,6 +398,9 @@
             self.interface, self.rcv_io, is_inband_frame)
         self.log.info('registered-frameio')
 
+        # Start collecting stats from the device after a brief pause
+        self.start_kpi_collection(device.id)
+
     def rcv_io(self, port, frame):
         self.log.info('reveived', iface_name=port.iface_name,
                        frame_len=len(frame))
@@ -337,6 +434,10 @@
         ))
         self.log.info('success')
 
+    def update_pm_config(self, device, pm_config):
+        log.info("handler-update-pm-config", device=device, pm_config=pm_config)
+        self.pm_metrics.update(pm_config)
+
     def send_proxied_message(self, proxy_address, msg):
         self.log.info('sending-proxied-message')
         if isinstance(msg, FlowTable):
@@ -504,3 +605,36 @@
         # 2) Remove the device from ponsim
 
         self.log.info('deleted', device_id=self.device_id)
+
+    def start_kpi_collection(self, device_id):
+
+        def _collect(device_id, prefix):
+
+            try:
+                # Step 1: gather metrics from device
+                port_metrics = \
+                self.pm_metrics.collect_port_metrics(self.get_channel())
+
+                # Step 2: prepare the KpiEvent for submission
+                # we can time-stamp them here (or could use time derived from OLT
+                ts = arrow.utcnow().timestamp
+                kpi_event = KpiEvent(
+                    type=KpiEventType.slice,
+                    ts=ts,
+                    prefixes={
+                        # OLT NNI port
+                        prefix + '.nni': MetricValuePairs(
+                            metrics=port_metrics['nni']),
+                        # OLT PON port
+                        prefix + '.pon': MetricValuePairs(
+                            metrics=port_metrics['pon'])
+                    }
+                )
+
+                # Step 3: submit
+                self.adapter_agent.submit_kpis(kpi_event)
+
+            except Exception as e:
+                log.exception('failed-to-submit-kpis', e=e)
+        self.pm_metrics.start_collector(_collect)
+
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index e926d9d..c822c42 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -218,7 +218,8 @@
         device_agent = self.core.get_device_agent(device_pm_config.id)
         device_agent.update_device_pm_config(device_pm_config,init)
 
-    def update_adapter_pm_config(self, device, device_pm_config):
+    def update_adapter_pm_config(self, device_id, device_pm_config):
+        device = self.get_device(device_id)
         self.adapter.update_pm_config(device, device_pm_config)
 
     def _add_peer_reference(self, device_id, port):
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index ca402f6..3c2e267 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -228,8 +228,9 @@
     def _pm_config_updated(self, pm_configs):
         self.log.debug('pm-config-updated', pm_configs=pm_configs,
                        callback_data=self.callback_data)
+        device_id = self.proxy.get('/').id
         if not self.callback_data:
-            self.adapter_agent.update_adapter_pm_config(self.proxy.get('/'), pm_configs)
+            self.adapter_agent.update_adapter_pm_config(device_id, pm_configs)
         self.callback_data = None
 
     ## <======================= FLOW TABLE UPDATE HANDLING ====================
diff --git a/voltha/protos/ponsim.proto b/voltha/protos/ponsim.proto
index 3bc4d36..a3741a0 100644
--- a/voltha/protos/ponsim.proto
+++ b/voltha/protos/ponsim.proto
@@ -16,6 +16,22 @@
     repeated openflow_13.ofp_flow_stats flows = 2;
 }
 
+message PonSimPacketCounter {
+    string name = 1;
+    int64 value = 2;
+}
+
+message PonSimPortMetrics {
+    string port_name = 1;
+    repeated PonSimPacketCounter packets = 2;
+}
+
+message PonSimMetrics {
+    string device = 1;
+    repeated PonSimPortMetrics metrics = 2;
+}
+
+
 service PonSim {
 
     rpc GetDeviceInfo(google.protobuf.Empty)
@@ -24,4 +40,6 @@
     rpc UpdateFlowTable(FlowTable)
         returns(google.protobuf.Empty) {}
 
+    rpc GetStats(google.protobuf.Empty)
+        returns(PonSimMetrics) {}
 }