blob: 1111c1e2c5894620cf5c4a7e3cb22ab57c5aa457 [file] [log] [blame]
khenaidoo032d3302017-06-09 14:50:04 -04001from google.protobuf.json_format import MessageToDict
2from time import time, sleep
3from voltha.core.flow_decomposer import *
4from voltha.protos import openflow_13_pb2 as ofp
5import simplejson, jsonschema
6import os
7import subprocess
8import select
9
10from tests.itests.docutests.test_utils import \
11 run_command_to_completion_with_raw_stdout, \
12 run_command_to_completion_with_stdout_in_list
13
14from common.utils.consulhelpers import verify_all_services_healthy
15
16from voltha.protos.device_pb2 import Device
17from tests.itests.voltha.rest_base import RestBase
18from common.utils.consulhelpers import get_endpoint_from_consul
19from voltha.protos.voltha_pb2 import AlarmFilter
20
21LOCAL_CONSUL = "localhost:8500"
22DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test.yml"
23
24command_defs = dict(
25 docker_ps="docker ps",
26 docker_compose_start_all="docker-compose -f {} up -d "
27 .format(DOCKER_COMPOSE_FILE),
28 docker_stop_and_remove_all_containers="docker-compose -f {} down"
29 .format(DOCKER_COMPOSE_FILE),
30 docker_compose_start_voltha="docker-compose -f {} up -d voltha "
31 .format(DOCKER_COMPOSE_FILE),
32 docker_compose_stop_voltha="docker-compose -f {} stop voltha"
33 .format(DOCKER_COMPOSE_FILE),
34 docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
35 .format(DOCKER_COMPOSE_FILE),
36 kafka_topics="kafkacat -b {} -L",
37 kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
38 kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
39)
40
41ALARM_SCHEMA = {
42 "type": "object",
43 "properties": {
44 "id": {"type": "string"},
45 "type": {"type": "string"},
46 "category": {"type": "string"},
47 "state": {"type": "string"},
48 "severity": {"type": "string"},
49 "resource_id": {"type": "string"},
50 "raised_ts": {"type": "number"},
51 "reported_ts": {"type": "number"},
52 "changed_ts": {"type": "number"},
53 "description": {"type": "string"},
54 "context": {
55 "type": "object",
56 "additionalProperties": {"type": "string"}
57 }
58 }
59}
60
61
62class TestConsulPersistence(RestBase):
63 t0 = [time()]
64
65 def pt(self, msg=''):
66 t1 = time()
67 print '%20.8f ms - %s' % (1000 * (t1 - TestConsulPersistence.t0[0]),
68 msg)
69 TestConsulPersistence.t0[0] = t1
70
71 def test_all_scenarios(self):
72 self.basic_scenario()
73 self.data_integrity()
74
75 def basic_scenario(self):
76 # 1. Setup the test
77 # A. Stop and restart all containers (to start from clean)
78 # B. Setup the REST endpoint
79 self.pt('Test setup - starts')
80 self._stop_and_remove_all_containers()
81 sleep(5) # A small wait for the system to settle down
82 self.start_all_containers()
83 self.set_rest_endpoint()
84 self.set_kafka_endpoint()
85 self.pt('Test setup - ends')
86
87 # 2. Test 1 - Verify no data is present in voltha
88 self.pt('Test 1 - starts')
89 self.verify_instance_has_no_data()
90 self.pt('Test 1 - ends')
91
92 # 3. Test 2 - Verify voltha data is preserved after a restart
93 # A. Add data to voltha
94 # B. Stop voltha instance only (data is in consul)
95 # C. Start a new voltha instance
96 # D. Verify the data is previoulsy set data is in the new voltha
97 # instance
98 self.pt('Test 2 - starts')
99 self.add_data_to_voltha_instance()
100 instance_data_before = self.get_voltha_instance_data()
101 self.stop_remove_start_voltha()
102 instance_data_after = self.get_voltha_instance_data()
103 self.assertEqual(instance_data_before, instance_data_after)
104 self.pt('Test 2 - ends')
105
106 def data_integrity(self):
107 """
108 This test goes through several voltha restarts along with variations
109 of configurations in between to ensure data integrity is preserved.
110
111 During this test, the user will be prompted to start ponsim. Use
112 these commands to run ponsim with 1 OLT and 4 ONUs. THis will also
113 enable alarm at a frequency of 5 seconds:
114 sudo -s
115 . ./env.sh
116 ./ponsim/main.py -v -o 4 -a -f 5
117
118 The user will also be prompted to enable port forwarding on ponmgmt
119 bridge. Use these commands:
120 sudo -s
121 echo 8 > /sys/class/net/ponmgmt/bridge/group_fwd_mask
122 """
123
124 def prompt(input_func, text):
125 val = input_func(text)
126 return val
127
128 def prompt_for_return(text):
129 return raw_input(text)
130
131 # 1. Setup the test
132 # A. Stop and restart all containers (to start from clean)
133 # B. Setup the REST endpoint
134 self.pt('Test setup - starts')
135 self._stop_and_remove_all_containers()
136 sleep(5) # A small wait for the system to settle down
137 self.start_all_containers()
138 self.consume_kafka_message_starting_at = time()
139 self.set_rest_endpoint()
140 self.set_kafka_endpoint()
141 self.pt('Test setup - ends')
142
143 # Get the user to start PONSIM as root
144 prompt(prompt_for_return,
145 '\nStart PONSIM as root with alarms enabled in another window ...')
146
147 prompt(prompt_for_return,
148 '\nEnsure port forwarding is set on ponmgnt ...')
149
150 # 2. Configure some data on the volthainstance
151 self.assert_no_device_present()
152 olt = self.add_olt_device()
153 olt_id = olt['id']
154 self.pt(olt_id)
155 self.verify_device_preprovisioned_state(olt_id)
156 self.enable_device(olt_id)
157 ldev_id = self.wait_for_logical_device(olt_id)
158 onu_ids = self.wait_for_onu_discovery(olt_id)
159 self.verify_logical_ports(ldev_id, 5)
160 self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
161 self.verify_olt_eapol_flow(olt_id)
162 self.assert_kpis_event(olt_id)
163 self.assert_alarm_generation(olt_id)
164 alarm_filter = self.create_device_filter(olt_id)
165 self.consume_kafka_message_starting_at = time()
166 self.assert_alarm_generation(olt_id, event_present=False)
167
168 # 3. Kill and restart the voltha instance
169 self.assert_restart_voltha_successful()
170 self.assert_kpis_event(olt_id)
171 self.remove_device_filter(alarm_filter['id'])
172 self.assert_alarm_generation(olt_id)
173
174 self.pt('Voltha restart with initial set of data - successful')
175
176 # 4. Ensure we can keep doing operation on the new voltha instance
177 # as if nothing happened
178 olt_ids, onu_ids = self.get_olt_onu_devices()
179 self.disable_device(onu_ids[0])
180 self.verify_logical_ports(ldev_id, 4)
181
182 # 5. Kill and restart the voltha instance
183 self.assert_restart_voltha_successful()
184 self.assert_kpis_event(olt_id)
185 alarm_filter = self.create_device_filter(olt_id)
186 self.consume_kafka_message_starting_at = time()
187 self.assert_alarm_generation(olt_id, event_present=False)
188 self.remove_device_filter(alarm_filter['id'])
189 self.assert_alarm_generation(olt_id)
190
191 self.pt('Voltha restart with disabled ONU - successful')
192
193 # 6. Do some more operations
194 self.enable_device(onu_ids[0])
195 self.verify_logical_ports(ldev_id, 5)
196 self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
197 self.verify_olt_eapol_flow(olt_id)
198 self.disable_device(olt_ids[0])
199 self.assert_all_onus_state(olt_ids[0], 'DISABLED', 'UNKNOWN')
200 self.assert_no_logical_device()
201
202 # 6. Kill and restart the voltha instance
203 self.assert_restart_voltha_successful()
204 self.assert_kpis_event(olt_id, event_present=False)
205 self.assert_alarm_generation(olt_id, event_present=False)
206
207 self.pt('Voltha restart with disabled OLT - successful')
208
209 # 7. Enable OLT and very states of ONUs
210 self.enable_device(olt_ids[0])
211 self.assert_all_onus_state(olt_ids[0], 'ENABLED', 'ACTIVE')
212 self.wait_for_logical_device(olt_ids[0])
213
214 # 8. Kill and restart the voltha instance
215 self.assert_restart_voltha_successful()
216 self.assert_kpis_event(olt_id)
217 self.assert_alarm_generation(olt_id)
218
219 self.pt('Voltha restart with re-enabled OLT - successful')
220
221 # 9. Install EAPOL and disable ONU
222 self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
223 self.verify_olt_eapol_flow(olt_id)
224 self.disable_device(onu_ids[0])
225
226 # 10. Kill and restart the voltha instance
227 self.assert_restart_voltha_successful()
228 self.assert_kpis_event(olt_id)
229 self.assert_alarm_generation(olt_id)
230
231 self.pt('Voltha restart with EAPOL and disabled ONU - successful')
232
233 # 11. Delete the OLT and ONU
234 self.delete_device(onu_ids[0])
235 self.verify_logical_ports(ldev_id, 4)
236 self.disable_device(olt_ids[0])
237 self.delete_device(olt_ids[0])
238 self.assert_no_device_present()
239
240 # 13. Kill and restart the voltha instance
241 self.assert_restart_voltha_successful()
242 self.assert_kpis_event(olt_id, event_present=False)
243 self.assert_alarm_generation(olt_id, event_present=False)
244
245 self.pt('Voltha restart with no data - successful')
246
247 # 14. Verify no device present
248 self.assert_no_device_present()
249
250 def set_rest_endpoint(self):
251 self.rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,
252 'chameleon-rest')
253 self.base_url = 'http://' + self.rest_endpoint
254
255 def set_kafka_endpoint(self):
256 self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
257
258 def assert_restart_voltha_successful(self):
259 self.maxDiff = None
260 instance_data_before = self.get_voltha_instance_data()
261 self.stop_remove_start_voltha()
262 instance_data_after = self.get_voltha_instance_data()
263 self.assertEqual(instance_data_before, instance_data_after)
264
265 def stop_remove_start_voltha(self):
266 self.stop_voltha(remove=True)
267 self.consume_kafka_message_starting_at = time()
268 self.start_voltha()
269 # REST endpoint may have changed after a voltha restart
270 # Send a basic command to trigger the REST endpoint to refresh itself
271 try:
272 self.get_devices()
273 except Exception as e:
274 self.pt('get-devices-fail expected')
275 # Wait for everything to settle
276 self.wait_till('chameleon service HEALTHY',
277 lambda: verify_all_services_healthy(LOCAL_CONSUL,
278 service_name='chameleon-rest') == True,
279 timeout=30)
280 # Chameleon takes some time to compile the protos and make them
281 # available. So let's wait 10 seconds
282 sleep(10)
283 # Update the REST endpoint info
284 self.set_rest_endpoint()
285
286 def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
287 deadline = time() + timeout
288 while time() < deadline:
289 if predicate():
290 return
291 sleep(interval)
292 self.fail('Timed out while waiting for condition: {}'.format(msg))
293
294 def _stop_and_remove_all_containers(self):
295 # check if there are any running containers first
296 cmd = command_defs['docker_ps']
297 out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
298 self.assertEqual(rc, 0)
299 if len(out) > 1: # not counting docker ps header
300 cmd = command_defs['docker_stop_and_remove_all_containers']
301 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
302 self.assertEqual(rc, 0)
303
304 def start_all_containers(self):
305 t0 = time()
306
307 # start all the containers
308 self.pt("Starting all containers ...")
309 cmd = command_defs['docker_compose_start_all']
310 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
311 self.assertEqual(rc, 0)
312
313 self.pt("Waiting for voltha and chameleon containers to be ready ...")
314 self.wait_till('voltha services HEALTHY',
315 lambda: verify_all_services_healthy(LOCAL_CONSUL,
316 service_name='voltha-grpc') == True,
317 timeout=10)
318 self.wait_till('chameleon services HEALTHY',
319 lambda: verify_all_services_healthy(LOCAL_CONSUL,
320 service_name='chameleon-rest') == True,
321 timeout=10)
322
323 # Chameleon takes some time to compile the protos and make them
324 # available. So let's wait 10 seconds
325 sleep(10)
326
327 def start_voltha(self):
328 t0 = time()
329 self.pt("Starting voltha ...")
330 cmd = command_defs['docker_compose_start_voltha']
331 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
332 self.assertEqual(rc, 0)
333
334 self.pt("Waiting for voltha to be ready ...")
335 self.wait_till('voltha service HEALTHY',
336 lambda: verify_all_services_healthy(LOCAL_CONSUL,
337 service_name='voltha-grpc') == True,
338 timeout=30)
339 self.pt("Voltha is ready ...")
340
341 def stop_voltha(self, remove=False):
342 t0 = time()
343 self.pt("Stopping voltha ...")
344 cmd = command_defs['docker_compose_stop_voltha']
345 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
346 self.assertEqual(rc, 0)
347
348 if remove:
349 cmd = command_defs['docker_compose_remove_voltha']
350 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
351 self.assertEqual(rc, 0)
352
353 def get_devices(self):
354 devices = self.get('/api/v1/devices')['items']
355 return devices
356
357 def get_logical_devices(self):
358 ldevices = self.get('/api/v1/logical_devices')['items']
359 return ldevices
360
361 def get_adapters(self):
362 adapters = self.get('/api/v1/local/adapters')['items']
363 return adapters
364
365 def verify_instance_has_no_data(self):
366 data = self.get_voltha_instance_data()
367 self.assertEqual(data['logical_devices'], [])
368 self.assertEqual(data['devices'], [])
369
370 def add_data_to_voltha_instance(self):
371 # Preprovision a bunch of ponsim devices
372 self.n_olts = 100
373 self.olt_devices = {}
374 for i in xrange(self.n_olts):
375 d = self.add_olt_device()
376 self.olt_devices[d['id']] = d
377
378 def get_voltha_instance_data(self):
379 return self.get('/api/v1/local', headers={'get-depth': '-1'})
380
381 def add_olt_device(self):
382 device = Device(
383 type='ponsim_olt',
384 host_and_port='172.17.0.1:50060'
385 )
386 device = self.post('/api/v1/devices', MessageToDict(device),
387 expected_code=200)
388 return device
389
390 def get_olt_onu_devices(self):
391 devices = self.get('/api/v1/devices')['items']
392 olt_ids = []
393 onu_ids = []
394 for d in devices:
395 if d['adapter'] == 'ponsim_olt':
396 olt_ids.append(d['id'])
397 elif d['adapter'] == 'ponsim_onu':
398 onu_ids.append(d['id'])
399 else:
400 onu_ids.append(d['id'])
401 return olt_ids, onu_ids
402
403 def verify_device_preprovisioned_state(self, olt_id):
404 # we also check that so far what we read back is same as what we get
405 # back on create
406 device = self.get('/api/v1/devices/{}'.format(olt_id))
407 self.assertNotEqual(device['id'], '')
408 self.assertEqual(device['adapter'], 'ponsim_olt')
409 self.assertEqual(device['admin_state'], 'PREPROVISIONED')
410 self.assertEqual(device['oper_status'], 'UNKNOWN')
411
412 def enable_device(self, olt_id):
413 path = '/api/v1/devices/{}'.format(olt_id)
414 self.post(path + '/enable', expected_code=200)
415 device = self.get(path)
416 self.assertEqual(device['admin_state'], 'ENABLED')
417
418 self.wait_till(
419 'admin state moves to ACTIVATING or ACTIVE',
420 lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
421 timeout=0.5)
422
423 # eventually, it shall move to active state and by then we shall have
424 # device details filled, connect_state set, and device ports created
425 self.wait_till(
426 'admin state ACTIVE',
427 lambda: self.get(path)['oper_status'] == 'ACTIVE',
428 timeout=0.5)
429 device = self.get(path)
430 self.assertEqual(device['connect_status'], 'REACHABLE')
431
432 ports = self.get(path + '/ports')['items']
433 self.assertEqual(len(ports), 2)
434
435 def wait_for_logical_device(self, olt_id):
436 # we shall find the logical device id from the parent_id of the olt
437 # (root) device
438 device = self.get(
439 '/api/v1/devices/{}'.format(olt_id))
440 self.assertNotEqual(device['parent_id'], '')
441 logical_device = self.get(
442 '/api/v1/logical_devices/{}'.format(device['parent_id']))
443
444 # the logical device shall be linked back to the hard device,
445 # its ports too
446 self.assertEqual(logical_device['root_device_id'], device['id'])
447
448 logical_ports = self.get(
449 '/api/v1/logical_devices/{}/ports'.format(
450 logical_device['id'])
451 )['items']
452 self.assertGreaterEqual(len(logical_ports), 1)
453 logical_port = logical_ports[0]
454 self.assertEqual(logical_port['id'], 'nni')
455 self.assertEqual(logical_port['ofp_port']['name'], 'nni')
456 self.assertEqual(logical_port['ofp_port']['port_no'], 0)
457 self.assertEqual(logical_port['device_id'], device['id'])
458 self.assertEqual(logical_port['device_port_no'], 2)
459 return logical_device['id']
460
461 def find_onus(self, olt_id):
462 devices = self.get('/api/v1/devices')['items']
463 return [
464 d for d in devices
465 if d['parent_id'] == olt_id
466 ]
467
468 def wait_for_onu_discovery(self, olt_id):
469 # shortly after we shall see the discovery of four new onus, linked to
470 # the olt device
471 self.wait_till(
472 'find four ONUs linked to the olt device',
473 lambda: len(self.find_onus(olt_id)) >= 4,
474 2
475 )
476 # verify that they are properly set
477 onus = self.find_onus(olt_id)
478 for onu in onus:
479 self.assertEqual(onu['admin_state'], 'ENABLED')
480 self.assertEqual(onu['oper_status'], 'ACTIVE')
481
482 return [onu['id'] for onu in onus]
483
484 def assert_all_onus_state(self, olt_id, admin_state, oper_state):
485 # verify all onus are in a given state
486 onus = self.find_onus(olt_id)
487 for onu in onus:
488 self.assertEqual(onu['admin_state'], admin_state)
489 self.assertEqual(onu['oper_status'], oper_state)
490
491 return [onu['id'] for onu in onus]
492
493 def assert_onu_state(self, onu_id, admin_state, oper_state):
494 # Verify the onu states are correctly set
495 onu = self.get('/api/v1/devices/{}'.format(onu_id))
496 self.assertEqual(onu['admin_state'], admin_state)
497 self.assertEqual(onu['oper_status'], oper_state)
498
499 def verify_logical_ports(self, ldev_id, num_ports):
500
501 # at this point we shall see num_ports logical ports on the
502 # logical device
503 logical_ports = self.get(
504 '/api/v1/logical_devices/{}/ports'.format(ldev_id)
505 )['items']
506 self.assertGreaterEqual(len(logical_ports), num_ports)
507
508 # verify that all logical ports are LIVE (state=4)
509 for lport in logical_ports:
510 self.assertEqual(lport['ofp_port']['state'], 4)
511
512 def simulate_eapol_flow_install(self, ldev_id, olt_id, onu_ids):
513
514 # emulate the flow mod requests that shall arrive from the SDN
515 # controller, one for each ONU
516 lports = self.get(
517 '/api/v1/logical_devices/{}/ports'.format(ldev_id)
518 )['items']
519
520 # device_id -> logical port map, which we will use to construct
521 # our flows
522 lport_map = dict((lp['device_id'], lp) for lp in lports)
523 for onu_id in onu_ids:
524 # if eth_type == 0x888e => send to controller
525 _in_port = lport_map[onu_id]['ofp_port']['port_no']
526 req = ofp.FlowTableUpdate(
527 id='ponsim1',
528 flow_mod=mk_simple_flow_mod(
529 match_fields=[
530 in_port(_in_port),
531 vlan_vid(ofp.OFPVID_PRESENT | 0),
532 eth_type(0x888e)],
533 actions=[
534 output(ofp.OFPP_CONTROLLER)
535 ],
536 priority=1000
537 )
538 )
539 res = self.post('/api/v1/logical_devices/{}/flows'.format(ldev_id),
540 MessageToDict(req,
541 preserving_proto_field_name=True),
542 expected_code=200)
543
544 # for sanity, verify that flows are in flow table of logical device
545 flows = self.get(
546 '/api/v1/logical_devices/{}/flows'.format(ldev_id))['items']
547 self.assertGreaterEqual(len(flows), 4)
548
549 def verify_olt_eapol_flow(self, olt_id):
550 # olt shall have two flow rules, one is the default and the
551 # second is the result of eapol forwarding with rule:
552 # if eth_type == 0x888e => push vlan(1000); out_port=nni_port
553 flows = self.get('/api/v1/devices/{}/flows'.format(olt_id))['items']
554 self.assertEqual(len(flows), 2)
555 flow = flows[1]
556 self.assertEqual(flow['table_id'], 0)
557 self.assertEqual(flow['priority'], 1000)
558
559 # TODO refine this
560 # self.assertEqual(flow['match'], {})
561 # self.assertEqual(flow['instructions'], [])
562
563 def disable_device(self, id):
564 path = '/api/v1/devices/{}'.format(id)
565 self.post(path + '/disable', expected_code=200)
566 device = self.get(path)
567 self.assertEqual(device['admin_state'], 'DISABLED')
568
569 self.wait_till(
570 'operational state moves to UNKNOWN',
571 lambda: self.get(path)['oper_status'] == 'UNKNOWN',
572 timeout=0.5)
573
574 # eventually, the connect_state should be UNREACHABLE
575 self.wait_till(
576 'connest status UNREACHABLE',
577 lambda: self.get(path)['connect_status'] == 'UNREACHABLE',
578 timeout=0.5)
579
580 # Device's ports should be INACTIVE
581 ports = self.get(path + '/ports')['items']
582 self.assertEqual(len(ports), 2)
583 for p in ports:
584 self.assertEqual(p['admin_state'], 'DISABLED')
585 self.assertEqual(p['oper_status'], 'UNKNOWN')
586
587 def delete_device(self, id):
588 path = '/api/v1/devices/{}'.format(id)
589 self.delete(path + '/delete', expected_code=200)
590 device = self.get(path, expected_code=404)
591 self.assertIsNone(device)
592
593 def assert_no_device_present(self):
594 path = '/api/v1/devices'
595 devices = self.get(path)['items']
596 self.assertEqual(devices, [])
597
598 def assert_no_logical_device(self):
599 path = '/api/v1/logical_devices'
600 ld = self.get(path)['items']
601 self.assertEqual(ld, [])
602
603 def delete_device_incorrect_state(self, id):
604 path = '/api/v1/devices/{}'.format(id)
605 self.delete(path + '/delete', expected_code=400)
606
607 def enable_unknown_device(self, id):
608 path = '/api/v1/devices/{}'.format(id)
609 self.post(path + '/enable', expected_code=404)
610
611 def disable_unknown_device(self, id):
612 path = '/api/v1/devices/{}'.format(id)
613 self.post(path + '/disable', expected_code=404)
614
615 def delete_unknown_device(self, id):
616 path = '/api/v1/devices/{}'.format(id)
617 self.delete(path + '/delete', expected_code=404)
618
619 def assert_alarm_generation(self, device_id, event_present=True):
620 # The olt device should start generating alarms periodically
621 alarm = self.assert_alarm_event(device_id, event_present)
622
623 if event_present:
624 self.assertIsNotNone(alarm)
625 # Make sure that the schema is valid
626 self.assert_alarm_event_schema(alarm)
627
628 # Validate a sample alarm for a specific device
629 def assert_alarm_event(self, device_id, event_present=True):
630 self.alarm_data = None
631
632 def validate_output(data):
633 alarm = simplejson.loads(data)
634
635 if not alarm or \
636 'resource_id' not in alarm or \
637 'reported_ts' not in alarm:
638 return False
639
640 # Check for new alarms only
641 if alarm['reported_ts'] > self.consume_kafka_message_starting_at:
642 if alarm['resource_id'] == device_id:
643 self.alarm_data = alarm
644 return True
645
646 cmd = command_defs['kafka_alarms'].format(self.kafka_endpoint)
647
648 self.run_command_and_wait_until(cmd, validate_output, 30, 'alarms',
649 expected_predicate_result=event_present)
650
651 return self.alarm_data
652
653 # Validate a sample kpi for a specific device
654 def assert_kpis_event(self, device_id, event_present=True):
655
656 def validate_output(data):
657 kpis_data = simplejson.loads(data)
658
659 if not kpis_data or \
660 'ts' not in kpis_data or \
661 'prefixes' not in kpis_data:
662 return False
663
664 # Check only new kpis
665 if kpis_data['ts'] > self.consume_kafka_message_starting_at:
666 for key, value in kpis_data['prefixes'].items():
667 if device_id in key:
668 return True
669 return False
670
671 cmd = command_defs['kafka_kpis'].format(self.kafka_endpoint)
672
673 self.run_command_and_wait_until(cmd, validate_output, 60, 'kpis',
674 expected_predicate_result=event_present)
675
676 # Verify that the alarm follows the proper schema structure
677 def assert_alarm_event_schema(self, alarm):
678 try:
679 jsonschema.validate(alarm, ALARM_SCHEMA)
680 except Exception as e:
681 self.assertTrue(
682 False, 'Validation failed for alarm : {}'.format(e.message))
683
684 def create_device_filter(self, device_id):
685 rules = list()
686 rule = dict()
687
688 # Create a filter with a single rule
689 rule['key'] = 'device_id'
690 rule['value'] = device_id
691 rules.append(rule)
692
693 alarm_filter = AlarmFilter(rules=rules)
khenaidoo08d48d22017-06-29 19:42:49 -0400694 alarm_filter = self.post('/api/v1/alarm_filters',
khenaidoo032d3302017-06-09 14:50:04 -0400695 MessageToDict(alarm_filter),
696 expected_code=200)
697 self.assertIsNotNone(alarm_filter)
698 return alarm_filter
699
700 def remove_device_filter(self, alarm_filter_id):
khenaidoo08d48d22017-06-29 19:42:49 -0400701 path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
khenaidoo032d3302017-06-09 14:50:04 -0400702 self.delete(path, expected_code=200)
703 alarm_filter = self.get(path, expected_code=404)
704 self.assertIsNone(alarm_filter)
705
706 def run_command_and_wait_until(self, cmd, predicate, timeout, msg,
707 expected_predicate_result=True):
708 # Run until the predicate is True or timeout
709 try:
710 deadline = time() + timeout
711 env = os.environ.copy()
712 proc = subprocess.Popen(
713 cmd,
714 env=env,
715 shell=True,
716 stdout=subprocess.PIPE,
717 stderr=subprocess.PIPE,
718 bufsize=1
719 )
720 poll_obj = select.poll()
721 poll_obj.register(proc.stdout, select.POLLIN)
722 while time() < deadline:
723 poll_result = poll_obj.poll(0)
724 if poll_result:
725 line = proc.stdout.readline()
726 if predicate(line):
727 try:
728 proc.terminate()
729 proc.wait()
730 subprocess.Popen(['reset']).wait()
731 except Exception as e:
732 print "Received exception {} when killing process " \
733 "started with {}".format(repr(e), cmd)
734 if not expected_predicate_result:
735 self.fail(
736 'Predicate is True but is should be false:{}'.
737 format(msg))
738 else:
739 return
740 try:
741 proc.terminate()
742 proc.wait()
743 subprocess.Popen(['reset']).wait()
744 except Exception as e:
745 print "Received exception {} when killing process " \
746 "started with {}".format(repr(e), cmd)
747
748 if expected_predicate_result:
749 self.fail(
750 'Timed out while waiting for condition: {}'.format(msg))
751
752 except Exception as e:
753 print 'Exception {} when running command:{}'.format(repr(e), cmd)
754 return