CORD-898: Initial integration for alarms support
- Introduced an AlarmEvent type
- Provided the necessary API to propagate the alarms
Change-Id: I285cab38cd8896508a820c8aea592819e0242ed5
diff --git a/requirements.txt b/requirements.txt
index 054d32e..62f79fd 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -34,6 +34,7 @@
scapy>=2.3.2
service-identity
simplejson>=3.8.1
+jsonschema>=2.6.0
six>=1.10.0
structlog>=16.1.0
termcolor>=1.1.0
diff --git a/tests/itests/voltha/test_voltha_alarm_events.py b/tests/itests/voltha/test_voltha_alarm_events.py
new file mode 100644
index 0000000..7f932be
--- /dev/null
+++ b/tests/itests/voltha/test_voltha_alarm_events.py
@@ -0,0 +1,155 @@
+from unittest import main
+from common.utils.consulhelpers import get_endpoint_from_consul
+from tests.itests.docutests.test_utils import run_long_running_command_with_timeout
+from tests.itests.voltha.rest_base import RestBase
+from google.protobuf.json_format import MessageToDict
+from voltha.protos.device_pb2 import Device
+import simplejson, jsonschema
+import re
+
+# ~~~~~~~ Common variables ~~~~~~~
+
+LOCAL_CONSUL = "localhost:8500"
+
+COMMANDS = dict(
+ kafka_client_run="kafkacat -b {} -L",
+ kafka_client_alarm_check="kafkacat -o -5 -b {} -C -t voltha.alarms -c 10",
+)
+
+ALARM_SCHEMA = {
+ "type": "object",
+ "properties": {
+ "id": {"type": "string"},
+ "type": {"type": "string"},
+ "category": {"type": "string"},
+ "state": {"type": "string"},
+ "severity": {"type": "string"},
+ "resource_id": {"type": "string"},
+ "raised_ts": {"type": "number"},
+ "reported_ts": {"type": "number"},
+ "changed_ts": {"type": "number"},
+ "description": {"type": "string"},
+ "context": {
+ "type": "object",
+ "additionalProperties": {"type": "string"}
+ }
+ }
+}
+
+
+# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+
+class VolthaAlarmEventTests(RestBase):
+ # Retrieve details on the REST entry point
+ rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'chameleon-rest')
+
+ # Construct the base_url
+ base_url = 'http://' + rest_endpoint
+
+ # Start by querying consul to get the endpoint details
+ kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+
+ # ~~~~~~~~~~~~ Tests ~~~~~~~~~~~~
+
+ def test_alarm_topic_exists(self):
+ # We want to make sure that the topic is available on the system
+ expected_pattern = ['voltha.alarms']
+
+ # Start the kafka client to retrieve details on topics
+ cmd = COMMANDS['kafka_client_run'].format(self.kafka_endpoint)
+ kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+
+ # Loop through the kafka client output to find the topic
+ found = False
+ for out in kafka_client_output:
+ if all(ep in out for ep in expected_pattern):
+ found = True
+ break
+
+ self.assertTrue(found, 'Failed to find topic {}'.format(expected_pattern))
+
+ def test_alarm_generated_by_adapter(self):
+ # Verify that REST calls can be made
+ self.verify_rest()
+
+ # Create a new device
+ device = self.add_device()
+
+ # Activate the new device
+ self.activate_device(device['id'])
+
+ # The simulated olt device should start generating alarms periodically
+ alarm = self.get_alarm_event(device['id'])
+
+ # Make sure that the schema is valid
+ self.validate_alarm_event_schema(alarm)
+
+ # Validate the constructed alarm id
+ self.verify_alarm_event_id(device['id'], alarm['id'])
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ # Make sure the Voltha REST interface is available
+ def verify_rest(self):
+ self.get('/api/v1')
+
+ # Create a new simulated device
+ def add_device(self):
+ device = Device(
+ type='simulated_olt',
+ )
+ device = self.post('/api/v1/local/devices', MessageToDict(device), expected_code=200)
+ return device
+
+ # Active the simulated device. This will trigger the simulation of random alarms
+ def activate_device(self, device_id):
+ path = '/api/v1/local/devices/{}'.format(device_id)
+ self.post(path + '/activate', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'ENABLED')
+
+ # Retrieve a sample alarm for a specific device
+ def get_alarm_event(self, device_id):
+ cmd = COMMANDS['kafka_client_alarm_check'].format(self.kafka_endpoint)
+ kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+
+ # Verify the kafka client output
+ found = False
+ self.alarm_data = None
+
+ for out in kafka_client_output:
+ self.alarm_data = simplejson.loads(out)
+
+ print self.alarm_data
+
+ if 'resource_id' not in self.alarm_data:
+ continue
+ elif self.alarm_data['resource_id'] == device_id:
+ found = True
+ break
+
+ self.assertTrue(found, 'Failed to find kafka alarm with device id:{}'.format(device_id))
+
+ return self.alarm_data
+
+ # Verify that the alarm follows the proper schema structure
+ def validate_alarm_event_schema(self, alarm):
+ try:
+ jsonschema.validate(alarm, ALARM_SCHEMA)
+ except Exception as e:
+ self.assertTrue(False, 'Validation failed for alarm : {}'.format(e.message))
+
+ # Verify that alarm identifier based on the format generated by default.
+ def verify_alarm_event_id(self, device_id, alarm_id):
+ prefix = re.findall(r"(voltha)\.(\w+)\.(\w+)", alarm_id)
+
+ self.assertEqual(len(prefix), 1, 'Failed to parse the alarm id: {}'.format(alarm_id))
+ self.assertEqual(len(prefix[0]), 3, 'Expected id format: voltha.<adapter name>.<device id>')
+ self.assertEqual(prefix[0][0], 'voltha', 'Expected id format: voltha.<adapter name>.<device id>')
+ self.assertEqual(prefix[0][1], 'simulated_olt', 'Expected id format: voltha.<adapter name>.<device id>')
+ self.assertEqual(prefix[0][2], device_id, 'Expected id format: voltha.<adapter name>.<device id>')
+
+
+if __name__ == '__main__':
+ main()
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index 4011166..4111735 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -256,3 +256,10 @@
:param kpi_event_msg: A protobuf message of KpiEvent type.
:return: None
"""
+
+ def submit_alarm(alarm_event_msg):
+ """
+ Submit an alarm on behalf of the OLT and its adapter.
+ :param alarm_event_msg: A protobuf message of AlarmEvent type.
+ :return: None
+ """
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 406e210..01fc8e6 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -44,13 +44,13 @@
from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_1GB_FD, \
OFPPF_FIBER, OFPPS_LIVE, ofp_switch_features, OFPC_PORT_STATS, \
OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, AlarmEventSeverity, AlarmEventState, AlarmEventCategory
log = structlog.get_logger()
@implementer(IAdapterInterface)
class SimulatedOltAdapter(object):
-
name = 'simulated_olt'
supported_device_types = [
@@ -372,6 +372,8 @@
# reactor.callLater(0.1, self._simulate_detection_of_onus, device.id)
self.start_kpi_collection(device.id)
+ self.start_alarm_simulation(device.id)
+
@inlineCallbacks
def _simulate_detection_of_onus(self, device_id):
for i in xrange(1, 5):
@@ -546,7 +548,6 @@
else:
raise Exception('Port should be 1 or 2 by our convention')
-
def update_flows_incrementally(self, device, flow_changes, group_changes):
raise NotImplementedError()
@@ -580,46 +581,46 @@
# upgraded the metrics to include packet statistics for
# testing.
nni_port_metrics = yield dict(
- tx_pkts=self.nni_tx_pkts+random.randint(90, 100),
- rx_pkts=self.nni_rx_pkts+random.randint(90, 100),
- tx_bytes=self.nni_tx_bytes+random.randint(90000, 100000),
- rx_bytes=self.nni_rx_bytes+random.randint(90000, 100000),
- tx_64 = self.nni_tx_64 + random.randint(50,55),
- tx_65_127 = self.nni_tx_65_127 + random.randint(55,60),
- tx_128_255 = self.nni_tx_128_255 + random.randint(60,65),
- tx_256_511 = self.nni_tx_256_511 + random.randint(85,90),
- tx_512_1023 = self.nni_tx_512_1023 + random.randint(90,95),
- tx_1024_1518 = self.nni_tx_1024_1518 + random.randint(60,65),
- tx_1519_9k = self.nni_tx_1519_9k + random.randint(50,55),
+ tx_pkts=self.nni_tx_pkts + random.randint(90, 100),
+ rx_pkts=self.nni_rx_pkts + random.randint(90, 100),
+ tx_bytes=self.nni_tx_bytes + random.randint(90000, 100000),
+ rx_bytes=self.nni_rx_bytes + random.randint(90000, 100000),
+ tx_64=self.nni_tx_64 + random.randint(50, 55),
+ tx_65_127=self.nni_tx_65_127 + random.randint(55, 60),
+ tx_128_255=self.nni_tx_128_255 + random.randint(60, 65),
+ tx_256_511=self.nni_tx_256_511 + random.randint(85, 90),
+ tx_512_1023=self.nni_tx_512_1023 + random.randint(90, 95),
+ tx_1024_1518=self.nni_tx_1024_1518 + random.randint(60, 65),
+ tx_1519_9k=self.nni_tx_1519_9k + random.randint(50, 55),
- rx_64 = self.nni_tx_64 + random.randint(50,55),
- rx_65_127 = self.nni_tx_65_127 + random.randint(55,60),
- rx_128_255 = self.nni_tx_128_255 + random.randint(60,65),
- rx_256_511 = self.nni_tx_256_511 + random.randint(85,90),
- rx_512_1023 = self.nni_tx_512_1023 + random.randint(90,95),
- rx_1024_1518 = self.nni_tx_1024_1518 + random.randint(60,65),
- rx_1519_9k = self.nni_tx_1519_9k + random.randint(50,55)
+ rx_64=self.nni_tx_64 + random.randint(50, 55),
+ rx_65_127=self.nni_tx_65_127 + random.randint(55, 60),
+ rx_128_255=self.nni_tx_128_255 + random.randint(60, 65),
+ rx_256_511=self.nni_tx_256_511 + random.randint(85, 90),
+ rx_512_1023=self.nni_tx_512_1023 + random.randint(90, 95),
+ rx_1024_1518=self.nni_tx_1024_1518 + random.randint(60, 65),
+ rx_1519_9k=self.nni_tx_1519_9k + random.randint(50, 55)
)
pon_port_metrics = yield dict(
- tx_pkts=self.pon_tx_pkts+random.randint(90, 100),
- rx_pkts=self.pon_rx_pkts+random.randint(90, 100),
- tx_bytes=self.pon_tx_bytes+random.randint(90000, 100000),
- rx_bytes=self.pon_rx_bytes+random.randint(90000, 100000),
- tx_64 = self.pon_tx_64 + random.randint(50,55),
- tx_65_127 = self.pon_tx_65_127 + random.randint(55,60),
- tx_128_255 = self.pon_tx_128_255 + random.randint(60,65),
- tx_256_511 = self.pon_tx_256_511 + random.randint(85,90),
- tx_512_1023 = self.pon_tx_512_1023 + random.randint(90,95),
- tx_1024_1518 = self.pon_tx_1024_1518 + random.randint(60,65),
- tx_1519_9k = self.pon_tx_1519_9k + random.randint(50,55),
+ tx_pkts=self.pon_tx_pkts + random.randint(90, 100),
+ rx_pkts=self.pon_rx_pkts + random.randint(90, 100),
+ tx_bytes=self.pon_tx_bytes + random.randint(90000, 100000),
+ rx_bytes=self.pon_rx_bytes + random.randint(90000, 100000),
+ tx_64=self.pon_tx_64 + random.randint(50, 55),
+ tx_65_127=self.pon_tx_65_127 + random.randint(55, 60),
+ tx_128_255=self.pon_tx_128_255 + random.randint(60, 65),
+ tx_256_511=self.pon_tx_256_511 + random.randint(85, 90),
+ tx_512_1023=self.pon_tx_512_1023 + random.randint(90, 95),
+ tx_1024_1518=self.pon_tx_1024_1518 + random.randint(60, 65),
+ tx_1519_9k=self.pon_tx_1519_9k + random.randint(50, 55),
- rx_64 = self.pon_tx_64 + random.randint(50,55),
- rx_65_127 = self.pon_tx_65_127 + random.randint(55,60),
- rx_128_255 = self.pon_tx_128_255 + random.randint(60,65),
- rx_256_511 = self.pon_tx_256_511 + random.randint(85,90),
- rx_512_1023 = self.pon_tx_512_1023 + random.randint(90,95),
- rx_1024_1518 = self.pon_tx_1024_1518 + random.randint(60,65),
- rx_1519_9k = self.pon_tx_1519_9k + random.randint(50,55)
+ rx_64=self.pon_tx_64 + random.randint(50, 55),
+ rx_65_127=self.pon_tx_65_127 + random.randint(55, 60),
+ rx_128_255=self.pon_tx_128_255 + random.randint(60, 65),
+ rx_256_511=self.pon_tx_256_511 + random.randint(85, 90),
+ rx_512_1023=self.pon_tx_512_1023 + random.randint(90, 95),
+ rx_1024_1518=self.pon_tx_1024_1518 + random.randint(60, 65),
+ rx_1519_9k=self.pon_tx_1519_9k + random.randint(50, 55)
)
self.pon_tx_pkts = pon_port_metrics['tx_pkts']
self.pon_rx_pkts = pon_port_metrics['rx_pkts']
@@ -694,6 +695,45 @@
lc = LoopingCall(_collect, device_id, prefix)
lc.start(interval=15) # TODO make this configurable
+ def start_alarm_simulation(self, device_id):
+
+ """Simulate periodic device alarms"""
+ import random
+
+ def _generate_alarm(device_id):
+
+ try:
+ # Randomly choose values for each enum types
+ severity = random.choice(list(v for k, v in AlarmEventSeverity.DESCRIPTOR.enum_values_by_name.items()))
+ state = random.choice(list(v for k, v in AlarmEventState.DESCRIPTOR.enum_values_by_name.items()))
+ type = random.choice(list(v for k, v in AlarmEventType.DESCRIPTOR.enum_values_by_name.items()))
+ category = random.choice(list(v for k, v in AlarmEventCategory.DESCRIPTOR.enum_values_by_name.items()))
+
+ description = "Simulated alarm - device:{} type:{} severity:{} state:{} category:{}".format(device_id,
+ type.name,
+ severity.name,
+ state.name,
+ category.name)
+
+ current_context = {}
+ for key, value in self.__dict__.items():
+ current_context[key] = str(value)
+
+ alarm_event = self.adapter_agent.create_alarm(resource_id=device_id,
+ type=type.number,
+ category=category.number,
+ severity=severity.number,
+ state=state.number,
+ description=description,
+ context=current_context)
+
+ self.adapter_agent.submit_alarm(alarm_event)
+
+ except Exception as e:
+ log.exception('failed-to-submit-alarm', e=e)
+
+ alarm_lc = LoopingCall(_generate_alarm, device_id)
+ alarm_lc.start(30)
# ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
@@ -713,7 +753,7 @@
eapol_start = str(
Ether(src='00:11:22:33:44:55') /
EAPOL(type=1, len=0) /
- Padding(load=42*'\x00')
+ Padding(load=42 * '\x00')
)
device = self.adapter_agent.get_device(device_id)
self.adapter_agent.send_packet_in(logical_device_id=device.parent_id,
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index d5f2a7c..485869e 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -18,6 +18,8 @@
Agent to play gateway between CORE and an individual adapter.
"""
from uuid import uuid4
+import arrow
+import re
import structlog
from google.protobuf.json_format import MessageToJson
@@ -30,7 +32,7 @@
from voltha.adapters.interface import IAdapterAgent
from voltha.protos import third_party
from voltha.protos.device_pb2 import Device, Port
-from voltha.protos.events_pb2 import KpiEvent
+from voltha.protos.events_pb2 import KpiEvent, AlarmEvent, AlarmEventType, AlarmEventSeverity, AlarmEventState, AlarmEventCategory
from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
LogicalPort, AdminState
from voltha.registry import registry
@@ -316,3 +318,37 @@
except Exception as e:
self.log.exception('failed-kpi-submission',
type=type(kpi_event_msg))
+
+ # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
+
+ def create_alarm(self, id=None, resource_id=None, description=None, raised_ts=0, changed_ts=0,
+ type=AlarmEventType.EQUIPMENT, category=AlarmEventCategory.GENERAL,
+ severity=AlarmEventSeverity.MINOR, state=AlarmEventState.RAISED,
+ context=None):
+
+ # Construct the ID if it is not provided
+ if id == None:
+ id = 'voltha.{}.{}'.format(self.adapter_name, resource_id)
+
+ return AlarmEvent(
+ id=id,
+ resource_id=resource_id,
+ type=type,
+ category=category,
+ severity=severity,
+ state=state,
+ description=description,
+ reported_ts=arrow.utcnow().timestamp,
+ raised_ts=raised_ts,
+ changed_ts=changed_ts,
+ context=context
+ )
+
+ def submit_alarm(self, alarm_event_msg):
+ try:
+ assert isinstance(alarm_event_msg, AlarmEvent)
+ self.event_bus.publish('alarms', alarm_event_msg)
+
+ except Exception as e:
+ self.log.exception('failed-alarm-submission',
+ type=type(alarm_event_msg))
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 5052a96..d9fef30 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -52,3 +52,83 @@
map<string, MetricValuePairs> prefixes = 3;
}
+
+/*
+ * Identify to the area of the system impacted by the alarm
+ */
+message AlarmEventType {
+ enum AlarmEventType {
+ COMMUNICATION = 0;
+ ENVIRONMENT = 1;
+ EQUIPMENT = 2;
+ SERVICE = 3;
+ PROCESSING = 4;
+ SECURITY = 5;
+ }
+}
+
+/*
+ * Identify to the functional category originating the alarm
+ */
+message AlarmEventCategory {
+ enum AlarmEventCategory {
+ GENERAL = 0;
+ CONFIGURATION = 1;
+ PON = 2;
+ TCA = 3;
+ }
+}
+
+/*
+ * Active state of the alarm
+ */
+message AlarmEventState {
+ enum AlarmEventState {
+ RAISED = 0;
+ CLEARED = 1;
+ }
+}
+
+/*
+ * Identify the overall impact of the alarm on the system
+ */
+message AlarmEventSeverity {
+ enum AlarmEventSeverity {
+ INDETERMINATE = 0;
+ WARNING = 1;
+ MINOR = 2;
+ MAJOR = 3;
+ CRITICAL = 4;
+ }
+}
+
+/*
+ *
+ */
+message AlarmEvent {
+ string id = 1; // Unique ID for this alarm. e.g. voltha.some_olt.1234
+
+ AlarmEventType.AlarmEventType type = 2; // Refers to the area of the system impacted by the alarm
+ // COMMUNICATION, ENVIRONMENT, EQUIPMENT, SERVICE, PROCESSING, SECURITY
+
+ AlarmEventCategory.AlarmEventCategory category = 3; // Refers to the functional category originating the alarm
+ // GENERAL, CONFIG, PON, TCA
+
+ AlarmEventState.AlarmEventState state = 4; // Current active state of the alarm
+ // RAISED, CLEARED
+
+ AlarmEventSeverity.AlarmEventSeverity severity = 5; // Overall impact of the alarm on the system
+ // INDETERMINATE, WARNING, MINOR, MAJOR, CRITICAL
+
+ float raised_ts = 6; // Timestamp at which the alarm was first raised
+
+ float reported_ts = 7; // Timestamp at which the alarm was reported
+
+ float changed_ts = 8; // Timestamp at which the alarm has changed since it was raised
+
+ string resource_id = 9; // Identifier of the originating resource of the alarm
+
+ string description = 10; // Textual explanation of the alarm
+
+ map<string,string> context = 11; // Key/Value storage for extra information that may give context to the alarm
+}