| #!/usr/bin/env python |
| |
| # Copyright 2020-present Open Networking Foundation |
| # |
| # SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0 |
| |
| import os |
| import time |
| import datetime |
| import pytz |
| import threading |
| from icalevents.icalevents import events |
| from flask import Flask, jsonify, abort, request, Response |
| import prometheus_client as prom |
| |
| # URL of maintenance calendar |
| SECRET_ICAL_URL = os.environ.get("SECRET_ICAL_URL") |
| |
| # Aether environment that the server is monitoring (e.g., "production") |
| # To schedule downtime, postfix the cluster name with the env: "ace-tucson-production" |
| AETHER_ENV = os.environ.get("AETHER_ENV", "production") |
| |
| # Move to "no result" status if we don't hear from agent for this many seconds |
| NO_RESULT_THRESHOLD = 720 |
| |
| app = Flask(__name__) |
| edges = [ |
| { |
| 'name': 'ace-example', |
| 'status': { |
| 'control_plane': 'connected', |
| 'user_plane': 'connected' |
| }, |
| 'speedtest': { |
| 'ping': { |
| 'dns': { |
| 'min': 0.0, |
| 'avg': 0.0, |
| 'max': 0.0, |
| 'stddev': 0.0 |
| } |
| }, |
| 'iperf': { |
| 'cluster': { |
| 'downlink': 0.0, |
| 'uplink': 0.0 |
| } |
| } |
| }, |
| 'signal_quality': { |
| 'rsrq': 0, |
| 'rsrp': 0 |
| }, |
| 'last_update': time.time() |
| } |
| ] |
| |
| status_codes = { |
| "no result": -2, |
| "error": -1, |
| "disconnected": 0, |
| "connecting": 1, |
| "connected": 2 |
| } |
| |
| room_mapping = { |
| "ace-menlo-pixel-production": "(Compute)-MP-1-Aether Production", |
| "ace-menlo-staging": "(Compute)-MP-1-Aether Staging" |
| } |
| |
| # Legacy test status metrics, reporting a status code between -2 and 2 |
| cp_status = prom.Gauge("aetheredge_status_control_plane", "Control plane status code", ["name"]) |
| up_status = prom.Gauge("aetheredge_status_user_plane", "User plane status code", ["name"]) |
| |
| # Simplified binary test result metrics |
| e2e_tests_ok = prom.Gauge("aetheredge_e2e_tests_ok", "Last connect and ping test both passed", ["name"]) |
| connect_test_ok = prom.Gauge("aetheredge_connect_test_ok", "Last connect test passed", ["name"]) |
| ping_test_ok = prom.Gauge("aetheredge_ping_test_ok", "Last ping test passed", ["name"]) |
| e2e_tests_down = prom.Gauge("aetheredge_e2e_tests_down", "E2E tests not reporting", ["name"]) |
| |
| # Speedtest ping metrics |
| ping_dns_min = prom.Gauge("aetheredge_ping_dns_test_min","Last ping test minimum value",["name"]) |
| ping_dns_avg = prom.Gauge("aetheredge_ping_dns_test_avg","Last ping test average",["name"]) |
| ping_dns_max = prom.Gauge("aetheredge_ping_dns_test_max","Last ping test maximum value",["name"]) |
| ping_dns_stddev = prom.Gauge("aetheredge_ping_dns_test_stddev","Last ping test standard deviation",["name"]) |
| |
| # Speedtest iperf metrics |
| iperf_cluster_downlink = prom.Gauge("aetheredge_iperf_cluster_downlink_test","Last iperf test downlink result",["name"]) |
| iperf_cluster_uplink = prom.Gauge("aetheredge_iperf_cluster_uplink_test","Last iperf test downlink result",["name"]) |
| |
| # Signal quality metrics in CESQ format not dB |
| # RSRQ: >=53 excellent, 43 ~ 53 good, 33 ~ 43 mid, <=33 bad, 0 no signal |
| # RSRP: >=20 excellent, 10 ~ 20 good, 0 ~ 10 mid, 0 no signal |
| signal_quality_rsrq = prom.Gauge("aetheredge_signal_quality_rsrq", "Quality of the received signal", ["name"]) |
| signal_quality_rsrp = prom.Gauge("aetheredge_signal_quality_rsrp", "Power of the received signal", ["name"]) |
| |
| # Other metrics |
| last_update = prom.Gauge("aetheredge_last_update", "Last reported test result", ["name"]) |
| maint_window = prom.Gauge("aetheredge_in_maintenance_window", "Currently in a maintenance window", ["name"]) |
| |
| def is_my_event(event, name): |
| for field in ["summary", "location", "description"]: |
| fullname = name |
| if name.startswith("ace-"): |
| fullname = "%s-%s" % (name, AETHER_ENV) |
| if fullname in getattr(event, field, ""): |
| return True |
| if fullname in room_mapping and room_mapping[fullname] in getattr(event, field, ""): |
| return True |
| return False |
| |
| def is_naive_datetime(d): |
| return d.tzinfo is None or d.tzinfo.utcoffset(d) is None |
| |
| def process_all_day_events(es): |
| for event in es: |
| if event.all_day: |
| # All day events have naive datetimes, which breaks comparisons |
| pacific = pytz.timezone('US/Pacific') |
| if is_naive_datetime(event.start): |
| event.start = pacific.localize(event.start) |
| if is_naive_datetime(event.end): |
| event.end = pacific.localize(event.end) |
| |
| def in_maintenance_window(events, name, now): |
| for event in events: |
| if event.start < now and event.end > now: |
| if is_my_event(event, name): |
| return True |
| return False |
| |
| def pull_maintenance_events(): |
| while(True): |
| now = datetime.datetime.now(pytz.utc) |
| try: |
| es = events(SECRET_ICAL_URL, start = now) |
| process_all_day_events(es) |
| except Exception as e: |
| print(e) |
| else: |
| for edge in edges: |
| if 'maintenance' not in edge: |
| edge['maintenance'] = {} |
| edge['maintenance']['in_window'] = in_maintenance_window(es, edge['name'], now) |
| edge['maintenance']['last_update'] = time.time() |
| time.sleep(60) |
| |
| def time_out_stale_results(): |
| for edge in edges: |
| time_elapsed = time.time() - edge["last_update"] |
| if time_elapsed > NO_RESULT_THRESHOLD: |
| edge['status']['control_plane'] = "no result" |
| edge['status']['user_plane'] = "no result" |
| edge['speedtest']['ping']['dns'] = {'min': 0.0, |
| 'avg': 0.0, |
| 'max': 0.0, |
| 'stddev': 0.0} |
| edge['speedtest']['iperf'] = {'cluster': { |
| 'downlink': 0.0, |
| 'uplink': 0.0 |
| } |
| } |
| edge.pop('signal_quality', None) |
| |
| def remove_edge_from_metrics(name): |
| try: |
| cp_status.remove(name) |
| up_status.remove(name) |
| last_update.remove(name) |
| e2e_tests_ok.remove(name) |
| connect_test_ok.remove(name) |
| ping_test_ok.remove(name) |
| e2e_tests_down.remove(name) |
| except: |
| pass |
| |
| try: |
| ping_dns_min.remove(name) |
| ping_dns_avg.remove(name) |
| ping_dns_max.remove(name) |
| ping_dns_stddev.remove(name) |
| except: |
| pass |
| |
| try: |
| iperf_cluster_downlink.remove(name) |
| iperf_cluster_uplink.remove(name) |
| except: |
| pass |
| |
| try: |
| signal_quality_rsrq.remove(name) |
| signal_quality_rsrp.remove(name) |
| except: |
| pass |
| |
| try: |
| maint_window.remove(name) |
| except: |
| pass |
| |
| @app.route('/edges/metrics', methods=['GET']) |
| def get_prometheus_metrics(): |
| res = [] |
| time_out_stale_results() |
| for edge in edges: |
| if edge['name'] == "ace-example": |
| continue |
| |
| connect_status = edge['status']['control_plane'] |
| ping_status = edge['status']['user_plane'] |
| |
| speedtest_ping_results_exist = True |
| if edge['speedtest']['ping']['dns']['avg']: |
| ping_dns_min_result = edge['speedtest']['ping']['dns']['min'] |
| ping_dns_avg_result = edge['speedtest']['ping']['dns']['avg'] |
| ping_dns_max_result = edge['speedtest']['ping']['dns']['max'] |
| ping_dns_stddev_result = edge['speedtest']['ping']['dns']['stddev'] |
| else: |
| speedtest_ping_results_exist = False |
| |
| speedtest_iperf_results_exist = True |
| if edge['speedtest']['iperf']['cluster']['downlink']: |
| iperf_cluster_downlink_result = edge['speedtest']['iperf']['cluster']['downlink'] |
| iperf_cluster_uplink_result = edge['speedtest']['iperf']['cluster']['uplink'] |
| else: |
| speedtest_iperf_results_exist = False |
| |
| cp_status.labels(edge['name']).set(status_codes[connect_status]) |
| up_status.labels(edge['name']).set(status_codes[ping_status]) |
| |
| last_update.labels(edge['name']).set(edge['last_update']) |
| if 'maintenance' in edge: |
| maint_window.labels(edge['name']).set(int(edge['maintenance']['in_window'])) |
| |
| connect_test_ok.labels(edge['name']).set(0) |
| ping_test_ok.labels(edge['name']).set(0) |
| e2e_tests_ok.labels(edge['name']).set(0) |
| e2e_tests_down.labels(edge['name']).set(0) |
| |
| if connect_status in ["error", "no result"] or ping_status in ["error", "no result"]: |
| e2e_tests_down.labels(edge['name']).set(1) |
| else: |
| if connect_status == "connected": |
| connect_test_ok.labels(edge['name']).set(1) |
| if ping_status == "connected": |
| ping_test_ok.labels(edge['name']).set(1) |
| if connect_status == "connected" and ping_status == "connected": |
| e2e_tests_ok.labels(edge['name']).set(1) |
| |
| if speedtest_ping_results_exist: |
| ping_dns_min.labels(edge['name']).set(ping_dns_min_result) |
| ping_dns_avg.labels(edge['name']).set(ping_dns_avg_result) |
| ping_dns_max.labels(edge['name']).set(ping_dns_max_result) |
| ping_dns_stddev.labels(edge['name']).set(ping_dns_stddev_result) |
| |
| if speedtest_iperf_results_exist: |
| iperf_cluster_downlink.labels(edge['name']).set(iperf_cluster_downlink_result) |
| iperf_cluster_uplink.labels(edge['name']).set(iperf_cluster_uplink_result) |
| |
| if 'signal_quality' in edge.keys(): |
| signal_quality_rsrq.labels(edge['name']).set(edge['signal_quality']['rsrq']) |
| signal_quality_rsrp.labels(edge['name']).set(edge['signal_quality']['rsrp']) |
| |
| res.append(prom.generate_latest(cp_status)) |
| res.append(prom.generate_latest(up_status)) |
| res.append(prom.generate_latest(ping_dns_min)) |
| res.append(prom.generate_latest(ping_dns_avg)) |
| res.append(prom.generate_latest(ping_dns_max)) |
| res.append(prom.generate_latest(ping_dns_stddev)) |
| res.append(prom.generate_latest(iperf_cluster_downlink)) |
| res.append(prom.generate_latest(iperf_cluster_uplink)) |
| res.append(prom.generate_latest(last_update)) |
| res.append(prom.generate_latest(maint_window)) |
| res.append(prom.generate_latest(connect_test_ok)) |
| res.append(prom.generate_latest(ping_test_ok)) |
| res.append(prom.generate_latest(e2e_tests_ok)) |
| res.append(prom.generate_latest(e2e_tests_down)) |
| res.append(prom.generate_latest(signal_quality_rsrq)) |
| res.append(prom.generate_latest(signal_quality_rsrp)) |
| |
| return Response(res, mimetype="text/plain") |
| |
| |
| @app.route('/edges/healthz', methods=['GET']) |
| def get_health(): |
| return {'message': 'healthy'} |
| |
| |
| @app.route('/edges', methods=['GET']) |
| def get_edges(): |
| time_out_stale_results() |
| return jsonify({'edges': edges}) |
| |
| |
| @app.route('/edges/<string:name>', methods=['GET']) |
| def get_edge(name): |
| time_out_stale_results() |
| edge = [edge for edge in edges if edge['name'] == name] |
| if len(edge) == 0: |
| abort(404) |
| return jsonify({'edge': edge[0]}) |
| |
| |
| @app.route('/edges', methods=['POST']) |
| @app.route('/testresults', methods=['POST']) |
| def create_or_update_edge(): |
| if not request.json: |
| abort(400) |
| if 'name' not in request.json: |
| abort(400) |
| if 'status' not in request.json: |
| abort(400) |
| |
| req_edge = { |
| 'name': request.json['name'], |
| 'status': { |
| 'control_plane': request.json['status']['control_plane'], |
| 'user_plane': request.json['status']['user_plane'] |
| }, |
| 'speedtest': { |
| 'ping': { |
| 'dns': { |
| 'min': 0.0, |
| 'avg': 0.0, |
| 'max': 0.0, |
| 'stddev': 0.0 |
| } |
| }, |
| 'iperf': { |
| 'cluster': { |
| 'downlink': 0.0, |
| 'uplink': 0.0 |
| } |
| } |
| }, |
| 'last_update': time.time() |
| } |
| |
| if 'speedtest' in request.json: |
| if 'ping' in request.json['speedtest']: |
| req_edge['speedtest']['ping'] = request.json['speedtest']['ping'] |
| if 'iperf' in request.json['speedtest']: |
| req_edge['speedtest']['iperf'] = request.json['speedtest']['iperf'] |
| |
| if 'signal_quality' in request.json: |
| req_edge['signal_quality'] = request.json['signal_quality'] |
| |
| edge = [edge for edge in edges if edge['name'] == req_edge['name']] |
| if len(edge) == 0: |
| print("new edge request " + req_edge['name']) |
| edges.append(req_edge) |
| else: |
| edge[0]['status']['control_plane'] = req_edge['status']['control_plane'] |
| edge[0]['status']['user_plane'] = req_edge['status']['user_plane'] |
| edge[0]['speedtest']['ping'] = req_edge['speedtest']['ping'] |
| edge[0]['speedtest']['iperf'] = req_edge['speedtest']['iperf'] |
| if 'signal_quality' in req_edge.keys(): |
| edge[0]['signal_quality'] = req_edge['signal_quality'] |
| edge[0]['last_update'] = req_edge['last_update'] |
| |
| return jsonify({'edge': req_edge}), 201 |
| |
| |
| @app.route('/edges/<string:name>', methods=['DELETE']) |
| @app.route('/testresults/<string:name>', methods=['DELETE']) |
| def delete_edge(name): |
| print("delete edge request " + name) |
| result = False |
| for i in range(len(edges)): |
| if edges[i]['name'] == name: |
| del edges[i] |
| remove_edge_from_metrics(name) |
| result = True |
| break |
| if not result: |
| abort(404) |
| return jsonify({'result': True}) |
| |
| |
| if __name__ == '__main__': |
| if SECRET_ICAL_URL and AETHER_ENV: |
| print(" * Starting maintenance calendar polling thread (Aether env: %s)" % AETHER_ENV) |
| t = threading.Thread(target=pull_maintenance_events) |
| t.start() |
| app.run(debug=True, host='0.0.0.0', port=80) |