Voltha Alarm Filters: Added new integration test
- Also fixed alarm events test
Amendments:
- Execution should succeed when the topic does not exist
- Send msg to the kafka bus to ensure the topic gets created
- Optimized the executed kafka commands
Change-Id: Iebafddf5da8f46acd4bf77e1a9fae823f085c70e
diff --git a/tests/itests/voltha/test_voltha_alarm_events.py b/tests/itests/voltha/test_voltha_alarm_events.py
index 769b7f2..c5e2858 100644
--- a/tests/itests/voltha/test_voltha_alarm_events.py
+++ b/tests/itests/voltha/test_voltha_alarm_events.py
@@ -14,7 +14,8 @@
COMMANDS = dict(
kafka_client_run="kafkacat -b {} -L",
- kafka_client_alarm_check="kafkacat -o -5 -b {} -C -t voltha.alarms -c 10",
+ kafka_client_send_msg='echo hello | kafkacat -b {} -P -t voltha.alarms -c 1',
+ kafka_client_alarm_check="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
)
ALARM_SCHEMA = {
@@ -53,7 +54,11 @@
# ~~~~~~~~~~~~ Tests ~~~~~~~~~~~~
- def test_alarm_topic_exists(self):
+ def test_1_alarm_topic_exists(self):
+ # Produce a message to ensure that the topic exists
+ cmd = COMMANDS['kafka_client_send_msg'].format(self.kafka_endpoint)
+ run_long_running_command_with_timeout(cmd, 5)
+
# We want to make sure that the topic is available on the system
expected_pattern = ['voltha.alarms']
@@ -71,7 +76,7 @@
self.assertTrue(found,
'Failed to find topic {}'.format(expected_pattern))
- def test_alarm_generated_by_adapter(self):
+ def test_2_alarm_generated_by_adapter(self):
# Verify that REST calls can be made
self.verify_rest()
@@ -109,35 +114,39 @@
# This will trigger the simulation of random alarms
def activate_device(self, device_id):
path = '/api/v1/local/devices/{}'.format(device_id)
- self.post(path + '/activate', expected_code=200)
+ self.post(path + '/enable', expected_code=200)
device = self.get(path)
self.assertEqual(device['admin_state'], 'ENABLED')
# Retrieve a sample alarm for a specific device
def get_alarm_event(self, device_id):
cmd = COMMANDS['kafka_client_alarm_check'].format(self.kafka_endpoint)
- kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+ kafka_client_output = run_long_running_command_with_timeout(cmd, 30)
# Verify the kafka client output
found = False
- self.alarm_data = None
+ alarm_data = None
for out in kafka_client_output:
- self.alarm_data = simplejson.loads(out)
+ # Catch any error that might occur while reading the kafka messages
+ try:
+ alarm_data = simplejson.loads(out)
+ print alarm_data
- print self.alarm_data
+ if not alarm_data or 'resource_id' not in alarm_data:
+ continue
+ elif alarm_data['resource_id'] == device_id:
+ found = True
+ break
- if 'resource_id' not in self.alarm_data:
+ except Exception as e:
continue
- elif self.alarm_data['resource_id'] == device_id:
- found = True
- break
self.assertTrue(
found,
'Failed to find kafka alarm with device id:{}'.format(device_id))
- return self.alarm_data
+ return alarm_data
# Verify that the alarm follows the proper schema structure
def validate_alarm_event_schema(self, alarm):
diff --git a/tests/itests/voltha/test_voltha_alarm_filters.py b/tests/itests/voltha/test_voltha_alarm_filters.py
new file mode 100644
index 0000000..88744ea
--- /dev/null
+++ b/tests/itests/voltha/test_voltha_alarm_filters.py
@@ -0,0 +1,160 @@
+from unittest import main
+
+import simplejson
+from google.protobuf.json_format import MessageToDict
+
+from common.utils.consulhelpers import get_endpoint_from_consul
+from tests.itests.docutests.test_utils import \
+ run_long_running_command_with_timeout
+from tests.itests.voltha.rest_base import RestBase
+from voltha.protos.device_pb2 import Device
+from voltha.protos.voltha_pb2 import AlarmFilter
+
+# ~~~~~~~ Common variables ~~~~~~~
+
+LOCAL_CONSUL = "localhost:8500"
+
+COMMANDS = dict(
+ kafka_client_run="kafkacat -b {} -L",
+ kafka_client_send_msg='echo hello | kafkacat -b {} -P -t voltha.alarms -c 1',
+ kafka_client_alarm_check="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
+)
+
+
+# ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+
+class VolthaAlarmFilterTests(RestBase):
+ # Retrieve details on the REST entry point
+ rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'chameleon-rest')
+
+ # Construct the base_url
+ base_url = 'http://' + rest_endpoint
+
+ # Start by querying consul to get the endpoint details
+ kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
+
+ # ~~~~~~~~~~~~ Tests ~~~~~~~~~~~~
+
+ def test_1_alarm_topic_exists(self):
+ # Produce a message to ensure that the topic exists
+ cmd = COMMANDS['kafka_client_send_msg'].format(self.kafka_endpoint)
+ run_long_running_command_with_timeout(cmd, 5)
+
+ # We want to make sure that the topic is available on the system
+ expected_pattern = ['voltha.alarms']
+
+ # Start the kafka client to retrieve details on topics
+ cmd = COMMANDS['kafka_client_run'].format(self.kafka_endpoint)
+ kafka_client_output = run_long_running_command_with_timeout(cmd, 20)
+
+ # Loop through the kafka client output to find the topic
+ found = False
+ for out in kafka_client_output:
+ if all(ep in out for ep in expected_pattern):
+ found = True
+ break
+
+ self.assertTrue(found,
+ 'Failed to find topic {}'.format(expected_pattern))
+
+ def test_2_alarm_generated_by_adapter(self):
+ # Verify that REST calls can be made
+ self.verify_rest()
+
+ # Create a new device
+ device_not_filtered = self.add_device()
+ device_filtered = self.add_device()
+
+ self.add_device_id_filter(device_filtered['id'])
+
+ # Activate the new device
+ self.activate_device(device_not_filtered['id'])
+ self.activate_device(device_filtered['id'])
+
+ # The simulated olt devices should start generating alarms periodically
+
+ # We should see alarms generated for the non filtered device
+ self.get_alarm_event(device_not_filtered['id'])
+
+ # We should not see any alarms from the filtered device
+ self.get_alarm_event(device_filtered['id'], True)
+
+ # ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
+
+ # Make sure the Voltha REST interface is available
+ def verify_rest(self):
+ self.get('/api/v1')
+
+ # Create a new simulated device
+ def add_device(self):
+ device = Device(
+ type='simulated_olt',
+ )
+ device = self.post('/api/v1/local/devices', MessageToDict(device),
+ expected_code=200)
+ return device
+
+ # Create a filter against a specific device id
+ def add_device_id_filter(self, device_id):
+ rules = list()
+ rule = dict()
+
+ # Create a filter with a single rule
+ rule['key'] = 'device_id'
+ rule['value'] = device_id
+ rules.append(rule)
+
+ alarm_filter = AlarmFilter(rules=rules)
+ alarm_filter = self.post('/api/v1/local/alarm_filters', MessageToDict(alarm_filter),
+ expected_code=200)
+
+ return alarm_filter
+
+ # Active the simulated device.
+ # This will trigger the simulation of random alarms
+ def activate_device(self, device_id):
+ path = '/api/v1/local/devices/{}'.format(device_id)
+ self.post(path + '/enable', expected_code=200)
+ device = self.get(path)
+ self.assertEqual(device['admin_state'], 'ENABLED')
+
+ # Retrieve a sample alarm for a specific device
+ def get_alarm_event(self, device_id, expect_failure=False):
+ cmd = COMMANDS['kafka_client_alarm_check'].format(self.kafka_endpoint)
+ kafka_client_output = run_long_running_command_with_timeout(cmd, 30)
+
+ # Verify the kafka client output
+ found = False
+ alarm_data = None
+
+ for out in kafka_client_output:
+ # Catch any error that might occur while reading the kafka messages
+ try:
+ alarm_data = simplejson.loads(out)
+ print alarm_data
+
+ if not alarm_data or 'resource_id' not in alarm_data:
+ continue
+ elif alarm_data['resource_id'] == device_id:
+ found = True
+ break
+
+ except Exception as e:
+ continue
+
+ if not expect_failure:
+ self.assertTrue(
+ found,
+ 'Failed to find kafka alarm with device id:{}'.format(device_id))
+ else:
+ self.assertFalse(
+ found,
+ 'Found a kafka alarm with device id:{}. It should have been filtered'.format(
+ device_id))
+
+ return alarm_data
+
+
+if __name__ == '__main__':
+ main()