A R Karthick | 35495c3 | 2017-05-11 14:58:32 -0700 | [diff] [blame] | 1 | import requests |
| 2 | import json |
| 3 | import time |
| 4 | from CordTestUtils import log_test as log |
| 5 | |
| 6 | class VolthaCtrl(object): |
| 7 | |
| 8 | def __init__(self, host, rest_port = 8881): |
| 9 | self.host = host |
| 10 | self.rest_port = rest_port |
| 11 | self.rest_url = 'http://{}:{}/api/v1'.format(host, rest_port) |
| 12 | |
| 13 | def get_devices(self): |
| 14 | url = '{}/local/devices'.format(self.rest_url) |
| 15 | resp = requests.get(url) |
| 16 | if resp.ok is not True or resp.status_code != 200: |
| 17 | return None |
| 18 | return resp.json() |
| 19 | |
| 20 | def enable_device(self, olt_type, olt_mac): |
| 21 | url = '{}/local/devices'.format(self.rest_url) |
| 22 | device_config = { 'type' : olt_type, 'mac_address' : olt_mac } |
| 23 | #pre-provision |
| 24 | log.info('Pre-provisioning %s with mac %s' %(olt_type, olt_mac)) |
| 25 | resp = requests.post(url, data = json.dumps(device_config)) |
| 26 | if resp.ok is not True or resp.status_code != 200: |
| 27 | return False |
| 28 | device_id = resp.json()['id'] |
| 29 | log.info('Enabling device %s' %(device_id)) |
| 30 | enable_url = '{}/{}/enable'.format(url, device_id) |
| 31 | resp = requests.post(enable_url) |
| 32 | if resp.ok is not True or resp.status_code != 200: |
| 33 | return False |
| 34 | #get operational status |
| 35 | time.sleep(5) |
| 36 | log.info('Checking operational status for device %s' %(device_id)) |
| 37 | resp = requests.get('{}/{}'.format(url, device_id)) |
| 38 | if resp.ok is not True or resp.status_code != 200: |
| 39 | return False |
| 40 | device_info = resp.json() |
| 41 | if device_info['oper_status'] != 'ACTIVE' or \ |
| 42 | device_info['admin_state'] != 'ENABLED' or \ |
| 43 | device_info['connect_status'] != 'REACHABLE': |
| 44 | return False |
| 45 | |
| 46 | return True |