blob: 5ba10b43b9c07f547ecf11ccb505f7bb0c46a3f7 [file] [log] [blame]
Zack Williams41513bf2018-07-07 20:08:35 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
khenaidoo032d3302017-06-09 14:50:04 -040014from google.protobuf.json_format import MessageToDict
15from time import time, sleep
16from voltha.core.flow_decomposer import *
17from voltha.protos import openflow_13_pb2 as ofp
18import simplejson, jsonschema
19import os
20import subprocess
21import select
22
Richard Jankowski8f52afb2018-03-29 14:19:11 -040023from tests.itests.test_utils import \
khenaidoo032d3302017-06-09 14:50:04 -040024 run_command_to_completion_with_raw_stdout, \
25 run_command_to_completion_with_stdout_in_list
Stephane Barbariecd51f992017-09-07 16:37:02 -040026from unittest import skip
khenaidoo032d3302017-06-09 14:50:04 -040027
28from common.utils.consulhelpers import verify_all_services_healthy
29
30from voltha.protos.device_pb2 import Device
31from tests.itests.voltha.rest_base import RestBase
32from common.utils.consulhelpers import get_endpoint_from_consul
33from voltha.protos.voltha_pb2 import AlarmFilter
34
35LOCAL_CONSUL = "localhost:8500"
khenaidoo3d3f99a2017-10-10 15:53:42 -040036DOCKER_COMPOSE_FILE = "compose/docker-compose-system-test-persistence.yml"
khenaidoo032d3302017-06-09 14:50:04 -040037
38command_defs = dict(
39 docker_ps="docker ps",
40 docker_compose_start_all="docker-compose -f {} up -d "
41 .format(DOCKER_COMPOSE_FILE),
42 docker_stop_and_remove_all_containers="docker-compose -f {} down"
43 .format(DOCKER_COMPOSE_FILE),
44 docker_compose_start_voltha="docker-compose -f {} up -d voltha "
45 .format(DOCKER_COMPOSE_FILE),
46 docker_compose_stop_voltha="docker-compose -f {} stop voltha"
47 .format(DOCKER_COMPOSE_FILE),
48 docker_compose_remove_voltha="docker-compose -f {} rm -f voltha"
49 .format(DOCKER_COMPOSE_FILE),
50 kafka_topics="kafkacat -b {} -L",
51 kafka_alarms="kafkacat -o end -b {} -C -t voltha.alarms -c 2",
52 kafka_kpis="kafkacat -o end -b {} -C -t voltha.kpis -c 5"
53)
54
55ALARM_SCHEMA = {
56 "type": "object",
57 "properties": {
58 "id": {"type": "string"},
59 "type": {"type": "string"},
60 "category": {"type": "string"},
61 "state": {"type": "string"},
62 "severity": {"type": "string"},
63 "resource_id": {"type": "string"},
64 "raised_ts": {"type": "number"},
65 "reported_ts": {"type": "number"},
66 "changed_ts": {"type": "number"},
67 "description": {"type": "string"},
68 "context": {
69 "type": "object",
70 "additionalProperties": {"type": "string"}
71 }
72 }
73}
74
75
76class TestConsulPersistence(RestBase):
77 t0 = [time()]
78
79 def pt(self, msg=''):
80 t1 = time()
81 print '%20.8f ms - %s' % (1000 * (t1 - TestConsulPersistence.t0[0]),
82 msg)
83 TestConsulPersistence.t0[0] = t1
84
khenaidoo3d3f99a2017-10-10 15:53:42 -040085 # @skip('Test case hangs during execution. Investigation required. Refer to VOL-425 and VOL-427')
khenaidoo032d3302017-06-09 14:50:04 -040086 def test_all_scenarios(self):
87 self.basic_scenario()
88 self.data_integrity()
89
90 def basic_scenario(self):
91 # 1. Setup the test
92 # A. Stop and restart all containers (to start from clean)
93 # B. Setup the REST endpoint
94 self.pt('Test setup - starts')
95 self._stop_and_remove_all_containers()
96 sleep(5) # A small wait for the system to settle down
97 self.start_all_containers()
98 self.set_rest_endpoint()
99 self.set_kafka_endpoint()
100 self.pt('Test setup - ends')
101
102 # 2. Test 1 - Verify no data is present in voltha
103 self.pt('Test 1 - starts')
104 self.verify_instance_has_no_data()
105 self.pt('Test 1 - ends')
106
107 # 3. Test 2 - Verify voltha data is preserved after a restart
108 # A. Add data to voltha
109 # B. Stop voltha instance only (data is in consul)
110 # C. Start a new voltha instance
111 # D. Verify the data is previoulsy set data is in the new voltha
112 # instance
113 self.pt('Test 2 - starts')
114 self.add_data_to_voltha_instance()
115 instance_data_before = self.get_voltha_instance_data()
116 self.stop_remove_start_voltha()
117 instance_data_after = self.get_voltha_instance_data()
118 self.assertEqual(instance_data_before, instance_data_after)
119 self.pt('Test 2 - ends')
120
121 def data_integrity(self):
122 """
123 This test goes through several voltha restarts along with variations
124 of configurations in between to ensure data integrity is preserved.
125
126 During this test, the user will be prompted to start ponsim. Use
127 these commands to run ponsim with 1 OLT and 4 ONUs. THis will also
128 enable alarm at a frequency of 5 seconds:
129 sudo -s
130 . ./env.sh
131 ./ponsim/main.py -v -o 4 -a -f 5
132
133 The user will also be prompted to enable port forwarding on ponmgmt
134 bridge. Use these commands:
135 sudo -s
136 echo 8 > /sys/class/net/ponmgmt/bridge/group_fwd_mask
137 """
138
139 def prompt(input_func, text):
140 val = input_func(text)
141 return val
142
143 def prompt_for_return(text):
144 return raw_input(text)
145
146 # 1. Setup the test
147 # A. Stop and restart all containers (to start from clean)
148 # B. Setup the REST endpoint
149 self.pt('Test setup - starts')
150 self._stop_and_remove_all_containers()
151 sleep(5) # A small wait for the system to settle down
152 self.start_all_containers()
153 self.consume_kafka_message_starting_at = time()
154 self.set_rest_endpoint()
155 self.set_kafka_endpoint()
156 self.pt('Test setup - ends')
157
158 # Get the user to start PONSIM as root
159 prompt(prompt_for_return,
160 '\nStart PONSIM as root with alarms enabled in another window ...')
161
162 prompt(prompt_for_return,
163 '\nEnsure port forwarding is set on ponmgnt ...')
164
165 # 2. Configure some data on the volthainstance
166 self.assert_no_device_present()
khenaidoo3d3f99a2017-10-10 15:53:42 -0400167 host = '172.17.0.1'
168 olt = self.add_olt_device(host)
khenaidoo032d3302017-06-09 14:50:04 -0400169 olt_id = olt['id']
170 self.pt(olt_id)
171 self.verify_device_preprovisioned_state(olt_id)
172 self.enable_device(olt_id)
173 ldev_id = self.wait_for_logical_device(olt_id)
174 onu_ids = self.wait_for_onu_discovery(olt_id)
175 self.verify_logical_ports(ldev_id, 5)
176 self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
177 self.verify_olt_eapol_flow(olt_id)
178 self.assert_kpis_event(olt_id)
179 self.assert_alarm_generation(olt_id)
180 alarm_filter = self.create_device_filter(olt_id)
181 self.consume_kafka_message_starting_at = time()
182 self.assert_alarm_generation(olt_id, event_present=False)
183
184 # 3. Kill and restart the voltha instance
185 self.assert_restart_voltha_successful()
186 self.assert_kpis_event(olt_id)
187 self.remove_device_filter(alarm_filter['id'])
188 self.assert_alarm_generation(olt_id)
189
190 self.pt('Voltha restart with initial set of data - successful')
191
192 # 4. Ensure we can keep doing operation on the new voltha instance
193 # as if nothing happened
194 olt_ids, onu_ids = self.get_olt_onu_devices()
195 self.disable_device(onu_ids[0])
196 self.verify_logical_ports(ldev_id, 4)
197
198 # 5. Kill and restart the voltha instance
199 self.assert_restart_voltha_successful()
200 self.assert_kpis_event(olt_id)
201 alarm_filter = self.create_device_filter(olt_id)
202 self.consume_kafka_message_starting_at = time()
203 self.assert_alarm_generation(olt_id, event_present=False)
204 self.remove_device_filter(alarm_filter['id'])
205 self.assert_alarm_generation(olt_id)
206
207 self.pt('Voltha restart with disabled ONU - successful')
208
209 # 6. Do some more operations
210 self.enable_device(onu_ids[0])
211 self.verify_logical_ports(ldev_id, 5)
212 self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
213 self.verify_olt_eapol_flow(olt_id)
214 self.disable_device(olt_ids[0])
215 self.assert_all_onus_state(olt_ids[0], 'DISABLED', 'UNKNOWN')
216 self.assert_no_logical_device()
217
218 # 6. Kill and restart the voltha instance
219 self.assert_restart_voltha_successful()
220 self.assert_kpis_event(olt_id, event_present=False)
221 self.assert_alarm_generation(olt_id, event_present=False)
222
223 self.pt('Voltha restart with disabled OLT - successful')
224
225 # 7. Enable OLT and very states of ONUs
226 self.enable_device(olt_ids[0])
227 self.assert_all_onus_state(olt_ids[0], 'ENABLED', 'ACTIVE')
228 self.wait_for_logical_device(olt_ids[0])
229
230 # 8. Kill and restart the voltha instance
231 self.assert_restart_voltha_successful()
232 self.assert_kpis_event(olt_id)
233 self.assert_alarm_generation(olt_id)
234
235 self.pt('Voltha restart with re-enabled OLT - successful')
236
237 # 9. Install EAPOL and disable ONU
238 self.simulate_eapol_flow_install(ldev_id, olt_id, onu_ids)
239 self.verify_olt_eapol_flow(olt_id)
240 self.disable_device(onu_ids[0])
241
242 # 10. Kill and restart the voltha instance
243 self.assert_restart_voltha_successful()
244 self.assert_kpis_event(olt_id)
245 self.assert_alarm_generation(olt_id)
246
247 self.pt('Voltha restart with EAPOL and disabled ONU - successful')
248
249 # 11. Delete the OLT and ONU
250 self.delete_device(onu_ids[0])
251 self.verify_logical_ports(ldev_id, 4)
252 self.disable_device(olt_ids[0])
253 self.delete_device(olt_ids[0])
254 self.assert_no_device_present()
255
256 # 13. Kill and restart the voltha instance
257 self.assert_restart_voltha_successful()
258 self.assert_kpis_event(olt_id, event_present=False)
259 self.assert_alarm_generation(olt_id, event_present=False)
260
261 self.pt('Voltha restart with no data - successful')
262
263 # 14. Verify no device present
264 self.assert_no_device_present()
265
266 def set_rest_endpoint(self):
267 self.rest_endpoint = get_endpoint_from_consul(LOCAL_CONSUL,
Richard Jankowski461cb972018-04-11 15:36:27 -0400268 'voltha-envoy-8443')
ubuntuc5c83d72017-07-01 17:57:19 -0700269 self.base_url = 'https://' + self.rest_endpoint
khenaidoo032d3302017-06-09 14:50:04 -0400270
271 def set_kafka_endpoint(self):
272 self.kafka_endpoint = get_endpoint_from_consul(LOCAL_CONSUL, 'kafka')
273
274 def assert_restart_voltha_successful(self):
275 self.maxDiff = None
276 instance_data_before = self.get_voltha_instance_data()
277 self.stop_remove_start_voltha()
278 instance_data_after = self.get_voltha_instance_data()
279 self.assertEqual(instance_data_before, instance_data_after)
280
281 def stop_remove_start_voltha(self):
282 self.stop_voltha(remove=True)
283 self.consume_kafka_message_starting_at = time()
284 self.start_voltha()
285 # REST endpoint may have changed after a voltha restart
286 # Send a basic command to trigger the REST endpoint to refresh itself
287 try:
288 self.get_devices()
289 except Exception as e:
290 self.pt('get-devices-fail expected')
291 # Wait for everything to settle
khenaidoo032d3302017-06-09 14:50:04 -0400292 sleep(10)
293 # Update the REST endpoint info
294 self.set_rest_endpoint()
295
296 def wait_till(self, msg, predicate, interval=0.1, timeout=5.0):
297 deadline = time() + timeout
298 while time() < deadline:
299 if predicate():
300 return
301 sleep(interval)
302 self.fail('Timed out while waiting for condition: {}'.format(msg))
303
304 def _stop_and_remove_all_containers(self):
305 # check if there are any running containers first
306 cmd = command_defs['docker_ps']
307 out, err, rc = run_command_to_completion_with_stdout_in_list(cmd)
308 self.assertEqual(rc, 0)
309 if len(out) > 1: # not counting docker ps header
310 cmd = command_defs['docker_stop_and_remove_all_containers']
311 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
312 self.assertEqual(rc, 0)
313
314 def start_all_containers(self):
315 t0 = time()
316
317 # start all the containers
318 self.pt("Starting all containers ...")
319 cmd = command_defs['docker_compose_start_all']
320 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
321 self.assertEqual(rc, 0)
322
khenaidoo079a7762017-10-26 21:42:05 -0400323 self.pt("Waiting for voltha container to be ready ...")
khenaidoo032d3302017-06-09 14:50:04 -0400324 self.wait_till('voltha services HEALTHY',
325 lambda: verify_all_services_healthy(LOCAL_CONSUL,
khenaidoo3d3f99a2017-10-10 15:53:42 -0400326 service_name='vcore-grpc') == True,
khenaidoo032d3302017-06-09 14:50:04 -0400327 timeout=10)
khenaidoo032d3302017-06-09 14:50:04 -0400328 sleep(10)
329
330 def start_voltha(self):
331 t0 = time()
332 self.pt("Starting voltha ...")
333 cmd = command_defs['docker_compose_start_voltha']
334 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
335 self.assertEqual(rc, 0)
336
337 self.pt("Waiting for voltha to be ready ...")
338 self.wait_till('voltha service HEALTHY',
339 lambda: verify_all_services_healthy(LOCAL_CONSUL,
khenaidoo3d3f99a2017-10-10 15:53:42 -0400340 service_name='vcore-grpc') == True,
khenaidoo032d3302017-06-09 14:50:04 -0400341 timeout=30)
342 self.pt("Voltha is ready ...")
343
344 def stop_voltha(self, remove=False):
345 t0 = time()
346 self.pt("Stopping voltha ...")
347 cmd = command_defs['docker_compose_stop_voltha']
348 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
349 self.assertEqual(rc, 0)
350
351 if remove:
352 cmd = command_defs['docker_compose_remove_voltha']
353 out, err, rc = run_command_to_completion_with_raw_stdout(cmd)
354 self.assertEqual(rc, 0)
355
356 def get_devices(self):
357 devices = self.get('/api/v1/devices')['items']
358 return devices
359
360 def get_logical_devices(self):
361 ldevices = self.get('/api/v1/logical_devices')['items']
362 return ldevices
363
364 def get_adapters(self):
Stephane Barbariecd51f992017-09-07 16:37:02 -0400365 adapters = self.get('/api/v1/adapters')['items']
khenaidoo032d3302017-06-09 14:50:04 -0400366 return adapters
367
368 def verify_instance_has_no_data(self):
369 data = self.get_voltha_instance_data()
Stephane Barbariecd51f992017-09-07 16:37:02 -0400370 self.assertEqual(data['logical_devices']['items'], None or [])
371 self.assertEqual(data['devices']['items'], None or [])
khenaidoo032d3302017-06-09 14:50:04 -0400372
373 def add_data_to_voltha_instance(self):
374 # Preprovision a bunch of ponsim devices
375 self.n_olts = 100
376 self.olt_devices = {}
377 for i in xrange(self.n_olts):
khenaidoo3d3f99a2017-10-10 15:53:42 -0400378 host = '172.17.1.{}'.format(i)
379 d = self.add_olt_device(host)
khenaidoo032d3302017-06-09 14:50:04 -0400380 self.olt_devices[d['id']] = d
381
382 def get_voltha_instance_data(self):
Stephane Barbariecd51f992017-09-07 16:37:02 -0400383 data = {}
384 data['devices'] = self.get('/api/v1/devices')
385 data['logical_devices'] = self.get('/api/v1/logical_devices')
386 return data
khenaidoo032d3302017-06-09 14:50:04 -0400387
khenaidoo3d3f99a2017-10-10 15:53:42 -0400388 def add_olt_device(self, host):
khenaidoo032d3302017-06-09 14:50:04 -0400389 device = Device(
390 type='ponsim_olt',
khenaidoo3d3f99a2017-10-10 15:53:42 -0400391 host_and_port='{}:50060'.format(host)
khenaidoo032d3302017-06-09 14:50:04 -0400392 )
393 device = self.post('/api/v1/devices', MessageToDict(device),
Stephane Barbariecd51f992017-09-07 16:37:02 -0400394 expected_http_code=200)
khenaidoo032d3302017-06-09 14:50:04 -0400395 return device
396
397 def get_olt_onu_devices(self):
398 devices = self.get('/api/v1/devices')['items']
399 olt_ids = []
400 onu_ids = []
401 for d in devices:
402 if d['adapter'] == 'ponsim_olt':
403 olt_ids.append(d['id'])
404 elif d['adapter'] == 'ponsim_onu':
405 onu_ids.append(d['id'])
406 else:
407 onu_ids.append(d['id'])
408 return olt_ids, onu_ids
409
410 def verify_device_preprovisioned_state(self, olt_id):
411 # we also check that so far what we read back is same as what we get
412 # back on create
413 device = self.get('/api/v1/devices/{}'.format(olt_id))
414 self.assertNotEqual(device['id'], '')
415 self.assertEqual(device['adapter'], 'ponsim_olt')
416 self.assertEqual(device['admin_state'], 'PREPROVISIONED')
417 self.assertEqual(device['oper_status'], 'UNKNOWN')
418
419 def enable_device(self, olt_id):
420 path = '/api/v1/devices/{}'.format(olt_id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400421 self.post(path + '/enable', expected_http_code=200)
khenaidoo032d3302017-06-09 14:50:04 -0400422 device = self.get(path)
423 self.assertEqual(device['admin_state'], 'ENABLED')
424
425 self.wait_till(
426 'admin state moves to ACTIVATING or ACTIVE',
427 lambda: self.get(path)['oper_status'] in ('ACTIVATING', 'ACTIVE'),
428 timeout=0.5)
429
430 # eventually, it shall move to active state and by then we shall have
431 # device details filled, connect_state set, and device ports created
432 self.wait_till(
433 'admin state ACTIVE',
434 lambda: self.get(path)['oper_status'] == 'ACTIVE',
435 timeout=0.5)
436 device = self.get(path)
437 self.assertEqual(device['connect_status'], 'REACHABLE')
438
439 ports = self.get(path + '/ports')['items']
440 self.assertEqual(len(ports), 2)
441
442 def wait_for_logical_device(self, olt_id):
443 # we shall find the logical device id from the parent_id of the olt
444 # (root) device
445 device = self.get(
446 '/api/v1/devices/{}'.format(olt_id))
447 self.assertNotEqual(device['parent_id'], '')
448 logical_device = self.get(
449 '/api/v1/logical_devices/{}'.format(device['parent_id']))
450
451 # the logical device shall be linked back to the hard device,
452 # its ports too
453 self.assertEqual(logical_device['root_device_id'], device['id'])
454
455 logical_ports = self.get(
456 '/api/v1/logical_devices/{}/ports'.format(
457 logical_device['id'])
458 )['items']
459 self.assertGreaterEqual(len(logical_ports), 1)
460 logical_port = logical_ports[0]
461 self.assertEqual(logical_port['id'], 'nni')
462 self.assertEqual(logical_port['ofp_port']['name'], 'nni')
463 self.assertEqual(logical_port['ofp_port']['port_no'], 0)
464 self.assertEqual(logical_port['device_id'], device['id'])
465 self.assertEqual(logical_port['device_port_no'], 2)
466 return logical_device['id']
467
468 def find_onus(self, olt_id):
469 devices = self.get('/api/v1/devices')['items']
470 return [
471 d for d in devices
472 if d['parent_id'] == olt_id
473 ]
474
475 def wait_for_onu_discovery(self, olt_id):
476 # shortly after we shall see the discovery of four new onus, linked to
477 # the olt device
478 self.wait_till(
479 'find four ONUs linked to the olt device',
480 lambda: len(self.find_onus(olt_id)) >= 4,
481 2
482 )
483 # verify that they are properly set
484 onus = self.find_onus(olt_id)
485 for onu in onus:
486 self.assertEqual(onu['admin_state'], 'ENABLED')
487 self.assertEqual(onu['oper_status'], 'ACTIVE')
488
489 return [onu['id'] for onu in onus]
490
491 def assert_all_onus_state(self, olt_id, admin_state, oper_state):
492 # verify all onus are in a given state
493 onus = self.find_onus(olt_id)
494 for onu in onus:
495 self.assertEqual(onu['admin_state'], admin_state)
496 self.assertEqual(onu['oper_status'], oper_state)
497
498 return [onu['id'] for onu in onus]
499
500 def assert_onu_state(self, onu_id, admin_state, oper_state):
501 # Verify the onu states are correctly set
502 onu = self.get('/api/v1/devices/{}'.format(onu_id))
503 self.assertEqual(onu['admin_state'], admin_state)
504 self.assertEqual(onu['oper_status'], oper_state)
505
506 def verify_logical_ports(self, ldev_id, num_ports):
507
508 # at this point we shall see num_ports logical ports on the
509 # logical device
510 logical_ports = self.get(
511 '/api/v1/logical_devices/{}/ports'.format(ldev_id)
512 )['items']
513 self.assertGreaterEqual(len(logical_ports), num_ports)
514
515 # verify that all logical ports are LIVE (state=4)
516 for lport in logical_ports:
517 self.assertEqual(lport['ofp_port']['state'], 4)
518
519 def simulate_eapol_flow_install(self, ldev_id, olt_id, onu_ids):
520
521 # emulate the flow mod requests that shall arrive from the SDN
522 # controller, one for each ONU
523 lports = self.get(
524 '/api/v1/logical_devices/{}/ports'.format(ldev_id)
525 )['items']
526
527 # device_id -> logical port map, which we will use to construct
528 # our flows
529 lport_map = dict((lp['device_id'], lp) for lp in lports)
530 for onu_id in onu_ids:
531 # if eth_type == 0x888e => send to controller
532 _in_port = lport_map[onu_id]['ofp_port']['port_no']
533 req = ofp.FlowTableUpdate(
Stephane Barbariecd51f992017-09-07 16:37:02 -0400534 id=ldev_id,
khenaidoo032d3302017-06-09 14:50:04 -0400535 flow_mod=mk_simple_flow_mod(
536 match_fields=[
537 in_port(_in_port),
538 vlan_vid(ofp.OFPVID_PRESENT | 0),
539 eth_type(0x888e)],
540 actions=[
541 output(ofp.OFPP_CONTROLLER)
542 ],
543 priority=1000
544 )
545 )
546 res = self.post('/api/v1/logical_devices/{}/flows'.format(ldev_id),
547 MessageToDict(req,
548 preserving_proto_field_name=True),
Stephane Barbariecd51f992017-09-07 16:37:02 -0400549 expected_http_code=200)
khenaidoo032d3302017-06-09 14:50:04 -0400550
551 # for sanity, verify that flows are in flow table of logical device
552 flows = self.get(
553 '/api/v1/logical_devices/{}/flows'.format(ldev_id))['items']
554 self.assertGreaterEqual(len(flows), 4)
555
556 def verify_olt_eapol_flow(self, olt_id):
557 # olt shall have two flow rules, one is the default and the
558 # second is the result of eapol forwarding with rule:
559 # if eth_type == 0x888e => push vlan(1000); out_port=nni_port
560 flows = self.get('/api/v1/devices/{}/flows'.format(olt_id))['items']
Richard Jankowski461cb972018-04-11 15:36:27 -0400561 self.assertEqual(len(flows), 8)
khenaidoo032d3302017-06-09 14:50:04 -0400562 flow = flows[1]
563 self.assertEqual(flow['table_id'], 0)
564 self.assertEqual(flow['priority'], 1000)
565
566 # TODO refine this
567 # self.assertEqual(flow['match'], {})
568 # self.assertEqual(flow['instructions'], [])
569
570 def disable_device(self, id):
571 path = '/api/v1/devices/{}'.format(id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400572 self.post(path + '/disable', expected_http_code=200)
khenaidoo032d3302017-06-09 14:50:04 -0400573 device = self.get(path)
574 self.assertEqual(device['admin_state'], 'DISABLED')
575
576 self.wait_till(
577 'operational state moves to UNKNOWN',
578 lambda: self.get(path)['oper_status'] == 'UNKNOWN',
579 timeout=0.5)
580
581 # eventually, the connect_state should be UNREACHABLE
582 self.wait_till(
583 'connest status UNREACHABLE',
584 lambda: self.get(path)['connect_status'] == 'UNREACHABLE',
585 timeout=0.5)
586
587 # Device's ports should be INACTIVE
588 ports = self.get(path + '/ports')['items']
589 self.assertEqual(len(ports), 2)
590 for p in ports:
591 self.assertEqual(p['admin_state'], 'DISABLED')
592 self.assertEqual(p['oper_status'], 'UNKNOWN')
593
594 def delete_device(self, id):
595 path = '/api/v1/devices/{}'.format(id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400596 self.delete(path + '/delete', expected_http_code=200)
597 device = self.get(path, expected_http_code=200, grpc_status=5)
khenaidoo032d3302017-06-09 14:50:04 -0400598 self.assertIsNone(device)
599
600 def assert_no_device_present(self):
601 path = '/api/v1/devices'
602 devices = self.get(path)['items']
603 self.assertEqual(devices, [])
604
605 def assert_no_logical_device(self):
606 path = '/api/v1/logical_devices'
607 ld = self.get(path)['items']
608 self.assertEqual(ld, [])
609
610 def delete_device_incorrect_state(self, id):
611 path = '/api/v1/devices/{}'.format(id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400612 self.delete(path + '/delete', expected_http_code=200, grpc_status=3)
khenaidoo032d3302017-06-09 14:50:04 -0400613
614 def enable_unknown_device(self, id):
615 path = '/api/v1/devices/{}'.format(id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400616 self.post(path + '/enable', expected_http_code=200, grpc_status=5)
khenaidoo032d3302017-06-09 14:50:04 -0400617
618 def disable_unknown_device(self, id):
619 path = '/api/v1/devices/{}'.format(id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400620 self.post(path + '/disable', expected_http_code=200, grpc_status=5)
khenaidoo032d3302017-06-09 14:50:04 -0400621
622 def delete_unknown_device(self, id):
623 path = '/api/v1/devices/{}'.format(id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400624 self.delete(path + '/delete', expected_http_code=200, grpc_status=5)
khenaidoo032d3302017-06-09 14:50:04 -0400625
626 def assert_alarm_generation(self, device_id, event_present=True):
627 # The olt device should start generating alarms periodically
628 alarm = self.assert_alarm_event(device_id, event_present)
629
630 if event_present:
631 self.assertIsNotNone(alarm)
632 # Make sure that the schema is valid
633 self.assert_alarm_event_schema(alarm)
634
635 # Validate a sample alarm for a specific device
636 def assert_alarm_event(self, device_id, event_present=True):
637 self.alarm_data = None
638
639 def validate_output(data):
640 alarm = simplejson.loads(data)
641
642 if not alarm or \
643 'resource_id' not in alarm or \
644 'reported_ts' not in alarm:
645 return False
646
647 # Check for new alarms only
648 if alarm['reported_ts'] > self.consume_kafka_message_starting_at:
649 if alarm['resource_id'] == device_id:
650 self.alarm_data = alarm
651 return True
652
653 cmd = command_defs['kafka_alarms'].format(self.kafka_endpoint)
654
655 self.run_command_and_wait_until(cmd, validate_output, 30, 'alarms',
656 expected_predicate_result=event_present)
657
658 return self.alarm_data
659
660 # Validate a sample kpi for a specific device
661 def assert_kpis_event(self, device_id, event_present=True):
662
663 def validate_output(data):
664 kpis_data = simplejson.loads(data)
665
666 if not kpis_data or \
667 'ts' not in kpis_data or \
668 'prefixes' not in kpis_data:
669 return False
670
671 # Check only new kpis
672 if kpis_data['ts'] > self.consume_kafka_message_starting_at:
673 for key, value in kpis_data['prefixes'].items():
674 if device_id in key:
675 return True
676 return False
677
678 cmd = command_defs['kafka_kpis'].format(self.kafka_endpoint)
679
680 self.run_command_and_wait_until(cmd, validate_output, 60, 'kpis',
681 expected_predicate_result=event_present)
682
683 # Verify that the alarm follows the proper schema structure
684 def assert_alarm_event_schema(self, alarm):
685 try:
686 jsonschema.validate(alarm, ALARM_SCHEMA)
687 except Exception as e:
688 self.assertTrue(
689 False, 'Validation failed for alarm : {}'.format(e.message))
690
691 def create_device_filter(self, device_id):
692 rules = list()
693 rule = dict()
694
695 # Create a filter with a single rule
696 rule['key'] = 'device_id'
697 rule['value'] = device_id
698 rules.append(rule)
699
700 alarm_filter = AlarmFilter(rules=rules)
khenaidoo08d48d22017-06-29 19:42:49 -0400701 alarm_filter = self.post('/api/v1/alarm_filters',
khenaidoo032d3302017-06-09 14:50:04 -0400702 MessageToDict(alarm_filter),
Stephane Barbariecd51f992017-09-07 16:37:02 -0400703 expected_http_code=200)
khenaidoo032d3302017-06-09 14:50:04 -0400704 self.assertIsNotNone(alarm_filter)
705 return alarm_filter
706
707 def remove_device_filter(self, alarm_filter_id):
khenaidoo08d48d22017-06-29 19:42:49 -0400708 path = '/api/v1/alarm_filters/{}'.format(alarm_filter_id)
Stephane Barbariecd51f992017-09-07 16:37:02 -0400709 self.delete(path, expected_http_code=200)
710 alarm_filter = self.get(path, expected_http_code=200, grpc_status=5)
khenaidoo032d3302017-06-09 14:50:04 -0400711 self.assertIsNone(alarm_filter)
712
713 def run_command_and_wait_until(self, cmd, predicate, timeout, msg,
714 expected_predicate_result=True):
715 # Run until the predicate is True or timeout
716 try:
717 deadline = time() + timeout
718 env = os.environ.copy()
719 proc = subprocess.Popen(
720 cmd,
721 env=env,
722 shell=True,
723 stdout=subprocess.PIPE,
724 stderr=subprocess.PIPE,
725 bufsize=1
726 )
727 poll_obj = select.poll()
728 poll_obj.register(proc.stdout, select.POLLIN)
729 while time() < deadline:
730 poll_result = poll_obj.poll(0)
731 if poll_result:
732 line = proc.stdout.readline()
733 if predicate(line):
734 try:
735 proc.terminate()
736 proc.wait()
737 subprocess.Popen(['reset']).wait()
738 except Exception as e:
739 print "Received exception {} when killing process " \
740 "started with {}".format(repr(e), cmd)
741 if not expected_predicate_result:
742 self.fail(
743 'Predicate is True but is should be false:{}'.
744 format(msg))
745 else:
746 return
747 try:
748 proc.terminate()
749 proc.wait()
750 subprocess.Popen(['reset']).wait()
751 except Exception as e:
752 print "Received exception {} when killing process " \
753 "started with {}".format(repr(e), cmd)
754
755 if expected_predicate_result:
756 self.fail(
757 'Timed out while waiting for condition: {}'.format(msg))
758
759 except Exception as e:
760 print 'Exception {} when running command:{}'.format(repr(e), cmd)
761 return