| # Copyright 2017-present Open Networking Foundation |
| # |
| # Licensed under the Apache License, Version 2.0 (the "License"); |
| # you may not use this file except in compliance with the License. |
| # You may obtain a copy of the License at |
| # |
| # http://www.apache.org/licenses/LICENSE-2.0 |
| # |
| # Unless required by applicable law or agreed to in writing, software |
| # distributed under the License is distributed on an "AS IS" BASIS, |
| # WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. |
| # See the License for the specific language governing permissions and |
| # limitations under the License. |
| |
| import functools |
| import unittest |
| from mock import patch, PropertyMock, ANY |
| |
| import os, sys |
| import time |
| |
| test_path = os.path.abspath(os.path.dirname(os.path.realpath(__file__))) |
| xos_dir = os.path.join(test_path, '..', '..', '..') |
| |
| def config_get_mock(orig, overrides, key): |
| if key in overrides: |
| return overrides[key] |
| else: |
| return orig(key) |
| |
| class FakeKafkaConsumer(): |
| def __init__(self, values=["sampleevent"]): |
| self.values = values |
| |
| def subscribe(self, topics): |
| pass |
| |
| def poll(self): |
| for x in self.values: |
| yield x |
| # block forever |
| time.sleep(1000) |
| |
| class TestEventEngine(unittest.TestCase): |
| def setUp(self): |
| global XOSKafkaThread, Config, event_engine_log |
| |
| self.sys_path_save = sys.path |
| self.cwd_save = os.getcwd() |
| sys.path.append(xos_dir) |
| sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base')) |
| sys.path.append(os.path.join(xos_dir, 'synchronizers', 'new_base', 'tests', 'event_steps')) |
| |
| config = os.path.join(test_path, "test_config.yaml") |
| from xosconfig import Config |
| Config.clear() |
| Config.init(config, 'synchronizer-config-schema.yaml') |
| |
| from synchronizers.new_base.mock_modelaccessor_build import build_mock_modelaccessor |
| build_mock_modelaccessor(xos_dir, services_dir=None, service_xprotos=[]) |
| |
| os.chdir(os.path.join(test_path, '..')) # config references tests/model-deps |
| |
| from event_engine import XOSKafkaThread, XOSEventEngine |
| from event_engine import log as event_engine_log |
| |
| self.event_steps_dir = Config.get("event_steps_dir") |
| self.event_engine = XOSEventEngine() |
| |
| def tearDown(self): |
| sys.path = self.sys_path_save |
| os.chdir(self.cwd_save) |
| |
| def test_load_event_step_modules(self): |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| self.assertEqual(len(self.event_engine.event_steps), 1) |
| |
| def test_start(self): |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \ |
| patch.object(self.event_engine.event_steps[0], "process_event") as process_event: |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| self.assertEqual(len(self.event_engine.threads), 1) |
| |
| # Since event_engine.start() launches threads, give them a hundred milliseconds to do something... |
| time.sleep(0.1) |
| |
| # We should have subscribed to the fake consumer |
| fake_subscribe.assert_called_with(["sometopic"]) |
| |
| # The fake consumer will have returned one event, and that event will have been passed to our step |
| process_event.assert_called_with("sampleevent") |
| |
| def test_start_with_pattern(self): |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \ |
| patch.object(self.event_engine.event_steps[0], "process_event") as process_event, \ |
| patch.object(self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock) as pattern, \ |
| patch.object(self.event_engine.event_steps[0], "topics", new_callable=PropertyMock) as topics: |
| |
| pattern.return_value = "somepattern" |
| topics.return_value = [] |
| |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| self.assertEqual(len(self.event_engine.threads), 1) |
| |
| # Since event_engine.start() launches threads, give them a hundred milliseconds to do something... |
| time.sleep(0.1) |
| |
| # We should have subscribed to the fake consumer |
| fake_subscribe.assert_called_with("somepattern") |
| |
| # The fake consumer will have returned one event, and that event will have been passed to our step |
| process_event.assert_called_with("sampleevent") |
| |
| |
| def test_start_bad_tech(self): |
| """ Set an unknown Technology in the event_step. XOSEventEngine.start() should print an error message and |
| not create any threads. |
| """ |
| |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(event_engine_log, "error") as log_error, \ |
| patch.object(self.event_engine.event_steps[0], "technology") as technology: |
| technology.return_value = "not_kafka" |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| self.assertEqual(len(self.event_engine.threads), 0) |
| |
| log_error.assert_called_with('Unknown technology. Skipping step', step="TestEventStep", |
| technology=ANY) |
| |
| def test_start_bad_no_topics(self): |
| """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail |
| with an exception before calling subscribe. |
| """ |
| |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \ |
| patch.object(self.event_engine.event_steps[0], "topics", new_callable=PropertyMock) as topics: |
| topics.return_value = [] |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| # the thread does get launched, but it will fail with an exception |
| self.assertEqual(len(self.event_engine.threads), 1) |
| |
| time.sleep(0.1) |
| |
| fake_subscribe.assert_not_called() |
| |
| def test_start_bad_topics_and_pattern(self): |
| """ Set no topics in the event_step. XOSEventEngine.start() will launch a thread, but the thread will fail |
| with an exception before calling subscribe. |
| """ |
| |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(FakeKafkaConsumer, "subscribe") as fake_subscribe, \ |
| patch.object(self.event_engine.event_steps[0], "pattern", new_callable=PropertyMock) as pattern: |
| pattern.return_value = "foo" |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| # the thread does get launched, but it will fail with an exception |
| self.assertEqual(len(self.event_engine.threads), 1) |
| |
| time.sleep(0.1) |
| |
| fake_subscribe.assert_not_called() |
| |
| def test_start_config_no_eventbus_kind(self): |
| """ Set a blank event_bus.kind in Config. XOSEventEngine.start() should print an error message and |
| not create any threads. |
| """ |
| |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| config_get_orig = Config.get |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(event_engine_log, "error") as log_error, \ |
| patch.object(Config, "get", new=functools.partial(config_get_mock, config_get_orig, {"event_bus.kind": None})): |
| |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| self.assertEqual(len(self.event_engine.threads), 0) |
| |
| log_error.assert_called_with('Eventbus kind is not configured in synchronizer config file.') |
| |
| def test_start_config_bad_eventbus_kind(self): |
| """ Set an unknown event_bus.kind in Config. XOSEventEngine.start() should print an error message and |
| not create any threads. |
| """ |
| |
| self.event_engine.load_event_step_modules(self.event_steps_dir) |
| |
| config_get_orig = Config.get |
| with patch.object(XOSKafkaThread, "create_kafka_consumer") as create_kafka_consumer, \ |
| patch.object(event_engine_log, "error") as log_error, \ |
| patch.object(Config, "get", |
| new=functools.partial(config_get_mock, config_get_orig, {"event_bus.kind": "not_kafka"})): |
| create_kafka_consumer.return_value = FakeKafkaConsumer() |
| self.event_engine.start() |
| |
| self.assertEqual(len(self.event_engine.threads), 0) |
| |
| log_error.assert_called_with('Eventbus kind is set to a technology we do not implement.', |
| eventbus_kind='not_kafka') |
| |
| if __name__ == '__main__': |
| unittest.main() |