blob: cb3a78aab703ef50303e58b665d98d89fca4f823 [file] [log] [blame]
Andy Bavier614af142020-08-07 14:49:56 -07001#!/usr/bin/env python
2
3# Copyright 2020-present Open Networking Foundation
4#
5# Licensed under the Apache License, Version 2.0 (the "License");
6# you may not use this file except in compliance with the License.
7# You may obtain a copy of the License at
8#
9# http://www.apache.org/licenses/LICENSE-2.0
10#
11# Unless required by applicable law or agreed to in writing, software
12# distributed under the License is distributed on an "AS IS" BASIS,
13# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
14# See the License for the specific language governing permissions and
15# limitations under the License.
16
17import unittest
18import edge_monitoring_server as ems
19import datetime
20import pytz
Hyunsun Moon5f237ec2020-09-29 14:45:52 -070021import json
22import time
23
24
25test_edge = {
26 'name': 'production-edge-onf-menlo',
27 'status': {
28 'control_plane': 'connected',
29 'user_plane': 'connected'
30 },
31 'last_update': time.time()
32}
Andy Bavier614af142020-08-07 14:49:56 -070033
34
35class MyEvent:
Andy Bavierc41cf0c2020-09-02 14:49:21 -070036 def __init__ (self, location = "", description = "", summary = "", start = None, end = None, all_day = False):
Andy Bavier614af142020-08-07 14:49:56 -070037 self.location = location
38 self.description = description
39 self.summary = summary
40 self.start = start
41 self.end = end
Andy Bavierc41cf0c2020-09-02 14:49:21 -070042 self.all_day = all_day
Andy Bavier614af142020-08-07 14:49:56 -070043
44class MyEventNoLoc:
45 def __init__ (self, description = "", summary = ""):
46 self.description = description
47 self.summary = summary
48
49
50class TestEdgeMonitoringServer(unittest.TestCase):
Hyunsun Moon5f237ec2020-09-29 14:45:52 -070051 def setUp(self):
52 self.app = ems.app.test_client()
53
Andy Bavier614af142020-08-07 14:49:56 -070054 def test_match_location(self):
55 event = MyEvent(location = "production-edge-onf-menlo, (Compute)-MP-1-Aether Production")
56 self.assertTrue(ems.is_my_event(event, "production-edge-onf-menlo"))
57 self.assertTrue(ems.is_my_event(event, "(Compute)-MP-1-Aether Production"))
58
59 def test_match_description(self):
60 event = MyEvent(description = "production-edge-onf-menlo, (Compute)-MP-1-Aether Production")
61 self.assertTrue(ems.is_my_event(event, "production-edge-onf-menlo"))
62 self.assertTrue(ems.is_my_event(event, "(Compute)-MP-1-Aether Production"))
63
64 def test_match_summary(self):
65 event = MyEvent(summary = "production-edge-onf-menlo, (Compute)-MP-1-Aether Production")
66 self.assertTrue(ems.is_my_event(event, "production-edge-onf-menlo"))
67 self.assertTrue(ems.is_my_event(event, "(Compute)-MP-1-Aether Production"))
68
69 def test_no_match(self):
70 event = MyEvent(summary = "production-edge-onf-menlo, (Compute)-MP-1-Aether Production")
71 self.assertFalse(ems.is_my_event(event, "production-edge-intel"))
72 self.assertFalse(ems.is_my_event(event, "(Compute)-MP-1-Aether Staging"))
73
74 def test_missing_field(self):
75 event = MyEventNoLoc(description = "production-edge-onf-menlo, (Compute)-MP-1-Aether Production")
76 self.assertTrue(ems.is_my_event(event, "production-edge-onf-menlo"))
77
78 def test_in_window(self):
79 events = []
80 now = datetime.datetime.now(pytz.utc)
81 events.append(MyEvent(location = "(Compute)-MP-1-Aether Production",
82 start = now - datetime.timedelta(hours=1),
83 end = now + datetime.timedelta(hours=1)))
84 self.assertTrue(ems.in_maintenance_window(events, "production-edge-onf-menlo", now))
85 self.assertFalse(ems.in_maintenance_window(events, "production-edge-onf-tucson", now))
86
87 def test_not_in_window(self):
88 events = []
89 now = datetime.datetime.now(pytz.utc)
90 events.append(MyEvent(location = "production-edge-onf-menlo",
91 start = now + datetime.timedelta(hours=1),
92 end = now + datetime.timedelta(hours=2)))
93 self.assertFalse(ems.in_maintenance_window(events, "production-edge-onf-menlo", now))
94
95 def test_no_events(self):
96 events = []
97 now = datetime.datetime.now(pytz.utc)
98 self.assertFalse(ems.in_maintenance_window(events, "production-edge-onf-menlo", now))
99
Andy Bavierc41cf0c2020-09-02 14:49:21 -0700100 def test_all_day_events(self):
101 events = []
102 events.append(MyEvent(location = "production-edge-onf-menlo",
103 start = datetime.datetime(2020, 9, 2, 0, 0),
104 end = datetime.datetime(2020, 9, 3, 0, 0),
105 all_day = True))
106
107 ems.process_all_day_events(events)
108
109 now = datetime.datetime(2020, 9, 2, 12, 0, tzinfo=pytz.utc)
110 self.assertTrue(ems.in_maintenance_window(events, "production-edge-onf-menlo", now))
111
112 now = datetime.datetime(2020, 9, 3, 12, 0, tzinfo=pytz.utc)
113 self.assertFalse(ems.in_maintenance_window(events, "production-edge-onf-menlo", now))
114
Hyunsun Moon5f237ec2020-09-29 14:45:52 -0700115 def test_get_edges(self):
116 response = self.app.get('/edges')
117 data = json.loads(response.get_data(as_text=True))
118 self.assertEqual(len(data['edges']), 1)
119 self.assertEqual(data['edges'][0]['name'], 'production-edge-example')
120
121 def test_create_and_delete_edge(self):
122 response = self.app.post('/edges', json=test_edge)
123 data = json.loads(response.get_data(as_text=True))
124 self.assertEqual(data['edge']['name'], 'production-edge-onf-menlo')
125
126 response = self.app.get('/edges')
127 data = json.loads(response.get_data(as_text=True))
128 self.assertEqual(len(data['edges']), 2)
129
130 response = self.app.delete('/edges/production-edge-onf-menlo')
131 data = json.loads(response.get_data(as_text=True))
132 self.assertEqual(data['result'], True)
133 response = self.app.get('/edges', json=test_edge)
134
135 response = self.app.get('/edges')
136 data = json.loads(response.get_data(as_text=True))
137 self.assertEqual(len(data['edges']), 1)
Andy Bavierc41cf0c2020-09-02 14:49:21 -0700138
Andy Bavier614af142020-08-07 14:49:56 -0700139
140if __name__ == '__main__':
141 suite = unittest.TestLoader().loadTestsFromTestCase(TestEdgeMonitoringServer)
142 unittest.TextTestRunner(verbosity=2).run(suite)