Add roc.py
Change-Id: I866b95dd7fe98bbe6f787f113393b2375cf427a5
diff --git a/roc.py b/roc.py
new file mode 100644
index 0000000..fd45701
--- /dev/null
+++ b/roc.py
@@ -0,0 +1,131 @@
+"""
+SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org>
+SPDX-License-Identifier: LicenseRef-ONF-Member-1.01
+"""
+
+import requests
+import json
+from requests.structures import CaseInsensitiveDict
+import logging as log
+import pyaml
+
+URL = "https://roc.menlo.aetherproject.org/aether-roc-api/aether/v4.0.0/connectivity-service-v4/"
+
+
+# URL = "https://roc.staging.aether.onlab.us/aether-roc-api/aether/v4.0.0/connectivity-service-v4/"
+
+
+class Roc(object):
+
+ def __init__(self, user, password):
+ self.user = user
+ self.password = password
+ self.key = self.get_key()
+
+ def headers(self):
+ h = CaseInsensitiveDict()
+ h["Content-Type"] = "application/json"
+ h["Authorization"] = "Bearer " + self.key
+ return h
+
+ def get_mbr(self, device_group):
+ url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group)
+ response = requests.get(url, headers=self.headers())
+ if response.status_code != 200:
+ log.error("Failed to get mbr, status_code: {}".format(response.status_code))
+ return None
+ mbr = json.loads(response.text)
+ return mbr
+
+ def set_mbr(self, device_group, mbr):
+ log.info("Set {} mbr to {}".format(device_group, mbr))
+ m = {'uplink': mbr}
+ url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group)
+ response = requests.post(url, headers=self.headers(), json=m)
+
+ # If error, renew api key and try again
+ if response.status_code != 201:
+ log.info("Renew ROC api key")
+ self.key = self.get_key()
+ response = requests.post(url, headers=self.headers(), json=m)
+ if response.status_code != 201:
+ log.error("Failed to set mbr, device_group:{}, mbr:{}, status_code: {}".format(device_group, mbr,
+ response.status_code))
+ sys.exit()
+
+ def get_key(self):
+ url = "https://keycloak.opennetworking.org/auth/realms/master/protocol/openid-connect/token"
+ headers = CaseInsensitiveDict()
+ headers["Content-Type"] = "application/x-www-form-urlencoded"
+ data = {
+ 'grant_type': 'password',
+ 'client_id': 'aether-roc-gui',
+ 'username': self.user,
+ 'password': self.password,
+ 'scope': 'openid profile email groups'
+ }
+ response = requests.post(url, data, headers)
+ key = json.loads(response.text)['access_token']
+ return key
+
+ def get_enterprise(self):
+ url = URL + "enterprise"
+ response = requests.get(url, headers=self.headers())
+ if response.status_code != 200:
+ log.error("get_enterprise() failed, status_code: {}".format(response.status_code))
+ return None
+ return json.loads(response.text)
+
+ def get_site(self):
+ url = URL + "site"
+ response = requests.get(url, headers=self.headers())
+ if response.status_code != 200:
+ log.error("get_site() failed, status_code: {}".format(response.status_code))
+ return None
+ return json.loads(response.text)
+
+ def get_upf(self):
+ url = URL + "upf"
+ response = requests.get(url, headers=self.headers())
+ if response.status_code != 200:
+ log.error("get_upf() failed, status_code: {}".format(response.status_code))
+ return None
+ return json.loads(response.text)
+
+ def get_devicegroup(self):
+ url = URL + "device-group"
+ response = requests.get(url, headers=self.headers())
+ if response.status_code != 200:
+ log.error("get_devicegroup() failed, status_code: {}".format(response.status_code))
+ return None
+ return json.loads(response.text)
+
+ def get_ipdomain(self):
+ url = URL + "ip-domain"
+ response = requests.get(url, headers=self.headers())
+ if response.status_code != 200:
+ log.error("get_ipdomain() failed, status_code: {}".format(response.status_code))
+ return None
+ return json.loads(response.text)
+
+
+if __name__ == '__main__':
+ # use valid keycloak user/password
+ roc = Roc("user", "password")
+
+ print(pyaml.dump(roc.get_enterprise()))
+ print(pyaml.dump(roc.get_site()))
+ print(pyaml.dump(roc.get_upf()))
+ print(pyaml.dump(roc.get_devicegroup()))
+ print(pyaml.dump(roc.get_ipdomain()))
+
+ cameras = "menlo-4g-cameras"
+
+ mbr = roc.get_mbr(cameras)
+ print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))
+ roc.set_mbr(cameras, 5000000)
+ mbr = roc.get_mbr(cameras)
+ print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))
+ roc.set_mbr(cameras, 10000000)
+ mbr = roc.get_mbr(cameras)
+ print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))