Voltha Alarm Filters: Added hooks to suppress alarms at the adapter
- Updates based on review comments
- Fixed indentation
Change-Id: Icd1c35919d1c23b86094d4684aaaafb7bae57754
diff --git a/voltha/adapters/broadcom_onu/broadcom_onu.py b/voltha/adapters/broadcom_onu/broadcom_onu.py
index b8f7d7f..e02c9ed 100644
--- a/voltha/adapters/broadcom_onu/broadcom_onu.py
+++ b/voltha/adapters/broadcom_onu/broadcom_onu.py
@@ -144,6 +144,11 @@
def receive_inter_adapter_message(self, msg):
log.info('receive_inter_adapter_message', msg=msg)
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
class BroadcomOnuHandler(object):
diff --git a/voltha/adapters/dpoe_onu/dpoe_onu.py b/voltha/adapters/dpoe_onu/dpoe_onu.py
index 6084bc4..5aa8838 100644
--- a/voltha/adapters/dpoe_onu/dpoe_onu.py
+++ b/voltha/adapters/dpoe_onu/dpoe_onu.py
@@ -393,6 +393,12 @@
proxy_address=proxy_address, msg=msg)
self.incoming_messages.put(msg)
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
@inlineCallbacks
def _message_exchange(self, device):
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index 60c27f1..e44537b 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -203,6 +203,20 @@
:return: None
"""
+ def suppress_alarm(filter):
+ """
+ Inform an adapter that all incoming alarms should be suppressed
+ :param filter: A Voltha.AlarmFilter object.
+ :return: (Deferred) Shall be fired to acknowledge the suppression.
+ """
+
+ def unsuppress_alarm(filter):
+ """
+ Inform an adapter that all incoming alarms should resume
+ :param filter: A Voltha.AlarmFilter object.
+ :return: (Deferred) Shall be fired to acknowledge the unsuppression.
+ """
+
# TODO work in progress
# ...
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 09e4440..89204b7 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -479,6 +479,13 @@
def receive_inter_adapter_message(self, msg):
pass
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
+
class MaplePBClientFactory(pb.PBClientFactory, ReconnectingClientFactory):
channel = None
maxDelay = 60
diff --git a/voltha/adapters/microsemi_olt/microsemi_olt.py b/voltha/adapters/microsemi_olt/microsemi_olt.py
index 8ac93c5..42dc56f 100644
--- a/voltha/adapters/microsemi_olt/microsemi_olt.py
+++ b/voltha/adapters/microsemi_olt/microsemi_olt.py
@@ -168,6 +168,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
##
# Private methods
##
diff --git a/voltha/adapters/pmcs_onu/pmcs_onu.py b/voltha/adapters/pmcs_onu/pmcs_onu.py
index 50aada1..a58353f 100644
--- a/voltha/adapters/pmcs_onu/pmcs_onu.py
+++ b/voltha/adapters/pmcs_onu/pmcs_onu.py
@@ -145,6 +145,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
@inlineCallbacks
def _onu_device_activation(self, device):
# first we verify that we got parent reference and proxy info
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 58cba69..138e248 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -297,6 +297,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
class PonSimOltHandler(object):
def __init__(self, adapter, device_id):
diff --git a/voltha/adapters/ponsim_onu/ponsim_onu.py b/voltha/adapters/ponsim_onu/ponsim_onu.py
index 2098a22..cbaf3ab 100644
--- a/voltha/adapters/ponsim_onu/ponsim_onu.py
+++ b/voltha/adapters/ponsim_onu/ponsim_onu.py
@@ -146,6 +146,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
class PonSimOnuHandler(object):
def __init__(self, adapter, device_id):
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index d6421f0..f2e056b 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -670,6 +670,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
def start_kpi_collection(self, device_id):
"""Simulate periodic KPI metric collection from the device"""
diff --git a/voltha/adapters/simulated_onu/simulated_onu.py b/voltha/adapters/simulated_onu/simulated_onu.py
index 8abe3a3..83dfd85 100644
--- a/voltha/adapters/simulated_onu/simulated_onu.py
+++ b/voltha/adapters/simulated_onu/simulated_onu.py
@@ -354,6 +354,12 @@
# just place incoming message to a list
self.incoming_messages.put((proxy_address, msg))
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
@inlineCallbacks
def _simulate_message_exchange(self, device):
diff --git a/voltha/adapters/tibit_olt/tibit_olt.py b/voltha/adapters/tibit_olt/tibit_olt.py
index 914c47b..233f048 100644
--- a/voltha/adapters/tibit_olt/tibit_olt.py
+++ b/voltha/adapters/tibit_olt/tibit_olt.py
@@ -922,6 +922,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
def start_kpi_collection(self, device_id):
""" Periodic KPI metric collection from the device """
import random
diff --git a/voltha/adapters/tibit_onu/tibit_onu.py b/voltha/adapters/tibit_onu/tibit_onu.py
index 00477b4..6062007 100644
--- a/voltha/adapters/tibit_onu/tibit_onu.py
+++ b/voltha/adapters/tibit_onu/tibit_onu.py
@@ -534,6 +534,12 @@
def receive_inter_adapter_message(self, msg):
raise NotImplementedError()
+ def suppress_alarm(self, filter):
+ raise NotImplementedError()
+
+ def unsuppress_alarm(self, filter):
+ raise NotImplementedError()
+
def start_kpi_collection(self, device_id):
"""TMP Simulate periodic KPI metric collection from the device"""
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 5798045..7daa0a7 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -177,6 +177,12 @@
return self.update_flows_incrementally(
device, flow_changes, group_changes)
+ def suppress_alarm(self, filter):
+ return self.adapter.suppress_alarm(filter)
+
+ def unsuppress_alarm(self, filter):
+ return self.adapter.unsuppress_alarm(filter)
+
# def update_pm_collection(self, device, pm_collection_config):
# return self.adapter.update_pm_collection(device, pm_collection_config)
@@ -628,11 +634,11 @@
for rule in alarm_filter.rules:
self.log.debug("compare-alarm-event",
key=AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key),
- actual=rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key).lower()],
+ actual=rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key)].lower(),
expected=rule.value.lower())
exclude = exclude and \
(rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(
- rule.key).lower()] == rule.value.lower())
+ rule.key)].lower() == rule.value.lower())
if not exclude:
break
diff --git a/voltha/core/alarm_filter_agent.py b/voltha/core/alarm_filter_agent.py
new file mode 100644
index 0000000..51afba9
--- /dev/null
+++ b/voltha/core/alarm_filter_agent.py
@@ -0,0 +1,135 @@
+#
+# Copyright 2017 the original author or authors.
+#
+# Licensed under the Apache License, Version 2.0 (the "License");
+# you may not use this file except in compliance with the License.
+# You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+#
+
+"""
+Singleton agent responsible for managing alarm filters
+"""
+import structlog
+from twisted.internet.defer import inlineCallbacks
+
+from voltha.core.config.config_proxy import CallbackType
+from voltha.protos.voltha_pb2 import \
+ AlarmFilter, AlarmFilterRuleKey
+
+
+class AlarmFilterAgent(object):
+ __instance = None
+
+ def __new__(cls, core):
+ if cls.__instance == None:
+ cls.__instance = object.__new__(cls, core)
+ return cls.__instance
+
+ def __init__(self, core):
+ self.core = core
+ self.root_proxy = self.core.get_proxy('/')
+ self.self_proxies = {}
+
+ self.log = structlog.get_logger()
+
+ @inlineCallbacks
+ def add_filter(self, alarm_filter):
+ self.log.debug('starting')
+
+ self.self_proxies[alarm_filter.id] = self.core.get_proxy(
+ '/alarm_filters/{}'.format(alarm_filter.id))
+
+ self.self_proxies[alarm_filter.id].register_callback(
+ CallbackType.PRE_UPDATE, self._review_filter)
+ self.self_proxies[alarm_filter.id].register_callback(
+ CallbackType.POST_UPDATE, self._process_filter)
+
+ yield self._process_filter(alarm_filter)
+
+ self.log.info('started')
+
+ @inlineCallbacks
+ def remove_filter(self, alarm_filter):
+ self.log.debug('stopping')
+
+ yield self._review_filter(alarm_filter)
+
+ self.self_proxies[alarm_filter.id].unregister_callback(
+ CallbackType.PRE_UPDATE, self._review_filter)
+ self.self_proxies[alarm_filter.id].unregister_callback(
+ CallbackType.POST_UPDATE, self._process_filter)
+
+ del self.self_proxies[alarm_filter.id]
+
+ self.log.info('stopped', alarm_filter=alarm_filter.id)
+
+ # Remove filters referencing the specified device
+ def remove_device_filters(self, device):
+ self.log.debug('cleaning')
+ all_filters = self.root_proxy.get('/alarm_filters')
+ for filter in all_filters:
+ for r in filter.rules:
+ if r.key == AlarmFilterRuleKey.device_id and r.value == device.id:
+ self.root_proxy.remove('/alarm_filters/{}'.format(filter.id))
+ break
+
+ self.log.debug('cleaned')
+
+ def _review_filter(self, alarm_filter=None):
+ # UPDATE scenario:
+ #
+ # When a filter is updated, we need to review the content of the previous filter
+ # to ensure that nothing is left un-managed and not cleaned up.
+ #
+ # TEAR DOWN scenario:
+ # When a filter is deleted, we need to go through the rules
+ # and revert anything that was configured when the filter was first created.
+
+ if alarm_filter is not None:
+ current_filter = self.self_proxies[alarm_filter.id].get()
+
+ # Find any device id rules contained in the filter that might have changed
+ rules_to_clean = [r for r in current_filter.rules if
+ r.key == AlarmFilterRuleKey.device_id and r not in alarm_filter.rules]
+
+ if not rules_to_clean:
+ # There are no rules from the current filter that require a clean up
+ return
+
+ # Un-suppress devices that are no longer referenced
+ self._manage_suppression(current_filter)
+
+ def _process_filter(self, alarm_filter):
+ assert isinstance(alarm_filter, AlarmFilter)
+ self._manage_suppression(alarm_filter, True)
+
+ @inlineCallbacks
+ def _manage_suppression(self, alarm_filter, suppress=False):
+ # Pull out all filters
+ all_filters = self.root_proxy.get('/alarm_filters')
+
+ # Build a list of all devices that have filters against them
+ all_device_ids = set()
+ for filter in all_filters:
+ if filter.id != alarm_filter.id:
+ ids = [r.value for r in filter.rules if r.key == AlarmFilterRuleKey.device_id]
+ all_device_ids.update(ids)
+
+ for rule in alarm_filter.rules:
+ # Check if it's a device_id rule and that it wasn't already processed by another filter
+ if rule.key == AlarmFilterRuleKey.device_id and rule.value not in all_device_ids:
+ # A suppression call will occur only for device ids that exist on the system
+ assert rule.value in self.core.device_agents
+
+ if suppress:
+ yield self.core.device_agents[rule.value].suppress_alarm(alarm_filter)
+ else:
+ yield self.core.device_agents[rule.value].unsuppress_alarm(alarm_filter)
\ No newline at end of file
diff --git a/voltha/core/core.py b/voltha/core/core.py
index c11136b..88271a4 100644
--- a/voltha/core/core.py
+++ b/voltha/core/core.py
@@ -24,6 +24,7 @@
from twisted.internet.defer import inlineCallbacks, returnValue
from zope.interface import implementer
+from voltha.core.alarm_filter_agent import AlarmFilterAgent
from voltha.core.config.config_proxy import CallbackType
from voltha.core.device_agent import DeviceAgent
from voltha.core.dispatcher import Dispatcher
@@ -31,8 +32,7 @@
from voltha.core.local_handler import LocalHandler
from voltha.core.logical_device_agent import LogicalDeviceAgent
from voltha.protos.voltha_pb2 import \
- VolthaLocalServiceStub, \
- Device, LogicalDevice
+ Device, LogicalDevice, AlarmFilter
from voltha.registry import IComponent
log = structlog.get_logger()
@@ -40,7 +40,6 @@
@implementer(IComponent)
class VolthaCore(object):
-
def __init__(self, instance_id, version, log_level):
self.instance_id = instance_id
self.stopped = False
@@ -58,6 +57,7 @@
self.local_root_proxy = None
self.device_agents = {}
self.logical_device_agents = {}
+ self.alarm_filter_agent = None
self.packet_in_queue = Queue()
self.change_event_queue = Queue()
@@ -72,6 +72,7 @@
CallbackType.POST_ADD, self._post_add_callback)
self.local_root_proxy.register_callback(
CallbackType.POST_REMOVE, self._post_remove_callback)
+
log.info('started')
returnValue(self)
@@ -92,6 +93,8 @@
self._handle_add_device(data)
elif isinstance(data, LogicalDevice):
self._handle_add_logical_device(data)
+ elif isinstance(data, AlarmFilter):
+ self._handle_add_alarm_filter(data)
else:
pass # ignore others
@@ -101,6 +104,8 @@
self._handle_remove_device(data)
elif isinstance(data, LogicalDevice):
self._handle_remove_logical_device(data)
+ elif isinstance(data, AlarmFilter):
+ self._handle_remove_alarm_filter(data)
else:
pass # ignore others
@@ -118,6 +123,8 @@
@inlineCallbacks
def _handle_remove_device(self, device):
if device.id in self.device_agents:
+ AlarmFilterAgent(self).remove_device_filters(device)
+
yield self.device_agents[device.id].stop(device)
del self.device_agents[device.id]
@@ -141,3 +148,15 @@
def get_logical_device_agent(self, logical_device_id):
return self.logical_device_agents[logical_device_id]
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~ AlarmFilterAgent Mgmt ~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ @inlineCallbacks
+ def _handle_add_alarm_filter(self, alarm_filter):
+ assert isinstance(alarm_filter, AlarmFilter)
+ yield AlarmFilterAgent(self).add_filter(alarm_filter)
+
+ @inlineCallbacks
+ def _handle_remove_alarm_filter(self, alarm_filter):
+ assert isinstance(alarm_filter, AlarmFilter)
+ yield AlarmFilterAgent(self).remove_filter(alarm_filter)
diff --git a/voltha/core/device_agent.py b/voltha/core/device_agent.py
index b1cf67b..b00bebf 100644
--- a/voltha/core/device_agent.py
+++ b/voltha/core/device_agent.py
@@ -102,6 +102,22 @@
if not dry_run:
yield self.adapter_agent.get_device_details(device)
+ @inlineCallbacks
+ def suppress_alarm(self, filter):
+ self.log.debug('suppress-alarms')
+ try:
+ yield self.adapter_agent.suppress_alarm(filter)
+ except Exception as e:
+ self.log.exception(e.message)
+
+ @inlineCallbacks
+ def unsuppress_alarm(self, filter):
+ self.log.debug('unsuppress-alarms')
+ try:
+ yield self.adapter_agent.unsuppress_alarm(filter)
+ except Exception as e:
+ self.log.exception(e.message)
+
def _set_adapter_agent(self):
adapter_name = self._tmp_initial_data.adapter
if adapter_name == '':