blob: b193a2adb62e977ab1596b0c001df5e1b930b7ba [file] [log] [blame]
"""
SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org>
SPDX-License-Identifier: LicenseRef-ONF-Member-1.01
"""
import sys
import requests
import json
# noinspection PyUnresolvedReferences
from requests.structures import CaseInsensitiveDict
import logging as log
import pyaml
from datetime import datetime
from device import Device
URL = "https://roc.menlo.aetherproject.org/aether-roc-api/aether/v4.0.0/connectivity-service-v4/"
# URL = "https://roc.staging.aether.onlab.us/aether-roc-api/aether/v4.0.0/connectivity-service-v4/"
class Roc(object):
def __init__(self, user, password):
self.user = user
self.password = password
self.key = self.get_key()
def headers(self):
h = CaseInsensitiveDict()
h["Content-Type"] = "application/json"
h["Authorization"] = "Bearer " + self.key
return h
def get_mbr(self, device_group):
url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group)
response = requests.get(url, headers=self.headers())
if response.status_code != 200:
log.error("Failed to get mbr, status_code: {}".format(response.status_code))
return None
mbr = json.loads(response.text)
return mbr
def set_mbr(self, device_group, mbr):
log.info("Set {} mbr to {}".format(device_group, mbr))
m = {'uplink': mbr}
url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group)
response = requests.post(url, headers=self.headers(), json=m)
# If error, renew api key and try again
if response.status_code != 201:
log.info("Renew ROC api key")
self.key = self.get_key()
response = requests.post(url, headers=self.headers(), json=m)
if response.status_code != 201:
log.error("Failed to set mbr, device_group:{}, mbr:{}, status_code: {}".format(device_group, mbr,
response.status_code))
# noinspection PyUnresolvedReferences
sys.exit()
def get_key(self):
url = "https://keycloak.opennetworking.org/auth/realms/master/protocol/openid-connect/token"
headers = CaseInsensitiveDict()
headers["Content-Type"] = "application/x-www-form-urlencoded"
data = {
'grant_type': 'password',
'client_id': 'aether-roc-gui',
'username': self.user,
'password': self.password,
'scope': 'openid profile email groups'
}
response = requests.post(url, data, headers)
key = json.loads(response.text)['access_token']
return key
def get_enterprise(self):
url = URL + "enterprise"
response = requests.get(url, headers=self.headers())
if response.status_code != 200:
log.error("get_enterprise() failed, status_code: {}".format(response.status_code))
return None
return json.loads(response.text)
def get_site(self):
url = URL + "site"
response = requests.get(url, headers=self.headers())
if response.status_code != 200:
log.error("get_site() failed, status_code: {}".format(response.status_code))
return None
return json.loads(response.text)
def get_upf(self):
url = URL + "upf"
response = requests.get(url, headers=self.headers())
if response.status_code != 200:
log.error("get_upf() failed, status_code: {}".format(response.status_code))
return None
return json.loads(response.text)
def get_devicegroups(self):
url = URL + "device-group"
try:
response = requests.get(url, headers=self.headers(), timeout=10)
except requests.ReadTimeout as e:
log.error("ROC request timeout, error={}".format(e))
sys.exit()
if response.status_code != 200:
log.info("Renew ROC api key")
self.key = self.get_key()
response = requests.get(url, headers=self.headers(), timeout=10)
if response.status_code != 200:
log.error("get_devicegroups() failed, status_code: {}".format(response.status_code))
sys.exit()
return None
return json.loads(response.text)['device-group']
def get_ipdomain(self):
url = URL + "ip-domain"
response = requests.get(url, headers=self.headers())
if response.status_code != 200:
log.error("get_ipdomain() failed, status_code: {}".format(response.status_code))
return None
return json.loads(response.text)['ip-domain']
def get_devices(self):
devices = {}
for dg in self.get_devicegroups():
for d in dg['imsis']:
devices[d['imsi-id']] = d['imsi-range-from']
return devices
def update_devices(self, devices):
new = {}
dgs = self.get_devicegroups()
if dgs is not None:
for dg in dgs:
for d in dg['imsis']:
imsi_id = str(d['imsi-id'])
imsi = str(d['imsi-range-from'])
last_reachable = datetime.min
if imsi_id in devices:
last_reachable = devices[imsi_id].last_reachable
new[imsi_id] = Device(imsi_id, imsi, last_reachable)
return new
def get_subnet(self):
subnets = {}
ip_domains = self.get_ipdomain()
for ip_domain in ip_domains:
subnets[ip_domain['id']] = ip_domain['subnet']
return subnets
def dump(self):
print(pyaml.dump(self.get_enterprise()))
print(pyaml.dump(self.get_site()))
print(pyaml.dump(self.get_upf()))
print(pyaml.dump(self.get_devicegroups()))
print(pyaml.dump(self.get_ipdomain()))
if __name__ == '__main__':
# use valid keycloak user/password
roc = Roc("user", "password")
print(pyaml.dump(roc.get_enterprise()))
print(pyaml.dump(roc.get_site()))
print(pyaml.dump(roc.get_upf()))
print(pyaml.dump(roc.get_devicegroups()))
print(pyaml.dump(roc.get_ipdomain()))
cameras = "menlo-4g-cameras"
mbr = roc.get_mbr(cameras)
print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))
roc.set_mbr(cameras, 5000000)
mbr = roc.get_mbr(cameras)
print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))
roc.set_mbr(cameras, 10000000)
mbr = roc.get_mbr(cameras)
print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))