#!/usr/bin/env python

# Copyright 2020-present Open Networking Foundation
#
# SPDX-License-Identifier: LicenseRef-ONF-Member-Only-1.0

import unittest
import edge_monitoring_server as ems
import datetime
import pytz
import json
import time
from copy import deepcopy


test_edge = {
    'name': 'ace-menlo-pixel',
    'status': {
        'control_plane': 'connected',
        'user_plane': 'connected'
    },
    'speedtest': {
        'ping': {
            'dns': {
                'min': 2.0,
                'avg': 4.0,
                'max': 6.0,
                'stddev': 1.0
            }
        },
        'iperf': {
            'cluster': {
                'downlink': 100.0,
                'uplink': 10.0
            }
        }
    },
    'signal_quality': {
        'rsrq': 30,
        'rsrp': 80
    },
    'last_update': time.time()
}

test_edge_status_only = {
    'name': 'ace-menlo-pixel',
    'status': {
        'control_plane': 'connected',
        'user_plane': 'connected'
    },
    'last_update': time.time()
}

test_edge_no_speedtest = {
    'name': 'ace-menlo-pixel',
    'status': {
        'control_plane': 'connected',
        'user_plane': 'connected'
    },
    'signal_quality': {
        'rsrq': 30,
        'rsrp': 80
    },
    'last_update': time.time()
}

test_edge_no_signal_quality = {
    'name': 'ace-menlo-pixel',
    'status': {
        'control_plane': 'connected',
        'user_plane': 'connected'
    },
    'speedtest': {
        'ping': {
            'dns': {
                'min': 2.0,
                'avg': 4.0,
                'max': 6.0,
                'stddev': 1.0
            }
        },
        'iperf': {
            'cluster': {
                'downlink': 100.0,
                'uplink': 10.0
            }
        }
    },
    'last_update': time.time()
}

test_edge_no_iperf = {
    'name': 'ace-menlo-pixel',
    'status': {
        'control_plane': 'connected',
        'user_plane': 'connected'
    },
    'speedtest': {
        'ping': {
            'dns': {
                'min': 2.0,
                'avg': 4.0,
                'max': 6.0,
                'stddev': 1.0
            }
        }
    },
    'signal_quality': {
        'rsrq': 30,
        'rsrp': 80
    },
    'last_update': time.time()
}


class MyEvent:
    def __init__ (self, location = "", description = "", summary = "", start = None, end = None, all_day = False):
        self.location = location
        self.description = description
        self.summary = summary
        self.start = start
        self.end = end
        self.all_day = all_day

class MyEventNoLoc:
    def __init__ (self, description = "", summary = ""):
        self.description = description
        self.summary = summary


class TestEdgeMonitoringServer(unittest.TestCase):
    def setUp(self):
        self.app = ems.app.test_client()
        self.emulated_time = time.mktime(time.strptime("2021-04-05 00:00:00", "%Y-%m-%d %H:%M:%S"))
        self.time_method = time.time
        time.time = self._get_time

    def tearDown(self):
        time.time = self.time_method

    def _get_time(self):
        return self.emulated_time

    def _assert_status_metrics_exist(self, data):
        self.assertTrue('aetheredge_status_control_plane{name="ace-menlo-pixel"} 2.0' in data)
        self.assertTrue('aetheredge_status_user_plane{name="ace-menlo-pixel"} 2.0' in data)
        self.assertTrue('aetheredge_last_update{name="ace-menlo-pixel"}' in data)
        self.assertTrue('aetheredge_connect_test_ok{name="ace-menlo-pixel"} 1.0' in data)
        self.assertTrue('aetheredge_ping_test_ok{name="ace-menlo-pixel"} 1.0' in data)
        self.assertTrue('aetheredge_e2e_tests_ok{name="ace-menlo-pixel"} 1.0' in data)
        self.assertTrue('aetheredge_e2e_tests_down{name="ace-menlo-pixel"} 0.0' in data)

    def _assert_speedtest_metrics_exist(self, data):
        self.assertTrue('aetheredge_ping_dns_test_min{name="ace-menlo-pixel"} 2.0' in data)
        self.assertTrue('aetheredge_ping_dns_test_avg{name="ace-menlo-pixel"} 4.0' in data)
        self.assertTrue('aetheredge_ping_dns_test_max{name="ace-menlo-pixel"} 6.0' in data)
        self.assertTrue('aetheredge_ping_dns_test_stddev{name="ace-menlo-pixel"} 1.0' in data)
        self.assertTrue('aetheredge_iperf_cluster_downlink_test{name="ace-menlo-pixel"} 100.0' in data)
        self.assertTrue('aetheredge_iperf_cluster_uplink_test{name="ace-menlo-pixel"} 10.0' in data)

    def _assert_signal_quality_metrics_exist(self, data):
        self.assertTrue('aetheredge_signal_quality_rsrq{name="ace-menlo-pixel"} 30' in data)
        self.assertTrue('aetheredge_signal_quality_rsrp{name="ace-menlo-pixel"} 80' in data)

    def test_match_location(self):
        event = MyEvent(location = "ace-menlo-pixel-production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))
        event = MyEvent(location = "(Compute)-MP-1-Aether Production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))

    def test_match_description(self):
        event = MyEvent(description = "ace-menlo-pixel-production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))
        event = MyEvent(description = "(Compute)-MP-1-Aether Production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))

    def test_match_summary(self):
        event = MyEvent(summary = "ace-menlo-pixel-production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))
        event = MyEvent(summary = "(Compute)-MP-1-Aether Production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))

    def test_no_match(self):
        event = MyEvent(summary = "ace-menlo-pixel-production, (Compute)-MP-1-Aether Production")
        self.assertFalse(ems.is_my_event(event, "ace-intel"))
        self.assertFalse(ems.is_my_event(event, "(Compute)-MP-1-Aether Staging"))
        self.assertFalse(ems.is_my_event(event, "ace-menlo"))

    def test_missing_field(self):
        event = MyEventNoLoc(description = "(Compute)-MP-1-Aether Production")
        self.assertTrue(ems.is_my_event(event, "ace-menlo-pixel"))

    def test_in_window(self):
        events = []
        now = datetime.datetime.now(pytz.utc)
        events.append(MyEvent(location = "(Compute)-MP-1-Aether Production",
            start = now - datetime.timedelta(hours=1),
            end = now + datetime.timedelta(hours=1)))
        self.assertTrue(ems.in_maintenance_window(events, "ace-menlo-pixel", now))
        self.assertFalse(ems.in_maintenance_window(events, "ace-tucson", now))

    def test_not_in_window(self):
        events = []
        now = datetime.datetime.now(pytz.utc)
        events.append(MyEvent(location = "ace-menlo-pixel-production",
            start = now + datetime.timedelta(hours=1),
            end = now + datetime.timedelta(hours=2)))
        self.assertFalse(ems.in_maintenance_window(events, "ace-menlo-pixel", now))

    def test_no_events(self):
        events = []
        now = datetime.datetime.now(pytz.utc)
        self.assertFalse(ems.in_maintenance_window(events, "ace-menlo-pixel", now))

    def test_all_day_events(self):
        events = []
        events.append(MyEvent(location = "ace-menlo-pixel-production",
            start = datetime.datetime(2020, 9, 2, 0, 0),
            end = datetime.datetime(2020, 9, 3, 0, 0),
            all_day = True))

        ems.process_all_day_events(events)

        now = datetime.datetime(2020, 9, 2, 12, 0, tzinfo=pytz.utc)
        self.assertTrue(ems.in_maintenance_window(events, "ace-menlo-pixel", now))

        now = datetime.datetime(2020, 9, 3, 12, 0, tzinfo=pytz.utc)
        self.assertFalse(ems.in_maintenance_window(events, "ace-menlo-pixel", now))

    def test_get_edges(self):
        response = self.app.get('/edges')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(len(data['edges']), 1)
        self.assertEqual(data['edges'][0]['name'], 'ace-example')

    def test_create_and_delete_edge_legacy(self):
        response = self.app.post('/edges', json=test_edge)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(len(data['edges']), 2)

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self._assert_status_metrics_exist(data)
        self._assert_speedtest_metrics_exist(data)
        self._assert_signal_quality_metrics_exist(data)

        response = self.app.delete('/edges/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)
        self.assertFalse('ace-menlo-pixel' in data)

        response = self.app.get('/edges')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(len(data['edges']), 1)

    def test_create_and_delete_edge(self):
        response = self.app.post('/testresults', json=test_edge)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(len(data['edges']), 2)

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self._assert_status_metrics_exist(data)
        self._assert_speedtest_metrics_exist(data)
        self._assert_signal_quality_metrics_exist(data)

        response = self.app.delete('/testresults/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)
        self.assertFalse('ace-menlo-pixel' in data)

        response = self.app.get('/edges')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(len(data['edges']), 1)

    def test_create_and_delete_edge_speed_test(self):
        response = self.app.post('/testresults', json=test_edge)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self._assert_speedtest_metrics_exist(data)

        response = self.app.delete('/testresults/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

    def test_backwards_compatible_status_only(self):
        response = self.app.post('/testresults', json=test_edge_status_only)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self.assertFalse('aetheredge_signal_quality_rsrq{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_signal_quality_rsrp{name="ace-menlo-pixel"}' in data)

        self._assert_status_metrics_exist(data)

        response = self.app.delete('/testresults/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

    def test_backwards_compatible_no_speedtest(self):
        response = self.app.post('/testresults', json=test_edge_no_speedtest)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self.assertFalse('aetheredge_ping_dns_test_min{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_ping_dns_test_avg{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_ping_dns_test_max{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_ping_dns_test_stddev{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_iperf_cluster_downlink_test{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_iperf_cluster_uplink_test{name="ace-menlo-pixel"}' in data)

        self._assert_status_metrics_exist(data)
        self._assert_signal_quality_metrics_exist(data)

        response = self.app.delete('/testresults/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

    def test_backwards_compatible_no_signal_quality(self):
        response = self.app.post('/testresults', json=test_edge_no_signal_quality)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self.assertFalse('aetheredge_signal_quality_rsrq{name="ace-menlo-pixel"}' in data)
        self.assertFalse('aetheredge_signal_quality_rsrp{name="ace-menlo-pixel"}' in data)

        self._assert_status_metrics_exist(data)
        self._assert_speedtest_metrics_exist(data)

        response = self.app.delete('/testresults/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

    def test_timeout_stale_result(self):
        response = self.app.post('/testresults', json=test_edge)
        data = json.loads(response.get_data(as_text=True))
        # print(json.dumps(data, indent=2))

        self.assertEqual(data['edge']['status']['control_plane'], 'connected')
        self.assertEqual(data['edge']['status']['user_plane'], 'connected')
        self.assertEqual(data['edge']['speedtest']['ping']['dns']['avg'], 4.0)
        self.assertTrue('signal_quality' in data['edge'])

        self.emulated_time += (ems.NO_RESULT_THRESHOLD + 1)

        response = self.app.get('/edges/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        # print(json.dumps(data, indent=2))

        self.assertEqual(data['edge']['status']['control_plane'], 'no result')
        self.assertEqual(data['edge']['status']['user_plane'], 'no result')
        self.assertEqual(data['edge']['speedtest']['ping']['dns']['avg'], 0.0)
        self.assertFalse('signal_quality' in data['edge'])

    def test_backwards_compatible_no_iperf(self):
        response = self.app.post('/testresults', json=test_edge_no_iperf)
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['edge']['name'], 'ace-menlo-pixel')

        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        # print(data)

        self.assertFalse('iperf_cluster_downlink{name="ace-menlo-pixel"}' in data)
        self.assertFalse('iperf_cluster_uplink{name="ace-menlo-pixel"}' in data)

        response = self.app.delete('/testresults/ace-menlo-pixel')
        data = json.loads(response.get_data(as_text=True))
        self.assertEqual(data['result'], True)

    def test_handle_invalid_schema(self):
        response = self.app.post('/testresults', json="")
        self.assertEqual(response.status_code, 400)

        no_name = deepcopy(test_edge)
        del no_name['name']
        response = self.app.post('/testresults', json=no_name)
        self.assertEqual(response.status_code, 400)
        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        self.assertFalse('ace-menlo-pixel' in data)

        no_status = deepcopy(test_edge)
        del no_status['status']
        response = self.app.post('/testresults', json=no_status)
        self.assertEqual(response.status_code, 400)
        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        self.assertFalse('ace-menlo-pixel' in data)

        bad_status = deepcopy(test_edge)
        bad_status['status']['control_plane'] = 1
        response = self.app.post('/testresults', json=bad_status)
        self.assertEqual(response.status_code, 400)
        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        self.assertFalse('ace-menlo-pixel' in data)

        bad_ping_result = deepcopy(test_edge)
        bad_ping_result['speedtest']['ping']['dns']['min'] = "foo"
        response = self.app.post('/testresults', json=bad_ping_result)
        self.assertEqual(response.status_code, 400)
        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        self.assertFalse('ace-menlo-pixel' in data)

        bad_iperf_result = deepcopy(test_edge)
        bad_iperf_result['speedtest']['iperf']['cluster']['uplink'] = "foo"
        response = self.app.post('/testresults', json=bad_iperf_result)
        self.assertEqual(response.status_code, 400)
        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        self.assertFalse('ace-menlo-pixel' in data)

        bad_signal_quality = deepcopy(test_edge)
        del bad_signal_quality['signal_quality']['rsrq']
        response = self.app.post('/testresults', json=bad_signal_quality)
        self.assertEqual(response.status_code, 400)
        response = self.app.get('/edges/metrics')
        data = response.get_data(as_text=True)
        self.assertFalse('ace-menlo-pixel' in data)

if __name__ == '__main__':
    suite = unittest.TestLoader().loadTestsFromTestCase(TestEdgeMonitoringServer)
    unittest.TextTestRunner(verbosity=2).run(suite)
