Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 1 | """ |
| 2 | SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org> |
| 3 | SPDX-License-Identifier: LicenseRef-ONF-Member-1.01 |
| 4 | """ |
| 5 | |
| 6 | import requests |
| 7 | import json |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 8 | # noinspection PyUnresolvedReferences |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 9 | from requests.structures import CaseInsensitiveDict |
| 10 | import logging as log |
| 11 | import pyaml |
| 12 | |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 13 | from device import Device |
| 14 | |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 15 | URL = "https://roc.menlo.aetherproject.org/aether-roc-api/aether/v4.0.0/connectivity-service-v4/" |
| 16 | |
| 17 | |
| 18 | # URL = "https://roc.staging.aether.onlab.us/aether-roc-api/aether/v4.0.0/connectivity-service-v4/" |
| 19 | |
| 20 | |
| 21 | class Roc(object): |
| 22 | |
| 23 | def __init__(self, user, password): |
| 24 | self.user = user |
| 25 | self.password = password |
| 26 | self.key = self.get_key() |
| 27 | |
| 28 | def headers(self): |
| 29 | h = CaseInsensitiveDict() |
| 30 | h["Content-Type"] = "application/json" |
| 31 | h["Authorization"] = "Bearer " + self.key |
| 32 | return h |
| 33 | |
| 34 | def get_mbr(self, device_group): |
| 35 | url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group) |
| 36 | response = requests.get(url, headers=self.headers()) |
| 37 | if response.status_code != 200: |
| 38 | log.error("Failed to get mbr, status_code: {}".format(response.status_code)) |
| 39 | return None |
| 40 | mbr = json.loads(response.text) |
| 41 | return mbr |
| 42 | |
| 43 | def set_mbr(self, device_group, mbr): |
| 44 | log.info("Set {} mbr to {}".format(device_group, mbr)) |
| 45 | m = {'uplink': mbr} |
| 46 | url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group) |
| 47 | response = requests.post(url, headers=self.headers(), json=m) |
| 48 | |
| 49 | # If error, renew api key and try again |
| 50 | if response.status_code != 201: |
| 51 | log.info("Renew ROC api key") |
| 52 | self.key = self.get_key() |
| 53 | response = requests.post(url, headers=self.headers(), json=m) |
| 54 | if response.status_code != 201: |
| 55 | log.error("Failed to set mbr, device_group:{}, mbr:{}, status_code: {}".format(device_group, mbr, |
| 56 | response.status_code)) |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 57 | # noinspection PyUnresolvedReferences |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 58 | sys.exit() |
| 59 | |
| 60 | def get_key(self): |
| 61 | url = "https://keycloak.opennetworking.org/auth/realms/master/protocol/openid-connect/token" |
| 62 | headers = CaseInsensitiveDict() |
| 63 | headers["Content-Type"] = "application/x-www-form-urlencoded" |
| 64 | data = { |
| 65 | 'grant_type': 'password', |
| 66 | 'client_id': 'aether-roc-gui', |
| 67 | 'username': self.user, |
| 68 | 'password': self.password, |
| 69 | 'scope': 'openid profile email groups' |
| 70 | } |
| 71 | response = requests.post(url, data, headers) |
| 72 | key = json.loads(response.text)['access_token'] |
| 73 | return key |
| 74 | |
| 75 | def get_enterprise(self): |
| 76 | url = URL + "enterprise" |
| 77 | response = requests.get(url, headers=self.headers()) |
| 78 | if response.status_code != 200: |
| 79 | log.error("get_enterprise() failed, status_code: {}".format(response.status_code)) |
| 80 | return None |
| 81 | return json.loads(response.text) |
| 82 | |
| 83 | def get_site(self): |
| 84 | url = URL + "site" |
| 85 | response = requests.get(url, headers=self.headers()) |
| 86 | if response.status_code != 200: |
| 87 | log.error("get_site() failed, status_code: {}".format(response.status_code)) |
| 88 | return None |
| 89 | return json.loads(response.text) |
| 90 | |
| 91 | def get_upf(self): |
| 92 | url = URL + "upf" |
| 93 | response = requests.get(url, headers=self.headers()) |
| 94 | if response.status_code != 200: |
| 95 | log.error("get_upf() failed, status_code: {}".format(response.status_code)) |
| 96 | return None |
| 97 | return json.loads(response.text) |
| 98 | |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 99 | def get_devicegroups(self): |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 100 | url = URL + "device-group" |
| 101 | response = requests.get(url, headers=self.headers()) |
| 102 | if response.status_code != 200: |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 103 | log.error("get_devicegroups() failed, status_code: {}".format(response.status_code)) |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 104 | return None |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame] | 105 | return json.loads(response.text)['device-group'] |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 106 | |
| 107 | def get_ipdomain(self): |
| 108 | url = URL + "ip-domain" |
| 109 | response = requests.get(url, headers=self.headers()) |
| 110 | if response.status_code != 200: |
| 111 | log.error("get_ipdomain() failed, status_code: {}".format(response.status_code)) |
| 112 | return None |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame] | 113 | return json.loads(response.text)['ip-domain'] |
| 114 | |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 115 | def get_devices(self): |
| 116 | imsis = {} |
| 117 | devicegroups = self.get_devicegroups() |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame] | 118 | for devicegroup in devicegroups: |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 119 | for device in devicegroup['imsis']: |
| 120 | imsis[device['imsi-id']] = device['imsi-range-from'] |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame] | 121 | return imsis |
| 122 | |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 123 | def update_devices(self, devices): |
| 124 | dgs = self.get_devicegroups() |
| 125 | for dg in dgs: |
| 126 | for d in dg['imsis']: |
| 127 | devices[str(d['imsi-range-from'])] = Device(str(d['imsi-id']), d['imsi-range-from']) |
| 128 | |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame] | 129 | def get_subnet(self): |
| 130 | subnets = {} |
| 131 | ip_domains = self.get_ipdomain() |
| 132 | for ip_domain in ip_domains: |
| 133 | subnets[ip_domain['id']] = ip_domain['subnet'] |
| 134 | return subnets |
| 135 | |
| 136 | |
| 137 | def dump(self): |
| 138 | print(pyaml.dump(self.get_enterprise())) |
| 139 | print(pyaml.dump(self.get_site())) |
| 140 | print(pyaml.dump(self.get_upf())) |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 141 | print(pyaml.dump(self.get_devicegroups())) |
Shad Ansari | 5b9d1f5 | 2022-01-29 01:42:45 +0000 | [diff] [blame] | 142 | print(pyaml.dump(self.get_ipdomain())) |
| 143 | |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 144 | |
| 145 | |
| 146 | if __name__ == '__main__': |
| 147 | # use valid keycloak user/password |
| 148 | roc = Roc("user", "password") |
| 149 | |
| 150 | print(pyaml.dump(roc.get_enterprise())) |
| 151 | print(pyaml.dump(roc.get_site())) |
| 152 | print(pyaml.dump(roc.get_upf())) |
Shad Ansari | d88692c | 2022-02-01 22:47:43 +0000 | [diff] [blame^] | 153 | print(pyaml.dump(roc.get_devicegroups())) |
Shad Ansari | 6fcfa29 | 2022-01-28 00:34:13 +0000 | [diff] [blame] | 154 | print(pyaml.dump(roc.get_ipdomain())) |
| 155 | |
| 156 | cameras = "menlo-4g-cameras" |
| 157 | |
| 158 | mbr = roc.get_mbr(cameras) |
| 159 | print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"])) |
| 160 | roc.set_mbr(cameras, 5000000) |
| 161 | mbr = roc.get_mbr(cameras) |
| 162 | print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"])) |
| 163 | roc.set_mbr(cameras, 10000000) |
| 164 | mbr = roc.get_mbr(cameras) |
| 165 | print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"])) |