SEBA-316 Add performance metrics to brcm_openomci_onu

Change-Id: I369eedb1efc5b924aefb3869e1065a5542fff659
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
index 0f07fa4..c7fce17 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu.py
@@ -194,8 +194,12 @@
     def get_device_details(self, device):
         raise NotImplementedError()
 
-    def update_pm_config(self, device, pm_configs):
-        raise NotImplementedError()
+    # TODO(smbaker): When BrcmOpenomciOnuAdapter is updated to inherit from OnuAdapter, this function can be deleted
+    def update_pm_config(self, device, pm_config):
+        log.info("adapter-update-pm-config", device=device,
+                 pm_config=pm_config)
+        handler = self.devices_handlers[device.id]
+        handler.update_pm_config(device, pm_config)
 
     def update_flows_bulk(self, device, flows, groups):
         '''
diff --git a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
index dcc123e..fa15830 100644
--- a/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
+++ b/voltha/adapters/brcm_openomci_onu/brcm_openomci_onu_handler.py
@@ -22,6 +22,10 @@
 from twisted.internet import reactor, task
 from twisted.internet.defer import DeferredQueue, inlineCallbacks, returnValue, TimeoutError
 
+from heartbeat import HeartBeat
+from voltha.extensions.kpi.onu.onu_pm_metrics import OnuPmMetrics
+from voltha.extensions.kpi.onu.onu_omci_pm import OnuOmciPmMetrics
+
 from common.utils.indexpool import IndexPool
 import voltha.core.flow_decomposer as fd
 from voltha.registry import registry
@@ -78,6 +82,7 @@
         self.proxy_address = None
         self.tx_id = 0
         self._enabled = False
+        self.pm_metrics = None
         self._omcc_version = OMCCVersion.Unknown
         self._total_tcont_count = 0  # From ANI-G ME
         self._qos_flexibility = 0  # From ONT2_G ME
@@ -91,6 +96,8 @@
         self._pon_port_number = 100
         self.logical_device_id = None
 
+        self._heartbeat = HeartBeat.create(self, device_id)
+
         # Set up OpenOMCI environment
         self._onu_omci_device = None
         self._dev_info_loaded = False
@@ -118,6 +125,10 @@
         return self._onu_omci_device.omci_cc if self._onu_omci_device is not None else None
 
     @property
+    def heartbeat(self):
+        return self._heartbeat
+
+    @property
     def uni_ports(self):
         return self._unis.values()
 
@@ -169,12 +180,34 @@
             device.connect_status = ConnectStatus.REACHABLE
             device.oper_status = OperStatus.DISCOVERED
             device.reason = 'activating-onu'
+
+            # pm_metrics requires a logical device id
+            parent_device = self.adapter_agent.get_device(device.parent_id)
+            self.logical_device_id = parent_device.parent_id
+            assert self.logical_device_id, 'Invalid logical device ID'
+
             self.adapter_agent.update_device(device)
 
             self.log.debug('set-device-discovered')
 
             self._init_pon_state(device)
 
+            ############################################################################
+            # Setup PM configuration for this device
+            # Pass in ONU specific options
+            kwargs = {
+                OnuPmMetrics.DEFAULT_FREQUENCY_KEY: OnuPmMetrics.DEFAULT_ONU_COLLECTION_FREQUENCY,
+                'heartbeat': self.heartbeat,
+                OnuOmciPmMetrics.OMCI_DEV_KEY: self._onu_omci_device
+            }
+            self.pm_metrics = OnuPmMetrics(self.adapter_agent, self.device_id,
+                                           self.logical_device_id, grouped=True,
+                                           freq_override=False, **kwargs)
+            pm_config = self.pm_metrics.make_proto()
+            self._onu_omci_device.set_pm_config(self.pm_metrics.omci_pm.openomci_interval_pm)
+            self.log.info("initial-pm-config", pm_config=pm_config)
+            self.adapter_agent.update_device_pm_config(pm_config, init=True)
+
             self.enabled = True
         else:
             self.log.info('onu-already-activated')
@@ -257,6 +290,10 @@
         else:
             self.log.debug("parent-adapter-not-available")
 
+    def update_pm_config(self, device, pm_config):
+        # TODO: This has not been tested
+        self.log.info('update_pm_config', pm_config=pm_config)
+        self.pm_metrics.update(pm_config)
 
     # Calling this assumes the onu is active/ready and had at least an initial mib downloaded.   This gets called from
     # flow decomposition that ultimately comes from onos
@@ -440,6 +477,7 @@
         reactor.callLater(1, self._onu_omci_device.start)
         onu_device.reason = "starting-openomci"
         self.adapter_agent.update_device(onu_device)
+        self._heartbeat.enabled = True
 
     # Currently called each time there is an onu "down" indication from the olt handler
     # TODO: possibly other reasons to "update" from the olt?
@@ -593,6 +631,7 @@
             device.reason = "restarting-openomci"
             self.adapter_agent.update_device(device)
             reactor.callLater(1, self._onu_omci_device.start)
+            self._heartbeat.enabled = True
         except Exception as e:
             log.exception('exception-in-onu-reenable', exception=e)
 
@@ -669,13 +708,6 @@
                                                OnuDeviceEvents.OmciCapabilitiesEvent)
         self._capabilities_subscription = bus.subscribe(topic, self.capabilties_handler)
 
-    def _unsubscribe_to_events(self):
-        self.log.debug('function-entry')
-        if self._in_sync_subscription is not None:
-            bus = self._onu_omci_device.event_bus
-            bus.unsubscribe(self._in_sync_subscription)
-            self._in_sync_subscription = None
-
     # Called when the mib is in sync
     def in_sync_handler(self, _topic, msg):
         self.log.debug('function-entry', _topic=_topic, msg=msg)
diff --git a/voltha/adapters/brcm_openomci_onu/heartbeat.py b/voltha/adapters/brcm_openomci_onu/heartbeat.py
new file mode 100644
index 0000000..4a7ab1f
--- /dev/null
+++ b/voltha/adapters/brcm_openomci_onu/heartbeat.py
@@ -0,0 +1,179 @@
+# Copyright 2017-present Adtran, Inc.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+import structlog
+from twisted.internet import reactor
+from voltha.protos.common_pb2 import OperStatus, ConnectStatus
+from voltha.extensions.omci.omci_me import OntGFrame
+
+
+class HeartBeat(object):
+    """Wraps health-check support for ONU"""
+    INITIAL_DELAY = 60                      # Delay after start until first check
+    TICK_DELAY = 2                          # Heartbeat interval
+
+    def __init__(self, handler, device_id):
+        self.log = structlog.get_logger(device_id=device_id)
+        self._enabled = False
+        self._handler = handler
+        self._device_id = device_id
+        self._defer = None
+        self._alarm_active = False
+        self._heartbeat_count = 0
+        self._heartbeat_miss = 0
+        self._alarms_raised_count = 0
+        self.heartbeat_failed_limit = 5
+        self.heartbeat_last_reason = ''
+        self.heartbeat_interval = self.TICK_DELAY
+
+    def __str__(self):
+        return "HeartBeat: count:{}, miss: {}".format(self._heartbeat_count,
+                                                      self._heartbeat_miss)
+
+    @staticmethod
+    def create(handler, device_id):
+        return HeartBeat(handler, device_id)
+
+    def _start(self, delay=INITIAL_DELAY):
+        self._defer = reactor.callLater(delay, self.check_pulse)
+
+    def _stop(self):
+        d, self._defeered = self._defeered, None
+        if d is not None and not d.called():
+            d.cancel()
+
+    @property
+    def enabled(self):
+        return self._enabled
+
+    @enabled.setter
+    def enabled(self, value):
+        if self._enabled != value:
+            self._enabled = value
+
+            # if value:
+            #     self._start()
+            # else:
+            #     self._stop()
+
+    @property
+    def check_item(self):
+        return 'vendor_id'
+
+    @property
+    def check_value(self):
+        # device = self._handler.adapter_agent.get_device(self._device_id)
+        # return device.serial_number
+        return 'ADTN'
+
+    @property
+    def alarm_active(self):
+        return self._alarm_active
+
+    @property
+    def heartbeat_count(self):
+        return self._heartbeat_count
+
+    @property
+    def heartbeat_miss(self):
+        return self._heartbeat_miss
+
+    @property
+    def alarms_raised_count(self):
+        return self._alarms_raised_count
+
+    def check_pulse(self):
+        if self.enabled:
+            try:
+                self._defer = self._handler.openomci.omci_cc.send(OntGFrame(self.check_item).get())
+                self._defer.addCallbacks(self._heartbeat_success, self._heartbeat_fail)
+
+            except Exception as e:
+                self._defer = reactor.callLater(5, self._heartbeat_fail, e)
+
+    def _heartbeat_success(self, results):
+        self.log.debug('heartbeat-success')
+
+        try:
+            omci_response = results.getfieldval("omci_message")
+            data = omci_response.getfieldval("data")
+            value = data[self.check_item]
+
+            if value != self.check_value:
+                self._heartbeat_miss = self.heartbeat_failed_limit
+                self.heartbeat_last_reason = "Invalid {}, got '{}' but expected '{}'".\
+                    format(self.check_item, value, self.check_value)
+            else:
+                self._heartbeat_miss = 0
+                self.heartbeat_last_reason = ''
+
+        except Exception as e:
+            self._heartbeat_miss = self.heartbeat_failed_limit
+            self.heartbeat_last_reason = e.message
+
+        self.heartbeat_check_status(results)
+
+    def _heartbeat_fail(self, failure):
+        self._heartbeat_miss += 1
+        self.log.info('heartbeat-miss', failure=failure,
+                      count=self._heartbeat_count,
+                      miss=self._heartbeat_miss)
+        self.heartbeat_last_reason = 'OMCI connectivity error'
+        self.heartbeat_check_status(None)
+
+    def on_heartbeat_alarm(self, active):
+        # TODO: Do something here ?
+        #
+        #  TODO: If failed (active = true) due to bad serial-number shut off the UNI port?
+        pass
+
+    def heartbeat_check_status(self, results):
+        """
+        Check the number of heartbeat failures against the limit and emit an alarm if needed
+        """
+        device = self._handler.adapter_agent.get_device(self._device_id)
+
+        try:
+            from voltha.extensions.alarms.heartbeat_alarm import HeartbeatAlarm
+
+            if self._heartbeat_miss >= self.heartbeat_failed_limit:
+                if device.connect_status == ConnectStatus.REACHABLE:
+                    self.log.warning('heartbeat-failed', count=self._heartbeat_miss)
+                    device.connect_status = ConnectStatus.UNREACHABLE
+                    device.oper_status = OperStatus.FAILED
+                    device.reason = self.heartbeat_last_reason
+                    self._handler.adapter_agent.update_device(device)
+                    HeartbeatAlarm(self._handler.alarms, 'onu', self._heartbeat_miss).raise_alarm()
+                    self._alarm_active = True
+                    self.on_heartbeat_alarm(True)
+            else:
+                # Update device states
+                if device.connect_status != ConnectStatus.REACHABLE and self._alarm_active:
+                    device.connect_status = ConnectStatus.REACHABLE
+                    device.oper_status = OperStatus.ACTIVE
+                    device.reason = ''
+                    self._handler.adapter_agent.update_device(device)
+                    HeartbeatAlarm(self._handler.alarms, 'onu').clear_alarm()
+
+                    self._alarm_active = False
+                    self._alarms_raised_count += 1
+                    self.on_heartbeat_alarm(False)
+
+        except Exception as e:
+            self.log.exception('heartbeat-check', e=e)
+
+        # Reschedule next heartbeat
+        if self.enabled:
+            self._heartbeat_count += 1
+            self._defer = reactor.callLater(self.heartbeat_interval, self.check_pulse)
diff --git a/voltha/extensions/omci/state_machines/performance_intervals.py b/voltha/extensions/omci/state_machines/performance_intervals.py
index 1d5da4d..486eba8 100644
--- a/voltha/extensions/omci/state_machines/performance_intervals.py
+++ b/voltha/extensions/omci/state_machines/performance_intervals.py
@@ -84,7 +84,7 @@
     DEFAULT_TICK_DELAY = 15          # Seconds between checks for collection tick
     DEFAULT_INTERVAL_SKEW = 10 * 60  # Seconds to skew past interval boundary
     DEFAULT_COLLECT_ATTEMPTS = 3     # Maximum number of collection fetch attempts
-    DEFAULT_CREATE_ATTEMPTS = 5      # Maximum number of attempts to create a PM Managed Entities
+    DEFAULT_CREATE_ATTEMPTS = 15     # Maximum number of attempts to create a PM Managed Entities
 
     def __init__(self, agent, device_id, tasks,
                  advertise_events=False,
@@ -508,7 +508,7 @@
             self._deferred = reactor.callLater(0, self.success)
 
         def failure(reason):
-            self.log.info('create-me-failure', reason=reason)
+            self.log.info('create-me-failure', reason=reason, retries=self._add_pm_me_retry)
             self._current_task = None
             if self._add_pm_me_retry <= self._create_attempts:
               for pm, me in mes.items():
@@ -517,7 +517,7 @@
               self._deferred = reactor.callLater(self._timeout_delay, self.failure)
             else:
               # we cant seem to create any collection me, no point in doing anything
-              self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason)
+              self.log.warn('unable-to-create-pm-me-disabling-collection', reason=reason, device_id=self._device_id)
               self._deferred = reactor.callLater(self._timeout_delay, self.stop)
 
         self._current_task = self._create_pm_task(self._agent, self._device_id, mes)
@@ -558,7 +558,9 @@
         """
         State machine has just transitioned to the collect_data state
         """
+
         if self._next_interval is not None and self._next_interval > datetime.utcnow():
+            self.log.info('wait-next-interval')
             # Not ready for next interval, transition back to idle and we should get
             # called again after a short delay
             reactor.callLater(0, self.success)
@@ -574,6 +576,8 @@
             class_id = key[0]
             entity_id = key[1]
 
+            self.log.info("in-enter-collect-data", key=key, retries=self._pm_me_collect_retries[key])
+
             # Collect the data ?
             if self._pm_me_collect_retries[key] > 0:
                 def success(results):
diff --git a/voltha/extensions/omci/tasks/omci_create_pm_task.py b/voltha/extensions/omci/tasks/omci_create_pm_task.py
index e0e92c3..4743b03 100644
--- a/voltha/extensions/omci/tasks/omci_create_pm_task.py
+++ b/voltha/extensions/omci/tasks/omci_create_pm_task.py
@@ -15,7 +15,7 @@
 #
 from task import Task
 from twisted.internet import reactor
-from twisted.internet.defer import inlineCallbacks, failure
+from twisted.internet.defer import inlineCallbacks, failure, TimeoutError
 from voltha.extensions.omci.omci_defs import ReasonCodes, EntityOperations
 from voltha.extensions.omci.omci_frame import OmciFrame
 from voltha.extensions.omci.omci_messages import OmciCreate
@@ -118,7 +118,12 @@
                         )
                     )
                 self.strobe_watchdog()
-                results = yield self._device.omci_cc.send(frame)
+                try:
+                    results = yield self._device.omci_cc.send(frame)
+                except TimeoutError:
+                    self.log.warning('perform-create-timeout', me_class_id=me_class_id, me_entity_id=me_entity_id,
+                                     pm_class_id=pm_class_id, pm_entity_id=pm_entity_id)
+                    raise
 
                 status = results.fields['omci_message'].fields['success_code']
                 self.log.debug('perform-create-status', status=status)
@@ -132,6 +137,7 @@
 
                 self.log.debug('create-pm-success', class_id=pm_class_id,
                                entity_id=pm_entity_id)
+
             self.deferred.callback(self)
 
         except Exception as e: