blob: 386d0ab48498092438c26e234d789d745da70b0e [file] [log] [blame]
# Copyright 2017-present Adtran, Inc.
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
from __future__ import absolute_import
import structlog
from twisted.internet import reactor
from twisted.internet.defer import inlineCallbacks
from voltha_protos.common_pb2 import OperStatus, ConnectStatus
from pyvoltha.adapters.extensions.omci.omci_me import OntGFrame
class HeartBeat(object):
"""Wraps health-check support for ONU"""
INITIAL_DELAY = 60 # Delay after start until first check
TICK_DELAY = 2 # Heartbeat interval
def __init__(self, handler, device_id):
self.log = structlog.get_logger(device_id=device_id)
self._enabled = False
self._handler = handler
self._device_id = device_id
self._defer = None
self._alarm_active = False
self._heartbeat_count = 0
self._heartbeat_miss = 0
self._alarms_raised_count = 0
self.heartbeat_failed_limit = 5
self.heartbeat_last_reason = ''
self.heartbeat_interval = self.TICK_DELAY
def __str__(self):
return "HeartBeat: count:{}, miss: {}".format(self._heartbeat_count,
self._heartbeat_miss)
@staticmethod
def create(handler, device_id):
return HeartBeat(handler, device_id)
def _start(self, delay=INITIAL_DELAY):
self._defer = reactor.callLater(delay, self.check_pulse)
def _stop(self):
d, self._defer = self._defer, None
if d is not None and not d.called():
d.cancel()
@property
def enabled(self):
return self._enabled
@enabled.setter
def enabled(self, value):
if self._enabled != value:
self._enabled = value
# if value:
# self._start()
# else:
# self._stop()
@property
def check_item(self):
return 'vendor_id'
@property
def check_value(self):
# device = self._handler.core_proxy.get_device(self._device_id)
# return device.serial_number
return 'ADTN'
@property
def alarm_active(self):
return self._alarm_active
@property
def heartbeat_count(self):
return self._heartbeat_count
@property
def heartbeat_miss(self):
return self._heartbeat_miss
@property
def alarms_raised_count(self):
return self._alarms_raised_count
def check_pulse(self):
if self.enabled:
try:
self._defer = self._handler.openomci.omci_cc.send(OntGFrame(self.check_item).get())
self._defer.addCallbacks(self._heartbeat_success, self._heartbeat_fail)
except Exception as e:
self._defer = reactor.callLater(5, self._heartbeat_fail, e)
def _heartbeat_success(self, results):
self.log.debug('heartbeat-success')
try:
omci_response = results.getfieldval("omci_message")
data = omci_response.getfieldval("data")
value = data[self.check_item]
if value != self.check_value:
self._heartbeat_miss = self.heartbeat_failed_limit
self.heartbeat_last_reason = "Invalid {}, got '{}' but expected '{}'".\
format(self.check_item, value, self.check_value)
else:
self._heartbeat_miss = 0
self.heartbeat_last_reason = ''
except Exception as e:
self._heartbeat_miss = self.heartbeat_failed_limit
self.heartbeat_last_reason = e.message
self.heartbeat_check_status(results)
def _heartbeat_fail(self, failure):
self._heartbeat_miss += 1
self.log.info('heartbeat-miss', failure=failure,
count=self._heartbeat_count,
miss=self._heartbeat_miss)
self.heartbeat_last_reason = 'OMCI connectivity error'
self.heartbeat_check_status(None)
def on_heartbeat_alarm(self, active):
# TODO: Do something here ?
#
# TODO: If failed (active = true) due to bad serial-number shut off the UNI port?
pass
@inlineCallbacks
def heartbeat_check_status(self, results):
"""
Check the number of heartbeat failures against the limit and emit an alarm if needed
"""
device = self._handler.core_proxy.get_device(self._device_id)
try:
from pyvoltha.adapters.extensions.alarms.heartbeat_alarm import HeartbeatAlarm
if self._heartbeat_miss >= self.heartbeat_failed_limit:
if device.connect_status == ConnectStatus.REACHABLE:
self.log.warning('heartbeat-failed', count=self._heartbeat_miss)
device.connect_status = ConnectStatus.UNREACHABLE
device.oper_status = OperStatus.FAILED
device.reason = self.heartbeat_last_reason
yield self._handler.core_proxy.device_update(device)
HeartbeatAlarm(self._handler.alarms, 'onu', self._heartbeat_miss).raise_alarm()
self._alarm_active = True
self.on_heartbeat_alarm(True)
else:
# Update device states
if device.connect_status != ConnectStatus.REACHABLE and self._alarm_active:
device.connect_status = ConnectStatus.REACHABLE
device.oper_status = OperStatus.ACTIVE
device.reason = ''
yield self._handler.core_proxy.device_update(device)
HeartbeatAlarm(self._handler.alarms, 'onu').clear_alarm()
self._alarm_active = False
self._alarms_raised_count += 1
self.on_heartbeat_alarm(False)
except Exception as e:
self.log.exception('heartbeat-check', e=e)
# Reschedule next heartbeat
if self.enabled:
self._heartbeat_count += 1
self._defer = reactor.callLater(self.heartbeat_interval, self.check_pulse)