CORD-898: Initial integration for alarms support

- Introduced an AlarmEvent type
- Provided the necessary API to propagate the alarms

Change-Id: I285cab38cd8896508a820c8aea592819e0242ed5
diff --git a/requirements.txt b/requirements.txt
index 054d32e..62f79fd 100755
--- a/requirements.txt
+++ b/requirements.txt
@@ -34,6 +34,7 @@
 scapy>=2.3.2
 service-identity
 simplejson>=3.8.1
+jsonschema>=2.6.0
 six>=1.10.0
 structlog>=16.1.0
 termcolor>=1.1.0
diff --git a/tests/itests/voltha/test_voltha_alarm_events.py b/tests/itests/voltha/test_voltha_alarm_events.py
new file mode 100644
index 0000000..7f932be
--- /dev/null
+++ b/tests/itests/voltha/test_voltha_alarm_events.py
@@ -0,0 +1,155 @@
+from unittest import main
+from common.utils.consulhelpers import get_endpoint_from_consul
+from tests.itests.docutests.test_utils import run_long_running_command_with_timeout
+from tests.itests.voltha.rest_base import RestBase
+from google.protobuf.json_format import MessageToDict
+from voltha.protos.device_pb2 import Device
+import simplejson, jsonschema
+import re
+
+# ~~~~~~~ Common variables ~~~~~~~
+
+LOCAL_CONSUL = "localhost:8500"
+
+COMMANDS = dict(
+    kafka_client_run="kafkacat -b {} -L",
+    kafka_client_alarm_check="kafkacat -o -5 -b {} -C -t voltha.alarms -c 10",
+)
+
+ALARM_SCHEMA = {
+    "type": "object",
+    "properties": {
+        "id": {"type": "string"},
+        "type": {"type": "string"},
+        "category": {"type": "string"},
+        "state": {"type": "string"},
+        "severity": {"type": "string"},
+        "resource_id": {"type": "string"},
+        "raised_ts": {"type": "number"},
+        "reported_ts": {"type": "number"},
+        "changed_ts": {"type": "number"},
+        "description": {"type": "string"},
+        "context": {
+            "type": "object",
+            "additionalProperties": {"type": "string"}
+        }
+    }
+}
+
+
+# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+
+class VolthaAlarmEventTests(RestBase):
+    # Retrieve details on the REST entry point
+    rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'chameleon-rest')
+
+    # Construct the base_url
+    base_url = 'http://' + rest_endpoint
+
+    # Start by querying consul to get the endpoint details
+    kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+
+    # ~~~~~~~~~~~~ Tests ~~~~~~~~~~~~
+
+    def test_alarm_topic_exists(self):
+        # We want to make sure that the topic is available on the system
+        expected_pattern = ['voltha.alarms']
+
+        # Start the kafka client to retrieve details on topics
+        cmd = COMMANDS['kafka_client_run'].format(self.kafka_endpoint)
+        kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+
+        # Loop through the kafka client output to find the topic
+        found = False
+        for out in kafka_client_output:
+            if all(ep in out for ep in expected_pattern):
+                found = True
+                break
+
+        self.assertTrue(found, 'Failed to find topic {}'.format(expected_pattern))
+
+    def test_alarm_generated_by_adapter(self):
+        # Verify that REST calls can be made
+        self.verify_rest()
+
+        # Create a new device
+        device = self.add_device()
+
+        # Activate the new device
+        self.activate_device(device['id'])
+
+        # The simulated olt device should start generating alarms periodically
+        alarm = self.get_alarm_event(device['id'])
+
+        # Make sure that the schema is valid
+        self.validate_alarm_event_schema(alarm)
+
+        # Validate the constructed alarm id
+        self.verify_alarm_event_id(device['id'], alarm['id'])
+
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+    # Make sure the Voltha REST interface is available
+    def verify_rest(self):
+        self.get('/api/v1')
+
+    # Create a new simulated device
+    def add_device(self):
+        device = Device(
+            type='simulated_olt',
+        )
+        device = self.post('/api/v1/local/devices', MessageToDict(device), expected_code=200)
+        return device
+
+    # Active the simulated device.  This will trigger the simulation of random alarms
+    def activate_device(self, device_id):
+        path = '/api/v1/local/devices/{}'.format(device_id)
+        self.post(path + '/activate', expected_code=200)
+        device = self.get(path)
+        self.assertEqual(device['admin_state'], 'ENABLED')
+
+    # Retrieve a sample alarm for a specific device
+    def get_alarm_event(self, device_id):
+        cmd = COMMANDS['kafka_client_alarm_check'].format(self.kafka_endpoint)
+        kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+
+        # Verify the kafka client output
+        found = False
+        self.alarm_data = None
+
+        for out in kafka_client_output:
+            self.alarm_data = simplejson.loads(out)
+
+            print self.alarm_data
+
+            if 'resource_id' not in self.alarm_data:
+                continue
+            elif self.alarm_data['resource_id'] == device_id:
+                found = True
+                break
+
+        self.assertTrue(found, 'Failed to find kafka alarm with device id:{}'.format(device_id))
+
+        return self.alarm_data
+
+    # Verify that the alarm follows the proper schema structure
+    def validate_alarm_event_schema(self, alarm):
+        try:
+            jsonschema.validate(alarm, ALARM_SCHEMA)
+        except Exception as e:
+            self.assertTrue(False, 'Validation failed for alarm : {}'.format(e.message))
+
+    # Verify that alarm identifier based on the format generated by default.
+    def verify_alarm_event_id(self, device_id, alarm_id):
+        prefix = re.findall(r"(voltha)\.(\w+)\.(\w+)", alarm_id)
+
+        self.assertEqual(len(prefix), 1, 'Failed to parse the alarm id: {}'.format(alarm_id))
+        self.assertEqual(len(prefix[0]), 3, 'Expected id format: voltha.<adapter name>.<device id>')
+        self.assertEqual(prefix[0][0], 'voltha', 'Expected id format: voltha.<adapter name>.<device id>')
+        self.assertEqual(prefix[0][1], 'simulated_olt', 'Expected id format: voltha.<adapter name>.<device id>')
+        self.assertEqual(prefix[0][2], device_id, 'Expected id format: voltha.<adapter name>.<device id>')
+
+
+if __name__ == '__main__':
+    main()
diff --git a/voltha/adapters/interface.py b/voltha/adapters/interface.py
index 4011166..4111735 100644
--- a/voltha/adapters/interface.py
+++ b/voltha/adapters/interface.py
@@ -256,3 +256,10 @@
         :param kpi_event_msg: A protobuf message of KpiEvent type.
         :return: None
         """
+
+    def submit_alarm(alarm_event_msg):
+        """
+        Submit an alarm on behalf of the OLT and its adapter.
+        :param alarm_event_msg: A protobuf message of AlarmEvent type.
+        :return: None
+        """
diff --git a/voltha/adapters/simulated_olt/simulated_olt.py b/voltha/adapters/simulated_olt/simulated_olt.py
index 406e210..01fc8e6 100644
--- a/voltha/adapters/simulated_olt/simulated_olt.py
+++ b/voltha/adapters/simulated_olt/simulated_olt.py
@@ -44,13 +44,13 @@
 from voltha.protos.openflow_13_pb2 import ofp_desc, ofp_port, OFPPF_1GB_FD, \
     OFPPF_FIBER, OFPPS_LIVE, ofp_switch_features, OFPC_PORT_STATS, \
     OFPC_GROUP_STATS, OFPC_TABLE_STATS, OFPC_FLOW_STATS
+from voltha.protos.events_pb2 import AlarmEvent, AlarmEventType, AlarmEventSeverity, AlarmEventState, AlarmEventCategory
 
 log = structlog.get_logger()
 
 
 @implementer(IAdapterInterface)
 class SimulatedOltAdapter(object):
-
     name = 'simulated_olt'
 
     supported_device_types = [
@@ -372,6 +372,8 @@
         # reactor.callLater(0.1, self._simulate_detection_of_onus, device.id)
         self.start_kpi_collection(device.id)
 
+        self.start_alarm_simulation(device.id)
+
     @inlineCallbacks
     def _simulate_detection_of_onus(self, device_id):
         for i in xrange(1, 5):
@@ -546,7 +548,6 @@
             else:
                 raise Exception('Port should be 1 or 2 by our convention')
 
-
     def update_flows_incrementally(self, device, flow_changes, group_changes):
         raise NotImplementedError()
 
@@ -580,46 +581,46 @@
                 # upgraded the metrics to include packet statistics for
                 # testing.
                 nni_port_metrics = yield dict(
-                    tx_pkts=self.nni_tx_pkts+random.randint(90, 100),
-                    rx_pkts=self.nni_rx_pkts+random.randint(90, 100),
-                    tx_bytes=self.nni_tx_bytes+random.randint(90000, 100000),
-                    rx_bytes=self.nni_rx_bytes+random.randint(90000, 100000),
-                    tx_64 = self.nni_tx_64 + random.randint(50,55),
-                    tx_65_127 = self.nni_tx_65_127 + random.randint(55,60),
-                    tx_128_255 = self.nni_tx_128_255 + random.randint(60,65),
-                    tx_256_511 = self.nni_tx_256_511 + random.randint(85,90),
-                    tx_512_1023 = self.nni_tx_512_1023 + random.randint(90,95),
-                    tx_1024_1518 = self.nni_tx_1024_1518 + random.randint(60,65),
-                    tx_1519_9k = self.nni_tx_1519_9k + random.randint(50,55),
+                    tx_pkts=self.nni_tx_pkts + random.randint(90, 100),
+                    rx_pkts=self.nni_rx_pkts + random.randint(90, 100),
+                    tx_bytes=self.nni_tx_bytes + random.randint(90000, 100000),
+                    rx_bytes=self.nni_rx_bytes + random.randint(90000, 100000),
+                    tx_64=self.nni_tx_64 + random.randint(50, 55),
+                    tx_65_127=self.nni_tx_65_127 + random.randint(55, 60),
+                    tx_128_255=self.nni_tx_128_255 + random.randint(60, 65),
+                    tx_256_511=self.nni_tx_256_511 + random.randint(85, 90),
+                    tx_512_1023=self.nni_tx_512_1023 + random.randint(90, 95),
+                    tx_1024_1518=self.nni_tx_1024_1518 + random.randint(60, 65),
+                    tx_1519_9k=self.nni_tx_1519_9k + random.randint(50, 55),
 
-                    rx_64 = self.nni_tx_64 + random.randint(50,55),
-                    rx_65_127 = self.nni_tx_65_127 + random.randint(55,60),
-                    rx_128_255 = self.nni_tx_128_255 + random.randint(60,65),
-                    rx_256_511 = self.nni_tx_256_511 + random.randint(85,90),
-                    rx_512_1023 = self.nni_tx_512_1023 + random.randint(90,95),
-                    rx_1024_1518 = self.nni_tx_1024_1518 + random.randint(60,65),
-                    rx_1519_9k = self.nni_tx_1519_9k + random.randint(50,55)
+                    rx_64=self.nni_tx_64 + random.randint(50, 55),
+                    rx_65_127=self.nni_tx_65_127 + random.randint(55, 60),
+                    rx_128_255=self.nni_tx_128_255 + random.randint(60, 65),
+                    rx_256_511=self.nni_tx_256_511 + random.randint(85, 90),
+                    rx_512_1023=self.nni_tx_512_1023 + random.randint(90, 95),
+                    rx_1024_1518=self.nni_tx_1024_1518 + random.randint(60, 65),
+                    rx_1519_9k=self.nni_tx_1519_9k + random.randint(50, 55)
                 )
                 pon_port_metrics = yield dict(
-                    tx_pkts=self.pon_tx_pkts+random.randint(90, 100),
-                    rx_pkts=self.pon_rx_pkts+random.randint(90, 100),
-                    tx_bytes=self.pon_tx_bytes+random.randint(90000, 100000),
-                    rx_bytes=self.pon_rx_bytes+random.randint(90000, 100000),
-                    tx_64 = self.pon_tx_64 + random.randint(50,55),
-                    tx_65_127 = self.pon_tx_65_127 + random.randint(55,60),
-                    tx_128_255 = self.pon_tx_128_255 + random.randint(60,65),
-                    tx_256_511 = self.pon_tx_256_511 + random.randint(85,90),
-                    tx_512_1023 = self.pon_tx_512_1023 + random.randint(90,95),
-                    tx_1024_1518 = self.pon_tx_1024_1518 + random.randint(60,65),
-                    tx_1519_9k = self.pon_tx_1519_9k + random.randint(50,55),
+                    tx_pkts=self.pon_tx_pkts + random.randint(90, 100),
+                    rx_pkts=self.pon_rx_pkts + random.randint(90, 100),
+                    tx_bytes=self.pon_tx_bytes + random.randint(90000, 100000),
+                    rx_bytes=self.pon_rx_bytes + random.randint(90000, 100000),
+                    tx_64=self.pon_tx_64 + random.randint(50, 55),
+                    tx_65_127=self.pon_tx_65_127 + random.randint(55, 60),
+                    tx_128_255=self.pon_tx_128_255 + random.randint(60, 65),
+                    tx_256_511=self.pon_tx_256_511 + random.randint(85, 90),
+                    tx_512_1023=self.pon_tx_512_1023 + random.randint(90, 95),
+                    tx_1024_1518=self.pon_tx_1024_1518 + random.randint(60, 65),
+                    tx_1519_9k=self.pon_tx_1519_9k + random.randint(50, 55),
 
-                    rx_64 = self.pon_tx_64 + random.randint(50,55),
-                    rx_65_127 = self.pon_tx_65_127 + random.randint(55,60),
-                    rx_128_255 = self.pon_tx_128_255 + random.randint(60,65),
-                    rx_256_511 = self.pon_tx_256_511 + random.randint(85,90),
-                    rx_512_1023 = self.pon_tx_512_1023 + random.randint(90,95),
-                    rx_1024_1518 = self.pon_tx_1024_1518 + random.randint(60,65),
-                    rx_1519_9k = self.pon_tx_1519_9k + random.randint(50,55)
+                    rx_64=self.pon_tx_64 + random.randint(50, 55),
+                    rx_65_127=self.pon_tx_65_127 + random.randint(55, 60),
+                    rx_128_255=self.pon_tx_128_255 + random.randint(60, 65),
+                    rx_256_511=self.pon_tx_256_511 + random.randint(85, 90),
+                    rx_512_1023=self.pon_tx_512_1023 + random.randint(90, 95),
+                    rx_1024_1518=self.pon_tx_1024_1518 + random.randint(60, 65),
+                    rx_1519_9k=self.pon_tx_1519_9k + random.randint(50, 55)
                 )
                 self.pon_tx_pkts = pon_port_metrics['tx_pkts']
                 self.pon_rx_pkts = pon_port_metrics['rx_pkts']
@@ -694,6 +695,45 @@
         lc = LoopingCall(_collect, device_id, prefix)
         lc.start(interval=15)  # TODO make this configurable
 
+    def start_alarm_simulation(self, device_id):
+
+        """Simulate periodic device alarms"""
+        import random
+
+        def _generate_alarm(device_id):
+
+            try:
+                # Randomly choose values for each enum types
+                severity = random.choice(list(v for k, v in AlarmEventSeverity.DESCRIPTOR.enum_values_by_name.items()))
+                state = random.choice(list(v for k, v in AlarmEventState.DESCRIPTOR.enum_values_by_name.items()))
+                type = random.choice(list(v for k, v in AlarmEventType.DESCRIPTOR.enum_values_by_name.items()))
+                category = random.choice(list(v for k, v in AlarmEventCategory.DESCRIPTOR.enum_values_by_name.items()))
+
+                description = "Simulated alarm - device:{} type:{} severity:{} state:{} category:{}".format(device_id,
+                                                                                                            type.name,
+                                                                                                            severity.name,
+                                                                                                            state.name,
+                                                                                                            category.name)
+
+                current_context = {}
+                for key, value in self.__dict__.items():
+                    current_context[key] = str(value)
+
+                alarm_event = self.adapter_agent.create_alarm(resource_id=device_id,
+                                                              type=type.number,
+                                                              category=category.number,
+                                                              severity=severity.number,
+                                                              state=state.number,
+                                                              description=description,
+                                                              context=current_context)
+
+                self.adapter_agent.submit_alarm(alarm_event)
+
+            except Exception as e:
+                log.exception('failed-to-submit-alarm', e=e)
+
+        alarm_lc = LoopingCall(_generate_alarm, device_id)
+        alarm_lc.start(30)
 
     # ~~~~~~~~~~~~~~~~~~~~ Embedded test Klein rest server ~~~~~~~~~~~~~~~~~~~~
 
@@ -713,7 +753,7 @@
         eapol_start = str(
             Ether(src='00:11:22:33:44:55') /
             EAPOL(type=1, len=0) /
-            Padding(load=42*'\x00')
+            Padding(load=42 * '\x00')
         )
         device = self.adapter_agent.get_device(device_id)
         self.adapter_agent.send_packet_in(logical_device_id=device.parent_id,
diff --git a/voltha/core/adapter_agent.py b/voltha/core/adapter_agent.py
index d5f2a7c..485869e 100644
--- a/voltha/core/adapter_agent.py
+++ b/voltha/core/adapter_agent.py
@@ -18,6 +18,8 @@
 Agent to play gateway between CORE and an individual adapter.
 """
 from uuid import uuid4
+import arrow
+import re
 
 import structlog
 from google.protobuf.json_format import MessageToJson
@@ -30,7 +32,7 @@
 from voltha.adapters.interface import IAdapterAgent
 from voltha.protos import third_party
 from voltha.protos.device_pb2 import Device, Port
-from voltha.protos.events_pb2 import KpiEvent
+from voltha.protos.events_pb2 import KpiEvent, AlarmEvent, AlarmEventType, AlarmEventSeverity, AlarmEventState, AlarmEventCategory
 from voltha.protos.voltha_pb2 import DeviceGroup, LogicalDevice, \
     LogicalPort, AdminState
 from voltha.registry import registry
@@ -316,3 +318,37 @@
         except Exception as e:
             self.log.exception('failed-kpi-submission',
                                type=type(kpi_event_msg))
+
+    # ~~~~~~~~~~~~~~~~~~~ Handle alarm submissions ~~~~~~~~~~~~~~~~~~~~~
+
+    def create_alarm(self, id=None, resource_id=None, description=None, raised_ts=0, changed_ts=0,
+                     type=AlarmEventType.EQUIPMENT, category=AlarmEventCategory.GENERAL,
+                     severity=AlarmEventSeverity.MINOR, state=AlarmEventState.RAISED,
+                     context=None):
+
+        # Construct the ID if it is not provided
+        if id == None:
+            id = 'voltha.{}.{}'.format(self.adapter_name, resource_id)
+
+        return AlarmEvent(
+            id=id,
+            resource_id=resource_id,
+            type=type,
+            category=category,
+            severity=severity,
+            state=state,
+            description=description,
+            reported_ts=arrow.utcnow().timestamp,
+            raised_ts=raised_ts,
+            changed_ts=changed_ts,
+            context=context
+        )
+
+    def submit_alarm(self, alarm_event_msg):
+        try:
+            assert isinstance(alarm_event_msg, AlarmEvent)
+            self.event_bus.publish('alarms', alarm_event_msg)
+
+        except Exception as e:
+            self.log.exception('failed-alarm-submission',
+                               type=type(alarm_event_msg))
diff --git a/voltha/protos/events.proto b/voltha/protos/events.proto
index 5052a96..d9fef30 100644
--- a/voltha/protos/events.proto
+++ b/voltha/protos/events.proto
@@ -52,3 +52,83 @@
     map<string, MetricValuePairs> prefixes = 3;
 
 }
+
+/*
+ * Identify to the area of the system impacted by the alarm
+ */
+message AlarmEventType {
+    enum AlarmEventType {
+        COMMUNICATION = 0;
+        ENVIRONMENT = 1;
+        EQUIPMENT = 2;
+        SERVICE = 3;
+        PROCESSING = 4;
+        SECURITY = 5;
+    }
+}
+
+/*
+ * Identify to the functional category originating the alarm
+ */
+message AlarmEventCategory {
+    enum AlarmEventCategory {
+        GENERAL = 0;
+        CONFIGURATION = 1;
+        PON = 2;
+        TCA = 3;
+    }
+}
+
+/*
+ * Active state of the alarm
+ */
+message AlarmEventState {
+    enum AlarmEventState {
+        RAISED = 0;
+        CLEARED = 1;
+    }
+}
+
+/*
+ * Identify the overall impact of the alarm on the system
+ */
+message AlarmEventSeverity {
+    enum AlarmEventSeverity {
+        INDETERMINATE = 0;
+        WARNING = 1;
+        MINOR = 2;
+        MAJOR = 3;
+        CRITICAL = 4;
+    }
+}
+
+/*
+ *
+ */
+message AlarmEvent {
+    string id = 1;                                      // Unique ID for this alarm.  e.g. voltha.some_olt.1234
+
+    AlarmEventType.AlarmEventType type = 2;             // Refers to the area of the system impacted by the alarm
+                                                        // COMMUNICATION, ENVIRONMENT, EQUIPMENT, SERVICE, PROCESSING, SECURITY
+
+    AlarmEventCategory.AlarmEventCategory category = 3; // Refers to the functional category originating the alarm
+                                                        // GENERAL, CONFIG, PON, TCA
+
+    AlarmEventState.AlarmEventState state = 4;          // Current active state of the alarm
+                                                        // RAISED, CLEARED
+
+    AlarmEventSeverity.AlarmEventSeverity severity = 5; // Overall impact of the alarm on the system
+                                                        // INDETERMINATE, WARNING, MINOR, MAJOR, CRITICAL
+
+    float raised_ts = 6;                                // Timestamp at which the alarm was first raised
+
+    float reported_ts = 7;                              // Timestamp at which the alarm was reported
+
+    float changed_ts = 8;                               // Timestamp at which the alarm has changed since it was raised
+
+    string resource_id = 9;                             // Identifier of the originating resource of the alarm
+
+    string description = 10;                            // Textual explanation of the alarm
+
+    map<string,string> context = 11;                    // Key/Value storage for extra information that may give context to the alarm
+}