blob: 352c7e3ac6b98daccc705a5aac8e146163646b3b [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
Zack Williams5c2ea232019-01-30 15:23:01 -070015from __future__ import absolute_import, print_function
Scott Bakerbba67b62019-01-28 17:38:21 -080016import confluent_kafka
17import functools
18import unittest
19
20from mock import patch, PropertyMock, ANY
21
22import os
23import sys
24import time
25
26log = None
27
28test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
29sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
30xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
31
Zack Williams5c2ea232019-01-30 15:23:01 -070032print(os.getcwd())
Scott Bakerbba67b62019-01-28 17:38:21 -080033
34def config_get_mock(orig, overrides, key):
35 if key in overrides:
36 return overrides[key]
37 else:
38 return orig(key)
39
40
41class FakeKafkaConsumer:
42 def __init__(self, values=[]):
43 self.values = values
44
45 def subscribe(self, topics):
46 pass
47
48 def poll(self, timeout=1.0):
49 if self.values:
50 return FakeKafkaMessage(self.values.pop())
51 # block forever
52 time.sleep(1000)
53
54
55class FakeKafkaMessage:
56 """ Works like Message in confluent_kafka
57 https://docs.confluent.io/current/clients/confluent-kafka-python/#message
58 """
59
60 def __init__(
61 self,
62 timestamp=None,
63 topic="faketopic",
64 key="fakekey",
65 value="fakevalue",
66 error=False,
67 ):
68
69 if timestamp is None:
70 self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
71 self.fake_timestamp = None
72 else:
73 self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
74 self.fake_timestamp = timestamp
75
76 self.fake_topic = topic
77 self.fake_key = key
78 self.fake_value = value
79 self.fake_error = error
80
81 def error(self):
82 return self.fake_error
83
84 def timestamp(self):
85 return (self.fake_ts_type, self.fake_timestamp)
86
87 def topic(self):
88 return self.fake_topic
89
90 def key(self):
91 return self.fake_key
92
93 def value(self):
94 return self.fake_value
95
96
97class TestEventEngine(unittest.TestCase):
98 @classmethod
99 def setUpClass(cls):
100
101 global log
102
103 config = os.path.join(test_path, "test_config.yaml")
104 from xosconfig import Config
105
106 Config.clear()
107 Config.init(config, "synchronizer-config-schema.yaml")
108
109 if not log:
110 from multistructlog import create_logger
111
112 log = create_logger(Config().get("logging"))
113
114 def setUp(self):
115 global XOSKafkaThread, Config, log
116
117 self.sys_path_save = sys.path
118 self.cwd_save = os.getcwd()
119
120 config = os.path.join(test_path, "test_config.yaml")
121 from xosconfig import Config
122
123 Config.clear()
124 Config.init(config, "synchronizer-config-schema.yaml")
125
126 from xossynchronizer.mock_modelaccessor_build import (
127 build_mock_modelaccessor,
128 )
129
130 build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
131
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800132 from xossynchronizer.modelaccessor import model_accessor
133
Scott Bakerf0d7e5c2019-02-05 08:35:31 -0800134 # The test config.yaml references files in `xos-synchronizer-tests/` so make sure we're in the parent
135 # directory of the test directory.
Scott Bakerbba67b62019-01-28 17:38:21 -0800136 os.chdir(os.path.join(test_path, ".."))
137
138 from xossynchronizer.event_engine import XOSKafkaThread, XOSEventEngine
139
140 self.event_steps_dir = Config.get("event_steps_dir")
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800141 self.event_engine = XOSEventEngine(model_accessor=model_accessor, log=log)
Scott Bakerbba67b62019-01-28 17:38:21 -0800142
143 def tearDown(self):
144 sys.path = self.sys_path_save
145 os.chdir(self.cwd_save)
146
147 def test_load_event_step_modules(self):
148 self.event_engine.load_event_step_modules(self.event_steps_dir)
149 self.assertEqual(len(self.event_engine.event_steps), 1)
150
151 def test_start(self):
152 self.event_engine.load_event_step_modules(self.event_steps_dir)
153
154 with patch.object(
155 XOSKafkaThread, "create_kafka_consumer"
156 ) as create_kafka_consumer, patch.object(
157 FakeKafkaConsumer, "subscribe"
158 ) as fake_subscribe, patch.object(
159 self.event_engine.event_steps[0], "process_event"
160 ) as process_event:
161
162 create_kafka_consumer.return_value = FakeKafkaConsumer(
163 values=["sampleevent"]
164 )
165 self.event_engine.start()
166
167 self.assertEqual(len(self.event_engine.threads), 1)
168
169 # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
170 time.sleep(0.1)
171
172 # We should have subscribed to the fake consumer
173 fake_subscribe.assert_called_once()
174
175 # The fake consumer will have returned one event
176 process_event.assert_called_once()
177
178 def test_start_with_pattern(self):
179 self.event_engine.load_event_step_modules(self.event_steps_dir)
180
181 with patch.object(
182 XOSKafkaThread, "create_kafka_consumer"
183 ) as create_kafka_consumer, patch.object(
184 FakeKafkaConsumer, "subscribe"
185 ) as fake_subscribe, patch.object(
186 self.event_engine.event_steps[0], "process_event"
187 ) as process_event, patch.object(
188 self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
189 ) as pattern, patch.object(
190 self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
191 ) as topics:
192
193 pattern.return_value = "somepattern"
194 topics.return_value = []
195
196 create_kafka_consumer.return_value = FakeKafkaConsumer(
197 values=["sampleevent"]
198 )
199 self.event_engine.start()
200
201 self.assertEqual(len(self.event_engine.threads), 1)
202
203 # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
204 time.sleep(0.1)
205
206 # We should have subscribed to the fake consumer
207 fake_subscribe.assert_called_with("somepattern")
208
209 # The fake consumer will have returned one event
210 process_event.assert_called_once()
211
212 def test_start_bad_tech(self):
213 """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
214 not create any threads.
215 """
216
217 self.event_engine.load_event_step_modules(self.event_steps_dir)
218
219 with patch.object(
220 XOSKafkaThread, "create_kafka_consumer"
221 ) as create_kafka_consumer, patch.object(
222 log, "error"
223 ) as log_error, patch.object(
224 self.event_engine.event_steps[0], "technology"
225 ) as technology:
226 technology.return_value = "not_kafka"
227 create_kafka_consumer.return_value = FakeKafkaConsumer()
228 self.event_engine.start()
229
230 self.assertEqual(len(self.event_engine.threads), 0)
231
232 log_error.assert_called_with(
233 "Unknown technology. Skipping step",
234 step="TestEventStep",
235 technology=ANY,
236 )
237
238 def test_start_bad_no_topics(self):
239 """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
240 with an exception before calling subscribe.
241 """
242
243 self.event_engine.load_event_step_modules(self.event_steps_dir)
244
245 with patch.object(
246 XOSKafkaThread, "create_kafka_consumer"
247 ) as create_kafka_consumer, patch.object(
248 FakeKafkaConsumer, "subscribe"
249 ) as fake_subscribe, patch.object(
250 self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
251 ) as topics:
252 topics.return_value = []
253 create_kafka_consumer.return_value = FakeKafkaConsumer()
254 self.event_engine.start()
255
256 # the thread does get launched, but it will fail with an exception
257 self.assertEqual(len(self.event_engine.threads), 1)
258
259 time.sleep(0.1)
260
261 fake_subscribe.assert_not_called()
262
263 def test_start_bad_topics_and_pattern(self):
264 """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
265 with an exception before calling subscribe.
266 """
267
268 self.event_engine.load_event_step_modules(self.event_steps_dir)
269
270 with patch.object(
271 XOSKafkaThread, "create_kafka_consumer"
272 ) as create_kafka_consumer, patch.object(
273 FakeKafkaConsumer, "subscribe"
274 ) as fake_subscribe, patch.object(
275 self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
276 ) as pattern:
277 pattern.return_value = "foo"
278 create_kafka_consumer.return_value = FakeKafkaConsumer()
279 self.event_engine.start()
280
281 # the thread does get launched, but it will fail with an exception
282 self.assertEqual(len(self.event_engine.threads), 1)
283
284 time.sleep(0.1)
285
286 fake_subscribe.assert_not_called()
287
288 def test_start_config_no_eventbus_kind(self):
289 """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and
290 not create any threads.
291 """
292
293 self.event_engine.load_event_step_modules(self.event_steps_dir)
294
295 config_get_orig = Config.get
296 with patch.object(
297 XOSKafkaThread, "create_kafka_consumer"
298 ) as create_kafka_consumer, patch.object(
299 log, "error"
300 ) as log_error, patch.object(
301 Config,
302 "get",
303 new=functools.partial(
304 config_get_mock, config_get_orig, {"event_bus.kind": None}
305 ),
306 ):
307
308 create_kafka_consumer.return_value = FakeKafkaConsumer()
309 self.event_engine.start()
310
311 self.assertEqual(len(self.event_engine.threads), 0)
312
313 log_error.assert_called_with(
314 "Eventbus kind is not configured in synchronizer config file."
315 )
316
317 def test_start_config_bad_eventbus_kind(self):
318 """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and
319 not create any threads.
320 """
321
322 self.event_engine.load_event_step_modules(self.event_steps_dir)
323
324 config_get_orig = Config.get
325 with patch.object(
326 XOSKafkaThread, "create_kafka_consumer"
327 ) as create_kafka_consumer, patch.object(
328 log, "error"
329 ) as log_error, patch.object(
330 Config,
331 "get",
332 new=functools.partial(
333 config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"}
334 ),
335 ):
336 create_kafka_consumer.return_value = FakeKafkaConsumer()
337 self.event_engine.start()
338
339 self.assertEqual(len(self.event_engine.threads), 0)
340
341 log_error.assert_called_with(
342 "Eventbus kind is set to a technology we do not implement.",
343 eventbus_kind="not_kafka",
344 )
345
346
347if __name__ == "__main__":
348 unittest.main()