SEBA-300 Add ponsim_onu performance metrics
Change-Id: I8d43293511d38006d1a5495d99bf2e968cdb6241
diff --git a/ponsim/v2/core/ponsim_metric.go b/ponsim/v2/core/ponsim_metric.go
index 953a438..e9e1585 100644
--- a/ponsim/v2/core/ponsim_metric.go
+++ b/ponsim/v2/core/ponsim_metric.go
@@ -114,6 +114,7 @@
*/
type PonSimMetricCounter struct {
Name string
+ Device_Type string
TxCounters map[txMetricCounterType]*metricCounter
RxCounters map[rxMetricCounterType]*metricCounter
}
@@ -121,8 +122,8 @@
/*
NewPonSimMetricCounter instantiates new metric counters for a PON device
*/
-func NewPonSimMetricCounter(name string) *PonSimMetricCounter {
- counter := &PonSimMetricCounter{Name: name}
+func NewPonSimMetricCounter(name string, device_type string) *PonSimMetricCounter {
+ counter := &PonSimMetricCounter{Name: name, Device_Type: device_type}
counter.TxCounters = map[txMetricCounterType]*metricCounter{
tx_64_pkts: newTxMetricCounter(tx_64_pkts, 1, 64),
@@ -186,7 +187,18 @@
func (mc *PonSimMetricCounter) MakeProto() *voltha.PonSimMetrics {
simMetrics := &voltha.PonSimMetrics{Device: mc.Name}
ponMetrics := &voltha.PonSimPortMetrics{PortName: "pon"}
- nniMetrics := &voltha.PonSimPortMetrics{PortName: "nni"}
+ portMetrics := &voltha.PonSimPortMetrics{}
+
+ if (mc.Device_Type == "ONU") {
+ portMetrics.PortName = "uni"
+ } else if (mc.Device_Type == "OLT") {
+ portMetrics.PortName = "nni"
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "counters": mc.RxCounters,
+ }).Error("Unknown Device_Type in PonSimMetricCounter")
+ portMetrics.PortName = "unknown"
+ }
// Collect RX metrics
for _, c := range mc.RxCounters {
@@ -198,9 +210,9 @@
Value: int64(c.Value[0]),
},
)
- // NNI values
- nniMetrics.Packets = append(
- nniMetrics.Packets,
+ // NNI/UNI values
+ portMetrics.Packets = append(
+ portMetrics.Packets,
&voltha.PonSimPacketCounter{
Name: c.Name,
Value: int64(c.Value[1]),
@@ -217,9 +229,9 @@
Value: int64(c.Value[0]),
},
)
- // NNI values
- nniMetrics.Packets = append(
- nniMetrics.Packets,
+ // NNI/UNI values
+ portMetrics.Packets = append(
+ portMetrics.Packets,
&voltha.PonSimPacketCounter{
Name: c.Name,
Value: int64(c.Value[1]),
@@ -229,7 +241,7 @@
// Populate GRPC proto structure
simMetrics.Metrics = append(simMetrics.Metrics, ponMetrics)
- simMetrics.Metrics = append(simMetrics.Metrics, nniMetrics)
+ simMetrics.Metrics = append(simMetrics.Metrics, portMetrics)
return simMetrics
}
diff --git a/ponsim/v2/grpc/nbi/ponsim_handler.go b/ponsim/v2/grpc/nbi/ponsim_handler.go
index ec64d5c..df73a34 100644
--- a/ponsim/v2/grpc/nbi/ponsim_handler.go
+++ b/ponsim/v2/grpc/nbi/ponsim_handler.go
@@ -266,7 +266,7 @@
*/
func (handler *PonSimHandler) GetStats(
ctx context.Context,
- empty *empty.Empty,
+ req *voltha.PonSimMetricsRequest,
) (*voltha.PonSimMetrics, error) {
common.Logger().WithFields(logrus.Fields{
"handler": handler,
@@ -280,35 +280,47 @@
"olt": olt,
}).Debug("Retrieving stats for OLT")
- // Get stats for current device
+ if req.Port == 0 {
+ // port == 0, return the OLT statistics
+ metrics = (handler.device).(*core.PonSimOltDevice).Counter.MakeProto()
+ } else {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "port": req.Port,
+ }).Debug("Request is for ONU")
- // Loop through each onus to get stats from those as well?
- // send grpc request to each onu
- for _, child := range (handler.device).(*core.PonSimOltDevice).GetOnus() {
+ // port != 0, contact the ONU, retrieve onu statistics, and return to the caller
+ if child, ok := (handler.device).(*core.PonSimOltDevice).GetOnus()[req.Port]; ok {
+ host := strings.Join([]string{child.Device.Address, strconv.Itoa(int(child.Device.Port))}, ":")
+ conn, err := grpc.Dial(
+ host,
+ grpc.WithInsecure(),
+ )
+ if err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "error": err.Error(),
+ }).Error("GRPC Connection problem")
+ }
+ defer conn.Close()
+ client := voltha.NewPonSimClient(conn)
- host := strings.Join([]string{child.Device.Address, strconv.Itoa(int(child.Device.Port))}, ":")
- conn, err := grpc.Dial(
- host,
- grpc.WithInsecure(),
- )
- if err != nil {
+ if onu_stats, err := client.GetStats(ctx, req); err != nil {
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "host": host,
+ "error": err.Error(),
+ }).Error("Problem forwarding stats request to ONU")
+ } else {
+ metrics = onu_stats
+ }
+ } else {
common.Logger().WithFields(logrus.Fields{
"handler": handler,
- "error": err.Error(),
- }).Error("GRPC Connection problem")
- }
- defer conn.Close()
- client := voltha.NewPonSimClient(conn)
-
- if _, err = client.GetStats(ctx, empty); err != nil {
- common.Logger().WithFields(logrus.Fields{
- "handler": handler,
- "host": host,
- "error": err.Error(),
- }).Error("Problem forwarding stats request to ONU")
+ "port": req.Port,
+ }).Warn("Unable to find ONU")
}
}
- metrics = (handler.device).(*core.PonSimOltDevice).Counter.MakeProto()
common.Logger().WithFields(logrus.Fields{
"handler": handler,
@@ -320,6 +332,11 @@
"handler": handler,
"onu": onu,
}).Debug("Retrieving stats for ONU")
+ metrics = (handler.device).(*core.PonSimOnuDevice).Counter.MakeProto()
+ common.Logger().WithFields(logrus.Fields{
+ "handler": handler,
+ "metrics": metrics,
+ }).Debug("ONU Metrics")
} else {
common.Logger().WithFields(logrus.Fields{
"handler": handler,
diff --git a/ponsim/v2/ponsim.go b/ponsim/v2/ponsim.go
index cbb863d..0981351 100644
--- a/ponsim/v2/ponsim.go
+++ b/ponsim/v2/ponsim.go
@@ -263,7 +263,7 @@
Port: int32(grpc_port),
AlarmsOn: alarm_sim,
AlarmsFreq: alarm_freq,
- Counter: core.NewPonSimMetricCounter(name),
+ Counter: core.NewPonSimMetricCounter(name, device_type),
// TODO: pass certificates
//GrpcSecurity: certs,
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index b18b7e2..29aa0bb 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -50,7 +50,7 @@
OFPC_GROUP_STATS, OFPC_PORT_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS, \
ofp_switch_features, ofp_desc
from voltha.protos.openflow_13_pb2 import ofp_port
-from voltha.protos.ponsim_pb2 import FlowTable, PonSimFrame
+from voltha.protos.ponsim_pb2 import FlowTable, PonSimFrame, PonSimMetricsRequest
from voltha.registry import registry
from voltha.protos.bbf_fiber_base_pb2 import \
@@ -132,7 +132,7 @@
def collect_port_metrics(self, channel):
rtrn_port_metrics = dict()
stub = ponsim_pb2.PonSimStub(channel)
- stats = stub.GetStats(Empty())
+ stats = stub.GetStats(ponsim_pb2.PonSimMetricsRequest(port=0))
rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
rtrn_port_metrics['nni'] = self.extract_nni_metrics(stats)
return rtrn_port_metrics
@@ -706,6 +706,11 @@
self.log.info('pushing-onu-flow-table', port=msg.port)
res = stub.UpdateFlowTable(msg)
self.adapter_agent.receive_proxied_message(proxy_address, res)
+ elif isinstance(msg, PonSimMetricsRequest):
+ stub = ponsim_pb2.PonSimStub(self.get_channel())
+ self.log.info('proxying onu stats request', port=msg.port)
+ res = stub.GetStats(msg)
+ self.adapter_agent.receive_proxied_message(proxy_address, res)
def packet_out(self, egress_port, msg):
self.log.info('sending-packet-out', egress_port=egress_port,
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index 332845b..77ab382 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -18,21 +18,24 @@
Fully simulated OLT/ONU adapter.
"""
+import arrow
import sys
import structlog
from twisted.internet.defer import DeferredQueue, inlineCallbacks
from common.utils.asleep import asleep
+from twisted.internet.task import LoopingCall
from voltha.adapters.iadapter import OnuAdapter
from voltha.core.logical_device_agent import mac_str_to_tuple
from voltha.protos import third_party
from voltha.protos.common_pb2 import OperStatus, ConnectStatus, AdminState
-from voltha.protos.device_pb2 import Port
+from voltha.protos.device_pb2 import Port, PmConfig, PmConfigs
+from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
from voltha.protos.logical_device_pb2 import LogicalPort
from voltha.protos.openflow_13_pb2 import OFPPS_LIVE, OFPPF_FIBER, \
OFPPF_1GB_FD
from voltha.protos.openflow_13_pb2 import ofp_port
-from voltha.protos.ponsim_pb2 import FlowTable
+from voltha.protos.ponsim_pb2 import FlowTable, PonSimMetricsRequest, PonSimMetrics
from voltha.protos.ponsim_pb2 import InterfaceConfig
from voltha.protos.bbf_fiber_base_pb2 import OntaniConfig, VOntaniConfig, \
VEnetConfig
@@ -81,6 +84,92 @@
'log': 'remove-multicast-distribution-set-data'},
}
+class AdapterPmMetrics:
+ def __init__(self, device):
+ self.pm_names = {'tx_64_pkts', 'tx_65_127_pkts', 'tx_128_255_pkts',
+ 'tx_256_511_pkts', 'tx_512_1023_pkts',
+ 'tx_1024_1518_pkts', 'tx_1519_9k_pkts',
+ 'rx_64_pkts', 'rx_65_127_pkts',
+ 'rx_128_255_pkts', 'rx_256_511_pkts',
+ 'rx_512_1023_pkts', 'rx_1024_1518_pkts',
+ 'rx_1519_9k_pkts'}
+ self.device = device
+ self.id = device.id
+ self.name = 'ponsim_onu'
+ # self.id = "abc"
+ self.default_freq = 150
+ self.grouped = False
+ self.freq_override = False
+ self.pon_metrics_config = dict()
+ self.uni_metrics_config = dict()
+ self.lc = None
+ for m in self.pm_names:
+ self.pon_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+ self.uni_metrics_config[m] = PmConfig(name=m,
+ type=PmConfig.COUNTER,
+ enabled=True)
+
+ def update(self, pm_config):
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
+ for m in pm_config.metrics:
+ self.pon_metrics_config[m.name].enabled = m.enabled
+ self.uni_metrics_config[m.name].enabled = m.enabled
+
+ def make_proto(self):
+ pm_config = PmConfigs(
+ id=self.id,
+ default_freq=self.default_freq,
+ grouped=False,
+ freq_override=False)
+ for m in sorted(self.pon_metrics_config):
+ pm = self.pon_metrics_config[m] # Either will do they're the same
+ pm_config.metrics.extend([PmConfig(name=pm.name,
+ type=pm.type,
+ enabled=pm.enabled)])
+ return pm_config
+
+ def extract_metrics(self, stats):
+ rtrn_port_metrics = dict()
+ rtrn_port_metrics['pon'] = self.extract_pon_metrics(stats)
+ rtrn_port_metrics['uni'] = self.extract_uni_metrics(stats)
+ return rtrn_port_metrics
+
+ def extract_pon_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "pon":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def extract_uni_metrics(self, stats):
+ rtrn_pon_metrics = dict()
+ for m in stats.metrics:
+ if m.port_name == "uni":
+ for p in m.packets:
+ if self.pon_metrics_config[p.name].enabled:
+ rtrn_pon_metrics[p.name] = p.value
+ return rtrn_pon_metrics
+
+ def start_collector(self, callback):
+ log.info("starting-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ prefix = 'voltha.{}.{}'.format(self.name, self.device.id)
+ self.lc = LoopingCall(callback, self.device.id, prefix)
+ self.lc.start(interval=self.default_freq / 10)
+
+ def stop_collector(self):
+ log.info("stopping-pm-collection", device_name=self.name,
+ device_id=self.device.id)
+ self.lc.stop()
+
class PonSimOnuAdapter(OnuAdapter):
def __init__(self, adapter_agent, config):
@@ -110,6 +199,12 @@
else:
_method(data)
+ def update_pm_config(self, device, pm_config):
+ log.info("adapter-update-pm-config", device=device,
+ pm_config=pm_config)
+ handler = self.devices_handlers[device.id]
+ handler.update_pm_config(device, pm_config)
+
def create_interface(self, device, data):
_method_name = sys._getframe().f_code.co_name
self.xpon_ponsim_onu_adapter_interface(_method_name, device, data)
@@ -191,7 +286,40 @@
self.pon_port = None
def receive_message(self, msg):
- self.incoming_messages.put(msg)
+ if isinstance(msg, PonSimMetrics):
+ # Message is a reply to an ONU statistics request. Push it out to Kafka via adapter.submit_kpis().
+ if self.pm_metrics:
+ self.log.info('Handling incoming ONU metrics')
+ prefix = 'voltha.{}.{}'.format("ponsim_onu", self.device_id)
+ port_metrics = self.pm_metrics.extract_metrics(msg)
+ try:
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ # OLT NNI port
+ prefix + '.uni': MetricValuePairs(
+ metrics=port_metrics['uni']),
+ # OLT PON port
+ prefix + '.pon': MetricValuePairs(
+ metrics=port_metrics['pon'])
+ }
+ )
+
+ self.log.info('Submitting KPI for incoming ONU mnetrics')
+
+ # Step 3: submit
+ self.adapter_agent.submit_kpis(kpi_event)
+ except Exception as e:
+ log.exception('failed-to-submit-kpis', e=e)
+ else:
+ # We received a statistics message, but we don't have pm_metrics set up. This shouldn't happen.
+ self.log.warning('received unexpected PonSimMetrics')
+ else:
+ # The message is probably a reply to a FlowTable update. self.update_flow_table() will pop it off this
+ # queue and return it to its caller.
+ self.incoming_messages.put(msg)
def activate(self, device):
self.log.info('activating')
@@ -212,6 +340,12 @@
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
# register physical ports
self.uni_port = Port(
port_no=2,
@@ -264,6 +398,9 @@
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
def _get_uni_port(self):
ports = self.adapter_agent.get_ports(self.device_id, Port.ETHERNET_UNI)
if ports:
@@ -292,6 +429,12 @@
device.connect_status = ConnectStatus.REACHABLE
self.adapter_agent.update_device(device)
+ # Now set the initial PM configuration for this device
+ self.pm_metrics = AdapterPmMetrics(device)
+ pm_config = self.pm_metrics.make_proto()
+ log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
# TODO: Verify that the uni, pon and logical ports exists
# Mark the device as REACHABLE and ACTIVE
@@ -300,6 +443,9 @@
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ # Start collecting stats from the device after a brief pause
+ self.start_kpi_collection(device.id)
+
self.log.info('reconciling-ONU-device-ends')
@inlineCallbacks
@@ -331,6 +477,11 @@
# Once completed, the accepts_add_remove_flow_updates for this
# device type can be set to True
+ def update_pm_config(self, device, pm_config):
+ log.info("handler-update-pm-config", device=device,
+ pm_config=pm_config)
+ self.pm_metrics.update(pm_config)
+
@inlineCallbacks
def reboot(self):
self.log.info('rebooting', device_id=self.device_id)
@@ -370,6 +521,8 @@
def disable(self):
self.log.info('disabling', device_id=self.device_id)
+ self.stop_kpi_collection()
+
# Get the latest device reference
device = self.adapter_agent.get_device(self.device_id)
@@ -474,6 +627,8 @@
device.oper_status = OperStatus.ACTIVE
self.adapter_agent.update_device(device)
+ self.start_kpi_collection(device.id)
+
self.log.info('re-enabled', device_id=device.id)
except Exception, e:
self.log.exception('error-reenabling', e=e)
@@ -489,6 +644,19 @@
self.log.info('deleted', device_id=self.device_id)
+ def start_kpi_collection(self, device_id):
+ def _collect(device_id, prefix):
+ # Proxy a message to ponsim_olt. The OLT will then query the ONU for statistics. The reply will
+ # arrive proxied back to us in self.receive_message().
+ msg = PonSimMetricsRequest(port=self.proxy_address.channel_id)
+ self.adapter_agent.send_proxied_message(self.proxy_address, msg)
+
+ self.pm_metrics.start_collector(_collect)
+
+ def stop_kpi_collection(self):
+ self.pm_metrics.stop_collector()
+
+
def get_interface_config(self, data):
interfaceConfig = InterfaceConfig()
if isinstance(data, OntaniConfig):
diff --git a/voltha/protos/ponsim.proto b/voltha/protos/ponsim.proto
index 9d62b63..5890ea4 100644
--- a/voltha/protos/ponsim.proto
+++ b/voltha/protos/ponsim.proto
@@ -49,6 +49,10 @@
repeated PonSimPortMetrics metrics = 2;
}
+message PonSimMetricsRequest {
+ int32 port = 1;
+}
+
message TcontInterfaceConfig {
bbf_fiber.TrafficDescriptorProfileData
traffic_descriptor_profile_config_data = 1;
@@ -88,7 +92,7 @@
rpc UpdateFlowTable(FlowTable)
returns(google.protobuf.Empty) {}
- rpc GetStats(google.protobuf.Empty)
+ rpc GetStats(PonSimMetricsRequest)
returns(PonSimMetrics) {}
}