blob: 13972c60650af271aa6d408eb78da14564a542e4 [file] [log] [blame]
Scott Bakerbba67b62019-01-28 17:38:21 -08001# Copyright 2017-present Open Networking Foundation
2#
3# Licensed under the Apache License, Version 2.0 (the "License");
4# you may not use this file except in compliance with the License.
5# You may obtain a copy of the License at
6#
7# http://www.apache.org/licenses/LICENSE-2.0
8#
9# Unless required by applicable law or agreed to in writing, software
10# distributed under the License is distributed on an "AS IS" BASIS,
11# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
12# See the License for the specific language governing permissions and
13# limitations under the License.
14
15import confluent_kafka
16import functools
17import unittest
18
19from mock import patch, PropertyMock, ANY
20
21import os
22import sys
23import time
24
25log = None
26
27test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__)))
28sync_lib_dir = os.path.join(test_path, "..", "xossynchronizer")
29xos_dir = os.path.join(test_path, "..", "..", "..", "xos")
30
31print os.getcwd()
32
33def config_get_mock(orig, overrides, key):
34 if key in overrides:
35 return overrides[key]
36 else:
37 return orig(key)
38
39
40class FakeKafkaConsumer:
41 def __init__(self, values=[]):
42 self.values = values
43
44 def subscribe(self, topics):
45 pass
46
47 def poll(self, timeout=1.0):
48 if self.values:
49 return FakeKafkaMessage(self.values.pop())
50 # block forever
51 time.sleep(1000)
52
53
54class FakeKafkaMessage:
55 """ Works like Message in confluent_kafka
56 https://docs.confluent.io/current/clients/confluent-kafka-python/#message
57 """
58
59 def __init__(
60 self,
61 timestamp=None,
62 topic="faketopic",
63 key="fakekey",
64 value="fakevalue",
65 error=False,
66 ):
67
68 if timestamp is None:
69 self.fake_ts_type = confluent_kafka.TIMESTAMP_NOT_AVAILABLE
70 self.fake_timestamp = None
71 else:
72 self.fake_ts_type = confluent_kafka.TIMESTAMP_CREATE_TIME
73 self.fake_timestamp = timestamp
74
75 self.fake_topic = topic
76 self.fake_key = key
77 self.fake_value = value
78 self.fake_error = error
79
80 def error(self):
81 return self.fake_error
82
83 def timestamp(self):
84 return (self.fake_ts_type, self.fake_timestamp)
85
86 def topic(self):
87 return self.fake_topic
88
89 def key(self):
90 return self.fake_key
91
92 def value(self):
93 return self.fake_value
94
95
96class TestEventEngine(unittest.TestCase):
97 @classmethod
98 def setUpClass(cls):
99
100 global log
101
102 config = os.path.join(test_path, "test_config.yaml")
103 from xosconfig import Config
104
105 Config.clear()
106 Config.init(config, "synchronizer-config-schema.yaml")
107
108 if not log:
109 from multistructlog import create_logger
110
111 log = create_logger(Config().get("logging"))
112
113 def setUp(self):
114 global XOSKafkaThread, Config, log
115
116 self.sys_path_save = sys.path
117 self.cwd_save = os.getcwd()
118
119 config = os.path.join(test_path, "test_config.yaml")
120 from xosconfig import Config
121
122 Config.clear()
123 Config.init(config, "synchronizer-config-schema.yaml")
124
125 from xossynchronizer.mock_modelaccessor_build import (
126 build_mock_modelaccessor,
127 )
128
129 build_mock_modelaccessor(sync_lib_dir, xos_dir, services_dir=None, service_xprotos=[])
130
131 # The test config.yaml references files in `test/` so make sure we're in the parent directory of the
132 # test directory.
133 os.chdir(os.path.join(test_path, ".."))
134
135 from xossynchronizer.event_engine import XOSKafkaThread, XOSEventEngine
136
137 self.event_steps_dir = Config.get("event_steps_dir")
138 self.event_engine = XOSEventEngine(log)
139
140 def tearDown(self):
141 sys.path = self.sys_path_save
142 os.chdir(self.cwd_save)
143
144 def test_load_event_step_modules(self):
145 self.event_engine.load_event_step_modules(self.event_steps_dir)
146 self.assertEqual(len(self.event_engine.event_steps), 1)
147
148 def test_start(self):
149 self.event_engine.load_event_step_modules(self.event_steps_dir)
150
151 with patch.object(
152 XOSKafkaThread, "create_kafka_consumer"
153 ) as create_kafka_consumer, patch.object(
154 FakeKafkaConsumer, "subscribe"
155 ) as fake_subscribe, patch.object(
156 self.event_engine.event_steps[0], "process_event"
157 ) as process_event:
158
159 create_kafka_consumer.return_value = FakeKafkaConsumer(
160 values=["sampleevent"]
161 )
162 self.event_engine.start()
163
164 self.assertEqual(len(self.event_engine.threads), 1)
165
166 # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
167 time.sleep(0.1)
168
169 # We should have subscribed to the fake consumer
170 fake_subscribe.assert_called_once()
171
172 # The fake consumer will have returned one event
173 process_event.assert_called_once()
174
175 def test_start_with_pattern(self):
176 self.event_engine.load_event_step_modules(self.event_steps_dir)
177
178 with patch.object(
179 XOSKafkaThread, "create_kafka_consumer"
180 ) as create_kafka_consumer, patch.object(
181 FakeKafkaConsumer, "subscribe"
182 ) as fake_subscribe, patch.object(
183 self.event_engine.event_steps[0], "process_event"
184 ) as process_event, patch.object(
185 self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
186 ) as pattern, patch.object(
187 self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
188 ) as topics:
189
190 pattern.return_value = "somepattern"
191 topics.return_value = []
192
193 create_kafka_consumer.return_value = FakeKafkaConsumer(
194 values=["sampleevent"]
195 )
196 self.event_engine.start()
197
198 self.assertEqual(len(self.event_engine.threads), 1)
199
200 # Since event_engine.start() launches threads, give them a hundred milliseconds to do something...
201 time.sleep(0.1)
202
203 # We should have subscribed to the fake consumer
204 fake_subscribe.assert_called_with("somepattern")
205
206 # The fake consumer will have returned one event
207 process_event.assert_called_once()
208
209 def test_start_bad_tech(self):
210 """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and
211 not create any threads.
212 """
213
214 self.event_engine.load_event_step_modules(self.event_steps_dir)
215
216 with patch.object(
217 XOSKafkaThread, "create_kafka_consumer"
218 ) as create_kafka_consumer, patch.object(
219 log, "error"
220 ) as log_error, patch.object(
221 self.event_engine.event_steps[0], "technology"
222 ) as technology:
223 technology.return_value = "not_kafka"
224 create_kafka_consumer.return_value = FakeKafkaConsumer()
225 self.event_engine.start()
226
227 self.assertEqual(len(self.event_engine.threads), 0)
228
229 log_error.assert_called_with(
230 "Unknown technology. Skipping step",
231 step="TestEventStep",
232 technology=ANY,
233 )
234
235 def test_start_bad_no_topics(self):
236 """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
237 with an exception before calling subscribe.
238 """
239
240 self.event_engine.load_event_step_modules(self.event_steps_dir)
241
242 with patch.object(
243 XOSKafkaThread, "create_kafka_consumer"
244 ) as create_kafka_consumer, patch.object(
245 FakeKafkaConsumer, "subscribe"
246 ) as fake_subscribe, patch.object(
247 self.event_engine.event_steps[0], "topics", new_callable=PropertyMock
248 ) as topics:
249 topics.return_value = []
250 create_kafka_consumer.return_value = FakeKafkaConsumer()
251 self.event_engine.start()
252
253 # the thread does get launched, but it will fail with an exception
254 self.assertEqual(len(self.event_engine.threads), 1)
255
256 time.sleep(0.1)
257
258 fake_subscribe.assert_not_called()
259
260 def test_start_bad_topics_and_pattern(self):
261 """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail
262 with an exception before calling subscribe.
263 """
264
265 self.event_engine.load_event_step_modules(self.event_steps_dir)
266
267 with patch.object(
268 XOSKafkaThread, "create_kafka_consumer"
269 ) as create_kafka_consumer, patch.object(
270 FakeKafkaConsumer, "subscribe"
271 ) as fake_subscribe, patch.object(
272 self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock
273 ) as pattern:
274 pattern.return_value = "foo"
275 create_kafka_consumer.return_value = FakeKafkaConsumer()
276 self.event_engine.start()
277
278 # the thread does get launched, but it will fail with an exception
279 self.assertEqual(len(self.event_engine.threads), 1)
280
281 time.sleep(0.1)
282
283 fake_subscribe.assert_not_called()
284
285 def test_start_config_no_eventbus_kind(self):
286 """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and
287 not create any threads.
288 """
289
290 self.event_engine.load_event_step_modules(self.event_steps_dir)
291
292 config_get_orig = Config.get
293 with patch.object(
294 XOSKafkaThread, "create_kafka_consumer"
295 ) as create_kafka_consumer, patch.object(
296 log, "error"
297 ) as log_error, patch.object(
298 Config,
299 "get",
300 new=functools.partial(
301 config_get_mock, config_get_orig, {"event_bus.kind": None}
302 ),
303 ):
304
305 create_kafka_consumer.return_value = FakeKafkaConsumer()
306 self.event_engine.start()
307
308 self.assertEqual(len(self.event_engine.threads), 0)
309
310 log_error.assert_called_with(
311 "Eventbus kind is not configured in synchronizer config file."
312 )
313
314 def test_start_config_bad_eventbus_kind(self):
315 """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and
316 not create any threads.
317 """
318
319 self.event_engine.load_event_step_modules(self.event_steps_dir)
320
321 config_get_orig = Config.get
322 with patch.object(
323 XOSKafkaThread, "create_kafka_consumer"
324 ) as create_kafka_consumer, patch.object(
325 log, "error"
326 ) as log_error, patch.object(
327 Config,
328 "get",
329 new=functools.partial(
330 config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"}
331 ),
332 ):
333 create_kafka_consumer.return_value = FakeKafkaConsumer()
334 self.event_engine.start()
335
336 self.assertEqual(len(self.event_engine.threads), 0)
337
338 log_error.assert_called_with(
339 "Eventbus kind is set to a technology we do not implement.",
340 eventbus_kind="not_kafka",
341 )
342
343
344if __name__ == "__main__":
345 unittest.main()