Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 1 | """ |
| 2 | SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org> |
| 3 | SPDX-License-Identifier: LicenseRef-ONF-Member-1.01 |
| 4 | """ |
| 5 | |
| 6 | import requests |
| 7 | import json |
| 8 | from requests.structures import CaseInsensitiveDict |
| 9 | import logging as log |
| 10 | import pyaml |
| 11 | |
| 12 | URL = "https://roc.menlo.aetherproject.org/aether-roc-api/aether/v4.0.0/connectivity-service-v4/" |
| 13 | |
| 14 | |
| 15 | # URL = "https://roc.staging.aether.onlab.us/aether-roc-api/aether/v4.0.0/connectivity-service-v4/" |
| 16 | |
| 17 | |
| 18 | class Roc(object): |
| 19 | |
| 20 | def __init__(self, user, password): |
| 21 | self.user = user |
| 22 | self.password = password |
| 23 | self.key = self.get_key() |
| 24 | |
| 25 | def headers(self): |
| 26 | h = CaseInsensitiveDict() |
| 27 | h["Content-Type"] = "application/json" |
| 28 | h["Authorization"] = "Bearer " + self.key |
| 29 | return h |
| 30 | |
| 31 | def get_mbr(self, device_group): |
| 32 | url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group) |
| 33 | response = requests.get(url, headers=self.headers()) |
| 34 | if response.status_code != 200: |
| 35 | log.error("Failed to get mbr, status_code: {}".format(response.status_code)) |
| 36 | return None |
| 37 | mbr = json.loads(response.text) |
| 38 | return mbr |
| 39 | |
| 40 | def set_mbr(self, device_group, mbr): |
| 41 | log.info("Set {} mbr to {}".format(device_group, mbr)) |
| 42 | m = {'uplink': mbr} |
| 43 | url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group) |
| 44 | response = requests.post(url, headers=self.headers(), json=m) |
| 45 | |
| 46 | # If error, renew api key and try again |
| 47 | if response.status_code != 201: |
| 48 | log.info("Renew ROC api key") |
| 49 | self.key = self.get_key() |
| 50 | response = requests.post(url, headers=self.headers(), json=m) |
| 51 | if response.status_code != 201: |
| 52 | log.error("Failed to set mbr, device_group:{}, mbr:{}, status_code: {}".format(device_group, mbr, |
| 53 | response.status_code)) |
| 54 | sys.exit() |
| 55 | |
| 56 | def get_key(self): |
| 57 | url = "https://keycloak.opennetworking.org/auth/realms/master/protocol/openid-connect/token" |
| 58 | headers = CaseInsensitiveDict() |
| 59 | headers["Content-Type"] = "application/x-www-form-urlencoded" |
| 60 | data = { |
| 61 | 'grant_type': 'password', |
| 62 | 'client_id': 'aether-roc-gui', |
| 63 | 'username': self.user, |
| 64 | 'password': self.password, |
| 65 | 'scope': 'openid profile email groups' |
| 66 | } |
| 67 | response = requests.post(url, data, headers) |
| 68 | key = json.loads(response.text)['access_token'] |
| 69 | return key |
| 70 | |
| 71 | def get_enterprise(self): |
| 72 | url = URL + "enterprise" |
| 73 | response = requests.get(url, headers=self.headers()) |
| 74 | if response.status_code != 200: |
| 75 | log.error("get_enterprise() failed, status_code: {}".format(response.status_code)) |
| 76 | return None |
| 77 | return json.loads(response.text) |
| 78 | |
| 79 | def get_site(self): |
| 80 | url = URL + "site" |
| 81 | response = requests.get(url, headers=self.headers()) |
| 82 | if response.status_code != 200: |
| 83 | log.error("get_site() failed, status_code: {}".format(response.status_code)) |
| 84 | return None |
| 85 | return json.loads(response.text) |
| 86 | |
| 87 | def get_upf(self): |
| 88 | url = URL + "upf" |
| 89 | response = requests.get(url, headers=self.headers()) |
| 90 | if response.status_code != 200: |
| 91 | log.error("get_upf() failed, status_code: {}".format(response.status_code)) |
| 92 | return None |
| 93 | return json.loads(response.text) |
| 94 | |
| 95 | def get_devicegroup(self): |
| 96 | url = URL + "device-group" |
| 97 | response = requests.get(url, headers=self.headers()) |
| 98 | if response.status_code != 200: |
| 99 | log.error("get_devicegroup() failed, status_code: {}".format(response.status_code)) |
| 100 | return None |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame^] | 101 | return json.loads(response.text)['device-group'] |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 102 | |
| 103 | def get_ipdomain(self): |
| 104 | url = URL + "ip-domain" |
| 105 | response = requests.get(url, headers=self.headers()) |
| 106 | if response.status_code != 200: |
| 107 | log.error("get_ipdomain() failed, status_code: {}".format(response.status_code)) |
| 108 | return None |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame^] | 109 | return json.loads(response.text)['ip-domain'] |
| 110 | |
| 111 | def get_imsi(self): |
| 112 | imsis = {} # imsi-id:imsi |
| 113 | devicegroups = self.get_devicegroup() |
| 114 | for devicegroup in devicegroups: |
| 115 | for imsi in devicegroup['imsis']: |
| 116 | imsis[imsi['imsi-id']] = imsi['imsi-range-from'] |
| 117 | return imsis |
| 118 | |
| 119 | def get_subnet(self): |
| 120 | subnets = {} |
| 121 | ip_domains = self.get_ipdomain() |
| 122 | for ip_domain in ip_domains: |
| 123 | subnets[ip_domain['id']] = ip_domain['subnet'] |
| 124 | return subnets |
| 125 | |
| 126 | |
| 127 | def dump(self): |
| 128 | print(pyaml.dump(self.get_enterprise())) |
| 129 | print(pyaml.dump(self.get_site())) |
| 130 | print(pyaml.dump(self.get_upf())) |
| 131 | print(pyaml.dump(self.get_devicegroup())) |
| 132 | print(pyaml.dump(self.get_ipdomain())) |
| 133 | |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 134 | |
| 135 | |
| 136 | if __name__ == '__main__': |
| 137 | # use valid keycloak user/password |
| 138 | roc = Roc("user", "password") |
| 139 | |
| 140 | print(pyaml.dump(roc.get_enterprise())) |
| 141 | print(pyaml.dump(roc.get_site())) |
| 142 | print(pyaml.dump(roc.get_upf())) |
| 143 | print(pyaml.dump(roc.get_devicegroup())) |
| 144 | print(pyaml.dump(roc.get_ipdomain())) |
| 145 | |
| 146 | cameras = "menlo-4g-cameras" |
| 147 | |
| 148 | mbr = roc.get_mbr(cameras) |
| 149 | print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"])) |
| 150 | roc.set_mbr(cameras, 5000000) |
| 151 | mbr = roc.get_mbr(cameras) |
| 152 | print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"])) |
| 153 | roc.set_mbr(cameras, 10000000) |
| 154 | mbr = roc.get_mbr(cameras) |
| 155 | print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"])) |