| # SPDX-FileCopyrightText: 2021 Open Networking Foundation <info@opennetworking.org> |
| # |
| # SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0 |
| |
| import json |
| import urllib.request |
| import time |
| |
| def main(): |
| config = json.loads(open("aml_staging_config.json", 'r').read()) |
| |
| # set active edges |
| activeEdges = config['activeEdges'] |
| |
| # set refresh interval (seconds) |
| refreshInterval = config['refreshInterval'] |
| |
| # set url to fetch edge status from |
| url = config['url'] |
| |
| # for debugging |
| debug = config['debug'] |
| debugCase = True |
| |
| while True: |
| # fetch edge status |
| response = urllib.request.urlopen(url) |
| data = response.read() |
| edges = json.loads(data.decode('utf-8')) |
| |
| allEdgesUp = True |
| |
| # loop through all edges |
| for e in edges['edges']: |
| # only check active edges specified in config file |
| if e['name'] not in activeEdges: |
| continue |
| |
| if isEdgeUp(e): |
| print(e['name'] + '\nok\n') |
| else: |
| print(e['name'] + '\nDOWN\n') |
| allEdgesUp = False |
| |
| if allEdgesUp: |
| # Run Green Light Scene |
| # os.system("uiopen shortcuts://run-shortcut?name=edgeOK") |
| print("** All edges are up. **") |
| else: |
| # Run Red Light Scene |
| # os.system("uiopen shortcuts://run-shortcut?name=edgeDOWN") |
| print("** AT LEAST 1 EDGE IS DOWN! **") |
| print("Sleeping for " + str(refreshInterval) + " seconds...") |
| |
| time.sleep(refreshInterval) |
| |
| def isEdgeUp(edge): |
| return edge['status']['control_plane'] == "connected" and \ |
| edge['status']['user_plane'] == "connected" |
| |
| if __name__ == "__main__": |
| main() |