blob: a9477f1e2915af4576cee1e5055da0e68411beb9 [file] [log] [blame]
"""
SPDX-FileCopyrightText: 2020-present Open Networking Foundation <info@opennetworking.org>
SPDX-License-Identifier: LicenseRef-ONF-Member-1.01
"""
import requests
import json
from requests.structures import CaseInsensitiveDict
URL = "https://roc.menlo.aetherproject.org/aether-roc-api/aether/v4.0.0/connectivity-service-v4/"
#URL = "https://roc.staging.aether.onlab.us/aether-roc-api/aether/v4.0.0/connectivity-service-v4/"
def headers(key):
h = CaseInsensitiveDict()
h["Content-Type"] = "application/json"
h["Authorization"] = "Bearer " + key
return h
def get_mbr(key, device_group):
url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group)
response = requests.get(url, headers=headers(key))
if response.status_code != 200:
print("Failed to get mbr, status_code: {}".format(response.status_code))
return None
mbr = json.loads(response.text)
return mbr
def set_mbr(key, device_group, mbr):
m = {'uplink' : mbr}
url = URL + "vcs/vcs/vcs-{}/slice/mbr".format(device_group)
response = requests.post(url, headers=headers(key), json=m)
assert response.status_code == 201, "Failed to set mbr"
if response.status_code != 201:
print("Failed to set mbr, device_group:{}, mbr:{}, status_code: {}".format(device_group, mbr, response.status_code))
if __name__ == '__main__':
key = ""
cameras = "menlo-4g-cameras"
#cameras = "cameras-4g"
mbr = get_mbr(key, cameras)
print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))
set_mbr(key, cameras, 5000000)
mbr = get_mbr(key, cameras)
print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))
set_mbr(key, cameras, 10000000)
mbr = get_mbr(key, cameras)
print("uplink mbr:{}, downlink mbr: {}".format(mbr["uplink"], mbr["downlink"]))