blob: 398fedf282af21486c4c815b08981063eed9e2c9 [file] [log] [blame]
#!/usr/bin/env python
# Copyright 2020-present Open Networking Foundation
#
# SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
import os
import time
import datetime
import pytz
import threading
from icalevents.icalevents import events
from flask import Flask, jsonify, abort, request, Response
import prometheus_client as prom
import jsonschema
from logging.config import dictConfig
# URL of maintenance calendar
SECRET_ICAL_URL = os.environ.get("SECRET_ICAL_URL")
# Aether environment that the server is monitoring (e.g., "production")
# To schedule downtime, postfix the cluster name with the env: "ace-tucson-production"
AETHER_ENV = os.environ.get("AETHER_ENV", "production")
# Move to "no result" status if we don't hear from agent for this many seconds
NO_RESULT_THRESHOLD = 720
dictConfig({
'version': 1,
'formatters': {'default': {
'format': '%(levelname)s %(message)s',
}},
'handlers': {'wsgi': {
'class': 'logging.StreamHandler',
'stream': 'ext://flask.logging.wsgi_errors_stream',
'formatter': 'default'
}},
'root': {
'level': 'INFO',
'handlers': ['wsgi']
}
})
app = Flask(__name__)
edgeSchema = {
"type": "object",
"properties": {
"name": {"type": "string"},
"status": {
"type": "object",
"properties": {
"control_plane": {"type": "string"},
"user_plane": {"type": "string"}
},
"required": ["control_plane", "user_plane"]
},
"speedtest": {
"type": "object",
"properties": {
"ping": {
"type": "object",
"properties": {
"dns": {
"type": "object",
"properties": {
"min": {"type": "number"},
"avg": {"type": "number"},
"max": {"type": "number"},
"stddev": {"type": "number"}
},
"required": ["min", "avg", "max", "stddev"]
},
"iperf_server": {
"type": "object",
"properties": {
"min": {"type": "number"},
"avg": {"type": "number"},
"max": {"type": "number"},
"stddev": {"type": "number"}
},
"required": ["min", "avg", "max", "stddev"]
}
}
},
"iperf": {
"type": "object",
"properties": {
"cluster": {
"type": "object",
"properties": {
"downlink": {"type": "number"},
"uplink": {"type": "number"}
},
"required": ["downlink", "uplink"]
}
}
}
}
},
"signal_quality": {
"type": "object",
"properties": {
"rsrq": {"type": "number"},
"rsrp": {"type": "number"}
},
"required": ["rsrq", "rsrp"]
}
},
"required": ["name", "status"]
}
edges = [
{
'name': 'ace-example',
'status': {
'control_plane': 'connected',
'user_plane': 'connected'
},
'speedtest': {
'ping': {
'dns': {
'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0
},
'iperf_server': {
'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0
}
},
'iperf': {
'cluster': {
'downlink': 0.0,
'uplink': 0.0
}
}
},
'signal_quality': {
'rsrq': 0,
'rsrp': 0
},
'last_update': time.time()
}
]
status_codes = {
"no result": -2,
"error": -1,
"disconnected": 0,
"connecting": 1,
"connected": 2
}
room_mapping = {
"ace-menlo-pixel-production": "(Compute)-MP-1-Aether Production",
"ace-menlo-staging": "(Compute)-MP-1-Aether Staging"
}
# Legacy test status metrics, reporting a status code between -2 and 2
cp_status = prom.Gauge("aetheredge_status_control_plane", "Control plane status code", ["name"])
up_status = prom.Gauge("aetheredge_status_user_plane", "User plane status code", ["name"])
# Simplified binary test result metrics
e2e_tests_ok = prom.Gauge("aetheredge_e2e_tests_ok", "Last connect and ping test both passed", ["name"])
connect_test_ok = prom.Gauge("aetheredge_connect_test_ok", "Last connect test passed", ["name"])
ping_test_ok = prom.Gauge("aetheredge_ping_test_ok", "Last ping test passed", ["name"])
e2e_tests_down = prom.Gauge("aetheredge_e2e_tests_down", "E2E tests not reporting", ["name"])
# Speedtest dns ping metrics
ping_dns_min = prom.Gauge("aetheredge_ping_dns_test_min","Last ping test to dns minimum value",["name"])
ping_dns_avg = prom.Gauge("aetheredge_ping_dns_test_avg","Last ping test to dns average",["name"])
ping_dns_max = prom.Gauge("aetheredge_ping_dns_test_max","Last ping test to dns maximum value",["name"])
ping_dns_stddev = prom.Gauge("aetheredge_ping_dns_test_stddev","Last ping test to dns standard deviation",["name"])
# Speedtest iperf server ping metrics
ping_iperf_server_min = prom.Gauge("aetheredge_ping_iperf_server_test_min","Last ping test to iperf_server minimum value",["name"])
ping_iperf_server_avg = prom.Gauge("aetheredge_ping_iperf_server_test_avg","Last ping test to iperf_server average",["name"])
ping_iperf_server_max = prom.Gauge("aetheredge_ping_iperf_server_test_max","Last ping test to iperf_server maximum value",["name"])
ping_iperf_server_stddev = prom.Gauge("aetheredge_ping_iperf_server_test_stddev","Last ping test to iperf_server standard deviation",["name"])
# Speedtest iperf metrics
iperf_cluster_downlink = prom.Gauge("aetheredge_iperf_cluster_downlink_test","Last iperf test downlink result",["name"])
iperf_cluster_uplink = prom.Gauge("aetheredge_iperf_cluster_uplink_test","Last iperf test downlink result",["name"])
# Signal quality metrics in CESQ format not dB
# RSRQ: >=53 excellent, 43 ~ 53 good, 33 ~ 43 mid, <=33 bad, 0 no signal
# RSRP: >=20 excellent, 10 ~ 20 good, 0 ~ 10 mid, 0 no signal
signal_quality_rsrq = prom.Gauge("aetheredge_signal_quality_rsrq", "Quality of the received signal", ["name"])
signal_quality_rsrp = prom.Gauge("aetheredge_signal_quality_rsrp", "Power of the received signal", ["name"])
# Other metrics
last_update = prom.Gauge("aetheredge_last_update", "Last reported test result", ["name"])
maint_window = prom.Gauge("aetheredge_in_maintenance_window", "Currently in a maintenance window", ["name"])
def is_my_event(event, name):
for field in ["summary", "location", "description"]:
fullname = name
if name.startswith("ace-"):
fullname = "%s-%s" % (name, AETHER_ENV)
if fullname in getattr(event, field, ""):
return True
if fullname in room_mapping and room_mapping[fullname] in getattr(event, field, ""):
return True
return False
def is_naive_datetime(d):
return d.tzinfo is None or d.tzinfo.utcoffset(d) is None
def process_all_day_events(es):
for event in es:
if event.all_day:
# All day events have naive datetimes, which breaks comparisons
pacific = pytz.timezone('US/Pacific')
if is_naive_datetime(event.start):
event.start = pacific.localize(event.start)
if is_naive_datetime(event.end):
event.end = pacific.localize(event.end)
def in_maintenance_window(events, name, now):
for event in events:
if event.start < now and event.end > now:
if is_my_event(event, name):
return True
return False
def pull_maintenance_events():
while(True):
now = datetime.datetime.now(pytz.utc)
try:
es = events(SECRET_ICAL_URL, start = now)
process_all_day_events(es)
except Exception as e:
app.logger.error(e)
else:
for edge in edges:
if 'maintenance' not in edge:
edge['maintenance'] = {}
edge['maintenance']['in_window'] = in_maintenance_window(es, edge['name'], now)
edge['maintenance']['last_update'] = time.time()
time.sleep(60)
def time_out_stale_results():
for edge in edges:
time_elapsed = time.time() - edge["last_update"]
if time_elapsed > NO_RESULT_THRESHOLD:
edge['status']['control_plane'] = "no result"
edge['status']['user_plane'] = "no result"
edge['speedtest']['ping']['dns'] = {'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0}
edge['speedtest']['ping']['iperf_server'] = {'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0}
edge['speedtest']['iperf'] = {'cluster': {
'downlink': 0.0,
'uplink': 0.0
}
}
edge.pop('signal_quality', None)
def remove_edge_from_metrics(name):
try:
cp_status.remove(name)
up_status.remove(name)
last_update.remove(name)
e2e_tests_ok.remove(name)
connect_test_ok.remove(name)
ping_test_ok.remove(name)
e2e_tests_down.remove(name)
except:
pass
try:
ping_dns_min.remove(name)
ping_dns_avg.remove(name)
ping_dns_max.remove(name)
ping_dns_stddev.remove(name)
except:
pass
try:
ping_iperf_server_min.remove(name)
ping_iperf_server_avg.remove(name)
ping_iperf_server_max.remove(name)
ping_iperf_server_stddev.remove(name)
except:
pass
try:
iperf_cluster_downlink.remove(name)
iperf_cluster_uplink.remove(name)
except:
pass
try:
signal_quality_rsrq.remove(name)
signal_quality_rsrp.remove(name)
except:
pass
try:
maint_window.remove(name)
except:
pass
@app.route('/edges/metrics', methods=['GET'])
def get_prometheus_metrics():
res = []
time_out_stale_results()
for edge in edges:
if edge['name'] == "ace-example":
continue
connect_status = edge['status']['control_plane']
ping_status = edge['status']['user_plane']
# Add ping dns latency results if available
try:
if edge['speedtest']['ping']['dns']['avg']:
ping_dns_min.labels(edge['name']).set(edge['speedtest']['ping']['dns']['min'])
ping_dns_avg.labels(edge['name']).set(edge['speedtest']['ping']['dns']['avg'])
ping_dns_max.labels(edge['name']).set(edge['speedtest']['ping']['dns']['max'])
ping_dns_stddev.labels(edge['name']).set(edge['speedtest']['ping']['dns']['stddev'])
except KeyError:
pass
# Add ping iperf_server latency results if available
try:
if edge['speedtest']['ping']['iperf_server']['avg']:
ping_iperf_server_min.labels(edge['name']).set(edge['speedtest']['ping']['iperf_server']['min'])
ping_iperf_server_avg.labels(edge['name']).set(edge['speedtest']['ping']['iperf_server']['avg'])
ping_iperf_server_max.labels(edge['name']).set(edge['speedtest']['ping']['iperf_server']['max'])
ping_iperf_server_stddev.labels(edge['name']).set(edge['speedtest']['ping']['iperf_server']['stddev'])
except KeyError:
pass
# Add iperf bandwidth results if available
try:
if edge['speedtest']['iperf']['cluster']['downlink']:
iperf_cluster_downlink.labels(edge['name']).set(edge['speedtest']['iperf']['cluster']['downlink'])
iperf_cluster_uplink.labels(edge['name']).set(edge['speedtest']['iperf']['cluster']['uplink'])
except KeyError:
pass
cp_status.labels(edge['name']).set(status_codes[connect_status])
up_status.labels(edge['name']).set(status_codes[ping_status])
last_update.labels(edge['name']).set(edge['last_update'])
if 'maintenance' in edge:
maint_window.labels(edge['name']).set(int(edge['maintenance']['in_window']))
connect_test_ok.labels(edge['name']).set(0)
ping_test_ok.labels(edge['name']).set(0)
e2e_tests_ok.labels(edge['name']).set(0)
e2e_tests_down.labels(edge['name']).set(0)
if connect_status in ["error", "no result"] or ping_status in ["error", "no result"]:
e2e_tests_down.labels(edge['name']).set(1)
else:
if connect_status == "connected":
connect_test_ok.labels(edge['name']).set(1)
if ping_status == "connected":
ping_test_ok.labels(edge['name']).set(1)
if connect_status == "connected" and ping_status == "connected":
e2e_tests_ok.labels(edge['name']).set(1)
if 'signal_quality' in edge.keys():
signal_quality_rsrq.labels(edge['name']).set(edge['signal_quality']['rsrq'])
signal_quality_rsrp.labels(edge['name']).set(edge['signal_quality']['rsrp'])
res.append(prom.generate_latest(cp_status))
res.append(prom.generate_latest(up_status))
res.append(prom.generate_latest(ping_dns_min))
res.append(prom.generate_latest(ping_dns_avg))
res.append(prom.generate_latest(ping_dns_max))
res.append(prom.generate_latest(ping_dns_stddev))
res.append(prom.generate_latest(ping_iperf_server_min))
res.append(prom.generate_latest(ping_iperf_server_avg))
res.append(prom.generate_latest(ping_iperf_server_max))
res.append(prom.generate_latest(ping_iperf_server_stddev))
res.append(prom.generate_latest(iperf_cluster_downlink))
res.append(prom.generate_latest(iperf_cluster_uplink))
res.append(prom.generate_latest(last_update))
res.append(prom.generate_latest(maint_window))
res.append(prom.generate_latest(connect_test_ok))
res.append(prom.generate_latest(ping_test_ok))
res.append(prom.generate_latest(e2e_tests_ok))
res.append(prom.generate_latest(e2e_tests_down))
res.append(prom.generate_latest(signal_quality_rsrq))
res.append(prom.generate_latest(signal_quality_rsrp))
return Response(res, mimetype="text/plain")
@app.route('/edges/healthz', methods=['GET'])
def get_health():
return {'message': 'healthy'}
@app.route('/edges', methods=['GET'])
def get_edges():
time_out_stale_results()
return jsonify({'edges': edges})
@app.route('/edges/<string:name>', methods=['GET'])
def get_edge(name):
time_out_stale_results()
edge = [edge for edge in edges if edge['name'] == name]
if len(edge) == 0:
abort(404)
return jsonify({'edge': edge[0]})
@app.route('/edges', methods=['POST'])
@app.route('/testresults', methods=['POST'])
def create_or_update_edge():
try:
jsonschema.validate(instance=request.json, schema=edgeSchema)
except jsonschema.exceptions.ValidationError as err:
app.logger.warn(err)
abort(400)
req_edge = {
'name': request.json['name'],
'status': {
'control_plane': request.json['status']['control_plane'],
'user_plane': request.json['status']['user_plane']
},
'speedtest': {
'ping': {
'dns': {
'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0
},
'iperf_server': {
'min': 0.0,
'avg': 0.0,
'max': 0.0,
'stddev': 0.0
}
},
'iperf': {
'cluster': {
'downlink': 0.0,
'uplink': 0.0
}
}
},
'last_update': time.time()
}
if 'speedtest' in request.json:
if 'ping' in request.json['speedtest']:
req_edge['speedtest']['ping'] = request.json['speedtest']['ping']
if 'iperf' in request.json['speedtest']:
req_edge['speedtest']['iperf'] = request.json['speedtest']['iperf']
if 'signal_quality' in request.json:
req_edge['signal_quality'] = request.json['signal_quality']
edge = [edge for edge in edges if edge['name'] == req_edge['name']]
if len(edge) == 0:
app.logger.info("new edge request " + req_edge['name'])
edges.append(req_edge)
else:
edge[0]['status']['control_plane'] = req_edge['status']['control_plane']
edge[0]['status']['user_plane'] = req_edge['status']['user_plane']
edge[0]['speedtest']['ping'] = req_edge['speedtest']['ping']
edge[0]['speedtest']['iperf'] = req_edge['speedtest']['iperf']
if 'signal_quality' in req_edge.keys():
edge[0]['signal_quality'] = req_edge['signal_quality']
edge[0]['last_update'] = req_edge['last_update']
return jsonify({'edge': req_edge}), 201
@app.route('/edges/<string:name>', methods=['DELETE'])
@app.route('/testresults/<string:name>', methods=['DELETE'])
def delete_edge(name):
app.logger.info("delete edge request " + name)
result = False
for i in range(len(edges)):
if edges[i]['name'] == name:
del edges[i]
remove_edge_from_metrics(name)
result = True
break
if not result:
abort(404)
return jsonify({'result': True})
if __name__ == '__main__':
if SECRET_ICAL_URL and AETHER_ENV:
app.logger.info(" * Starting maintenance calendar polling thread (Aether env: %s)" % AETHER_ENV)
t = threading.Thread(target=pull_maintenance_events)
t.start()
app.run(debug=True, host='0.0.0.0', port=80)