blob: e40cb08107209ffc4319feed7a1643bc392989c6 [file] [log] [blame]
Matteo Scandolo3effc6e2018-03-26 13:18:52 -07001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Matteo Scandolo8c096ef2018-02-16 22:42:04 -080015import requests
16from requests.auth import HTTPBasicAuth
17import unittest
18import json
19from time import sleep
20
21timeout = 10
22
23baseUrl = 'http://10.128.22.3'
24username = 'xosadmin@opencord.org'
25password = '1UvlBSNmBCbqaHzTC8qV'
26
27onosUrl = 'http://10.128.22.3:8183'
28onosUser = 'karaf'
29onosPass = 'karaf'
30
31class ProgranTest(unittest.TestCase):
32
33 # def test_connection(self):
34 # url = "%s/xosapi/v1/progran/progranserviceinstances" % baseUrl
35 # r = requests.get(url, auth=HTTPBasicAuth(username, password))
36 # res = r.json()
37 # self.assertTrue(isinstance(res['items'], list))
38
39 def test_create_profile(self):
40
41 handover_id = None
42 profile_id = None
43 imsi_id = None
44 enodeb_id = None
45
46 print "\nCreate profile"
47
48 # Create handover
49 handover_data = {
50 "A3offset": 2,
51 "A5Thresh1Rsrp": 97,
52 "A5Thresh1Rsrq": 10,
53 "A5Thresh2Rsrp": 95,
54 "A5Thresh2Rsrq": 8,
55 "A5TriggerType": 0,
56 "HysteresisA3": 1,
57 "HysteresisA5": 1,
58 "A3TriggerQuantity": 10,
59 "A5TriggerQuantity": 20
60 }
61
62 handover_url = "%s/xosapi/v1/progran/handovers" % baseUrl
63 hr = requests.post(handover_url, data=json.dumps(handover_data), auth=HTTPBasicAuth(username, password))
64 handover_res = hr.json()
65 handover_id = handover_res['id']
66 self.assertEqual(handover_res['backend_code'], 0)
67
68 # Create profile
69 profile_data = {
70 "name": "automated_test_1",
71 "DlSchedType": "RR",
72 "UlSchedType": "RR",
73 "DlAllocRBRate": 20,
74 "UlAllocRBRate": 20,
75 "AdmControl": 1,
76 "CellIndividualOffset": 1,
77 "mmeip": "192.168.61.30",
78 "mmeport": "36412",
79 "SubsProfile": "",
80 "DlWifiRate": 13,
81 "DlUeAllocRbRate": 12,
82 "handover_id": handover_id
83 }
84
85 profile_url = "%s/xosapi/v1/progran/progranserviceinstances" % baseUrl
86 pr = requests.post(profile_url, data=json.dumps(profile_data), auth=HTTPBasicAuth(username, password))
87 profile_res = pr.json()
88 profile_id = profile_res['id']
89 self.assertEqual(profile_res['backend_code'], 0)
90
91 # Read profile from ONOS
92 print "\nRead profile from ONOS"
93 sleep(timeout)
94
95 profile_read_url = "%s/onos/progran/profile/%s" % (onosUrl, profile_data['name'])
96 pr = requests.get(profile_read_url, auth=HTTPBasicAuth(username, password))
97 profile_read_res = pr.json()['ProfileArray'][0]
98 self.assertEqual(profile_read_res['Name'], profile_data['name'])
99 # TODO check for mme details
100
101 print "\nAdd IMSI"
102 # Add IMSI
103 imsi_data = {
104 "imsi_number": "302720100000421",
105 "apn_number": "",
106 "ue_status": 0
107 }
108 imsi_url = "%s/xosapi/v1/mcord/mcordsubscriberinstances" % baseUrl
109 ir = requests.post(imsi_url, data=json.dumps(imsi_data), auth=HTTPBasicAuth(username, password))
110 imsi_res = ir.json()
111 imsi_id = imsi_res['id']
112 self.assertEqual(imsi_res['backend_code'], 0)
113
114 # Read IMSI and check status
115 print "\nRead IMSI from ONOS"
116 sleep(timeout)
117
118 imsi_read_url = "%s/onos/progran/imsi/%s" % (onosUrl, imsi_data['imsi_number'])
119 pr = requests.get(imsi_read_url, auth=HTTPBasicAuth(username, password))
120 imsi_res = pr.json()['ImsiArray'][0]
121 self.assertEqual(imsi_res['IMSI'], imsi_data['imsi_number'])
122
123 # Add link from Profile to IMSI
124 print "\nAdding imsi to profile"
125 link_data = {
126 "provider_service_instance_id": profile_id,
127 "subscriber_service_instance_id": imsi_id,
128 }
129 link_url = "%s/xosapi/v1/core/serviceinstancelinks" % baseUrl
130 ir = requests.post(link_url, data=json.dumps(link_data), auth=HTTPBasicAuth(username, password))
131 link_res = ir.json()
132 self.assertEqual(link_res['backend_code'], 0)
133
134 # check link in ONOS
135 print "\nRead IMSI Profiles from ONOS"
136 sleep(timeout)
137 imsi_profile_url = "%s/onos/progran/imsi/%s/profile" % (onosUrl, imsi_data['imsi_number'])
138 pr = requests.get(imsi_profile_url, auth=HTTPBasicAuth(username, password))
139 imsi_profile_res = pr.json()['ProfileArray'][0]
140 self.assertEqual(imsi_profile_res['Name'], profile_data['name'])
141
142 # Add EnodeB
143 print "\nAdding Enodeb"
144 enodeb_data = {
145 "description": "testcem",
146 "enbId": "402",
147 "ipAddr": "192.168.32.1"
148 }
149 enodeb_url = "%s/xosapi/v1/progran/enodebs" % baseUrl
150 r = requests.post(enodeb_url, data=json.dumps(enodeb_data), auth=HTTPBasicAuth(username, password))
151 enodeb_res = r.json()
152 enodeb_id = enodeb_res['id']
153 self.assertEqual(enodeb_res['backend_code'], 0)
154
155 # Check EnodeB in ONOS
156 print "\nRead Enodeb from ONOS"
157 sleep(timeout)
158
159 enodeb_onos_url = "%s/onos/progran/enodeb/%s" % (onosUrl, enodeb_data['enbId'])
160 pr = requests.get(enodeb_onos_url, auth=HTTPBasicAuth(username, password))
161 enodeb_onos_res = pr.json()['EnodeBArray'][0]
162 self.assertEqual(enodeb_onos_res['Description'], enodeb_data['description'])
163
164 # Add enodeb to profile
165 print "\n Adding EnodeB to profile"
166
167 profile_data["enodeb_id"] = enodeb_id
168 profile_update_url = "%s/%s" % (profile_url, profile_id)
169 r = requests.put(profile_update_url, data=json.dumps(profile_data), auth=HTTPBasicAuth(username, password))
170 update_res = r.json()
171
172 # Check that enodeb has been added to profile
173 print "\nRead Enodeb to profile from ONOS"
174 sleep(timeout)
175
176 enodeb_onos_url = "%s/onos/progran/enodeb/%s" % (onosUrl, enodeb_data['enbId'])
177 pr = requests.get(enodeb_onos_url, auth=HTTPBasicAuth(username, password))
178 enodeb_onos_res = pr.json()['EnodeBArray'][0]
179 self.assertIn(profile_data['name'], enodeb_onos_res['ProfileArray'])
180
181 # TODO change profile UL/DL rates in ONOS and check in XOS (how??)
182
183
184if __name__ == "__main__":
185 unittest.main()