AETHER-1751 Add CI/CD

Change-Id: I29daba05c7f261e159384f94381bb8a397bd0400
diff --git a/edge-monitoring-server/edge_monitoring_server.py b/edge-monitoring-server/edge_monitoring_server.py
new file mode 100755
index 0000000..6f74a53
--- /dev/null
+++ b/edge-monitoring-server/edge_monitoring_server.py
@@ -0,0 +1,461 @@
+#!/usr/bin/env python
+
+# Copyright 2020-present Open Networking Foundation
+#
+# SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0
+
+import os
+import time
+import datetime
+import pytz
+import threading
+from icalevents.icalevents import events
+from flask import Flask, jsonify, abort, request, Response
+import prometheus_client as prom
+import jsonschema
+from logging.config import dictConfig
+
+# URL of maintenance calendar
+SECRET_ICAL_URL = os.environ.get("SECRET_ICAL_URL")
+
+# Aether environment that the server is monitoring (e.g., "production")
+# To schedule downtime, postfix the cluster name with the env: "ace-tucson-production"
+AETHER_ENV = os.environ.get("AETHER_ENV", "production")
+
+# Move to "no result" status if we don't hear from agent for this many seconds
+NO_RESULT_THRESHOLD = 720
+
+dictConfig({
+    'version': 1,
+    'formatters': {'default': {
+        'format': '[%(asctime)s] %(levelname)s  %(message)s',
+    }},
+    'handlers': {'wsgi': {
+        'class': 'logging.StreamHandler',
+        'stream': 'ext://flask.logging.wsgi_errors_stream',
+        'formatter': 'default'
+    }},
+    'root': {
+        'level': 'INFO',
+        'handlers': ['wsgi']
+    }
+})
+
+app = Flask(__name__)
+
+edgeSchema = {
+    "type": "object",
+    "properties": {
+        "name": {"type": "string"},
+        "status": {
+            "type": "object",
+            "properties": {
+                "control_plane": {"type": "string"},
+                "user_plane": {"type": "string"}
+            },
+            "required": ["control_plane", "user_plane"]
+        },
+        "speedtest": {
+            "type": "object",
+            "properties": {
+                "ping": {
+                    "type": "object",
+                    "properties": {
+                        "dns": {
+                            "type": "object",
+                            "properties": {
+                                "min": {"type": "number"},
+                                "avg": {"type": "number"},
+                                "max": {"type": "number"},
+                                "stddev": {"type": "number"}
+                            },
+                            "required": ["min", "avg", "max", "stddev"]
+                        }
+                    }
+                },
+                "iperf": {
+                    "type": "object",
+                    "properties": {
+                        "cluster": {
+                            "type": "object",
+                            "properties": {
+                                "downlink": {"type": "number"},
+                                "uplink": {"type": "number"}
+                            },
+                            "required": ["downlink", "uplink"]
+                        }
+                    }
+                }
+            }
+        },
+        "signal_quality": {
+            "type": "object",
+            "properties": {
+                "rsrq": {"type": "number"},
+                "rsrp": {"type": "number"}
+            },
+            "required": ["rsrq", "rsrp"]
+        }
+    },
+    "required": ["name", "status"]
+}
+
+edges = [
+    {
+        'name': 'ace-example',
+        'status': {
+            'control_plane': 'connected',
+            'user_plane': 'connected'
+        },
+        'speedtest': {
+            'ping': {
+                'dns': {
+                    'min': 0.0,
+                    'avg': 0.0,
+                    'max': 0.0,
+                    'stddev': 0.0
+                }
+            },
+            'iperf': {
+                'cluster': {
+                    'downlink': 0.0,
+                    'uplink': 0.0
+                }
+            }
+        },
+        'signal_quality': {
+            'rsrq': 0,
+            'rsrp': 0
+        },
+        'last_update': time.time()
+    }
+]
+
+status_codes = {
+    "no result": -2,
+    "error": -1,
+    "disconnected": 0,
+    "connecting": 1,
+    "connected": 2
+}
+
+room_mapping = {
+    "ace-menlo-pixel-production": "(Compute)-MP-1-Aether Production",
+    "ace-menlo-staging": "(Compute)-MP-1-Aether Staging"
+}
+
+# Legacy test status metrics, reporting a status code between -2 and 2
+cp_status = prom.Gauge("aetheredge_status_control_plane", "Control plane status code", ["name"])
+up_status = prom.Gauge("aetheredge_status_user_plane", "User plane status code", ["name"])
+
+# Simplified binary test result metrics
+e2e_tests_ok = prom.Gauge("aetheredge_e2e_tests_ok", "Last connect and ping test both passed", ["name"])
+connect_test_ok = prom.Gauge("aetheredge_connect_test_ok", "Last connect test passed", ["name"])
+ping_test_ok = prom.Gauge("aetheredge_ping_test_ok", "Last ping test passed", ["name"])
+e2e_tests_down = prom.Gauge("aetheredge_e2e_tests_down", "E2E tests not reporting", ["name"])
+
+# Speedtest ping metrics
+ping_dns_min = prom.Gauge("aetheredge_ping_dns_test_min","Last ping test minimum value",["name"])
+ping_dns_avg = prom.Gauge("aetheredge_ping_dns_test_avg","Last ping test average",["name"])
+ping_dns_max = prom.Gauge("aetheredge_ping_dns_test_max","Last ping test maximum value",["name"])
+ping_dns_stddev = prom.Gauge("aetheredge_ping_dns_test_stddev","Last ping test standard deviation",["name"])
+
+# Speedtest iperf metrics
+iperf_cluster_downlink = prom.Gauge("aetheredge_iperf_cluster_downlink_test","Last iperf test downlink result",["name"])
+iperf_cluster_uplink = prom.Gauge("aetheredge_iperf_cluster_uplink_test","Last iperf test downlink result",["name"])
+
+# Signal quality metrics in CESQ format not dB
+# RSRQ: >=53 excellent, 43 ~ 53 good, 33 ~ 43 mid, <=33 bad, 0 no signal
+# RSRP: >=20 excellent, 10 ~ 20 good, 0 ~ 10 mid, 0 no signal
+signal_quality_rsrq = prom.Gauge("aetheredge_signal_quality_rsrq", "Quality of the received signal", ["name"])
+signal_quality_rsrp = prom.Gauge("aetheredge_signal_quality_rsrp", "Power of the received signal", ["name"])
+
+# Other metrics
+last_update = prom.Gauge("aetheredge_last_update", "Last reported test result", ["name"])
+maint_window = prom.Gauge("aetheredge_in_maintenance_window", "Currently in a maintenance window", ["name"])
+
+def is_my_event(event, name):
+    for field in ["summary", "location", "description"]:
+        fullname = name
+        if name.startswith("ace-"):
+            fullname = "%s-%s" % (name, AETHER_ENV)
+        if fullname in getattr(event, field, ""):
+            return True
+        if fullname in room_mapping and room_mapping[fullname] in getattr(event, field, ""):
+            return True
+    return False
+
+def is_naive_datetime(d):
+    return d.tzinfo is None or d.tzinfo.utcoffset(d) is None
+
+def process_all_day_events(es):
+    for event in es:
+        if event.all_day:
+            # All day events have naive datetimes, which breaks comparisons
+            pacific = pytz.timezone('US/Pacific')
+            if is_naive_datetime(event.start):
+                event.start = pacific.localize(event.start)
+            if is_naive_datetime(event.end):
+                event.end = pacific.localize(event.end)
+
+def in_maintenance_window(events, name, now):
+    for event in events:
+        if event.start < now and event.end > now:
+            if is_my_event(event, name):
+                return True
+    return False
+
+def pull_maintenance_events():
+    while(True):
+        now = datetime.datetime.now(pytz.utc)
+        try:
+            es  = events(SECRET_ICAL_URL, start = now)
+            process_all_day_events(es)
+        except Exception as e:
+            app.logger.error(e)
+        else:
+            for edge in edges:
+                if 'maintenance' not in edge:
+                    edge['maintenance'] = {}
+                edge['maintenance']['in_window'] = in_maintenance_window(es, edge['name'], now)
+                edge['maintenance']['last_update'] = time.time()
+        time.sleep(60)
+
+def time_out_stale_results():
+    for edge in edges:
+        time_elapsed = time.time() - edge["last_update"]
+        if time_elapsed > NO_RESULT_THRESHOLD:
+            edge['status']['control_plane'] = "no result"
+            edge['status']['user_plane'] = "no result"
+            edge['speedtest']['ping']['dns'] = {'min': 0.0,
+                                                'avg': 0.0,
+                                                'max': 0.0,
+                                                'stddev': 0.0}
+            edge['speedtest']['iperf'] = {'cluster': {
+                                            'downlink': 0.0,
+                                            'uplink': 0.0
+                                            }
+                                        }
+            edge.pop('signal_quality', None)
+
+def remove_edge_from_metrics(name):
+    try:
+        cp_status.remove(name)
+        up_status.remove(name)
+        last_update.remove(name)
+        e2e_tests_ok.remove(name)
+        connect_test_ok.remove(name)
+        ping_test_ok.remove(name)
+        e2e_tests_down.remove(name)
+    except:
+        pass
+
+    try:
+        ping_dns_min.remove(name)
+        ping_dns_avg.remove(name)
+        ping_dns_max.remove(name)
+        ping_dns_stddev.remove(name)
+    except:
+        pass
+
+    try:
+        iperf_cluster_downlink.remove(name)
+        iperf_cluster_uplink.remove(name)
+    except:
+        pass
+
+    try:
+        signal_quality_rsrq.remove(name)
+        signal_quality_rsrp.remove(name)
+    except:
+        pass
+
+    try:
+        maint_window.remove(name)
+    except:
+        pass
+
+@app.route('/edges/metrics', methods=['GET'])
+def get_prometheus_metrics():
+    res = []
+    time_out_stale_results()
+    for edge in edges:
+        if edge['name'] == "ace-example":
+            continue
+
+        connect_status = edge['status']['control_plane']
+        ping_status = edge['status']['user_plane']
+
+        speedtest_ping_results_exist = True
+        if edge['speedtest']['ping']['dns']['avg']:
+            ping_dns_min_result = edge['speedtest']['ping']['dns']['min']
+            ping_dns_avg_result = edge['speedtest']['ping']['dns']['avg']
+            ping_dns_max_result = edge['speedtest']['ping']['dns']['max']
+            ping_dns_stddev_result = edge['speedtest']['ping']['dns']['stddev']
+        else:
+            speedtest_ping_results_exist = False
+
+        speedtest_iperf_results_exist = True
+        if edge['speedtest']['iperf']['cluster']['downlink']:
+            iperf_cluster_downlink_result = edge['speedtest']['iperf']['cluster']['downlink']
+            iperf_cluster_uplink_result = edge['speedtest']['iperf']['cluster']['uplink']
+        else:
+            speedtest_iperf_results_exist = False
+
+        cp_status.labels(edge['name']).set(status_codes[connect_status])
+        up_status.labels(edge['name']).set(status_codes[ping_status])
+
+        last_update.labels(edge['name']).set(edge['last_update'])
+        if 'maintenance' in edge:
+            maint_window.labels(edge['name']).set(int(edge['maintenance']['in_window']))
+
+        connect_test_ok.labels(edge['name']).set(0)
+        ping_test_ok.labels(edge['name']).set(0)
+        e2e_tests_ok.labels(edge['name']).set(0)
+        e2e_tests_down.labels(edge['name']).set(0)
+
+        if connect_status in ["error", "no result"] or ping_status in ["error", "no result"]:
+            e2e_tests_down.labels(edge['name']).set(1)
+        else:
+            if connect_status == "connected":
+                connect_test_ok.labels(edge['name']).set(1)
+            if ping_status == "connected":
+                ping_test_ok.labels(edge['name']).set(1)
+            if connect_status == "connected" and ping_status == "connected":
+                e2e_tests_ok.labels(edge['name']).set(1)
+
+        if speedtest_ping_results_exist:
+            ping_dns_min.labels(edge['name']).set(ping_dns_min_result)
+            ping_dns_avg.labels(edge['name']).set(ping_dns_avg_result)
+            ping_dns_max.labels(edge['name']).set(ping_dns_max_result)
+            ping_dns_stddev.labels(edge['name']).set(ping_dns_stddev_result)
+
+        if speedtest_iperf_results_exist:
+            iperf_cluster_downlink.labels(edge['name']).set(iperf_cluster_downlink_result)
+            iperf_cluster_uplink.labels(edge['name']).set(iperf_cluster_uplink_result)
+
+        if 'signal_quality' in edge.keys():
+            signal_quality_rsrq.labels(edge['name']).set(edge['signal_quality']['rsrq'])
+            signal_quality_rsrp.labels(edge['name']).set(edge['signal_quality']['rsrp'])
+
+    res.append(prom.generate_latest(cp_status))
+    res.append(prom.generate_latest(up_status))
+    res.append(prom.generate_latest(ping_dns_min))
+    res.append(prom.generate_latest(ping_dns_avg))
+    res.append(prom.generate_latest(ping_dns_max))
+    res.append(prom.generate_latest(ping_dns_stddev))
+    res.append(prom.generate_latest(iperf_cluster_downlink))
+    res.append(prom.generate_latest(iperf_cluster_uplink))
+    res.append(prom.generate_latest(last_update))
+    res.append(prom.generate_latest(maint_window))
+    res.append(prom.generate_latest(connect_test_ok))
+    res.append(prom.generate_latest(ping_test_ok))
+    res.append(prom.generate_latest(e2e_tests_ok))
+    res.append(prom.generate_latest(e2e_tests_down))
+    res.append(prom.generate_latest(signal_quality_rsrq))
+    res.append(prom.generate_latest(signal_quality_rsrp))
+
+    return Response(res, mimetype="text/plain")
+
+
+@app.route('/edges/healthz', methods=['GET'])
+def get_health():
+    return {'message': 'healthy'}
+
+
+@app.route('/edges', methods=['GET'])
+def get_edges():
+    time_out_stale_results()
+    return jsonify({'edges': edges})
+
+
+@app.route('/edges/<string:name>', methods=['GET'])
+def get_edge(name):
+    time_out_stale_results()
+    edge = [edge for edge in edges if edge['name'] == name]
+    if len(edge) == 0:
+        abort(404)
+    return jsonify({'edge': edge[0]})
+
+
+@app.route('/edges', methods=['POST'])
+@app.route('/testresults', methods=['POST'])
+def create_or_update_edge():
+    try:
+        jsonschema.validate(instance=request.json, schema=edgeSchema)
+    except jsonschema.exceptions.ValidationError as err:
+        app.logger.warn(err)
+        abort(400)
+
+    req_edge = {
+        'name': request.json['name'],
+        'status': {
+            'control_plane': request.json['status']['control_plane'],
+            'user_plane': request.json['status']['user_plane']
+        },
+        'speedtest': {
+            'ping': {
+                'dns': {
+                    'min': 0.0,
+                    'avg': 0.0,
+                    'max': 0.0,
+                    'stddev': 0.0
+                }
+            },
+            'iperf': {
+                'cluster': {
+                    'downlink': 0.0,
+                    'uplink': 0.0
+                }
+            }
+        },
+        'last_update': time.time()
+    }
+
+    if 'speedtest' in request.json:
+        if 'ping' in request.json['speedtest']:
+            req_edge['speedtest']['ping'] = request.json['speedtest']['ping']
+        if 'iperf' in request.json['speedtest']:
+            req_edge['speedtest']['iperf'] = request.json['speedtest']['iperf']
+
+    if 'signal_quality' in request.json:
+        req_edge['signal_quality'] = request.json['signal_quality']
+
+    edge = [edge for edge in edges if edge['name'] == req_edge['name']]
+    if len(edge) == 0:
+        app.logger.info("new edge request " + req_edge['name'])
+        edges.append(req_edge)
+    else:
+        edge[0]['status']['control_plane'] = req_edge['status']['control_plane']
+        edge[0]['status']['user_plane'] = req_edge['status']['user_plane']
+        edge[0]['speedtest']['ping'] = req_edge['speedtest']['ping']
+        edge[0]['speedtest']['iperf'] = req_edge['speedtest']['iperf']
+        if 'signal_quality' in req_edge.keys():
+            edge[0]['signal_quality'] = req_edge['signal_quality']
+        edge[0]['last_update'] = req_edge['last_update']
+
+    return jsonify({'edge': req_edge}), 201
+
+
+@app.route('/edges/<string:name>', methods=['DELETE'])
+@app.route('/testresults/<string:name>', methods=['DELETE'])
+def delete_edge(name):
+    app.logger.info("delete edge request " + name)
+    result = False
+    for i in range(len(edges)):
+        if edges[i]['name'] == name:
+            del edges[i]
+            remove_edge_from_metrics(name)
+            result = True
+            break
+    if not result:
+        abort(404)
+    return jsonify({'result': True})
+
+
+if __name__ == '__main__':
+    if SECRET_ICAL_URL and AETHER_ENV:
+        app.logger.info(" * Starting maintenance calendar polling thread (Aether env: %s)" % AETHER_ENV)
+        t = threading.Thread(target=pull_maintenance_events)
+        t.start()
+    app.run(debug=True, host='0.0.0.0', port=80)