blob: b23aefab5d79bb0c923ffee52fa3c1206a5c2b10 [file] [log] [blame]
A R Karthick35495c32017-05-11 14:58:32 -07001import requests
2import json
3import time
4from CordTestUtils import log_test as log
5
6class VolthaCtrl(object):
7
8 def __init__(self, host, rest_port = 8881):
9 self.host = host
10 self.rest_port = rest_port
11 self.rest_url = 'http://{}:{}/api/v1'.format(host, rest_port)
12
13 def get_devices(self):
14 url = '{}/local/devices'.format(self.rest_url)
15 resp = requests.get(url)
16 if resp.ok is not True or resp.status_code != 200:
17 return None
18 return resp.json()
19
20 def enable_device(self, olt_type, olt_mac):
21 url = '{}/local/devices'.format(self.rest_url)
22 device_config = { 'type' : olt_type, 'mac_address' : olt_mac }
23 #pre-provision
24 log.info('Pre-provisioning %s with mac %s' %(olt_type, olt_mac))
25 resp = requests.post(url, data = json.dumps(device_config))
26 if resp.ok is not True or resp.status_code != 200:
27 return False
28 device_id = resp.json()['id']
29 log.info('Enabling device %s' %(device_id))
30 enable_url = '{}/{}/enable'.format(url, device_id)
31 resp = requests.post(enable_url)
32 if resp.ok is not True or resp.status_code != 200:
33 return False
34 #get operational status
35 time.sleep(5)
36 log.info('Checking operational status for device %s' %(device_id))
37 resp = requests.get('{}/{}'.format(url, device_id))
38 if resp.ok is not True or resp.status_code != 200:
39 return False
40 device_info = resp.json()
41 if device_info['oper_status'] != 'ACTIVE' or \
42 device_info['admin_state'] != 'ENABLED' or \
43 device_info['connect_status'] != 'REACHABLE':
44 return False
45
46 return True
Chetan Gaonker3620a112017-05-23 06:10:15 +000047
48 def get_operational_status(self, device_id):
49 url = '{}/local/devices'.format(self.rest_url)
50 log.info('Checking operational status for device %s' %(device_id))
51 resp = requests.get('{}/{}'.format(url, device_id))
52 if resp.ok is not True or resp.status_code != 200:
53 return False
54 device_info = resp.json()
55 if device_info['oper_status'] != 'ACTIVE' or \
56 device_info['admin_state'] != 'ENABLED' or \
57 device_info['connect_status'] != 'REACHABLE':
58 return False
59 return True
60
61 def check_preprovision_status(self, device_id):
62 url = '{}/local/devices'.format(self.rest_url)
63 log.info('Check if device %s is in Preprovisioning state'%(device_id))
64 resp = requests.get('{}/{}'.format(url, device_id))
65 if resp.ok is not True or resp.status_code != 200:
66 return False
67 device_info = resp.json()
68 if device_info['admin_status'] == 'PREPROVISIONED':
69 return True
70 return False