blob: a09b3d052268ef56a29465ab9c12bad4fe10641d [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import confluent_kafka
16import functools
17import unittest
18
19from mock import patch, PropertyMock, ANY
20
21import os
22import sys
23import time
24
25log = None
26
27test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
28sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
29xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
30
31print os.getcwd()
32
33def config_get_mock(orig, overrides, key):
34 if key in overrides:
35 return overrides[key]
36 else:
37 return orig(key)
38
39
40class FakeKafkaConsumer:
41 def __init__(self, values=[]):
42 self.values = values
43
44 def subscribe(self, topics):
45 pass
46
47 def poll(self, timeout=1.0):
48 if self.values:
49 return FakeKafkaMessage(self.values.pop())
50 # block forever
51 time.sleep(1000)
52
53
54class FakeKafkaMessage:
55 """ Works like Message in confluent_kafka
56 https://docs.confluent.io/current/clients/confluent-kafka-python/#message
57 """
58
59 def __init__(
60 self,
61 timestamp=None,
62 topic="faketopic",
63 key="fakekey",
64 value="fakevalue",
65 error=False,
66 ):
67
68 if timestamp is None:
69 self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
70 self.fake_timestamp = None
71 else:
72 self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
73 self.fake_timestamp = timestamp
74
75 self.fake_topic = topic
76 self.fake_key = key
77 self.fake_value = value
78 self.fake_error = error
79
80 def error(self):
81 return self.fake_error
82
83 def timestamp(self):
84 return (self.fake_ts_type, self.fake_timestamp)
85
86 def topic(self):
87 return self.fake_topic
88
89 def key(self):
90 return self.fake_key
91
92 def value(self):
93 return self.fake_value
94
95
96class TestEventEngine(unittest.TestCase):
97 @classmethod
98 def setUpClass(cls):
99
100 global log
101
102 config = os.path.join(test_path, "test_config.yaml")
103 from xosconfig import Config
104
105 Config.clear()
106 Config.init(config, "synchronizer-config-schema.yaml")
107
108 if not log:
109 from multistructlog import create_logger
110
111 log = create_logger(Config().get("logging"))
112
113 def setUp(self):
114 global XOSKafkaThread, Config, log
115
116 self.sys_path_save = sys.path
117 self.cwd_save = os.getcwd()
118
119 config = os.path.join(test_path, "test_config.yaml")
120 from xosconfig import Config
121
122 Config.clear()
123 Config.init(config, "synchronizer-config-schema.yaml")
124
125 from xossynchronizer.mock_modelaccessor_build import (
126 build_mock_modelaccessor,
127 )
128
129 build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
130
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800131 from xossynchronizer.modelaccessor import model_accessor
132
Scott Bakerbba67b62019-01-28 17:38:21 -0800133 # The test config.yaml references files in `test/` so make sure we're in the parent directory of the
134 # test directory.
135 os.chdir(os.path.join(test_path, ".."))
136
137 from xossynchronizer.event_engine import XOSKafkaThread, XOSEventEngine
138
139 self.event_steps_dir = Config.get("event_steps_dir")
Scott Bakerc2fddaa2019-01-30 15:45:03 -0800140 self.event_engine = XOSEventEngine(model_accessor=model_accessor, log=log)
Scott Bakerbba67b62019-01-28 17:38:21 -0800141
142 def tearDown(self):
143 sys.path = self.sys_path_save
144 os.chdir(self.cwd_save)
145
146 def test_load_event_step_modules(self):
147 self.event_engine.load_event_step_modules(self.event_steps_dir)
148 self.assertEqual(len(self.event_engine.event_steps), 1)
149
150 def test_start(self):
151 self.event_engine.load_event_step_modules(self.event_steps_dir)
152
153 with patch.object(
154 XOSKafkaThread, "create_kafka_consumer"
155 ) as create_kafka_consumer, patch.object(
156 FakeKafkaConsumer, "subscribe"
157 ) as fake_subscribe, patch.object(
158 self.event_engine.event_steps[0], "process_event"
159 ) as process_event:
160
161 create_kafka_consumer.return_value = FakeKafkaConsumer(
162 values=["sampleevent"]
163 )
164 self.event_engine.start()
165
166 self.assertEqual(len(self.event_engine.threads), 1)
167
168 # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
169 time.sleep(0.1)
170
171 # We should have subscribed to the fake consumer
172 fake_subscribe.assert_called_once()
173
174 # The fake consumer will have returned one event
175 process_event.assert_called_once()
176
177 def test_start_with_pattern(self):
178 self.event_engine.load_event_step_modules(self.event_steps_dir)
179
180 with patch.object(
181 XOSKafkaThread, "create_kafka_consumer"
182 ) as create_kafka_consumer, patch.object(
183 FakeKafkaConsumer, "subscribe"
184 ) as fake_subscribe, patch.object(
185 self.event_engine.event_steps[0], "process_event"
186 ) as process_event, patch.object(
187 self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
188 ) as pattern, patch.object(
189 self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
190 ) as topics:
191
192 pattern.return_value = "somepattern"
193 topics.return_value = []
194
195 create_kafka_consumer.return_value = FakeKafkaConsumer(
196 values=["sampleevent"]
197 )
198 self.event_engine.start()
199
200 self.assertEqual(len(self.event_engine.threads), 1)
201
202 # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
203 time.sleep(0.1)
204
205 # We should have subscribed to the fake consumer
206 fake_subscribe.assert_called_with("somepattern")
207
208 # The fake consumer will have returned one event
209 process_event.assert_called_once()
210
211 def test_start_bad_tech(self):
212 """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
213 not create any threads.
214 """
215
216 self.event_engine.load_event_step_modules(self.event_steps_dir)
217
218 with patch.object(
219 XOSKafkaThread, "create_kafka_consumer"
220 ) as create_kafka_consumer, patch.object(
221 log, "error"
222 ) as log_error, patch.object(
223 self.event_engine.event_steps[0], "technology"
224 ) as technology:
225 technology.return_value = "not_kafka"
226 create_kafka_consumer.return_value = FakeKafkaConsumer()
227 self.event_engine.start()
228
229 self.assertEqual(len(self.event_engine.threads), 0)
230
231 log_error.assert_called_with(
232 "Unknown technology. Skipping step",
233 step="TestEventStep",
234 technology=ANY,
235 )
236
237 def test_start_bad_no_topics(self):
238 """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
239 with an exception before calling subscribe.
240 """
241
242 self.event_engine.load_event_step_modules(self.event_steps_dir)
243
244 with patch.object(
245 XOSKafkaThread, "create_kafka_consumer"
246 ) as create_kafka_consumer, patch.object(
247 FakeKafkaConsumer, "subscribe"
248 ) as fake_subscribe, patch.object(
249 self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
250 ) as topics:
251 topics.return_value = []
252 create_kafka_consumer.return_value = FakeKafkaConsumer()
253 self.event_engine.start()
254
255 # the thread does get launched, but it will fail with an exception
256 self.assertEqual(len(self.event_engine.threads), 1)
257
258 time.sleep(0.1)
259
260 fake_subscribe.assert_not_called()
261
262 def test_start_bad_topics_and_pattern(self):
263 """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
264 with an exception before calling subscribe.
265 """
266
267 self.event_engine.load_event_step_modules(self.event_steps_dir)
268
269 with patch.object(
270 XOSKafkaThread, "create_kafka_consumer"
271 ) as create_kafka_consumer, patch.object(
272 FakeKafkaConsumer, "subscribe"
273 ) as fake_subscribe, patch.object(
274 self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
275 ) as pattern:
276 pattern.return_value = "foo"
277 create_kafka_consumer.return_value = FakeKafkaConsumer()
278 self.event_engine.start()
279
280 # the thread does get launched, but it will fail with an exception
281 self.assertEqual(len(self.event_engine.threads), 1)
282
283 time.sleep(0.1)
284
285 fake_subscribe.assert_not_called()
286
287 def test_start_config_no_eventbus_kind(self):
288 """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and
289 not create any threads.
290 """
291
292 self.event_engine.load_event_step_modules(self.event_steps_dir)
293
294 config_get_orig = Config.get
295 with patch.object(
296 XOSKafkaThread, "create_kafka_consumer"
297 ) as create_kafka_consumer, patch.object(
298 log, "error"
299 ) as log_error, patch.object(
300 Config,
301 "get",
302 new=functools.partial(
303 config_get_mock, config_get_orig, {"event_bus.kind": None}
304 ),
305 ):
306
307 create_kafka_consumer.return_value = FakeKafkaConsumer()
308 self.event_engine.start()
309
310 self.assertEqual(len(self.event_engine.threads), 0)
311
312 log_error.assert_called_with(
313 "Eventbus kind is not configured in synchronizer config file."
314 )
315
316 def test_start_config_bad_eventbus_kind(self):
317 """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and
318 not create any threads.
319 """
320
321 self.event_engine.load_event_step_modules(self.event_steps_dir)
322
323 config_get_orig = Config.get
324 with patch.object(
325 XOSKafkaThread, "create_kafka_consumer"
326 ) as create_kafka_consumer, patch.object(
327 log, "error"
328 ) as log_error, patch.object(
329 Config,
330 "get",
331 new=functools.partial(
332 config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"}
333 ),
334 ):
335 create_kafka_consumer.return_value = FakeKafkaConsumer()
336 self.event_engine.start()
337
338 self.assertEqual(len(self.event_engine.threads), 0)
339
340 log_error.assert_called_with(
341 "Eventbus kind is set to a technology we do not implement.",
342 eventbus_kind="not_kafka",
343 )
344
345
346if __name__ == "__main__":
347 unittest.main()