VOLTHA Alarms: Added filtering capabilities
- Filters are constructed based on 1 or more rules as key:value pairs
- A key:value rule uses the available alarm attributes
- NBI and cli commands were added
Amendment:
- Moved filter protos to voltha proto to fix possible circular dependency
Change-Id: Ic72125e9d35135d75be175638341b0c08fd97f95
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index 24a4085..fcba8af 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -298,7 +298,7 @@
:return: None
"""
- def submit_alarm(alarm_event_msg):
+ def submit_alarm(device_id, alarm_event_msg):
"""
Submit an alarm on behalf of the OLT and its adapter.
:param alarm_event_msg: A protobuf message of AlarmEvent type.
diff --git a/voltha/adapters/maple_olt/maple_olt.py b/voltha/adapters/maple_olt/maple_olt.py
index 583aafa..7d08414 100644
--- a/voltha/adapters/maple_olt/maple_olt.py
+++ b/voltha/adapters/maple_olt/maple_olt.py
@@ -328,7 +328,7 @@
context=alarm_data,
raised_ts = ts)
- self.adapter_agent.submit_alarm(alarm_event)
+ self.adapter_agent.submit_alarm(self.device_id, alarm_event)
except Exception as e:
log.exception('failed-to-submit-alarm', e=e)
@@ -789,7 +789,7 @@
context=alarm_data,
raised_ts = ts)
- self.adapter_agent.submit_alarm(alarm_event)
+ self.adapter_agent.submit_alarm(device_id, alarm_event)
except Exception as e:
log.exception('failed-to-submit-alarm', e=e)
diff --git a/voltha/adapters/ponsim_olt/ponsim_olt.py b/voltha/adapters/ponsim_olt/ponsim_olt.py
index 26283af..c05d7fd 100644
--- a/voltha/adapters/ponsim_olt/ponsim_olt.py
+++ b/voltha/adapters/ponsim_olt/ponsim_olt.py
@@ -171,7 +171,7 @@
context=current_context
)
- self.adapter.adapter_agent.submit_alarm(alarm_event)
+ self.adapter.adapter_agent.submit_alarm(self.device.id, alarm_event)
except Exception as e:
log.exception('failed-to-send-alarm', e=e)
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 01d448f..6ed10a9 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -765,7 +765,7 @@
description=description,
context=current_context)
- self.adapter_agent.submit_alarm(alarm_event)
+ self.adapter_agent.submit_alarm(device_id, alarm_event)
except Exception as e:
log.exception('failed-to-submit-alarm', e=e)
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index 06688ed..7ef9a69 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -18,9 +18,8 @@
Agent to play gateway between CORE and an individual adapter.
"""
from uuid import uuid4
-import arrow
-import re
+import arrow
import structlog
from google.protobuf.json_format import MessageToJson
from scapy.packet import Packet
@@ -31,16 +30,14 @@
from common.frameio.frameio import hexify
from voltha.adapters.interface import IAdapterAgent
from voltha.protos import third_party
-from voltha.protos.device_pb2 import Device, Port
-from voltha.protos.events_pb2 import KpiEvent, AlarmEvent, AlarmEventType, \
- AlarmEventSeverity, AlarmEventState, AlarmEventCategory
+from voltha.core.flow_decomposer import OUTPUT
from voltha.protos.device_pb2 import Device, Port, PmConfigs
+from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, \
+ AlarmEventSeverity, AlarmEventState, AlarmEventCategory
from voltha.protos.events_pb2 import KpiEvent
from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
- LogicalPort, AdminState, OperStatus, ConnectStatus
+ LogicalPort, AdminState, OperStatus, AlarmFilterRuleKey
from voltha.registry import registry
-from voltha.core.flow_decomposer import OUTPUT
-import sys
@implementer(IAdapterAgent)
@@ -180,7 +177,7 @@
return self.update_flows_incrementally(
device, flow_changes, group_changes)
- #def update_pm_collection(self, device, pm_collection_config):
+ # def update_pm_collection(self, device, pm_collection_config):
# return self.adapter.update_pm_collection(device, pm_collection_config)
@@ -216,7 +213,7 @@
# we run the update through the device_agent so that the change
# does not loop back to the adapter unnecessarily
device_agent = self.core.get_device_agent(device_pm_config.id)
- device_agent.update_device_pm_config(device_pm_config,init)
+ device_agent.update_device_pm_config(device_pm_config, init)
def update_adapter_pm_config(self, device_id, device_pm_config):
device = self.get_device(device_id)
@@ -271,7 +268,6 @@
self._make_up_to_date('/devices/{}/ports'.format(device_id),
port.port_no, port)
-
def enable_all_ports(self, device_id):
"""
Re-enable all ports on that device, i.e. change the admin status to
@@ -361,7 +357,6 @@
return logical_device
-
def delete_logical_device(self, logical_device):
"""
This will remove the logical device as well as all logical ports
@@ -379,7 +374,6 @@
# callbacks' if present
self._remove_node('/logical_devices', logical_device.id)
-
def receive_packet_out(self, logical_device_id, ofp_packet_out):
def get_port_out(opo):
@@ -563,10 +557,43 @@
context=context
)
- def submit_alarm(self, alarm_event_msg):
+ def filter_alarm(self, device_id, alarm_event):
+ alarm_filters = self.root_proxy.get('/alarm_filters')
+
+ rule_values = {
+ 'id': alarm_event.id,
+ 'type': AlarmEventType.AlarmEventType.Name(alarm_event.type),
+ 'category': AlarmEventCategory.AlarmEventCategory.Name(alarm_event.category),
+ 'severity': AlarmEventSeverity.AlarmEventSeverity.Name(alarm_event.severity),
+ 'resource_id': alarm_event.resource_id,
+ 'device_id': device_id
+ }
+
+ for alarm_filter in alarm_filters:
+ if alarm_filter.rules:
+ exclude = True
+ for rule in alarm_filter.rules:
+ self.log.debug("compare-alarm-event",
+ key=AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key),
+ actual=rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(rule.key).lower()],
+ expected=rule.value.lower())
+ exclude = exclude and \
+ (rule_values[AlarmFilterRuleKey.AlarmFilterRuleKey.Name(
+ rule.key).lower()] == rule.value.lower())
+ if not exclude:
+ break
+
+ if exclude:
+ self.log.info("filtered-alarm-event", alarm=alarm_event)
+ return True
+
+ return False
+
+ def submit_alarm(self, device_id, alarm_event_msg):
try:
assert isinstance(alarm_event_msg, AlarmEvent)
- self.event_bus.publish('alarms', alarm_event_msg)
+ if not self.filter_alarm(device_id, alarm_event_msg):
+ self.event_bus.publish('alarms', alarm_event_msg)
except Exception as e:
self.log.exception('failed-alarm-submission',
diff --git a/voltha/core/global_handler.py b/voltha/core/global_handler.py
index b42a554..7df3f29 100644
--- a/voltha/core/global_handler.py
+++ b/voltha/core/global_handler.py
@@ -503,3 +503,57 @@
context)
+ @twisted_async
+ def CreateAlarmFilter(self, request, context):
+ log.info('grpc-request', request=request)
+ # TODO dispatching to local instead of passing it to leader
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'CreateAlarmFilter',
+ request,
+ context)
+
+ @twisted_async
+ def GetAlarmFilter(self, request, context):
+ log.warning('temp-limited-implementation')
+ # TODO dispatching to local instead of collecting all
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'GetAlarmFilter',
+ request,
+ context)
+
+ @twisted_async
+ def UpdateAlarmFilter(self, request, context):
+ log.info('grpc-request', request=request)
+ # TODO dispatching to local instead of passing it to leader
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'UpdateAlarmFilter',
+ request,
+ context)
+
+ @twisted_async
+ def DeleteAlarmFilter(self, request, context):
+ log.info('grpc-request', request=request)
+ # TODO dispatching to local instead of passing it to leader
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'DeleteAlarmFilter',
+ request,
+ context)
+
+ @twisted_async
+ def ListAlarmFilters(self, request, context):
+ log.warning('temp-limited-implementation')
+ # TODO dispatching to local instead of collecting all
+ return self.dispatcher.dispatch(
+ self.instance_id,
+ VolthaLocalServiceStub,
+ 'ListAlarmFilters',
+ Empty(),
+ context)
diff --git a/voltha/core/local_handler.py b/voltha/core/local_handler.py
index 4a0fde8..13e8b68 100644
--- a/voltha/core/local_handler.py
+++ b/voltha/core/local_handler.py
@@ -28,7 +28,8 @@
add_VolthaLocalServiceServicer_to_server, VolthaLocalServiceServicer, \
VolthaInstance, Adapters, LogicalDevices, LogicalDevice, Ports, \
LogicalPorts, Devices, Device, DeviceType, \
- DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent
+ DeviceTypes, DeviceGroups, DeviceGroup, AdminState, OperStatus, ChangeEvent, \
+ AlarmFilter, AlarmFilters
from voltha.protos.device_pb2 import PmConfigs
from voltha.registry import registry
@@ -588,3 +589,87 @@
assert isinstance(port_status, ofp_port_status)
event = ChangeEvent(id=device_id, port_status=port_status)
self.core.change_event_queue.put(event)
+
+
+ @twisted_async
+ def ListAlarmFilters(self, request, context):
+ try:
+ filters = self.root.get('/alarm_filters')
+ return AlarmFilters(filters=filters)
+ except KeyError:
+ context.set_code(StatusCode.NOT_FOUND)
+ return AlarmFilters()
+
+ @twisted_async
+ def GetAlarmFilter(self, request, context):
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed alarm filter id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return AlarmFilter()
+
+ try:
+ alarm_filter = self.root.get('/alarm_filters/{}'.format(request.id))
+ return alarm_filter
+ except KeyError:
+ context.set_details(
+ 'Alarm filter \'{}\' not found'.format(request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return AlarmFilter()
+
+ @twisted_async
+ def DeleteAlarmFilter(self, request, context):
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed alarm filter id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return Empty()
+
+ try:
+ self.root.remove('/alarm_filters/{}'.format(request.id))
+ except KeyError:
+ context.set_code(StatusCode.NOT_FOUND)
+
+ return Empty()
+
+ @twisted_async
+ def CreateAlarmFilter(self, request, context):
+ log.info('grpc-request', request=request)
+
+ try:
+ assert isinstance(request, AlarmFilter)
+ alarm_filter = request
+ assert alarm_filter.id == '', 'Alarm filter to be created cannot have id yet'
+
+ except AssertionError, e:
+ context.set_details(e.message)
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return AlarmFilter()
+
+ # fill additional data
+ alarm_filter.id = uuid4().hex[:12]
+
+ # add device to tree
+ self.root.add('/alarm_filters', alarm_filter)
+
+ return request
+
+ @twisted_async
+ def UpdateAlarmFilter(self, request, context):
+ if '/' in request.id:
+ context.set_details(
+ 'Malformed alarm filter id \'{}\''.format(request.id))
+ context.set_code(StatusCode.INVALID_ARGUMENT)
+ return AlarmFilter()
+
+ try:
+ assert isinstance(request, AlarmFilter)
+ alarm_filter = self.root.get('/alarm_filters/{}'.format(request.id))
+ self.root.update('/alarm_filters/{}'.format(request.id), request)
+
+ return request
+ except KeyError:
+ context.set_details(
+ 'Alarm filter \'{}\' not found'.format(request.id))
+ context.set_code(StatusCode.NOT_FOUND)
+ return AlarmFilter()
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 0c5ac04..c6d8979 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -133,4 +133,4 @@
// Key/Value storage for extra information that may give context to the alarm
map<string, string> context = 11;
-}
+}
\ No newline at end of file
diff --git a/voltha/protos/voltha.proto b/voltha/protos/voltha.proto
index 703f62b..d871d8a 100644
--- a/voltha/protos/voltha.proto
+++ b/voltha/protos/voltha.proto
@@ -39,6 +39,32 @@
repeated DeviceGroup items = 1;
}
+
+message AlarmFilterRuleKey {
+ enum AlarmFilterRuleKey {
+ id = 0;
+ type = 1;
+ severity = 2;
+ resource_id = 3;
+ category = 4;
+ device_id = 5;
+ }
+}
+
+message AlarmFilterRule {
+ AlarmFilterRuleKey.AlarmFilterRuleKey key = 1;
+ string value = 2;
+}
+message AlarmFilter {
+ string id = 1 [(access) = READ_ONLY];
+
+ repeated AlarmFilterRule rules = 2;
+}
+
+message AlarmFilters {
+ repeated AlarmFilter filters = 1;
+}
+
// Top-level (root) node for a Voltha Instance
message VolthaInstance {
option (yang_message_rule) = CREATE_BOTH_GROUPING_AND_CONTAINER;
@@ -60,6 +86,8 @@
repeated DeviceType device_types = 14 [(child_node) = {key: "id"}];
repeated DeviceGroup device_groups = 15 [(child_node) = {key: "id"}];
+
+ repeated AlarmFilter alarm_filters = 16 [(child_node) = {key: "id"}];
}
message VolthaInstances {
@@ -303,6 +331,37 @@
};
}
+ rpc CreateAlarmFilter(AlarmFilter) returns(AlarmFilter) {
+ option (google.api.http) = {
+ post: "/api/v1/alarm_filters"
+ body: "*"
+ };
+ }
+
+ rpc GetAlarmFilter(ID) returns(AlarmFilter) {
+ option (google.api.http) = {
+ get: "/api/v1/alarm_filters/{id}"
+ };
+ }
+
+ rpc UpdateAlarmFilter(AlarmFilter) returns(AlarmFilter) {
+ option (google.api.http) = {
+ put: "/api/v1/alarm_filters/{id}"
+ body: "*"
+ };
+ }
+
+ rpc DeleteAlarmFilter(ID) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ delete: "/api/v1/alarm_filters/{id}"
+ };
+ }
+
+ rpc ListAlarmFilters(google.protobuf.Empty) returns(AlarmFilters) {
+ option (google.api.http) = {
+ get: "/api/v1/alarm_filters"
+ };
+ }
}
/*
@@ -536,4 +595,35 @@
// This does not have an HTTP representation
}
+ rpc CreateAlarmFilter(AlarmFilter) returns(AlarmFilter) {
+ option (google.api.http) = {
+ post: "/api/v1/local/alarm_filters"
+ body: "*"
+ };
+ }
+
+ rpc GetAlarmFilter(ID) returns(AlarmFilter) {
+ option (google.api.http) = {
+ get: "/api/v1/local/alarm_filters/{id}"
+ };
+ }
+
+ rpc UpdateAlarmFilter(AlarmFilter) returns(AlarmFilter) {
+ option (google.api.http) = {
+ put: "/api/v1/local/alarm_filters/{id}"
+ body: "*"
+ };
+ }
+
+ rpc DeleteAlarmFilter(ID) returns(google.protobuf.Empty) {
+ option (google.api.http) = {
+ delete: "/api/v1/local/alarm_filters/{id}"
+ };
+ }
+
+ rpc ListAlarmFilters(google.protobuf.Empty) returns(AlarmFilters) {
+ option (google.api.http) = {
+ get: "/api/v1/local/alarm_filters"
+ };
+ }
}