SEBA-300 Add ponsim_onu performance metrics

Change-Id: I8d43293511d38006d1a5495d99bf2e968cdb6241
diff --git a/ponsim/v2/core/ponsim_metric.go b/ponsim/v2/core/ponsim_metric.go
index 953a438..e9e1585 100644
--- a/ponsim/v2/core/ponsim_metric.go
+++ b/ponsim/v2/core/ponsim_metric.go
@@ -114,6 +114,7 @@
  */
 type PonSimMetricCounter struct {
 	Name       string
+	Device_Type string
 	TxCounters map[txMetricCounterType]*metricCounter
 	RxCounters map[rxMetricCounterType]*metricCounter
 }
@@ -121,8 +122,8 @@
 /*
 NewPonSimMetricCounter instantiates new metric counters for a PON device
 */
-func NewPonSimMetricCounter(name string) *PonSimMetricCounter {
-	counter := &PonSimMetricCounter{Name: name}
+func NewPonSimMetricCounter(name string, device_type string) *PonSimMetricCounter {
+	counter := &PonSimMetricCounter{Name: name, Device_Type: device_type}
 
 	counter.TxCounters = map[txMetricCounterType]*metricCounter{
 		tx_64_pkts:        newTxMetricCounter(tx_64_pkts, 1, 64),
@@ -186,7 +187,18 @@
 func (mc *PonSimMetricCounter) MakeProto() *voltha.PonSimMetrics {
 	simMetrics := &voltha.PonSimMetrics{Device: mc.Name}
 	ponMetrics := &voltha.PonSimPortMetrics{PortName: "pon"}
-	nniMetrics := &voltha.PonSimPortMetrics{PortName: "nni"}
+	portMetrics := &voltha.PonSimPortMetrics{}
+
+	if (mc.Device_Type == "ONU") {
+	    portMetrics.PortName = "uni"
+	} else if (mc.Device_Type == "OLT") {
+	    portMetrics.PortName = "nni"
+	} else {
+	    common.Logger().WithFields(logrus.Fields{
+		"counters": mc.RxCounters,
+	    }).Error("Unknown Device_Type in PonSimMetricCounter")
+	    portMetrics.PortName = "unknown"
+	}
 
 	// Collect RX metrics
 	for _, c := range mc.RxCounters {
@@ -198,9 +210,9 @@
 				Value: int64(c.Value[0]),
 			},
 		)
-		// NNI values
-		nniMetrics.Packets = append(
-			nniMetrics.Packets,
+		// NNI/UNI values
+		portMetrics.Packets = append(
+			portMetrics.Packets,
 			&voltha.PonSimPacketCounter{
 				Name:  c.Name,
 				Value: int64(c.Value[1]),
@@ -217,9 +229,9 @@
 				Value: int64(c.Value[0]),
 			},
 		)
-		// NNI values
-		nniMetrics.Packets = append(
-			nniMetrics.Packets,
+		// NNI/UNI values
+		portMetrics.Packets = append(
+			portMetrics.Packets,
 			&voltha.PonSimPacketCounter{
 				Name:  c.Name,
 				Value: int64(c.Value[1]),
@@ -229,7 +241,7 @@
 
 	// Populate GRPC proto structure
 	simMetrics.Metrics = append(simMetrics.Metrics, ponMetrics)
-	simMetrics.Metrics = append(simMetrics.Metrics, nniMetrics)
+	simMetrics.Metrics = append(simMetrics.Metrics, portMetrics)
 
 	return simMetrics
 }
diff --git a/ponsim/v2/grpc/nbi/ponsim_handler.go b/ponsim/v2/grpc/nbi/ponsim_handler.go
index ec64d5c..df73a34 100644
--- a/ponsim/v2/grpc/nbi/ponsim_handler.go
+++ b/ponsim/v2/grpc/nbi/ponsim_handler.go
@@ -266,7 +266,7 @@
 */
 func (handler *PonSimHandler) GetStats(
 	ctx context.Context,
-	empty *empty.Empty,
+	req *voltha.PonSimMetricsRequest,
 ) (*voltha.PonSimMetrics, error) {
 	common.Logger().WithFields(logrus.Fields{
 		"handler": handler,
@@ -280,35 +280,47 @@
 			"olt":     olt,
 		}).Debug("Retrieving stats for OLT")
 
-		// Get stats for current device
+		if req.Port == 0 {
+			// port == 0, return the OLT statistics
+			metrics = (handler.device).(*core.PonSimOltDevice).Counter.MakeProto()
+		} else {
+			common.Logger().WithFields(logrus.Fields{
+			    "handler": handler,
+			    "port":   req.Port,
+			}).Debug("Request is for ONU")
 
-		// Loop through each onus to get stats from those as well?
-		// send grpc request to each onu
-		for _, child := range (handler.device).(*core.PonSimOltDevice).GetOnus() {
+			// port != 0, contact the ONU, retrieve onu statistics, and return to the caller
+			if child, ok := (handler.device).(*core.PonSimOltDevice).GetOnus()[req.Port]; ok {
+				host := strings.Join([]string{child.Device.Address, strconv.Itoa(int(child.Device.Port))}, ":")
+				conn, err := grpc.Dial(
+					host,
+					grpc.WithInsecure(),
+				)
+				if err != nil {
+					common.Logger().WithFields(logrus.Fields{
+					    "handler": handler,
+					    "error":   err.Error(),
+					}).Error("GRPC Connection problem")
+				}
+				defer conn.Close()
+				client := voltha.NewPonSimClient(conn)
 
-			host := strings.Join([]string{child.Device.Address, strconv.Itoa(int(child.Device.Port))}, ":")
-			conn, err := grpc.Dial(
-				host,
-				grpc.WithInsecure(),
-			)
-			if err != nil {
+				if onu_stats, err := client.GetStats(ctx, req); err != nil {
+					common.Logger().WithFields(logrus.Fields{
+					    "handler": handler,
+					    "host":    host,
+					    "error":   err.Error(),
+					}).Error("Problem forwarding stats request to ONU")
+				} else {
+					metrics = onu_stats
+				}
+			} else {
 				common.Logger().WithFields(logrus.Fields{
 					"handler": handler,
-					"error":   err.Error(),
-				}).Error("GRPC Connection problem")
-			}
-			defer conn.Close()
-			client := voltha.NewPonSimClient(conn)
-
-			if _, err = client.GetStats(ctx, empty); err != nil {
-				common.Logger().WithFields(logrus.Fields{
-					"handler": handler,
-					"host":    host,
-					"error":   err.Error(),
-				}).Error("Problem forwarding stats request to ONU")
+					"port":    req.Port,
+				}).Warn("Unable to find ONU")
 			}
 		}
-		metrics = (handler.device).(*core.PonSimOltDevice).Counter.MakeProto()
 
 		common.Logger().WithFields(logrus.Fields{
 			"handler": handler,
@@ -320,6 +332,11 @@
 			"handler": handler,
 			"onu":     onu,
 		}).Debug("Retrieving stats for ONU")
+		metrics = (handler.device).(*core.PonSimOnuDevice).Counter.MakeProto()
+		common.Logger().WithFields(logrus.Fields{
+			"handler": handler,
+			"metrics": metrics,
+		}).Debug("ONU Metrics")
 	} else {
 		common.Logger().WithFields(logrus.Fields{
 			"handler": handler,
diff --git a/ponsim/v2/ponsim.go b/ponsim/v2/ponsim.go
index cbb863d..0981351 100644
--- a/ponsim/v2/ponsim.go
+++ b/ponsim/v2/ponsim.go
@@ -263,7 +263,7 @@
 		Port:        int32(grpc_port),
 		AlarmsOn:    alarm_sim,
 		AlarmsFreq:  alarm_freq,
-		Counter:     core.NewPonSimMetricCounter(name),
+		Counter:     core.NewPonSimMetricCounter(name, device_type),
 
 		// TODO: pass certificates
 		//GrpcSecurity: certs,
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index b18b7e2..29aa0bb 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -50,7 +50,7 @@
     OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
     ofp_switch_features, ofp_desc
 from voltha.protos.openflow_13_pb2 import ofp_port
-from voltha.protos.ponsim_pb2 import FlowTable, PonSimFrame
+from voltha.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
 from voltha.registry import registry
 
 from voltha.protos.bbf_fiber_base_pb2 import \
@@ -132,7 +132,7 @@
     def collect_port_metrics(self, channel):
         rtrn_port_metrics = dict()
         stub = ponsim_pb2.PonSimStub(channel)
-        stats = stub.GetStats(Empty())
+        stats = stub.GetStats(ponsim_pb2.PonSimMetricsRequest(port=0))
         rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
         rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
         return rtrn_port_metrics
@@ -706,6 +706,11 @@
             self.log.info('pushing-onu-flow-table', port=msg.port)
             res = stub.UpdateFlowTable(msg)
             self.adapter_agent.receive_proxied_message(proxy_address, res)
+        elif isinstance(msg, PonSimMetricsRequest):
+            stub = ponsim_pb2.PonSimStub(self.get_channel())
+            self.log.info('proxying onu stats request', port=msg.port)
+            res = stub.GetStats(msg)
+            self.adapter_agent.receive_proxied_message(proxy_address, res)
 
     def packet_out(self, egress_port, msg):
         self.log.info('sending-packet-out', egress_port=egress_port,
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index 332845b..77ab382 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -18,21 +18,24 @@
 Fully simulated OLT/ONU adapter.
 """
 
+import arrow
 import sys
 import structlog
 from twisted.internet.defer import DeferredQueue, inlineCallbacks
 from common.utils.asleep import asleep
 
+from twisted.internet.task import LoopingCall
 from voltha.adapters.iadapter import OnuAdapter
 from voltha.core.logical_device_agent import mac_str_to_tuple
 from voltha.protos import third_party
 from voltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
-from voltha.protos.device_pb2 import Port
+from voltha.protos.device_pb2 import Port, PmConfig, PmConfigs
+from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
 from voltha.protos.logical_device_pb2 import LogicalPort
 from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
     OFPPF_1GB_FD
 from voltha.protos.openflow_13_pb2 import ofp_port
-from voltha.protos.ponsim_pb2 import FlowTable
+from voltha.protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
 from voltha.protos.ponsim_pb2 import InterfaceConfig
 from voltha.protos.bbf_fiber_base_pb2 import OntaniConfig, VOntaniConfig, \
     VEnetConfig
@@ -81,6 +84,92 @@
         'log': 'remove-multicast-distribution-set-data'},
 }
 
+class AdapterPmMetrics:
+    def __init__(self, device):
+        self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+                         'tx_256_511_pkts', 'tx_512_1023_pkts',
+                         'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+                         'rx_64_pkts', 'rx_65_127_pkts',
+                         'rx_128_255_pkts', 'rx_256_511_pkts',
+                         'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+                         'rx_1519_9k_pkts'}
+        self.device = device
+        self.id = device.id
+        self.name = 'ponsim_onu'
+        # self.id = "abc"
+        self.default_freq = 150
+        self.grouped = False
+        self.freq_override = False
+        self.pon_metrics_config = dict()
+        self.uni_metrics_config = dict()
+        self.lc = None
+        for m in self.pm_names:
+            self.pon_metrics_config[m] = PmConfig(name=m,
+                                                  type=PmConfig.COUNTER,
+                                                  enabled=True)
+            self.uni_metrics_config[m] = PmConfig(name=m,
+                                                  type=PmConfig.COUNTER,
+                                                  enabled=True)
+
+    def update(self, pm_config):
+        if self.default_freq != pm_config.default_freq:
+            # Update the callback to the new frequency.
+            self.default_freq = pm_config.default_freq
+            self.lc.stop()
+            self.lc.start(interval=self.default_freq / 10)
+        for m in pm_config.metrics:
+            self.pon_metrics_config[m.name].enabled = m.enabled
+            self.uni_metrics_config[m.name].enabled = m.enabled
+
+    def make_proto(self):
+        pm_config = PmConfigs(
+            id=self.id,
+            default_freq=self.default_freq,
+            grouped=False,
+            freq_override=False)
+        for m in sorted(self.pon_metrics_config):
+            pm = self.pon_metrics_config[m]  # Either will do they're the same
+            pm_config.metrics.extend([PmConfig(name=pm.name,
+                                               type=pm.type,
+                                               enabled=pm.enabled)])
+        return pm_config
+
+    def extract_metrics(self, stats):
+        rtrn_port_metrics = dict()
+        rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+        rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
+        return rtrn_port_metrics
+
+    def extract_pon_metrics(self, stats):
+        rtrn_pon_metrics = dict()
+        for m in stats.metrics:
+            if m.port_name == "pon":
+                for p in m.packets:
+                    if self.pon_metrics_config[p.name].enabled:
+                        rtrn_pon_metrics[p.name] = p.value
+                return rtrn_pon_metrics
+
+    def extract_uni_metrics(self, stats):
+        rtrn_pon_metrics = dict()
+        for m in stats.metrics:
+            if m.port_name == "uni":
+                for p in m.packets:
+                    if self.pon_metrics_config[p.name].enabled:
+                        rtrn_pon_metrics[p.name] = p.value
+                return rtrn_pon_metrics
+
+    def start_collector(self, callback):
+        log.info("starting-pm-collection", device_name=self.name,
+                 device_id=self.device.id)
+        prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+        self.lc = LoopingCall(callback, self.device.id, prefix)
+        self.lc.start(interval=self.default_freq / 10)
+
+    def stop_collector(self):
+        log.info("stopping-pm-collection", device_name=self.name,
+                 device_id=self.device.id)
+        self.lc.stop()
+
 
 class PonSimOnuAdapter(OnuAdapter):
     def __init__(self, adapter_agent, config):
@@ -110,6 +199,12 @@
                 else:
                     _method(data)
 
+    def update_pm_config(self, device, pm_config):
+        log.info("adapter-update-pm-config", device=device,
+                 pm_config=pm_config)
+        handler = self.devices_handlers[device.id]
+        handler.update_pm_config(device, pm_config)
+
     def create_interface(self, device, data):
         _method_name = sys._getframe().f_code.co_name
         self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
@@ -191,7 +286,40 @@
         self.pon_port = None
 
     def receive_message(self, msg):
-        self.incoming_messages.put(msg)
+        if isinstance(msg, PonSimMetrics):
+            # Message is a reply to an ONU statistics request. Push it out to Kafka via adapter.submit_kpis().
+            if self.pm_metrics:
+                self.log.info('Handling incoming ONU metrics')
+                prefix = 'voltha.{}.{}'.format("ponsim_onu", self.device_id)
+                port_metrics = self.pm_metrics.extract_metrics(msg)
+                try:
+                    ts = arrow.utcnow().timestamp
+                    kpi_event = KpiEvent(
+                        type=KpiEventType.slice,
+                        ts=ts,
+                        prefixes={
+                            # OLT NNI port
+                            prefix + '.uni': MetricValuePairs(
+                                metrics=port_metrics['uni']),
+                            # OLT PON port
+                            prefix + '.pon': MetricValuePairs(
+                                metrics=port_metrics['pon'])
+                        }
+                    )
+
+                    self.log.info('Submitting KPI for incoming ONU mnetrics')
+
+                    # Step 3: submit
+                    self.adapter_agent.submit_kpis(kpi_event)
+                except Exception as e:
+                   log.exception('failed-to-submit-kpis', e=e)
+            else:
+                # We received a statistics message, but we don't have pm_metrics set up. This shouldn't happen.
+                self.log.warning('received unexpected PonSimMetrics')
+        else:
+            # The message is probably a reply to a FlowTable update. self.update_flow_table() will pop it off this
+            # queue and return it to its caller.
+            self.incoming_messages.put(msg)
 
     def activate(self, device):
         self.log.info('activating')
@@ -212,6 +340,12 @@
         device.connect_status = ConnectStatus.REACHABLE
         self.adapter_agent.update_device(device)
 
+        # Now set the initial PM configuration for this device
+        self.pm_metrics = AdapterPmMetrics(device)
+        pm_config = self.pm_metrics.make_proto()
+        log.info("initial-pm-config", pm_config=pm_config)
+        self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
         # register physical ports
         self.uni_port = Port(
             port_no=2,
@@ -264,6 +398,9 @@
         device.oper_status = OperStatus.ACTIVE
         self.adapter_agent.update_device(device)
 
+        # Start collecting stats from the device after a brief pause
+        self.start_kpi_collection(device.id)
+
     def _get_uni_port(self):
         ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
         if ports:
@@ -292,6 +429,12 @@
         device.connect_status = ConnectStatus.REACHABLE
         self.adapter_agent.update_device(device)
 
+        # Now set the initial PM configuration for this device
+        self.pm_metrics = AdapterPmMetrics(device)
+        pm_config = self.pm_metrics.make_proto()
+        log.info("initial-pm-config", pm_config=pm_config)
+        self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
         # TODO: Verify that the uni, pon and logical ports exists
 
         # Mark the device as REACHABLE and ACTIVE
@@ -300,6 +443,9 @@
         device.oper_status = OperStatus.ACTIVE
         self.adapter_agent.update_device(device)
 
+        # Start collecting stats from the device after a brief pause
+        self.start_kpi_collection(device.id)
+
         self.log.info('reconciling-ONU-device-ends')
 
     @inlineCallbacks
@@ -331,6 +477,11 @@
         # Once completed, the accepts_add_remove_flow_updates for this
         # device type can be set to True
 
+    def update_pm_config(self, device, pm_config):
+        log.info("handler-update-pm-config", device=device,
+                 pm_config=pm_config)
+        self.pm_metrics.update(pm_config)
+
     @inlineCallbacks
     def reboot(self):
         self.log.info('rebooting', device_id=self.device_id)
@@ -370,6 +521,8 @@
     def disable(self):
         self.log.info('disabling', device_id=self.device_id)
 
+        self.stop_kpi_collection()
+
         # Get the latest device reference
         device = self.adapter_agent.get_device(self.device_id)
 
@@ -474,6 +627,8 @@
             device.oper_status = OperStatus.ACTIVE
             self.adapter_agent.update_device(device)
 
+            self.start_kpi_collection(device.id)
+
             self.log.info('re-enabled', device_id=device.id)
         except Exception, e:
             self.log.exception('error-reenabling', e=e)
@@ -489,6 +644,19 @@
 
         self.log.info('deleted', device_id=self.device_id)
 
+    def start_kpi_collection(self, device_id):
+        def _collect(device_id, prefix):
+            # Proxy a message to ponsim_olt. The OLT will then query the ONU for statistics. The reply will
+            # arrive proxied back to us in self.receive_message().
+            msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
+            self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+
+        self.pm_metrics.start_collector(_collect)
+
+    def stop_kpi_collection(self):
+        self.pm_metrics.stop_collector()
+
+
     def get_interface_config(self, data):
         interfaceConfig = InterfaceConfig()
         if isinstance(data, OntaniConfig):
diff --git a/voltha/protos/ponsim.proto b/voltha/protos/ponsim.proto
index 9d62b63..5890ea4 100644
--- a/voltha/protos/ponsim.proto
+++ b/voltha/protos/ponsim.proto
@@ -49,6 +49,10 @@
     repeated PonSimPortMetrics metrics = 2;
 }
 
+message PonSimMetricsRequest {
+    int32 port = 1;
+}
+
 message TcontInterfaceConfig {
     bbf_fiber.TrafficDescriptorProfileData
         traffic_descriptor_profile_config_data = 1;
@@ -88,7 +92,7 @@
     rpc UpdateFlowTable(FlowTable)
         returns(google.protobuf.Empty) {}
 
-    rpc GetStats(google.protobuf.Empty)
+    rpc GetStats(PonSimMetricsRequest)
         returns(PonSimMetrics) {}
 
 }