blob: e40cb08107209ffc4319feed7a1643bc392989c6 [file] [log] [blame]
# Copyright 2017-present Open Networking Foundation
#
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
#
# http://www.apache.org/licenses/LICENSE-2.0
#
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
import requests
from requests.auth import HTTPBasicAuth
import unittest
import json
from time import sleep
timeout = 10
baseUrl = 'http://10.128.22.3'
username = 'xosadmin@opencord.org'
password = '1UvlBSNmBCbqaHzTC8qV'
onosUrl = 'http://10.128.22.3:8183'
onosUser = 'karaf'
onosPass = 'karaf'
class ProgranTest(unittest.TestCase):
# def test_connection(self):
# url = "%s/xosapi/v1/progran/progranserviceinstances" % baseUrl
# r = requests.get(url, auth=HTTPBasicAuth(username, password))
# res = r.json()
# self.assertTrue(isinstance(res['items'], list))
def test_create_profile(self):
handover_id = None
profile_id = None
imsi_id = None
enodeb_id = None
print "\nCreate profile"
# Create handover
handover_data = {
"A3offset": 2,
"A5Thresh1Rsrp": 97,
"A5Thresh1Rsrq": 10,
"A5Thresh2Rsrp": 95,
"A5Thresh2Rsrq": 8,
"A5TriggerType": 0,
"HysteresisA3": 1,
"HysteresisA5": 1,
"A3TriggerQuantity": 10,
"A5TriggerQuantity": 20
}
handover_url = "%s/xosapi/v1/progran/handovers" % baseUrl
hr = requests.post(handover_url, data=json.dumps(handover_data), auth=HTTPBasicAuth(username, password))
handover_res = hr.json()
handover_id = handover_res['id']
self.assertEqual(handover_res['backend_code'], 0)
# Create profile
profile_data = {
"name": "automated_test_1",
"DlSchedType": "RR",
"UlSchedType": "RR",
"DlAllocRBRate": 20,
"UlAllocRBRate": 20,
"AdmControl": 1,
"CellIndividualOffset": 1,
"mmeip": "192.168.61.30",
"mmeport": "36412",
"SubsProfile": "",
"DlWifiRate": 13,
"DlUeAllocRbRate": 12,
"handover_id": handover_id
}
profile_url = "%s/xosapi/v1/progran/progranserviceinstances" % baseUrl
pr = requests.post(profile_url, data=json.dumps(profile_data), auth=HTTPBasicAuth(username, password))
profile_res = pr.json()
profile_id = profile_res['id']
self.assertEqual(profile_res['backend_code'], 0)
# Read profile from ONOS
print "\nRead profile from ONOS"
sleep(timeout)
profile_read_url = "%s/onos/progran/profile/%s" % (onosUrl, profile_data['name'])
pr = requests.get(profile_read_url, auth=HTTPBasicAuth(username, password))
profile_read_res = pr.json()['ProfileArray'][0]
self.assertEqual(profile_read_res['Name'], profile_data['name'])
# TODO check for mme details
print "\nAdd IMSI"
# Add IMSI
imsi_data = {
"imsi_number": "302720100000421",
"apn_number": "",
"ue_status": 0
}
imsi_url = "%s/xosapi/v1/mcord/mcordsubscriberinstances" % baseUrl
ir = requests.post(imsi_url, data=json.dumps(imsi_data), auth=HTTPBasicAuth(username, password))
imsi_res = ir.json()
imsi_id = imsi_res['id']
self.assertEqual(imsi_res['backend_code'], 0)
# Read IMSI and check status
print "\nRead IMSI from ONOS"
sleep(timeout)
imsi_read_url = "%s/onos/progran/imsi/%s" % (onosUrl, imsi_data['imsi_number'])
pr = requests.get(imsi_read_url, auth=HTTPBasicAuth(username, password))
imsi_res = pr.json()['ImsiArray'][0]
self.assertEqual(imsi_res['IMSI'], imsi_data['imsi_number'])
# Add link from Profile to IMSI
print "\nAdding imsi to profile"
link_data = {
"provider_service_instance_id": profile_id,
"subscriber_service_instance_id": imsi_id,
}
link_url = "%s/xosapi/v1/core/serviceinstancelinks" % baseUrl
ir = requests.post(link_url, data=json.dumps(link_data), auth=HTTPBasicAuth(username, password))
link_res = ir.json()
self.assertEqual(link_res['backend_code'], 0)
# check link in ONOS
print "\nRead IMSI Profiles from ONOS"
sleep(timeout)
imsi_profile_url = "%s/onos/progran/imsi/%s/profile" % (onosUrl, imsi_data['imsi_number'])
pr = requests.get(imsi_profile_url, auth=HTTPBasicAuth(username, password))
imsi_profile_res = pr.json()['ProfileArray'][0]
self.assertEqual(imsi_profile_res['Name'], profile_data['name'])
# Add EnodeB
print "\nAdding Enodeb"
enodeb_data = {
"description": "testcem",
"enbId": "402",
"ipAddr": "192.168.32.1"
}
enodeb_url = "%s/xosapi/v1/progran/enodebs" % baseUrl
r = requests.post(enodeb_url, data=json.dumps(enodeb_data), auth=HTTPBasicAuth(username, password))
enodeb_res = r.json()
enodeb_id = enodeb_res['id']
self.assertEqual(enodeb_res['backend_code'], 0)
# Check EnodeB in ONOS
print "\nRead Enodeb from ONOS"
sleep(timeout)
enodeb_onos_url = "%s/onos/progran/enodeb/%s" % (onosUrl, enodeb_data['enbId'])
pr = requests.get(enodeb_onos_url, auth=HTTPBasicAuth(username, password))
enodeb_onos_res = pr.json()['EnodeBArray'][0]
self.assertEqual(enodeb_onos_res['Description'], enodeb_data['description'])
# Add enodeb to profile
print "\n Adding EnodeB to profile"
profile_data["enodeb_id"] = enodeb_id
profile_update_url = "%s/%s" % (profile_url, profile_id)
r = requests.put(profile_update_url, data=json.dumps(profile_data), auth=HTTPBasicAuth(username, password))
update_res = r.json()
# Check that enodeb has been added to profile
print "\nRead Enodeb to profile from ONOS"
sleep(timeout)
enodeb_onos_url = "%s/onos/progran/enodeb/%s" % (onosUrl, enodeb_data['enbId'])
pr = requests.get(enodeb_onos_url, auth=HTTPBasicAuth(username, password))
enodeb_onos_res = pr.json()['EnodeBArray'][0]
self.assertIn(profile_data['name'], enodeb_onos_res['ProfileArray'])
# TODO change profile UL/DL rates in ONOS and check in XOS (how??)
if __name__ == "__main__":
unittest.main()