blob: 396eb35a9abfa291cc4fb9b111d894d677b3facd [file] [log] [blame]
Matteo Scandolo8c096ef2018-02-16 22:42:04 -08001import requests
2from requests.auth import HTTPBasicAuth
3import unittest
4import json
5from time import sleep
6
7timeout = 10
8
9baseUrl = 'http://10.128.22.3'
10username = 'xosadmin@opencord.org'
11password = '1UvlBSNmBCbqaHzTC8qV'
12
13onosUrl = 'http://10.128.22.3:8183'
14onosUser = 'karaf'
15onosPass = 'karaf'
16
17class ProgranTest(unittest.TestCase):
18
19 # def test_connection(self):
20 # url = "%s/xosapi/v1/progran/progranserviceinstances" % baseUrl
21 # r = requests.get(url, auth=HTTPBasicAuth(username, password))
22 # res = r.json()
23 # self.assertTrue(isinstance(res['items'], list))
24
25 def test_create_profile(self):
26
27 handover_id = None
28 profile_id = None
29 imsi_id = None
30 enodeb_id = None
31
32 print "\nCreate profile"
33
34 # Create handover
35 handover_data = {
36 "A3offset": 2,
37 "A5Thresh1Rsrp": 97,
38 "A5Thresh1Rsrq": 10,
39 "A5Thresh2Rsrp": 95,
40 "A5Thresh2Rsrq": 8,
41 "A5TriggerType": 0,
42 "HysteresisA3": 1,
43 "HysteresisA5": 1,
44 "A3TriggerQuantity": 10,
45 "A5TriggerQuantity": 20
46 }
47
48 handover_url = "%s/xosapi/v1/progran/handovers" % baseUrl
49 hr = requests.post(handover_url, data=json.dumps(handover_data), auth=HTTPBasicAuth(username, password))
50 handover_res = hr.json()
51 handover_id = handover_res['id']
52 self.assertEqual(handover_res['backend_code'], 0)
53
54 # Create profile
55 profile_data = {
56 "name": "automated_test_1",
57 "DlSchedType": "RR",
58 "UlSchedType": "RR",
59 "DlAllocRBRate": 20,
60 "UlAllocRBRate": 20,
61 "AdmControl": 1,
62 "CellIndividualOffset": 1,
63 "mmeip": "192.168.61.30",
64 "mmeport": "36412",
65 "SubsProfile": "",
66 "DlWifiRate": 13,
67 "DlUeAllocRbRate": 12,
68 "handover_id": handover_id
69 }
70
71 profile_url = "%s/xosapi/v1/progran/progranserviceinstances" % baseUrl
72 pr = requests.post(profile_url, data=json.dumps(profile_data), auth=HTTPBasicAuth(username, password))
73 profile_res = pr.json()
74 profile_id = profile_res['id']
75 self.assertEqual(profile_res['backend_code'], 0)
76
77 # Read profile from ONOS
78 print "\nRead profile from ONOS"
79 sleep(timeout)
80
81 profile_read_url = "%s/onos/progran/profile/%s" % (onosUrl, profile_data['name'])
82 pr = requests.get(profile_read_url, auth=HTTPBasicAuth(username, password))
83 profile_read_res = pr.json()['ProfileArray'][0]
84 self.assertEqual(profile_read_res['Name'], profile_data['name'])
85 # TODO check for mme details
86
87 print "\nAdd IMSI"
88 # Add IMSI
89 imsi_data = {
90 "imsi_number": "302720100000421",
91 "apn_number": "",
92 "ue_status": 0
93 }
94 imsi_url = "%s/xosapi/v1/mcord/mcordsubscriberinstances" % baseUrl
95 ir = requests.post(imsi_url, data=json.dumps(imsi_data), auth=HTTPBasicAuth(username, password))
96 imsi_res = ir.json()
97 imsi_id = imsi_res['id']
98 self.assertEqual(imsi_res['backend_code'], 0)
99
100 # Read IMSI and check status
101 print "\nRead IMSI from ONOS"
102 sleep(timeout)
103
104 imsi_read_url = "%s/onos/progran/imsi/%s" % (onosUrl, imsi_data['imsi_number'])
105 pr = requests.get(imsi_read_url, auth=HTTPBasicAuth(username, password))
106 imsi_res = pr.json()['ImsiArray'][0]
107 self.assertEqual(imsi_res['IMSI'], imsi_data['imsi_number'])
108
109 # Add link from Profile to IMSI
110 print "\nAdding imsi to profile"
111 link_data = {
112 "provider_service_instance_id": profile_id,
113 "subscriber_service_instance_id": imsi_id,
114 }
115 link_url = "%s/xosapi/v1/core/serviceinstancelinks" % baseUrl
116 ir = requests.post(link_url, data=json.dumps(link_data), auth=HTTPBasicAuth(username, password))
117 link_res = ir.json()
118 self.assertEqual(link_res['backend_code'], 0)
119
120 # check link in ONOS
121 print "\nRead IMSI Profiles from ONOS"
122 sleep(timeout)
123 imsi_profile_url = "%s/onos/progran/imsi/%s/profile" % (onosUrl, imsi_data['imsi_number'])
124 pr = requests.get(imsi_profile_url, auth=HTTPBasicAuth(username, password))
125 imsi_profile_res = pr.json()['ProfileArray'][0]
126 self.assertEqual(imsi_profile_res['Name'], profile_data['name'])
127
128 # Add EnodeB
129 print "\nAdding Enodeb"
130 enodeb_data = {
131 "description": "testcem",
132 "enbId": "402",
133 "ipAddr": "192.168.32.1"
134 }
135 enodeb_url = "%s/xosapi/v1/progran/enodebs" % baseUrl
136 r = requests.post(enodeb_url, data=json.dumps(enodeb_data), auth=HTTPBasicAuth(username, password))
137 enodeb_res = r.json()
138 enodeb_id = enodeb_res['id']
139 self.assertEqual(enodeb_res['backend_code'], 0)
140
141 # Check EnodeB in ONOS
142 print "\nRead Enodeb from ONOS"
143 sleep(timeout)
144
145 enodeb_onos_url = "%s/onos/progran/enodeb/%s" % (onosUrl, enodeb_data['enbId'])
146 pr = requests.get(enodeb_onos_url, auth=HTTPBasicAuth(username, password))
147 enodeb_onos_res = pr.json()['EnodeBArray'][0]
148 self.assertEqual(enodeb_onos_res['Description'], enodeb_data['description'])
149
150 # Add enodeb to profile
151 print "\n Adding EnodeB to profile"
152
153 profile_data["enodeb_id"] = enodeb_id
154 profile_update_url = "%s/%s" % (profile_url, profile_id)
155 r = requests.put(profile_update_url, data=json.dumps(profile_data), auth=HTTPBasicAuth(username, password))
156 update_res = r.json()
157
158 # Check that enodeb has been added to profile
159 print "\nRead Enodeb to profile from ONOS"
160 sleep(timeout)
161
162 enodeb_onos_url = "%s/onos/progran/enodeb/%s" % (onosUrl, enodeb_data['enbId'])
163 pr = requests.get(enodeb_onos_url, auth=HTTPBasicAuth(username, password))
164 enodeb_onos_res = pr.json()['EnodeBArray'][0]
165 self.assertIn(profile_data['name'], enodeb_onos_res['ProfileArray'])
166
167 # TODO change profile UL/DL rates in ONOS and check in XOS (how??)
168
169
170if __name__ == "__main__":
171 unittest.main()