Voltha Alarm Filters: Added new integration test
- Also fixed alarm events test

- Execution should succeed when the topic does not exist
- Send msg to the kafka bus to ensure the topic gets created
- Optimized the executed kafka commands

Change-Id: Iebafddf5da8f46acd4bf77e1a9fae823f085c70e
diff --git a/tests/itests/voltha/test_voltha_alarm_events.py b/tests/itests/voltha/test_voltha_alarm_events.py
index 769b7f2..c5e2858 100644
--- a/tests/itests/voltha/test_voltha_alarm_events.py
+++ b/tests/itests/voltha/test_voltha_alarm_events.py
@@ -14,7 +14,8 @@
 COMMANDS = dict(
     kafka_client_run="kafkacat -b {} -L",
-    kafka_client_alarm_check="kafkacat -o -5 -b {} -C -t voltha.alarms -c 10",
+    kafka_client_send_msg='echo hello | kafkacat -b {} -P -t voltha.alarms -c 1',
+    kafka_client_alarm_check="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
@@ -53,7 +54,11 @@
     # ~~~~~~~~~~~~ Tests ~~~~~~~~~~~~
-    def test_alarm_topic_exists(self):
+    def test_1_alarm_topic_exists(self):
+        # Produce a message to ensure that the topic exists
+        cmd = COMMANDS['kafka_client_send_msg'].format(self.kafka_endpoint)
+        run_long_running_command_with_timeout(cmd, 5)
         # We want to make sure that the topic is available on the system
         expected_pattern = ['voltha.alarms']
@@ -71,7 +76,7 @@
                         'Failed to find topic {}'.format(expected_pattern))
-    def test_alarm_generated_by_adapter(self):
+    def test_2_alarm_generated_by_adapter(self):
         # Verify that REST calls can be made
@@ -109,35 +114,39 @@
     # This will trigger the simulation of random alarms
     def activate_device(self, device_id):
         path = '/api/v1/local/devices/{}'.format(device_id)
-        self.post(path + '/activate', expected_code=200)
+        self.post(path + '/enable', expected_code=200)
         device = self.get(path)
         self.assertEqual(device['admin_state'], 'ENABLED')
     # Retrieve a sample alarm for a specific device
     def get_alarm_event(self, device_id):
         cmd = COMMANDS['kafka_client_alarm_check'].format(self.kafka_endpoint)
-        kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+        kafka_client_output = run_long_running_command_with_timeout(cmd, 30)
         # Verify the kafka client output
         found = False
-        self.alarm_data = None
+        alarm_data = None
         for out in kafka_client_output:
-            self.alarm_data = simplejson.loads(out)
+            # Catch any error that might occur while reading the kafka messages
+            try:
+                alarm_data = simplejson.loads(out)
+                print alarm_data
-            print self.alarm_data
+                if not alarm_data or 'resource_id' not in alarm_data:
+                    continue
+                elif alarm_data['resource_id'] == device_id:
+                    found = True
+                    break
-            if 'resource_id' not in self.alarm_data:
+            except Exception as e:
-            elif self.alarm_data['resource_id'] == device_id:
-                found = True
-                break
             'Failed to find kafka alarm with device id:{}'.format(device_id))
-        return self.alarm_data
+        return alarm_data
     # Verify that the alarm follows the proper schema structure
     def validate_alarm_event_schema(self, alarm):
diff --git a/tests/itests/voltha/test_voltha_alarm_filters.py b/tests/itests/voltha/test_voltha_alarm_filters.py
new file mode 100644
index 0000000..88744ea
--- /dev/null
+++ b/tests/itests/voltha/test_voltha_alarm_filters.py
@@ -0,0 +1,160 @@
+from unittest import main
+import simplejson
+from google.protobuf.json_format import MessageToDict
+from common.utils.consulhelpers import get_endpoint_from_consul
+from tests.itests.docutests.test_utils import \
+    run_long_running_command_with_timeout
+from tests.itests.voltha.rest_base import RestBase
+from voltha.protos.device_pb2 import Device
+from voltha.protos.voltha_pb2 import AlarmFilter
+# ~~~~~~~ Common variables ~~~~~~~
+LOCAL_CONSUL = "localhost:8500"
+COMMANDS = dict(
+    kafka_client_run="kafkacat -b {} -L",
+    kafka_client_send_msg='echo hello | kafkacat -b {} -P -t voltha.alarms -c 1',
+    kafka_client_alarm_check="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
+# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+class VolthaAlarmFilterTests(RestBase):
+    # Retrieve details on the REST entry point
+    rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'chameleon-rest')
+    # Construct the base_url
+    base_url = 'http://' + rest_endpoint
+    # Start by querying consul to get the endpoint details
+    kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+    # ~~~~~~~~~~~~ Tests ~~~~~~~~~~~~
+    def test_1_alarm_topic_exists(self):
+        # Produce a message to ensure that the topic exists
+        cmd = COMMANDS['kafka_client_send_msg'].format(self.kafka_endpoint)
+        run_long_running_command_with_timeout(cmd, 5)
+        # We want to make sure that the topic is available on the system
+        expected_pattern = ['voltha.alarms']
+        # Start the kafka client to retrieve details on topics
+        cmd = COMMANDS['kafka_client_run'].format(self.kafka_endpoint)
+        kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+        # Loop through the kafka client output to find the topic
+        found = False
+        for out in kafka_client_output:
+            if all(ep in out for ep in expected_pattern):
+                found = True
+                break
+        self.assertTrue(found,
+                        'Failed to find topic {}'.format(expected_pattern))
+    def test_2_alarm_generated_by_adapter(self):
+        # Verify that REST calls can be made
+        self.verify_rest()
+        # Create a new device
+        device_not_filtered = self.add_device()
+        device_filtered = self.add_device()
+        self.add_device_id_filter(device_filtered['id'])
+        # Activate the new device
+        self.activate_device(device_not_filtered['id'])
+        self.activate_device(device_filtered['id'])
+        # The simulated olt devices should start generating alarms periodically
+        # We should see alarms generated for the non filtered device
+        self.get_alarm_event(device_not_filtered['id'])
+        # We should not see any alarms from the filtered device
+        self.get_alarm_event(device_filtered['id'], True)
+    # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+    # Make sure the Voltha REST interface is available
+    def verify_rest(self):
+        self.get('/api/v1')
+    # Create a new simulated device
+    def add_device(self):
+        device = Device(
+            type='simulated_olt',
+        )
+        device = self.post('/api/v1/local/devices', MessageToDict(device),
+                           expected_code=200)
+        return device
+    # Create a filter against a specific device id
+    def add_device_id_filter(self, device_id):
+        rules = list()
+        rule = dict()
+        # Create a filter with a single rule
+        rule['key'] = 'device_id'
+        rule['value'] = device_id
+        rules.append(rule)
+        alarm_filter = AlarmFilter(rules=rules)
+        alarm_filter = self.post('/api/v1/local/alarm_filters', MessageToDict(alarm_filter),
+                                 expected_code=200)
+        return alarm_filter
+    # Active the simulated device.
+    # This will trigger the simulation of random alarms
+    def activate_device(self, device_id):
+        path = '/api/v1/local/devices/{}'.format(device_id)
+        self.post(path + '/enable', expected_code=200)
+        device = self.get(path)
+        self.assertEqual(device['admin_state'], 'ENABLED')
+    # Retrieve a sample alarm for a specific device
+    def get_alarm_event(self, device_id, expect_failure=False):
+        cmd = COMMANDS['kafka_client_alarm_check'].format(self.kafka_endpoint)
+        kafka_client_output = run_long_running_command_with_timeout(cmd, 30)
+        # Verify the kafka client output
+        found = False
+        alarm_data = None
+        for out in kafka_client_output:
+            # Catch any error that might occur while reading the kafka messages
+            try:
+                alarm_data = simplejson.loads(out)
+                print alarm_data
+                if not alarm_data or 'resource_id' not in alarm_data:
+                    continue
+                elif alarm_data['resource_id'] == device_id:
+                    found = True
+                    break
+            except Exception as e:
+                continue
+        if not expect_failure:
+            self.assertTrue(
+                found,
+                'Failed to find kafka alarm with device id:{}'.format(device_id))
+        else:
+            self.assertFalse(
+                found,
+                'Found a kafka alarm with device id:{}.  It should have been filtered'.format(
+                    device_id))
+        return alarm_data
+if __name__ == '__main__':
+    main()