SEBA-316 Add performance metrics to brcm_openomci_onu
Change-Id: I369eedb1efc5b924aefb3869e1065a5542fff659
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
index 0f07fa4..c7fce17 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
@@ -194,8 +194,12 @@
def get_device_details(self, device):
raise NotImplementedError()
- def update_pm_config(self, device, pm_configs):
- raise NotImplementedError()
+ # TODO(smbaker): When BrcmOpenomciOnuAdapter is updated to inherit from OnuAdapter, this function can be deleted
+ def update_pm_config(self, device, pm_config):
+ log.info("adapter-update-pm-config", device=device,
+ pm_config=pm_config)
+ handler = self.devices_handlers[device.id]
+ handler.update_pm_config(device, pm_config)
def update_flows_bulk(self, device, flows, groups):
'''
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index dcc123e..fa15830 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -22,6 +22,10 @@
from twisted.internet import reactor, task
from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
+from heartbeat import HeartBeat
+from voltha.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
+from voltha.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
+
from common.utils.indexpool import IndexPool
import voltha.core.flow_decomposer as fd
from voltha.registry import registry
@@ -78,6 +82,7 @@
self.proxy_address = None
self.tx_id = 0
self._enabled = False
+ self.pm_metrics = None
self._omcc_version = OMCCVersion.Unknown
self._total_tcont_count = 0 # From ANI-G ME
self._qos_flexibility = 0 # From ONT2_G ME
@@ -91,6 +96,8 @@
self._pon_port_number = 100
self.logical_device_id = None
+ self._heartbeat = HeartBeat.create(self, device_id)
+
# Set up OpenOMCI environment
self._onu_omci_device = None
self._dev_info_loaded = False
@@ -118,6 +125,10 @@
return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
@property
+ def heartbeat(self):
+ return self._heartbeat
+
+ @property
def uni_ports(self):
return self._unis.values()
@@ -169,12 +180,34 @@
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.DISCOVERED
device.reason = 'activating-onu'
+
+ # pm_metrics requires a logical device id
+ parent_device = self.adapter_agent.get_device(device.parent_id)
+ self.logical_device_id = parent_device.parent_id
+ assert self.logical_device_id, 'Invalid logical device ID'
+
self.adapter_agent.update_device(device)
self.log.debug('set-device-discovered')
self._init_pon_state(device)
+ ############################################################################
+ # Setup PM configuration for this device
+ # Pass in ONU specific options
+ kwargs = {
+ OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
+ 'heartbeat': self.heartbeat,
+ OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
+ }
+ self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
+ self.logical_device_id, grouped=True,
+ freq_override=False, **kwargs)
+ pm_config = self.pm_metrics.make_proto()
+ self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
+ self.log.info("initial-pm-config", pm_config=pm_config)
+ self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
self.enabled = True
else:
self.log.info('onu-already-activated')
@@ -257,6 +290,10 @@
else:
self.log.debug("parent-adapter-not-available")
+ def update_pm_config(self, device, pm_config):
+ # TODO: This has not been tested
+ self.log.info('update_pm_config', pm_config=pm_config)
+ self.pm_metrics.update(pm_config)
# Calling this assumes the onu is active/ready and had at least an initial mib downloaded. This gets called from
# flow decomposition that ultimately comes from onos
@@ -440,6 +477,7 @@
reactor.callLater(1, self._onu_omci_device.start)
onu_device.reason = "starting-openomci"
self.adapter_agent.update_device(onu_device)
+ self._heartbeat.enabled = True
# Currently called each time there is an onu "down" indication from the olt handler
# TODO: possibly other reasons to "update" from the olt?
@@ -593,6 +631,7 @@
device.reason = "restarting-openomci"
self.adapter_agent.update_device(device)
reactor.callLater(1, self._onu_omci_device.start)
+ self._heartbeat.enabled = True
except Exception as e:
log.exception('exception-in-onu-reenable', exception=e)
@@ -669,13 +708,6 @@
OnuDeviceEvents.OmciCapabilitiesEvent)
self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
- def _unsubscribe_to_events(self):
- self.log.debug('function-entry')
- if self._in_sync_subscription is not None:
- bus = self._onu_omci_device.event_bus
- bus.unsubscribe(self._in_sync_subscription)
- self._in_sync_subscription = None
-
# Called when the mib is in sync
def in_sync_handler(self, _topic, msg):
self.log.debug('function-entry', _topic=_topic, msg=msg)
diff --git a/voltha/adapters/brcm_openomci_onu/heartbeat.py b/voltha/adapters/brcm_openomci_onu/heartbeat.py
new file mode 100644
index 0000000..4a7ab1f
--- /dev/null
+++ b/voltha/adapters/brcm_openomci_onu/heartbeat.py
@@ -0,0 +1,179 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from twisted.internet import reactor
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus
+from voltha.extensions.omci.omci_me import OntGFrame
+
+
+class HeartBeat(object):
+ """Wraps health-check support for ONU"""
+ INITIAL_DELAY = 60 # Delay after start until first check
+ TICK_DELAY = 2 # Heartbeat interval
+
+ def __init__(self, handler, device_id):
+ self.log = structlog.get_logger(device_id=device_id)
+ self._enabled = False
+ self._handler = handler
+ self._device_id = device_id
+ self._defer = None
+ self._alarm_active = False
+ self._heartbeat_count = 0
+ self._heartbeat_miss = 0
+ self._alarms_raised_count = 0
+ self.heartbeat_failed_limit = 5
+ self.heartbeat_last_reason = ''
+ self.heartbeat_interval = self.TICK_DELAY
+
+ def __str__(self):
+ return "HeartBeat: count:{}, miss: {}".format(self._heartbeat_count,
+ self._heartbeat_miss)
+
+ @staticmethod
+ def create(handler, device_id):
+ return HeartBeat(handler, device_id)
+
+ def _start(self, delay=INITIAL_DELAY):
+ self._defer = reactor.callLater(delay, self.check_pulse)
+
+ def _stop(self):
+ d, self._defeered = self._defeered, None
+ if d is not None and not d.called():
+ d.cancel()
+
+ @property
+ def enabled(self):
+ return self._enabled
+
+ @enabled.setter
+ def enabled(self, value):
+ if self._enabled != value:
+ self._enabled = value
+
+ # if value:
+ # self._start()
+ # else:
+ # self._stop()
+
+ @property
+ def check_item(self):
+ return 'vendor_id'
+
+ @property
+ def check_value(self):
+ # device = self._handler.adapter_agent.get_device(self._device_id)
+ # return device.serial_number
+ return 'ADTN'
+
+ @property
+ def alarm_active(self):
+ return self._alarm_active
+
+ @property
+ def heartbeat_count(self):
+ return self._heartbeat_count
+
+ @property
+ def heartbeat_miss(self):
+ return self._heartbeat_miss
+
+ @property
+ def alarms_raised_count(self):
+ return self._alarms_raised_count
+
+ def check_pulse(self):
+ if self.enabled:
+ try:
+ self._defer = self._handler.openomci.omci_cc.send(OntGFrame(self.check_item).get())
+ self._defer.addCallbacks(self._heartbeat_success, self._heartbeat_fail)
+
+ except Exception as e:
+ self._defer = reactor.callLater(5, self._heartbeat_fail, e)
+
+ def _heartbeat_success(self, results):
+ self.log.debug('heartbeat-success')
+
+ try:
+ omci_response = results.getfieldval("omci_message")
+ data = omci_response.getfieldval("data")
+ value = data[self.check_item]
+
+ if value != self.check_value:
+ self._heartbeat_miss = self.heartbeat_failed_limit
+ self.heartbeat_last_reason = "Invalid {}, got '{}' but expected '{}'".\
+ format(self.check_item, value, self.check_value)
+ else:
+ self._heartbeat_miss = 0
+ self.heartbeat_last_reason = ''
+
+ except Exception as e:
+ self._heartbeat_miss = self.heartbeat_failed_limit
+ self.heartbeat_last_reason = e.message
+
+ self.heartbeat_check_status(results)
+
+ def _heartbeat_fail(self, failure):
+ self._heartbeat_miss += 1
+ self.log.info('heartbeat-miss', failure=failure,
+ count=self._heartbeat_count,
+ miss=self._heartbeat_miss)
+ self.heartbeat_last_reason = 'OMCI connectivity error'
+ self.heartbeat_check_status(None)
+
+ def on_heartbeat_alarm(self, active):
+ # TODO: Do something here ?
+ #
+ # TODO: If failed (active = true) due to bad serial-number shut off the UNI port?
+ pass
+
+ def heartbeat_check_status(self, results):
+ """
+ Check the number of heartbeat failures against the limit and emit an alarm if needed
+ """
+ device = self._handler.adapter_agent.get_device(self._device_id)
+
+ try:
+ from voltha.extensions.alarms.heartbeat_alarm import HeartbeatAlarm
+
+ if self._heartbeat_miss >= self.heartbeat_failed_limit:
+ if device.connect_status == ConnectStatus.REACHABLE:
+ self.log.warning('heartbeat-failed', count=self._heartbeat_miss)
+ device.connect_status = ConnectStatus.UNREACHABLE
+ device.oper_status = OperStatus.FAILED
+ device.reason = self.heartbeat_last_reason
+ self._handler.adapter_agent.update_device(device)
+ HeartbeatAlarm(self._handler.alarms, 'onu', self._heartbeat_miss).raise_alarm()
+ self._alarm_active = True
+ self.on_heartbeat_alarm(True)
+ else:
+ # Update device states
+ if device.connect_status != ConnectStatus.REACHABLE and self._alarm_active:
+ device.connect_status = ConnectStatus.REACHABLE
+ device.oper_status = OperStatus.ACTIVE
+ device.reason = ''
+ self._handler.adapter_agent.update_device(device)
+ HeartbeatAlarm(self._handler.alarms, 'onu').clear_alarm()
+
+ self._alarm_active = False
+ self._alarms_raised_count += 1
+ self.on_heartbeat_alarm(False)
+
+ except Exception as e:
+ self.log.exception('heartbeat-check', e=e)
+
+ # Reschedule next heartbeat
+ if self.enabled:
+ self._heartbeat_count += 1
+ self._defer = reactor.callLater(self.heartbeat_interval, self.check_pulse)
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
index 1d5da4d..486eba8 100644
--- a/voltha/extensions/omci/state_machines/performance_intervals.py
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -84,7 +84,7 @@
DEFAULT_TICK_DELAY = 15 # Seconds between checks for collection tick
DEFAULT_INTERVAL_SKEW = 10 * 60 # Seconds to skew past interval boundary
DEFAULT_COLLECT_ATTEMPTS = 3 # Maximum number of collection fetch attempts
- DEFAULT_CREATE_ATTEMPTS = 5 # Maximum number of attempts to create a PM Managed Entities
+ DEFAULT_CREATE_ATTEMPTS = 15 # Maximum number of attempts to create a PM Managed Entities
def __init__(self, agent, device_id, tasks,
advertise_events=False,
@@ -508,7 +508,7 @@
self._deferred = reactor.callLater(0, self.success)
def failure(reason):
- self.log.info('create-me-failure', reason=reason)
+ self.log.info('create-me-failure', reason=reason, retries=self._add_pm_me_retry)
self._current_task = None
if self._add_pm_me_retry <= self._create_attempts:
for pm, me in mes.items():
@@ -517,7 +517,7 @@
self._deferred = reactor.callLater(self._timeout_delay, self.failure)
else:
# we cant seem to create any collection me, no point in doing anything
- self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason)
+ self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason, device_id=self._device_id)
self._deferred = reactor.callLater(self._timeout_delay, self.stop)
self._current_task = self._create_pm_task(self._agent, self._device_id, mes)
@@ -558,7 +558,9 @@
"""
State machine has just transitioned to the collect_data state
"""
+
if self._next_interval is not None and self._next_interval > datetime.utcnow():
+ self.log.info('wait-next-interval')
# Not ready for next interval, transition back to idle and we should get
# called again after a short delay
reactor.callLater(0, self.success)
@@ -574,6 +576,8 @@
class_id = key[0]
entity_id = key[1]
+ self.log.info("in-enter-collect-data", key=key, retries=self._pm_me_collect_retries[key])
+
# Collect the data ?
if self._pm_me_collect_retries[key] > 0:
def success(results):
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
index e0e92c3..4743b03 100644
--- a/voltha/extensions/omci/tasks/omci_create_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -15,7 +15,7 @@
#
from task import Task
from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, failure
+from twisted.internet.defer import inlineCallbacks, failure, TimeoutError
from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
from voltha.extensions.omci.omci_frame import OmciFrame
from voltha.extensions.omci.omci_messages import OmciCreate
@@ -118,7 +118,12 @@
)
)
self.strobe_watchdog()
- results = yield self._device.omci_cc.send(frame)
+ try:
+ results = yield self._device.omci_cc.send(frame)
+ except TimeoutError:
+ self.log.warning('perform-create-timeout', me_class_id=me_class_id, me_entity_id=me_entity_id,
+ pm_class_id=pm_class_id, pm_entity_id=pm_entity_id)
+ raise
status = results.fields['omci_message'].fields['success_code']
self.log.debug('perform-create-status', status=status)
@@ -132,6 +137,7 @@
self.log.debug('create-pm-success', class_id=pm_class_id,
entity_id=pm_entity_id)
+
self.deferred.callback(self)
except Exception as e: