VOL-716: Publish PM to Kafka and support of enable/disable
Change-Id: I8556d2891290276f779d164a082b311aa1f6cc29
diff --git a/voltha/extensions/omci/onu_device_entry.py b/voltha/extensions/omci/onu_device_entry.py
index 569bb97..40f2678 100644
--- a/voltha/extensions/omci/onu_device_entry.py
+++ b/voltha/extensions/omci/onu_device_entry.py
@@ -72,6 +72,7 @@
self._runner = TaskRunner(device_id) # OMCI_CC Task runner
self._deferred = None
self._first_in_sync = False
+ self._first_capabilities = False
# OMCI related databases are on a per-agent basis. State machines and tasks
# are per ONU Vendor
@@ -125,6 +126,8 @@
self._alarm_sync_sm,
]
self._on_sync_state_machines = [ # Run after first in_sync event
+ ]
+ self._on_capabilities_state_machines = [ # Run after first capabilities events
self._pm_intervals_sm
]
self._custom_me_map = custom_me_map
@@ -183,6 +186,15 @@
"""
return self._pm_intervals_sm
+ def set_pm_config(self, pm_config):
+ """
+ Set PM interval configuration
+
+ :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
+ :return:
+ """
+ self._pm_intervals_sm.set_pm_config(pm_config)
+
@property
def alarm_synchronizer(self):
"""
@@ -281,6 +293,7 @@
self._started = True
self._omci_cc.enabled = True
self._first_in_sync = True
+ self._first_capabilities = True
self._runner.start()
self._configuration = OnuConfiguration(self._omci_agent, self._device_id)
@@ -353,6 +366,25 @@
self._deferred = reactor.callLater(0, start_state_machines,
self._on_sync_state_machines)
+ def first_in_capabilities_event(self):
+ """
+ This event is called on the first capabilities event after
+ OpenOMCI has been started. It is responsible for starting any
+ other state machine. These are often state machines that have tasks
+ that are dependent upon knowing if various MEs are supported
+ """
+ if self._first_capabilities:
+ self._first_capabilities = False
+
+ # Start up any other remaining OpenOMCI state machines
+ def start_state_machines(machines):
+ for sm in machines:
+ self._state_machines.append(sm)
+ reactor.callLater(0, sm.start)
+
+ self._deferred = reactor.callLater(0, start_state_machines,
+ self._on_capabilities_state_machines)
+
def _publish_device_status_event(self):
"""
Publish the ONU Device start/start status.
@@ -366,6 +398,9 @@
"""
Publish the ONU Device start/start status.
"""
+ if self.first_in_capabilities_event:
+ self.first_in_capabilities_event()
+
topic = OnuDeviceEntry.event_bus_topic(self.device_id,
OnuDeviceEvents.OmciCapabilitiesEvent)
msg = {
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
index 7f81ea3..417a706 100644
--- a/voltha/extensions/omci/state_machines/performance_intervals.py
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -113,6 +113,7 @@
self._agent = agent
self._device_id = device_id
self._device = None
+ self._pm_config = None
self._timeout_delay = timeout_delay
self._tick_delay = tick_delay
self._interval_skew = interval_skew
@@ -161,6 +162,7 @@
transitions=transitions,
initial=initial_state,
queued=True,
+ ignore_invalid_triggers=True,
name='{}-{}'.format(self.__class__.__name__,
device_id))
@@ -217,6 +219,15 @@
'next': str(self._next_interval)
})
+ def set_pm_config(self, pm_config):
+ """
+ Set PM interval configuration
+
+ :param pm_config: (OnuPmIntervalMetrics) PM Interval configuration
+ :return:
+ """
+ self._pm_config = pm_config
+
def _me_is_supported(self, class_id):
"""
Check to see if ONU supports this ME
@@ -395,8 +406,8 @@
self.advertise(OpenOmciEventType.state_change, self.state)
self._cancel_deferred()
- def success(results):
- self.log.debug('sync-time-success: {}'.format(results))
+ def success(_results):
+ self.log.debug('sync-time-success')
self._current_task = None
self._deferred = reactor.callLater(0, self.success)
# Calculate next interval time
@@ -443,7 +454,7 @@
mes, self._add_pm_me = self._add_pm_me, dict()
def success(results):
- self.log.debug('create-me-success: {}'.format(results))
+ self.log.debug('create-me-success', results=results)
# Check if already here. The create request could have received
# an already-exists status code which we consider successful
@@ -476,7 +487,7 @@
mes, self._del_pm_me = self._del_pm_me, set()
def success(results):
- self.log.debug('delete-me-success: {}'.format(results))
+ self.log.debug('delete-me-success', results=results)
self._current_task = None
for me in mes:
self._pm_me_collect_retries.pop(me)
@@ -518,7 +529,9 @@
# Collect the data ?
if self._pm_me_collect_retries[key] > 0:
def success(results):
- self.log.info('collect-success', results=results)
+ self.log.info('collect-success', results=results,
+ class_id=results.get('class_id'),
+ entity_id=results.get('entity_id'))
self._current_task = None
self._pm_me_collect_retries[key] = 0
self._deferred = reactor.callLater(0, self.success)
@@ -605,8 +618,11 @@
:param results: (dict) PM results
"""
self.log.debug('collect-publish', results=results)
- pass # TODO: Publish it here
- pass # TODO: Save off last time interval fetched to persistent storage
+
+ if self._pm_config is not None:
+ self._pm_config.publish_metrics(results)
+
+ pass # TODO: Save off last time interval fetched to persistent storage?
def on_mib_reset_response(self, _topic, msg):
"""
diff --git a/voltha/extensions/omci/tasks/interval_data_task.py b/voltha/extensions/omci/tasks/interval_data_task.py
index cb02187..04cc8eb 100644
--- a/voltha/extensions/omci/tasks/interval_data_task.py
+++ b/voltha/extensions/omci/tasks/interval_data_task.py
@@ -112,19 +112,19 @@
attr_names = self._counter_attributes.keys()
final_results = {
- 'class-id': self._class_id,
- 'entity-id': self._entity_id,
- 'me-name': self._entity.__name__, # Mostly for debugging...
- 'interval-utc-time': None,
+ 'class_id': self._class_id,
+ 'entity_id': self._entity_id,
+ 'me_name': self._entity.__name__, # Mostly for debugging...
+ 'interval_utc_time': None,
# Counters added here as they are retrieved
}
last_end_time = None
while len(attr_names) > 0:
# Get as many attributes that will fit. Always include the 1 octet
- # Interval End Time Attribute
+ # Interval End Time Attribute and 2 octets for the Entity ID
- remaining_payload = self._max_payload - 1
+ remaining_payload = self._max_payload - 3
attributes = list()
for name in attr_names:
if self._counter_attributes[name] > remaining_payload:
@@ -145,6 +145,8 @@
attributes_mask=self._entity.mask_for(*attributes)
)
)
+ self.log.debug('interval-get-request', class_id=self._class_id,
+ entity_id=self._entity_id)
try:
results = yield device.omci_cc.send(frame)
@@ -157,28 +159,27 @@
end_time=end_time)
if status != ReasonCodes.Success:
- raise IntervalDataTaskFailure('Unexpected Response Status: {}'.
- format(status))
-
+ raise IntervalDataTaskFailure('Unexpected Response Status: {}, Class ID: {}'.
+ format(status, self._class_id))
if last_end_time is None:
last_end_time = end_time
elif end_time != last_end_time:
msg = 'Interval End Time Changed during retrieval from {} to {}'\
.format(last_end_time, end_time)
- self.log.info('interval-roll-over', msg=msg)
+ self.log.info('interval-roll-over', msg=msg, class_id=self._class_id)
raise IntervalDataTaskFailure(msg)
- final_results['interval-utc-time'] = datetime.utcnow()
+ final_results['interval_utc_time'] = datetime.utcnow()
for attribute in attributes:
final_results[attribute] = omci_msg['data'].get(attribute)
except TimeoutError as e:
- self.log.warn('interval-get-timeout', e=e)
+ self.log.warn('interval-get-timeout', e=e, class_id=self._class_id)
self.deferred.errback(failure.Failure(e))
except Exception as e:
- self.log.exception('interval-get-failure', e=e)
+ self.log.exception('interval-get-failure', e=e, class_id=self._class_id)
self.deferred.errback(failure.Failure(e))
# Successful if here
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
index d2f41ec..e818d87 100644
--- a/voltha/extensions/omci/tasks/omci_create_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -77,7 +77,6 @@
@inlineCallbacks
def perform_create(self):
""" Perform the create requests """
- self.log.info('perform-create')
try:
for pm, me in self._me_dict.items():
@@ -86,6 +85,7 @@
me_class_id = me[0]
me_entity_id = me[1]
upstream = me[2]
+ self.log.debug('create-pm-me', class_id=pm_class_id, entity_id=pm_entity_id)
if me_class_id == 0:
# Typical/common PM interval format
@@ -111,7 +111,6 @@
bitmap, # Control fields bitmap
0, # TCI
0 # Reserved
-
]}
frame = OmciFrame(
transaction_id=None, # OMCI-CC will set
diff --git a/voltha/extensions/pki/README.md b/voltha/extensions/pki/README.md
index 903c457..b6bce83 100644
--- a/voltha/extensions/pki/README.md
+++ b/voltha/extensions/pki/README.md
@@ -97,7 +97,7 @@
| :-: | :----- | :---- |
| type | string | "slice" or "ts". A "slice" is a set of path/metric data for the same time-stamp. A "ts" is a time-series: array of data for same metric |
| ts | float | UTC time-stamp of data in slice mode (seconds since the epoch of January 1, 1970) |
-| prefixes | map | One or more prefix_name - value pairs as described below |
+| prefixes | list | One or more prefixes. A prefix is a key-value pair described below |
**NOTE**: The timestamp is currently retrieved as a whole value. It is also possible to easily get
the floating timestamp which contains the fractional seconds since epoch. **Is this of use**?
@@ -208,20 +208,15 @@
This initial code is only a preliminary sample. The following tasks need to be
added to the VOLTHA JIRA or performed in the SEBA group:
-- Get a list from SEBA/VOLTHA on required metrics.
-
-- Provide example JSON output and verify that it meets SEBA's requirements
+- Get a list from SEBA/VOLTHA on required metrics.
- Get feedback from other OLT/ONU developers on any needed changes
-- Test PM group enable/disable
-
- Allow PM groups to have different collection times
-- Solicit VOLTHA/SEBA if support for disabling of individual items in a PM group would be useful
-
- Support calling a 'get-data' method before collect the metrics. Currently metrics are collected
in a device adapter independent way and the PM just updates what the attributes happen to have.
-- TODO: Probably a few more. Look through code for more 'TODO' Notes
+- For statistics groups that have more than one instance, do we need to be able to
+ enable/disable specific instances? Major refactor of code if so (database work, ...)
diff --git a/voltha/extensions/pki/adapter_pm_metrics.py b/voltha/extensions/pki/adapter_pm_metrics.py
index e5b2c96..d9432a0 100644
--- a/voltha/extensions/pki/adapter_pm_metrics.py
+++ b/voltha/extensions/pki/adapter_pm_metrics.py
@@ -45,6 +45,7 @@
self.freq_override = grouped and freq_override
self.lc = None
self.prefix = 'voltha.{}.{}'.format(self.name, self.device_id)
+ self.pm_group_metrics = dict() # name -> PmGroupConfig
def update(self, pm_config):
# TODO: Move any common steps into base class
@@ -74,7 +75,7 @@
if self.lc is not None and self.default_freq > 0:
self.lc.stop()
- def collect_metrics(self, group, names, config):
+ def collect_group_metrics(self, group, names, config):
"""
Collect the metrics for a specific PM group.
@@ -95,14 +96,35 @@
for (metric, t) in names:
if config[metric].enabled and hasattr(group, metric):
metrics[metric] = getattr(group, metric)
+
return metrics
- def collect_group_metrics(self, metrics=None):
+ def collect_metrics(self, metrics=None):
+ """
+ Collect metrics for this adapter.
+
+ This method is called for each adapter at a fixed frequency. The adapter type
+ (OLT, ONU, ..) should provide a derived class where this method iterates
+ through all metrics and collects them up in a dictionary with the group/metric
+ name as the key, and the metric values as the contents.
+
+ For a group, the values are a map where metric_name -> metric_value
+ For and individual metric, the values are the metric value
+
+ TODO: Currently all group metrics are collected on a single timer tick. This needs to be fixed.
+
+ :param metrics: (dict) Existing map to add collected metrics to. This is
+ provided to allow derived classes to call into further
+ encapsulated classes
+
+ :return: (dict) metrics - see description above
+ """
raise NotImplementedError('Your derived class should override this method')
def collect_and_publish_metrics(self):
+ """ Request collection of all enabled metrics and publish them """
try:
- metrics = self.collect_group_metrics()
+ metrics = self.collect_metrics()
self.publish_metrics(metrics)
except Exception as e:
@@ -112,24 +134,27 @@
"""
Publish the metrics during a collection
- :param metrics: (dict) Metrics to publish
+ :param metrics: (dict) Metrics to publish. If empty, no metrics will be published
"""
- import arrow
- from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
+ self.log.debug('publish-metrics', metrics=metrics)
- try:
- ts = arrow.utcnow().timestamp
- kpi_event = KpiEvent(
- type=KpiEventType.slice,
- ts=ts,
- prefixes={
- self.prefix + '.{}'.format(k): MetricValuePairs(metrics=metrics[k])
- for k in metrics.keys()}
- )
- self.adapter_agent.submit_kpis(kpi_event)
+ if len(metrics):
+ import arrow
+ from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
- except Exception as e:
- self.log.exception('failed-to-submit-kpis', e=e)
+ try:
+ ts = arrow.utcnow().timestamp
+ kpi_event = KpiEvent(
+ type=KpiEventType.slice,
+ ts=ts,
+ prefixes={
+ self.prefix + '.{}'.format(k): MetricValuePairs(metrics=metrics[k])
+ for k in metrics.keys()}
+ )
+ self.adapter_agent.submit_kpis(kpi_event)
+
+ except Exception as e:
+ self.log.exception('failed-to-submit-kpis', e=e)
# TODO: Need to support on-demand counter update if provided by the PM 'group'.
# Currently we expect PM data to be periodically polled by a separate
diff --git a/voltha/extensions/pki/olt/olt_pm_metrics.py b/voltha/extensions/pki/olt/olt_pm_metrics.py
index 04e403f..932aa7d 100644
--- a/voltha/extensions/pki/olt/olt_pm_metrics.py
+++ b/voltha/extensions/pki/olt/olt_pm_metrics.py
@@ -123,29 +123,29 @@
self._pon_ports = kwargs.pop('pon-ports', None)
def update(self, pm_config):
- # TODO: Test both 'group' and 'non-group' functionality
- # TODO: Test frequency override capability for a particular group
- if self.default_freq != pm_config.default_freq:
- # Update the callback to the new frequency.
- self.default_freq = pm_config.default_freq
- self.lc.stop()
- self.lc.start(interval=self.default_freq / 10)
+ try:
+ # TODO: Test frequency override capability for a particular group
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
- if pm_config.grouped:
- for m in pm_config.groups:
- # TODO: Need to support individual group enable/disable
- pass
- # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
- # if m.enabled is True:
- # self.enable_pm_collection(m.group_name, remote)
- # else:
- # self.disable_pm_collection(m.group_name, remote)
- else:
- for m in pm_config.metrics:
- self.nni_metrics_config[m.name].enabled = m.enabled
- self.pon_metrics_config[m.name].enabled = m.enabled
- self.onu_metrics_config[m.name].enabled = m.enabled
- self.gem_metrics_config[m.name].enabled = m.enabled
+ if pm_config.grouped:
+ for group in pm_config.groups:
+ group_config = self.pm_group_metrics.get(group.group_name)
+ if group_config is not None:
+ group_config.enabled = group.enabled
+ else:
+ for m in pm_config.metrics:
+ self.nni_metrics_config[m.name].enabled = m.enabled
+ self.pon_metrics_config[m.name].enabled = m.enabled
+ self.onu_metrics_config[m.name].enabled = m.enabled
+ self.gem_metrics_config[m.name].enabled = m.enabled
+
+ except Exception as e:
+ self.log.exception('update-failure', e=e)
+ raise
def make_proto(self, pm_config=None):
if pm_config is None:
@@ -161,6 +161,8 @@
pm_ether_stats = PmGroupConfig(group_name='Ethernet',
group_freq=self.default_freq,
enabled=True)
+ self.pm_group_metrics[pm_ether_stats.group_name] = pm_ether_stats
+
else:
pm_ether_stats = None
@@ -169,22 +171,26 @@
group_freq=self.default_freq,
enabled=True)
- pm_ont_stats = PmGroupConfig(group_name='ONT',
+ pm_onu_stats = PmGroupConfig(group_name='ONU',
group_freq=self.default_freq,
enabled=True)
pm_gem_stats = PmGroupConfig(group_name='GEM',
group_freq=self.default_freq,
enabled=True)
+
+ self.pm_group_metrics[pm_pon_stats.group_name] = pm_pon_stats
+ self.pm_group_metrics[pm_onu_stats.group_name] = pm_onu_stats
+ self.pm_group_metrics[pm_gem_stats.group_name] = pm_gem_stats
else:
pm_pon_stats = None
- pm_ont_stats = None
+ pm_onu_stats = None
pm_gem_stats = None
else:
pm_ether_stats = pm_config if have_nni else None
pm_pon_stats = pm_config if have_pon else None
- pm_ont_stats = pm_config if have_pon else None
+ pm_onu_stats = pm_config if have_pon else None
pm_gem_stats = pm_config if have_pon else None
if have_nni:
@@ -214,7 +220,7 @@
if pm.name in metrics:
continue
metrics.add(pm.name)
- pm_ont_stats.metrics.extend([PmConfig(name=pm.name,
+ pm_onu_stats.metrics.extend([PmConfig(name=pm.name,
type=pm.type,
enabled=pm.enabled)])
@@ -228,57 +234,44 @@
type=pm.type,
enabled=pm.enabled)])
if self.grouped:
- pm_groups = [stats for stats in (pm_ether_stats,
- pm_pon_stats,
- pm_ont_stats,
- pm_gem_stats) if stats is not None]
- pm_config.groups.extend(pm_groups)
+ pm_config.groups.extend([stats for stats in
+ self.pm_group_metrics.itervalues()])
return pm_config
- def collect_group_metrics(self, metrics=None):
+ def collect_metrics(self, metrics=None):
# TODO: Currently PM collection is done for all metrics/groups on a single timer
if metrics is None:
metrics = dict()
- for port in self._nni_ports:
- metrics['nni.{}'.format(port.port_no)] = self.collect_nni_metrics(port)
-
+ if self.pm_group_metrics['Ethernet'].enabled:
+ for port in self._nni_ports:
+ name = 'nni.{}'.format(port.port_no)
+ metrics[name] = self.collect_group_metrics(port,
+ self.nni_pm_names,
+ self.nni_metrics_config)
for port in self._pon_ports:
- metrics['pon.{}'.format(port.pon_id)] = self.collect_pon_metrics(port)
-
+ if self.pm_group_metrics['PON'].enabled:
+ name = 'pon.{}'.format(port.pon_id)
+ metrics[name] = self.collect_group_metrics(port,
+ self.pon_pm_names,
+ self.pon_metrics_config)
for onu_id in port.onu_ids:
onu = port.onu(onu_id)
if onu is not None:
- metrics['pon.{}.onu.{}'.format(port.pon_id, onu.onu_id)] = \
- self.collect_onu_metrics(onu)
- for gem in onu.gem_ports:
- if gem.multicast:
- continue
-
- metrics['pon.{}.onu.{}.gem.{}'.format(port.pon_id,
- onu.onu_id,
- gem.gem_id)] = \
- self.collect_gem_metrics(gem)
- # TODO: Do any multicast GEM PORT metrics here...
+ if self.pm_group_metrics['ONU'].enabled:
+ name = 'pon.{}.onu.{}'.format(port.pon_id, onu.onu_id)
+ metrics[name] = self.collect_group_metrics(onu,
+ self.onu_pm_names,
+ self.onu_metrics_config)
+ if self.pm_group_metrics['GEM'].enabled:
+ for gem in onu.gem_ports:
+ if not gem.multicast:
+ name = 'pon.{}.onu.{}.gem.{}'.format(port.pon_id,
+ onu.onu_id,
+ gem.gem_id)
+ metrics[name] = self.collect_group_metrics(onu,
+ self.gem_pm_names,
+ self.gem_metrics_config)
+ # TODO: Do any multicast GEM PORT metrics here...
return metrics
-
- def collect_nni_metrics(self, nni_port):
- stats = {metric: getattr(nni_port, metric) for (metric, t) in self.nni_pm_names}
- return {metric: value for metric, value in stats.iteritems()
- if self.nni_metrics_config[metric].enabled}
-
- def collect_pon_metrics(self, pon_port):
- stats = {metric: getattr(pon_port, metric) for (metric, t) in self.pon_pm_names}
- return {metric: value for metric, value in stats.iteritems()
- if self.pon_metrics_config[metric].enabled}
-
- def collect_onu_metrics(self, onu):
- stats = {metric: getattr(onu, metric) for (metric, t) in self.onu_pm_names}
- return {metric: value for metric, value in stats.iteritems()
- if self.onu_metrics_config[metric].enabled}
-
- def collect_gem_metrics(self, gem):
- stats = {metric: getattr(gem, metric) for (metric, t) in self.gem_pm_names}
- return {metric: value for metric, value in stats.iteritems()
- if self.gem_metrics_config[metric].enabled}
diff --git a/voltha/extensions/pki/onu/IntervalMetrics.md b/voltha/extensions/pki/onu/IntervalMetrics.md
index 1f5bc53..cdf4e43 100644
--- a/voltha/extensions/pki/onu/IntervalMetrics.md
+++ b/voltha/extensions/pki/onu/IntervalMetrics.md
@@ -262,3 +262,7 @@
| key_report_message_count | This attribute counts key_report PLOAM messages transmitted. |
| acknowledge_message_count | This attribute counts acknowledge PLOAM messages transmitted. It includes all forms of acknowledgement, including those transmitted in response to a PLOAM grant when the ONU has nothing to send. |
| sleep_request_message_count | This attribute counts sleep_request PLOAM messages transmitted. |
+
+# Remaining Work Items
+
+- The enable/disable of a PM group (CLI/NBI) should control whether or not a PM interval is collected
\ No newline at end of file
diff --git a/voltha/extensions/pki/onu/onu_omci_pm.py b/voltha/extensions/pki/onu/onu_omci_pm.py
index 90a68a0..a1a3687 100644
--- a/voltha/extensions/pki/onu/onu_omci_pm.py
+++ b/voltha/extensions/pki/onu/onu_omci_pm.py
@@ -18,6 +18,12 @@
class OnuOmciPmMetrics(AdapterPmMetrics):
+ """ ONU OMCI related metrics """
+
+ # Metric default settings
+ DEFAULT_OMCI_CC_ENABLED = False
+ DEFAULT_OMCI_CC_FREQUENCY = 1200 # 1/10ths of a second
+
def __init__(self, adapter_agent, device_id,
grouped=False, freq_override=False, **kwargs):
"""
@@ -41,9 +47,7 @@
**kwargs)
self._omci_cc = kwargs.pop('omci-cc', None)
- # PM Config Types are COUNTER, GUAGE, and STATE # Note: GAUGE is misspelled in device.proto
self.omci_pm_names = {
- ('enabled', PmConfig.STATE),
('tx_frames', PmConfig.COUNTER),
('tx_errors', PmConfig.COUNTER),
('rx_frames', PmConfig.COUNTER),
@@ -65,7 +69,6 @@
self.openomci_interval_pm = OnuPmIntervalMetrics(adapter_agent, device_id)
def update(self, pm_config):
- # TODO: Test both 'group' and 'non-group' functionality
# TODO: Test frequency override capability for a particular group
if self.default_freq != pm_config.default_freq:
# Update the callback to the new frequency.
@@ -74,14 +77,10 @@
self.lc.start(interval=self.default_freq / 10)
if pm_config.grouped:
- for m in pm_config.groups:
- # TODO: Need to support individual group enable/disable
- pass
- # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
- # if m.enabled is True:,
- # self.enable_pm_collection(m.group_name, remote)
- # else:
- # self.disable_pm_collection(m.group_name, remote)
+ for group in pm_config.groups:
+ group_config = self.pm_group_metrics.get(group.group_name)
+ if group_config is not None:
+ group_config.enabled = group.enabled
else:
for m in pm_config.metrics:
self.omci_metrics_config[m.name].enabled = m.enabled
@@ -93,9 +92,10 @@
if self._omci_cc is not None:
if self.grouped:
- pm_omci_stats = PmGroupConfig(group_name='OMCI',
- group_freq=self.default_freq,
- enabled=True)
+ pm_omci_stats = PmGroupConfig(group_name='OMCI-CC',
+ group_freq=OnuOmciPmMetrics.DEFAULT_OMCI_CC_FREQUENCY,
+ enabled=OnuOmciPmMetrics.DEFAULT_OMCI_CC_ENABLED)
+ self.pm_group_metrics[pm_omci_stats.group_name] = pm_omci_stats
else:
pm_omci_stats = pm_config
@@ -115,7 +115,26 @@
return self.openomci_interval_pm.make_proto(pm_config)
- def collect_device_metrics(self, metrics=None):
+ def collect_metrics(self, metrics=None):
+ """
+ Collect metrics for this adapter.
+
+ This method is callfor each adapter at a fixed frequency. The adapter type
+ (OLT, ONU, ..) should provide a derived class where this method iterates
+ through all metrics and collects them up in a dictionary with the group/metric
+ name as the key, and the metric values as the contents.
+
+ For a group, the values are a map where metric_name -> metric_value
+ For and individual metric, the values are the metric value
+
+ TODO: Currently all group metrics are collected on a single timer tick. This needs to be fixed.
+
+ :param metrics: (dict) Existing map to add collected metrics to. This is
+ provided to allow derived classes to call into further
+ encapsulated classes
+
+ :return: (dict) metrics - see description above
+ """
# TODO: Currently PM collection is done for all metrics/groups on a single timer
if metrics is None:
metrics = dict()
@@ -123,7 +142,8 @@
# Note: Interval PM is collection done autonomously, not through this method
if self._omci_cc is not None:
- metrics['omci-cc'] = self.collect_metrics(self._omci_cc,
- self.omci_pm_names,
- self.omci_metrics_config)
+ if self.pm_group_metrics['OMCI-CC'].enabled:
+ metrics['OMCI-CC'] = self.collect_group_metrics(self._omci_cc,
+ self.omci_pm_names,
+ self.omci_metrics_config)
return metrics
diff --git a/voltha/extensions/pki/onu/onu_pm_interval_metrics.py b/voltha/extensions/pki/onu/onu_pm_interval_metrics.py
index c34c6e3..1f52e56 100644
--- a/voltha/extensions/pki/onu/onu_pm_interval_metrics.py
+++ b/voltha/extensions/pki/onu/onu_pm_interval_metrics.py
@@ -33,12 +33,24 @@
result of receipt of OMCI get responses on various PM History MEs. They are
also always managed as a group with a fixed frequency of 15 minutes.
"""
+ ME_ID_INFO = {
+ EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id: 'Ethernet Bridge Port History',
+ EthernetFrameDownstreamPerformanceMonitoringHistoryData.class_id: 'Ethernet Bridge Port History',
+ EthernetFrameExtendedPerformanceMonitoring.class_id: 'Ethernet Bridge Port History',
+ EthernetFrameExtendedPerformanceMonitoring64Bit.class_id: 'Ethernet Bridge Port History',
+ EthernetPMMonitoringHistoryData.class_id: 'Ethernet UNI History',
+ FecPerformanceMonitoringHistoryData.class_id: 'FEC History',
+ GemPortNetworkCtpMonitoringHistoryData.class_id: 'GEM Port History',
+ XgPonTcPerformanceMonitoringHistoryData.class_id: 'xgPON TC History',
+ XgPonDownstreamPerformanceMonitoringHistoryData.class_id: 'xgPON Downstream History',
+ XgPonUpstreamPerformanceMonitoringHistoryData.class_id: 'xgPON Upstream History'
+ }
+
def __init__(self, adapter_agent, device_id, **kwargs):
super(OnuPmIntervalMetrics, self).__init__(adapter_agent, device_id,
grouped=True, freq_override=False,
**kwargs)
ethernet_bridge_history = {
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -61,7 +73,6 @@
for (m, t) in ethernet_bridge_history}
ethernet_uni_history = { # Ethernet History Data (Class ID 24)
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -84,7 +95,6 @@
for (m, t) in ethernet_uni_history}
fec_history = { # FEC History Data (Class ID 312)
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -98,7 +108,6 @@
for (m, t) in fec_history}
gem_port_history = { # GEM Port Network CTP History Data (Class ID 341)
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -112,7 +121,6 @@
for (m, t) in gem_port_history}
xgpon_tc_history = { # XgPon TC History Data (Class ID 344)
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -129,7 +137,6 @@
for (m, t) in xgpon_tc_history}
xgpon_downstream_history = { # XgPon Downstream History Data (Class ID 345)
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -152,7 +159,6 @@
for (m, t) in xgpon_downstream_history}
xgpon_upstream_history = { # XgPon Upstream History Data (Class ID 346)
- ('enabled', PmConfig.STATE),
('class_id', PmConfig.GUAGE),
('entity_id', PmConfig.GUAGE),
("interval_end_time", PmConfig.GUAGE),
@@ -187,14 +193,18 @@
:param pm_config:
"""
- for m in pm_config.groups:
- # TODO: Need to support individual group enable/disable
- pass
- # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
- # if m.enabled is True:,
- # self.enable_pm_collection(m.group_name, remote)
- # else:
- # self.disable_pm_collection(m.group_name, remote)
+ self.log.debug('update')
+
+ try:
+ for group in pm_config.groups:
+ group_config = self.pm_group_metrics.get(group.group_name)
+ if group_config is not None and group_config.enabled != group.enabled:
+ group_config.enabled = group.enabled
+ # TODO: For OMCI PM Metrics, tie this into add/remove of the PM Interval ME itself
+
+ except Exception as e:
+ self.log.exception('update-failure', e=e)
+ raise
def make_proto(self, pm_config=None):
"""
@@ -210,9 +220,10 @@
"""
assert pm_config is not None
- pm_ethernet_bridge_history = PmGroupConfig(group_name='Ethernet Bridge Port History',
+ pm_ethernet_bridge_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[EthernetFrameUpstreamPerformanceMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_ethernet_bridge_history.group_name] = pm_ethernet_bridge_history
for m in sorted(self._ethernet_bridge_history_config):
pm = self._ethernet_bridge_history_config[m]
@@ -220,9 +231,10 @@
type=pm.type,
enabled=pm.enabled)])
- pm_ethernet_uni_history = PmGroupConfig(group_name='Ethernet UNI History',
+ pm_ethernet_uni_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[EthernetPMMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_ethernet_uni_history.group_name] = pm_ethernet_uni_history
for m in sorted(self._ethernet_uni_history_config):
pm = self._ethernet_uni_history_config[m]
@@ -230,9 +242,10 @@
type=pm.type,
enabled=pm.enabled)])
- pm_fec_history = PmGroupConfig(group_name='Upstream Ethernet History',
+ pm_fec_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[FecPerformanceMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_fec_history.group_name] = pm_fec_history
for m in sorted(self._fec_history_config):
pm = self._fec_history_config[m]
@@ -240,9 +253,10 @@
type=pm.type,
enabled=pm.enabled)])
- pm_gem_port_history = PmGroupConfig(group_name='GEM Port History',
+ pm_gem_port_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[GemPortNetworkCtpMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_gem_port_history.group_name] = pm_gem_port_history
for m in sorted(self._gem_port_history_config):
pm = self._gem_port_history_config[m]
@@ -250,9 +264,10 @@
type=pm.type,
enabled=pm.enabled)])
- pm_xgpon_tc_history = PmGroupConfig(group_name='xgPON TC History',
+ pm_xgpon_tc_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[XgPonTcPerformanceMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_xgpon_tc_history.group_name] = pm_xgpon_tc_history
for m in sorted(self._xgpon_tc_history_config):
pm = self._xgpon_tc_history_config[m]
@@ -260,9 +275,10 @@
type=pm.type,
enabled=pm.enabled)])
- pm_xgpon_downstream_history = PmGroupConfig(group_name='xgPON Downstream History',
+ pm_xgpon_downstream_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[XgPonDownstreamPerformanceMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_xgpon_downstream_history.group_name] = pm_xgpon_downstream_history
for m in sorted(self._xgpon_downstream_history_config):
pm = self._xgpon_downstream_history_config[m]
@@ -270,9 +286,10 @@
type=pm.type,
enabled=pm.enabled)])
- pm_xgpon_upstream_history = PmGroupConfig(group_name='xgPON Upstream History',
+ pm_xgpon_upstream_history = PmGroupConfig(group_name=OnuPmIntervalMetrics.ME_ID_INFO[XgPonUpstreamPerformanceMonitoringHistoryData.class_id],
group_freq=0,
enabled=True)
+ self.pm_group_metrics[pm_xgpon_upstream_history.group_name] = pm_xgpon_upstream_history
for m in sorted(self._xgpon_upstream_history_config):
pm = self._xgpon_upstream_history_config[m]
@@ -280,14 +297,8 @@
type=pm.type,
enabled=pm.enabled)])
- pm_config.groups.extend([pm_ethernet_bridge_history,
- pm_ethernet_uni_history,
- pm_fec_history,
- pm_gem_port_history,
- pm_xgpon_tc_history,
- pm_xgpon_downstream_history,
- pm_xgpon_upstream_history
- ])
+ pm_config.groups.extend([stats for stats in self.pm_group_metrics.itervalues()])
+
return pm_config
def publish_metrics(self, interval_data):
@@ -305,14 +316,18 @@
:return: (dict) Key/Value of metric data
"""
+ self.log.debug('publish-metrics', metrics=interval_data)
+
try:
import arrow
from voltha.protos.events_pb2 import KpiEvent, KpiEventType, MetricValuePairs
# Locate config
- config = self._configs.get(interval_data['class_id'])
+ class_id = interval_data['class_id']
+ config = self._configs.get(class_id)
+ group = self.pm_group_metrics.get(OnuPmIntervalMetrics.ME_ID_INFO.get(class_id, ''))
- if config is not None and config['enabled'].enabled:
+ if config is not None and group is not None and group.enabled:
# Extract only the metrics we need to publish
config_keys = config.keys()
metrics = {
diff --git a/voltha/extensions/pki/onu/onu_pm_metrics.py b/voltha/extensions/pki/onu/onu_pm_metrics.py
index f1f08f1..9cede63 100644
--- a/voltha/extensions/pki/onu/onu_pm_metrics.py
+++ b/voltha/extensions/pki/onu/onu_pm_metrics.py
@@ -21,10 +21,15 @@
"""
Shared ONU Device Adapter PM Metrics Manager
- This class specifically addresses ONU genernal PM (health, ...) area
+ This class specifically addresses ONU general PM (health, ...) area
specific PM (OMCI, PON, UNI) is supported in encapsulated classes accessible
from this object
"""
+
+ # Metric default settings
+ DEFAULT_HEARTBEAT_ENABLED = False
+ DEFAULT_HEARTBEAT_FREQUENCY = 1200 # 1/10ths of a second
+
def __init__(self, adapter_agent, device_id, grouped=False, freq_override=False, **kwargs):
"""
Initializer for shared ONU Device Adapter PM metrics
@@ -52,7 +57,6 @@
# there is not yet a common 'heartbeat' object
#
self.health_pm_names = {
- ('enabled', PmConfig.STATE),
('alarm_active', PmConfig.STATE),
('heartbeat_count', PmConfig.COUNTER),
('heartbeat_miss', PmConfig.COUNTER),
@@ -70,26 +74,26 @@
freq_override=freq_override, **kwargs)
def update(self, pm_config):
- # TODO: Test both 'group' and 'non-group' functionality
- # TODO: Test frequency override capability for a particular group
- if self.default_freq != pm_config.default_freq:
- # Update the callback to the new frequency.
- self.default_freq = pm_config.default_freq
- self.lc.stop()
- self.lc.start(interval=self.default_freq / 10)
+ try:
+ # TODO: Test frequency override capability for a particular group
+ if self.default_freq != pm_config.default_freq:
+ # Update the callback to the new frequency.
+ self.default_freq = pm_config.default_freq
+ self.lc.stop()
+ self.lc.start(interval=self.default_freq / 10)
- if pm_config.grouped:
- for m in pm_config.groups:
- # TODO: Need to support individual group enable/disable
- pass
- # self.pm_group_metrics[m.group_name].config.enabled = m.enabled
- # if m.enabled is True:,
- # self.enable_pm_collection(m.group_name, remote)
- # else:
- # self.disable_pm_collection(m.group_name, remote)
- else:
- for m in pm_config.metrics:
- self.health_metrics_config[m.name].enabled = m.enabled
+ if pm_config.grouped:
+ for group in pm_config.groups:
+ group_config = self.pm_group_metrics.get(group.group_name)
+ if group_config is not None:
+ group_config.enabled = group.enabled
+ else:
+ for m in pm_config.metrics:
+ self.health_metrics_config[m.name].enabled = m.enabled
+
+ except Exception as e:
+ self.log.exception('update-failure', e=e)
+ raise
self.omci_pm.update(pm_config)
@@ -104,8 +108,9 @@
if self._heartbeat is not None:
if self.grouped:
pm_health_stats = PmGroupConfig(group_name='Heartbeat',
- group_freq=self.default_freq,
- enabled=True)
+ group_freq=OnuPmMetrics.DEFAULT_HEARTBEAT_FREQUENCY,
+ enabled=OnuPmMetrics.DEFAULT_HEARTBEAT_ENABLED)
+ self.pm_group_metrics[pm_health_stats.group_name] = pm_health_stats
else:
pm_health_stats = pm_config
@@ -128,7 +133,12 @@
pm_config = self.omci_pm.make_proto(pm_config)
return pm_config
- def collect_group_metrics(self, metrics=None):
+ def collect_metrics(self, metrics=None):
+ """
+ Collect metrics
+ :param metrics:
+ :return:
+ """
# TODO: Currently PM collection is done for all metrics/groups on a single timer
if metrics is None:
metrics = dict()
@@ -138,7 +148,7 @@
# metrics['heartbeat'] = self.collect_metrics(self._heartbeat,
# self.health_pm_names,
# self.health_metrics_config)
- self.omci_pm.collect_device_metrics(metrics=metrics)
+ self.omci_pm.collect_metrics(metrics=metrics)
# TODO Add PON Port PM
# TODO Add UNI Port PM
return metrics